diff --git a/admin/app/com/lucidchart/piezo/admin/controllers/HealthCheck.scala b/admin/app/com/lucidchart/piezo/admin/controllers/HealthCheck.scala index 58a4da1c..be986f54 100644 --- a/admin/app/com/lucidchart/piezo/admin/controllers/HealthCheck.scala +++ b/admin/app/com/lucidchart/piezo/admin/controllers/HealthCheck.scala @@ -1,14 +1,17 @@ package com.lucidchart.piezo.admin.controllers -import org.joda.time.{DateTime, Minutes} -import org.joda.time.format.ISODateTimeFormat import play.api.* import play.api.libs.json.* import play.api.Logging import play.api.mvc.* import scala.io.Source +import java.time.Instant +import java.time.ZoneOffset.UTC +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit.{MINUTES, SECONDS} class HealthCheck(configuration: Configuration, cc: ControllerComponents) extends AbstractController(cc) with Logging { + import HealthCheck.timeFormatter val heartbeatFilename: String = configuration.getOptional[String]("com.lucidchart.piezo.heartbeatFile").getOrElse { logger.warn("heartbeat file not specified") @@ -34,13 +37,16 @@ class HealthCheck(configuration: Configuration, cc: ControllerComponents) extend try { val heartbeatFileLines = heartbeatFile.getLines().toList val heartbeatTimestamp = heartbeatFileLines(0) - val formatter = ISODateTimeFormat.dateTimeNoMillis().withZoneUTC() - val heartbeatTime = formatter.parseDateTime(heartbeatTimestamp) - val currentTime = new DateTime - val isTimestampRecent = Minutes.minutesBetween(heartbeatTime, currentTime).getMinutes < minutesBetweenBeats - (isTimestampRecent, formatter.print(heartbeatTime)) + val heartbeatTime = timeFormatter.parse(heartbeatTimestamp, Instant.from) + val currentTime = Instant.now() + val isTimestampRecent = heartbeatTime.until(currentTime, MINUTES) < minutesBetweenBeats + (isTimestampRecent, timeFormatter.format(heartbeatTime.truncatedTo(SECONDS))) } finally { heartbeatFile.close() } } } + +object HealthCheck { + private[piezo] val timeFormatter = DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(UTC) +} diff --git a/admin/app/com/lucidchart/piezo/admin/controllers/Jobs.scala b/admin/app/com/lucidchart/piezo/admin/controllers/Jobs.scala index e0bdd132..89851956 100644 --- a/admin/app/com/lucidchart/piezo/admin/controllers/Jobs.scala +++ b/admin/app/com/lucidchart/piezo/admin/controllers/Jobs.scala @@ -83,7 +83,7 @@ class Jobs( .flatMap { job => jobHistoryModel.getJob(job).headOption } - .sortWith(_.start after _.start) + .sortWith((a, b) => a.start.isAfter(b.start)) val triggeredJobs: List[JobKey] = TriggerHelper .getTriggersByGroup(scheduler) .flatMap { case (group, triggerKeys) => diff --git a/admin/app/com/lucidchart/piezo/admin/controllers/Triggers.scala b/admin/app/com/lucidchart/piezo/admin/controllers/Triggers.scala index 32c0a904..23a6c903 100644 --- a/admin/app/com/lucidchart/piezo/admin/controllers/Triggers.scala +++ b/admin/app/com/lucidchart/piezo/admin/controllers/Triggers.scala @@ -1,17 +1,17 @@ package com.lucidchart.piezo.admin.controllers import com.lucidchart.piezo.TriggerMonitoringPriority -import java.util.Date -import org.quartz.Trigger.TriggerState +import com.lucidchart.piezo.admin.models.{ModelComponents, MonitoringTeams} +import java.time.Instant import org.quartz.* +import org.quartz.Trigger.TriggerState import org.quartz.impl.triggers.{CronTriggerImpl, SimpleTriggerImpl} +import play.api.Logging import play.api.libs.json.* import play.api.mvc.* -import com.lucidchart.piezo.admin.models.{ModelComponents, MonitoringTeams} import scala.collection.mutable import scala.jdk.CollectionConverters.* import scala.util.Try -import play.api.Logging class Triggers( scheduler: Scheduler, @@ -27,9 +27,10 @@ class Triggers( val triggerFormHelper = new TriggerFormHelper(scheduler, monitoringTeams) - def firesFirst(time: Date)(trigger1: Trigger, trigger2: Trigger): Boolean = { - val time1 = trigger1.getFireTimeAfter(time) - val time2 = trigger2.getFireTimeAfter(time) + def firesFirst(time: Instant)(trigger1: Trigger, trigger2: Trigger): Boolean = { + val d = java.util.Date.from(time) + val time1 = trigger1.getFireTimeAfter(d) + val time2 = trigger2.getFireTimeAfter(d) if (time2 == null) true else if (time1 == null) false else if (time1 != time2) time1 before time2 @@ -37,7 +38,7 @@ class Triggers( } def getIndex: Action[AnyContent] = Action { implicit request => - val now = new Date() + val now = Instant.now() val allTriggers: List[Trigger] = TriggerHelper .getTriggersByGroup(scheduler) .flatMap { case (group, triggerKeys) => diff --git a/admin/app/com/lucidchart/piezo/admin/views/TimeFormat.scala b/admin/app/com/lucidchart/piezo/admin/views/TimeFormat.scala new file mode 100644 index 00000000..660296e7 --- /dev/null +++ b/admin/app/com/lucidchart/piezo/admin/views/TimeFormat.scala @@ -0,0 +1,35 @@ +package com.lucidchart.piezo.admin.views + +import java.time.Instant +import java.time.ZoneOffset.UTC +import java.time.format.DateTimeFormatter + +object TimeFormat { + + private val dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(UTC) + + /** + * Print a java.util.Date in a standard format. + * + * Will prilnt "--" if null is passed + */ + def printDate(d: java.util.Date): String = { + if (d == null) { + "--" + } else { + dtf.format(d.toInstant) + } + } + + /** + * Print an instant in a standard format. + */ + def printInstant(i: Instant): String = { + dtf.format(i) + } + + def printInstant(instant: Option[Instant]): String = instant match { + case Some(i) => dtf.format(i) + case None => "--" + } +} diff --git a/admin/app/com/lucidchart/piezo/admin/views/jobs.scala.html b/admin/app/com/lucidchart/piezo/admin/views/jobs.scala.html index d7b4db65..ffb90e76 100644 --- a/admin/app/com/lucidchart/piezo/admin/views/jobs.scala.html +++ b/admin/app/com/lucidchart/piezo/admin/views/jobs.scala.html @@ -1,7 +1,6 @@ @import com.lucidchart.piezo.admin.controllers.{routes=>piezoRoutes} -@import org.joda.time.format.DateTimeFormat -@import org.joda.time.DateTime @import org.quartz.{JobKey, SchedulerMetaData} +@import com.lucidchart.piezo.admin.views.TimeFormat.printInstant @( jobsByGroup: scala.collection.mutable.Buffer[(String, scala.collection.immutable.List[org.quartz.JobKey])], currentJob: Option[org.quartz.JobDetail], @@ -63,11 +62,10 @@

Jobs History

- @defining(DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")) { dtf => @jobsHistory.get.map { record => - @dtf.print(new DateTime(record.start)) - @dtf.print(new DateTime(record.finish)) + @printInstant(record.start) + @printInstant(record.finish) @record.group @record.name @record.trigger_group @@ -76,7 +74,6 @@

Jobs History

} - } } diff --git a/admin/app/com/lucidchart/piezo/admin/views/triggers.scala.html b/admin/app/com/lucidchart/piezo/admin/views/triggers.scala.html index 76e4b81b..53b73f9c 100644 --- a/admin/app/com/lucidchart/piezo/admin/views/triggers.scala.html +++ b/admin/app/com/lucidchart/piezo/admin/views/triggers.scala.html @@ -9,9 +9,7 @@ request: play.api.mvc.Request[AnyContent] ) @import com.lucidchart.piezo.admin.views -@import java.util.Date -@import org.joda.time.format.DateTimeFormat -@import org.joda.time.DateTime +@import com.lucidchart.piezo.admin.views.TimeFormat.printDate @com.lucidchart.piezo.admin.views.html.triggersLayout(triggersByGroup, currentTrigger) {

Select a trigger

@@ -42,12 +40,12 @@

Upcoming Triggers

- @defining(DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")) { dtf => - @defining(new Date()) { now => + *{ quartz still uses java.util.Date }* + @defining(new java.util.Date()) { now => @upcomingTriggers.map { trigger => - @dtf.print(new DateTime(trigger.getFireTimeAfter(now))) - @dtf.print(new DateTime(trigger.getFireTimeAfter(trigger.getFireTimeAfter(now)))) + @printDate(trigger.getFireTimeAfter(now)) + @printDate(trigger.getFireTimeAfter(trigger.getFireTimeAfter(now))) @trigger.getKey.getGroup @trigger.getKey.getName @trigger.getJobKey.getGroup @@ -56,7 +54,6 @@

Upcoming Triggers

} } - } } diff --git a/admin/test/com/lucidchart/piezo/admin/controllers/HealthCheckTest.scala b/admin/test/com/lucidchart/piezo/admin/controllers/HealthCheckTest.scala index 802f7ac7..5802571c 100644 --- a/admin/test/com/lucidchart/piezo/admin/controllers/HealthCheckTest.scala +++ b/admin/test/com/lucidchart/piezo/admin/controllers/HealthCheckTest.scala @@ -1,18 +1,17 @@ package com.lucidchart.piezo.admin.controllers import java.io.{File, FileWriter} -import org.joda.time.DateTime -import org.joda.time.format.ISODateTimeFormat import org.specs2.mutable.* import play.api.Configuration import play.api.test.Helpers.* import play.api.test.* -import org.joda.time.format.DateTimeFormatter +import java.time.Instant +import java.time.temporal.ChronoUnit.MINUTES class HealthCheckTest extends Specification { val filename = "HeartbeatTestFile" - val dtf: DateTimeFormatter = ISODateTimeFormat.dateTimeNoMillis().withZoneUTC() + private val dtf = HealthCheck.timeFormatter trait FileCleaner extends After { def after: Unit = new File(filename).delete @@ -24,7 +23,7 @@ class HealthCheckTest extends Specification { "send 200 when the worker timestamp is recent" in new FileCleaner { val file = new File(filename) val fileWrite = new FileWriter(file) - val heartbeatTime = dtf.print(new DateTime(System.currentTimeMillis())) + val heartbeatTime = dtf.format(Instant.now()) fileWrite.write(heartbeatTime) fileWrite.close() val healthCheck = new HealthCheck(testConfig(filename), Helpers.stubControllerComponents()) @@ -35,7 +34,7 @@ class HealthCheckTest extends Specification { "send 503 when the worker timestamp is too far in the past" in new FileCleaner { val file = new File(filename) val fileWrite = new FileWriter(file) - val heartbeatTime = dtf.print(new DateTime(System.currentTimeMillis()).minusMinutes(10)) + val heartbeatTime = dtf.format(Instant.now().minus(10, MINUTES)) fileWrite.write(heartbeatTime) fileWrite.close() val healthCheck = new HealthCheck(testConfig(filename), Helpers.stubControllerComponents()) diff --git a/admin/test/com/lucidchart/piezo/admin/controllers/TestUtil.scala b/admin/test/com/lucidchart/piezo/admin/controllers/TestUtil.scala index 9f85b2bd..3ae7919d 100644 --- a/admin/test/com/lucidchart/piezo/admin/controllers/TestUtil.scala +++ b/admin/test/com/lucidchart/piezo/admin/controllers/TestUtil.scala @@ -3,7 +3,7 @@ package com.lucidchart.piezo.admin.controllers import org.quartz.{JobBuilder, Scheduler, SimpleScheduleBuilder, TriggerBuilder} import com.lucidchart.piezo.jobs.monitoring.HeartBeat import com.lucidchart.piezo.admin.models.ModelComponents -import java.util.Date +import java.time.Instant /** */ @@ -13,7 +13,7 @@ object TestUtil { val triggerGroup = "testTriggerGroup" val triggerName = "testTriggerName" - def createJob(scheduler: Scheduler): Date = { + def createJob(scheduler: Scheduler): Instant = { val jobDetail = JobBuilder .newJob(classOf[HeartBeat]) .withIdentity(jobName, jobGroup) @@ -30,7 +30,7 @@ object TestUtil { .withDescription("test schedule description") .build() scheduler.deleteJob(jobDetail.getKey()) - scheduler.scheduleJob(jobDetail, trigger) + scheduler.scheduleJob(jobDetail, trigger).toInstant } val mockModelComponents = new ModelComponents(() => throw new Exception("fake connection")) diff --git a/worker/build.sbt b/worker/build.sbt index 45f0c0b6..2708a0f1 100644 --- a/worker/build.sbt +++ b/worker/build.sbt @@ -15,8 +15,6 @@ libraryDependencies ++= Seq( "org.specs2" %% "specs2-core" % "4.20.9" % Test, "mysql" % "mysql-connector-java" % "8.0.33", "javax.transaction" % "jta" % "1.1", - "joda-time" % "joda-time" % "2.13.1", - "org.joda" % "joda-convert" % "3.0.1", "com.typesafe" % "config" % "1.4.3", "com.datadoghq" % "java-dogstatsd-client" % "4.4.3", ) diff --git a/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala b/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala index ff823aef..b72d664d 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala @@ -1,12 +1,11 @@ package com.lucidchart.piezo -import java.sql.{ResultSet, Timestamp} -import java.util.Date -import org.quartz.{JobDataMap, JobKey, TriggerKey} -import org.slf4j.LoggerFactory -import org.slf4j.Logger import java.sql.Connection +import java.sql.ResultSet import java.time.Instant +import org.quartz.{JobDataMap, JobKey, TriggerKey} +import org.slf4j.Logger +import org.slf4j.LoggerFactory case class JobRecord( name: String, @@ -14,8 +13,8 @@ case class JobRecord( trigger_name: String, trigger_group: String, success: Int, - start: Date, - finish: Date, + start: Instant, + finish: Option[Instant], fire_instance_id: String, ) @@ -40,7 +39,7 @@ class JobHistoryModel(getConnection: () => Connection) { fireInstanceId: String, jobKey: JobKey, triggerKey: TriggerKey, - fireTime: Date, + fireTime: Instant, instanceDurationInMillis: Long, success: Boolean, ): Unit = { @@ -67,8 +66,8 @@ class JobHistoryModel(getConnection: () => Connection) { prepared.setString(4, triggerKey.getName) prepared.setString(5, triggerKey.getGroup) prepared.setBoolean(6, success) - prepared.setTimestamp(7, new Timestamp(fireTime.getTime)) - prepared.setTimestamp(8, new Timestamp(fireTime.getTime + instanceDurationInMillis)) + prepared.setObject(7, fireTime) + prepared.setObject(8, fireTime.plusMillis(instanceDurationInMillis)) prepared.executeUpdate() } catch { case e: Exception => logger.error("error in recording start of job", e) @@ -77,7 +76,7 @@ class JobHistoryModel(getConnection: () => Connection) { } // TODO: close statement? } - def deleteJobs(minStart: Long): Int = { + def deleteJobs(minStart: Instant): Int = { val connection = getConnection() try { val prepared = connection.prepareStatement( @@ -88,7 +87,7 @@ class JobHistoryModel(getConnection: () => Connection) { AND trigger_group != '$oneTimeJobTriggerGroup' """.stripMargin, ) - prepared.setTimestamp(1, new Timestamp(minStart)) + prepared.setObject(1, minStart) prepared.executeUpdate() } catch { case e: Exception => @@ -201,8 +200,8 @@ class JobHistoryModel(getConnection: () => Connection) { rs.getString("trigger_name"), rs.getString("trigger_group"), rs.getInt("success"), - rs.getTimestamp("start"), - rs.getTimestamp("finish"), + rs.getTimestamp("start").toInstant, + Option(rs.getTimestamp("finish")).map(_.toInstant), rs.getString("fire_instance_id"), ) } @@ -216,35 +215,18 @@ class JobHistoryModel(getConnection: () => Connection) { */ def addOneTimeJobIfNotExists(jobKey: JobKey, fireInstanceId: Long): Boolean = { val connection = getConnection() - connection.setAutoCommit(false) // Use a trigger key that the database won't clean up in "JobHistoryCleanup" val triggerKey: TriggerKey = oneTimeTriggerKey(fireInstanceId) try { + // Mark this trigger (as already run) in the same transaction, to prevent race conditions where two requests + // trigger the same job + val triggerStartTime: Instant = Instant.now() + // Same as "addJob()" but without the finish val prepared = connection.prepareStatement( """ - SELECT EXISTS( - SELECT 1 - FROM job_history - WHERE fire_instance_id=? - LIMIT 1 - ) as record_exists - """.stripMargin, - ) - prepared.setString(1, fireInstanceId.toString) - val rs = prepared.executeQuery() - if (rs.next() && rs.getBoolean(1)) { - // Record already exists, don't add it again - false - } else { - // Mark this trigger (as already run) in the same transaction, to prevent race conditions where two requests - // trigger the same job - val triggerStartTime: Instant = Instant.now() - // Same as "addJob()" but without the finish - val prepared = connection.prepareStatement( - """ - INSERT INTO job_history( + INSERT IGNORE INTO job_history( fire_instance_id, job_name, job_group, @@ -255,18 +237,17 @@ class JobHistoryModel(getConnection: () => Connection) { ) VALUES(?, ?, ?, ?, ?, ?, ?) """.stripMargin, - ) - prepared.setString(1, fireInstanceId.toString) - prepared.setString(2, jobKey.getName) - prepared.setString(3, jobKey.getGroup) - prepared.setString(4, triggerKey.getName) - prepared.setString(5, triggerKey.getGroup) - prepared.setBoolean(6, true) - prepared.setObject(7, triggerStartTime) - prepared.executeUpdate() - connection.commit() - true - } + ) + prepared.setString(1, fireInstanceId.toString) + prepared.setString(2, jobKey.getName) + prepared.setString(3, jobKey.getGroup) + prepared.setString(4, triggerKey.getName) + prepared.setString(5, triggerKey.getGroup) + prepared.setBoolean(6, true) + prepared.setObject(7, triggerStartTime) + // Check if we actually inserted the row, + // if we didn't, then the firs_instance_id was already in use + prepared.executeUpdate() > 0 } finally { connection.close() } diff --git a/worker/src/main/scala/com/lucidchart/piezo/TriggerHistoryModel.scala b/worker/src/main/scala/com/lucidchart/piezo/TriggerHistoryModel.scala index ef43e287..a129990d 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/TriggerHistoryModel.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/TriggerHistoryModel.scala @@ -1,17 +1,17 @@ package com.lucidchart.piezo -import java.sql.{Connection, Timestamp} +import java.time.Instant +import java.sql.Connection import org.quartz.TriggerKey -import org.slf4j.LoggerFactory -import java.util.Date import org.slf4j.Logger +import org.slf4j.LoggerFactory case class TriggerRecord( name: String, group: String, - scheduled_start: Date, - actual_start: Option[Date], - finish: Date, + scheduled_start: Instant, + actual_start: Option[Instant], + finish: Instant, misfire: Int, fire_instance_id: String, ) @@ -21,8 +21,8 @@ class TriggerHistoryModel(getConnection: () => Connection) { def addTrigger( triggerKey: TriggerKey, - triggerFireTime: Option[Date], - actualStart: Option[Date], + triggerFireTime: Option[Instant], + actualStart: Option[Instant], misfire: Boolean, fireInstanceId: Option[String], ): Unit = { @@ -51,9 +51,9 @@ class TriggerHistoryModel(getConnection: () => Connection) { ) prepared.setString(1, triggerKey.getName) prepared.setString(2, triggerKey.getGroup) - prepared.setTimestamp(3, new Timestamp(triggerFireTime.getOrElse(new Date).getTime)) - prepared.setTimestamp(4, actualStart.map(date => new Timestamp(date.getTime)).getOrElse(null)) - prepared.setTimestamp(5, new Timestamp(System.currentTimeMillis)) + prepared.setObject(3, triggerFireTime.getOrElse(Instant.now())) + prepared.setObject(4, actualStart.getOrElse(null)) + prepared.setObject(5, Instant.now()) prepared.setBoolean(6, misfire) prepared.setString(7, fireInstanceId.getOrElse("")) prepared.executeUpdate() @@ -64,11 +64,11 @@ class TriggerHistoryModel(getConnection: () => Connection) { } } - def deleteTriggers(minScheduledStart: Long): Int = { + def deleteTriggers(minScheduledStart: Instant): Int = { val connection = getConnection() try { val prepared = connection.prepareStatement("""DELETE FROM trigger_history WHERE scheduled_start < ?""") - prepared.setTimestamp(1, new Timestamp(minScheduledStart)) + prepared.setObject(1, minScheduledStart) prepared.executeUpdate() } catch { case e: Exception => @@ -95,9 +95,9 @@ class TriggerHistoryModel(getConnection: () => Connection) { result :+= new TriggerRecord( rs.getString("trigger_name"), rs.getString("trigger_group"), - rs.getTimestamp("scheduled_start"), - Option(rs.getTimestamp("actual_start")), - rs.getTimestamp("finish"), + rs.getTimestamp("scheduled_start").toInstant, + Option(rs.getTimestamp("actual_start")).map(_.toInstant), + rs.getTimestamp("finish").toInstant, rs.getInt("misfire"), rs.getString("fire_instance_id"), ) diff --git a/worker/src/main/scala/com/lucidchart/piezo/TriggerMonitoringModel.scala b/worker/src/main/scala/com/lucidchart/piezo/TriggerMonitoringModel.scala index 3e20a037..245fe500 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/TriggerMonitoringModel.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/TriggerMonitoringModel.scala @@ -1,11 +1,11 @@ package com.lucidchart.piezo import com.lucidchart.piezo.TriggerMonitoringPriority.TriggerMonitoringPriority -import java.util.Date import org.quartz.TriggerKey import org.slf4j.LoggerFactory import org.slf4j.Logger import java.sql.Connection +import java.time.Instant object TriggerMonitoringPriority { case class Value(id: Int, name: String) { @@ -31,8 +31,8 @@ case class TriggerMonitoringRecord( priority: TriggerMonitoringPriority, maxSecondsInError: Int, monitoringTeam: Option[String], - created: Date, - modified: Date, + created: Instant, + modified: Instant, ) class TriggerMonitoringModel(getConnection: () => Connection) { @@ -127,8 +127,8 @@ class TriggerMonitoringModel(getConnection: () => Connection) { priority, rs.getInt("max_error_time"), Option(rs.getString("monitoring_team")), - rs.getDate("created"), - rs.getDate("modified"), + rs.getTimestamp("created").toInstant, + rs.getTimestamp("modified").toInstant, ) } } else { diff --git a/worker/src/main/scala/com/lucidchart/piezo/Worker.scala b/worker/src/main/scala/com/lucidchart/piezo/Worker.scala index 7838f162..9852fa20 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/Worker.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/Worker.scala @@ -4,13 +4,15 @@ import com.timgroup.statsd.NonBlockingStatsDClientBuilder import java.io.* import java.util.Properties import java.util.concurrent.{Semaphore, TimeUnit} -import org.joda.time.DateTime -import org.joda.time.format.ISODateTimeFormat import org.quartz.Scheduler import org.slf4j.LoggerFactory import scala.util.Try import scala.util.control.NonFatal import org.quartz.utils.DBConnectionManager +import java.time.Instant +import java.time.ZoneOffset.UTC +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit.SECONDS import java.sql.Connection import org.quartz.SchedulerContext @@ -29,7 +31,7 @@ object Worker { private val logger = LoggerFactory.getLogger(this.getClass) private[piezo] val runSemaphore = new Semaphore(0) private val shutdownSemaphore = new Semaphore(1) - private[piezo] val dtf = ISODateTimeFormat.dateTimeNoMillis().withZoneUTC() + private[piezo] val dtf = DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(UTC) def main(args: Array[String]): Unit = { logger.info("worker starting") @@ -129,7 +131,7 @@ object Worker { val file = new File(filePath) file.getParentFile.mkdirs() val fileWrite = new FileWriter(file) - val heartbeatTime = dtf.print(new DateTime(System.currentTimeMillis())) + val heartbeatTime = dtf.format(Instant.now().truncatedTo(SECONDS)) fileWrite.write(heartbeatTime) fileWrite.close() } catch { diff --git a/worker/src/main/scala/com/lucidchart/piezo/WorkerJobListener.scala b/worker/src/main/scala/com/lucidchart/piezo/WorkerJobListener.scala index 7aadae70..3c0a3480 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/WorkerJobListener.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/WorkerJobListener.scala @@ -37,7 +37,7 @@ class WorkerJobListener(getConnection: () => Connection, statsd: StatsDClient, u context.getFireInstanceId, context.getTrigger.getJobKey, context.getTrigger.getKey, - context.getFireTime, + context.getFireTime.toInstant, context.getJobRunTime, success = success, ) diff --git a/worker/src/main/scala/com/lucidchart/piezo/WorkerTriggerListener.scala b/worker/src/main/scala/com/lucidchart/piezo/WorkerTriggerListener.scala index 0c79e09c..65445e03 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/WorkerTriggerListener.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/WorkerTriggerListener.scala @@ -49,8 +49,8 @@ class WorkerTriggerListener(getConnection: () => Connection, statsd: StatsDClien try { triggerHistoryModel.addTrigger( trigger.getKey, - Option(trigger.getPreviousFireTime), - Some(context.getFireTime), + Option(trigger.getPreviousFireTime).map(_.toInstant), + Some(context.getFireTime.toInstant), misfire = false, Some(context.getFireInstanceId), ) @@ -70,7 +70,7 @@ class WorkerTriggerListener(getConnection: () => Connection, statsd: StatsDClien try { triggerHistoryModel.addTrigger( trigger.getKey, - Option(trigger.getPreviousFireTime), + Option(trigger.getPreviousFireTime).map(_.toInstant), None, misfire = true, None, diff --git a/worker/src/main/scala/com/lucidchart/piezo/jobs/cleanup/JobHistoryCleanup.scala b/worker/src/main/scala/com/lucidchart/piezo/jobs/cleanup/JobHistoryCleanup.scala index d79e7c65..22ae2c94 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/jobs/cleanup/JobHistoryCleanup.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/jobs/cleanup/JobHistoryCleanup.scala @@ -3,28 +3,28 @@ package com.lucidchart.piezo.jobs.cleanup import org.quartz.{Job, JobExecutionContext} import org.slf4j.LoggerFactory import com.lucidchart.piezo.{JobHistoryModel, TriggerHistoryModel, Worker} -import java.util.Date +import java.time.{Duration, Instant} import java.sql.Connection class JobHistoryCleanup extends Job { private val logger = LoggerFactory.getLogger(this.getClass) def execute(context: JobExecutionContext): Unit = { - val maxAge = context.getMergedJobDataMap.getLong("maxAgeDays") * 24L * 3600L * 1000L - val minStart = System.currentTimeMillis() - maxAge + val maxAge = Duration.ofDays(context.getMergedJobDataMap.getLong("maxAgeDays")) + val minStart = Instant.now().minus(maxAge) val getConnection = Worker.connectionFactory(context.getScheduler.getContext) deleteTriggerHistories(getConnection, minStart) deleteJobHistories(getConnection, minStart) } - private[this] def deleteTriggerHistories(getConnection: () => Connection, minStart: Long): Unit = { - logger.info("Deleting triggers older than " + new Date(minStart)) + private[this] def deleteTriggerHistories(getConnection: () => Connection, minStart: Instant): Unit = { + logger.info("Deleting triggers older than " + minStart) val numDeleted = new TriggerHistoryModel(getConnection).deleteTriggers(minStart) logger.info("Deleted " + numDeleted + " trigger histories") } - private[this] def deleteJobHistories(getConnection: () => Connection, minStart: Long): Unit = { - logger.info("Deleting jobs older than " + new Date(minStart)) + private[this] def deleteJobHistories(getConnection: () => Connection, minStart: Instant): Unit = { + logger.info("Deleting jobs older than " + minStart) val numDeleted = new JobHistoryModel(getConnection).deleteJobs(minStart) logger.info("Deleted " + numDeleted + " job histories") } diff --git a/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala b/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala index bde9248f..22f798b5 100644 --- a/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala +++ b/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala @@ -9,9 +9,9 @@ import java.sql.DriverManager import java.util.Properties import scala.jdk.CollectionConverters.* import scala.util.Using -import java.util.Date import java.io.InputStream import java.time.Instant +import java.time.temporal.ChronoUnit.SECONDS import scala.concurrent.ExecutionContext.global import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} @@ -85,7 +85,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll { val jobKey = new JobKey("blah", "blah") val triggerKey = new TriggerKey("blahtn", "blahtg") jobHistoryModel.getJobs().isEmpty must beTrue - jobHistoryModel.addJob("ab", jobKey, triggerKey, new Date(), 1000, true) + jobHistoryModel.addJob("ab", jobKey, triggerKey, Instant.now(), 1000, true) jobHistoryModel.getJob(jobKey).headOption must beSome jobHistoryModel.getLastJobSuccessByTrigger(triggerKey) must beSome jobHistoryModel.getJobs().nonEmpty must beTrue @@ -104,7 +104,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll { val jobKey = new JobKey("blahc", "blahc") val triggerKey = new TriggerKey("blahtnc", "blahtgc") jobHistoryModel.getJob(jobKey).headOption must beNone - jobHistoryModel.addJob("abc", jobKey, triggerKey, new Date(), 1000, true) + jobHistoryModel.addJob("abc", jobKey, triggerKey, Instant.now(), 1000, true) jobHistoryModel.getJob(jobKey).headOption must beSome jobHistoryModel.getLastJobSuccessByTrigger(triggerKey) must beSome @@ -140,7 +140,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll { val jobHistoryModel = new JobHistoryModel(getConnectionProvider()) val temporaryTriggerKey = new TriggerKey("blahjz", "blahzg") val jobKey = new JobKey("blahjz123", "blahzg123") - val scheduledStart = java.util.Date.from(java.time.Instant.now()) + val scheduledStart = java.time.Instant.now() val temporaryFireInstanceId = "FireInstanceId" val permanentFireInstanceId = 123456789 val permanentFireInstanceIdString = permanentFireInstanceId.toString @@ -151,7 +151,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll { .getJob(jobKey) .map(_.fire_instance_id) .toSet mustEqual Set(temporaryFireInstanceId, permanentFireInstanceIdString) - jobHistoryModel.deleteJobs(Instant.now().plusSeconds(3).toEpochMilli) mustEqual 1 + jobHistoryModel.deleteJobs(Instant.now().plusSeconds(3)) mustEqual 1 jobHistoryModel .getJob(jobKey) .map(_.fire_instance_id) @@ -187,8 +187,12 @@ class ModelTest extends Specification with BeforeAll with AfterAll { // Doesn't matter which one inserted the record, as long as one did, and one didn't Await.result(combinedFutures, Duration.Inf) mustEqual Set(true, false) - val fireTime = java.time.Instant.now() - val instanceDurationInMillis: Long = 1000 + // Truncate to the second, so that we don't end up with a rounding + // error when we do the comparison. + // Otherwise, on insert, the second might be rounded up, and we end up a second + // of from adding one second to the actual date time, and truncating it. + val fireTime = java.time.Instant.now().truncatedTo(SECONDS) + val instanceDurationInMillis: Long = 3000 jobHistoryModel.completeOneTimeJob( fireInstanceId.toString, fireTime, @@ -201,15 +205,15 @@ class ModelTest extends Specification with BeforeAll with AfterAll { val expectedFinishSeconds = fireTime.plusMillis(instanceDurationInMillis).getEpochSecond jobHistoryModel .getJob(jobKey) - .map(record => (record.fire_instance_id, record.finish.toInstant.getEpochSecond)) must containTheSameElementsAs( + .map(record => (record.fire_instance_id, record.finish.map(_.getEpochSecond))) must containTheSameElementsAs( List( ( fireInstanceId.toString, - expectedFinishSeconds, + Some(expectedFinishSeconds), ), ), ) - jobHistoryModel.deleteJobs(Instant.now().plusSeconds(3).toEpochMilli) mustEqual 0 + jobHistoryModel.deleteJobs(Instant.now().plusSeconds(3)) mustEqual 0 // Delete the remaining record, so it doesn't affect other tests val connection = getConnectionProvider()() @@ -236,12 +240,12 @@ class ModelTest extends Specification with BeforeAll with AfterAll { insertedRecord.actual_start must beNone insertedRecord.fire_instance_id mustEqual "" // increase the time by 1 second so that the condition for the test satisfies. - triggerHistoryModel.deleteTriggers(new Date().getTime + 1000) mustEqual 1 + triggerHistoryModel.deleteTriggers(Instant.now().plusSeconds(1)) mustEqual 1 val triggerKey2 = new TriggerKey("blahj2", "blahg") triggerHistoryModel.addTrigger( triggerKey2, triggerFireTime = None, - actualStart = Some(new Date()), + actualStart = Some(Instant.now()), misfire = true, fireInstanceId = Some("blah"), ) @@ -249,7 +253,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll { newRecord.actual_start must beSome newRecord.fire_instance_id mustEqual "blah" - triggerHistoryModel.deleteTriggers(new Date().getTime + 1000) mustEqual 1 + triggerHistoryModel.deleteTriggers(Instant.now().plusSeconds(1)) mustEqual 1 } } } diff --git a/worker/src/test/scala/com/lucidchart/piezo/WorkerTest.scala b/worker/src/test/scala/com/lucidchart/piezo/WorkerTest.scala index 4de8d169..66ea53ef 100644 --- a/worker/src/test/scala/com/lucidchart/piezo/WorkerTest.scala +++ b/worker/src/test/scala/com/lucidchart/piezo/WorkerTest.scala @@ -8,6 +8,7 @@ import org.quartz.JobBuilder.* import org.quartz.TriggerBuilder.* import org.quartz.SimpleScheduleBuilder.* import org.quartz.impl.StdSchedulerFactory +import java.time.Instant import java.util.Properties import scala.util.Random @@ -95,8 +96,8 @@ class WorkerTest extends Specification { val heartbeat = reader.readLine() reader.close() println("heartbeat timestamp: " + heartbeat) - val heartbeatTime = Worker.dtf.parseDateTime(heartbeat.trim) - val inRange = heartbeatTime.isAfter(System.currentTimeMillis() - 5 * 1000) + val heartbeatTime = Worker.dtf.parse(heartbeat.trim, Instant.from) + val inRange = heartbeatTime.isAfter(Instant.now().minusSeconds(5)) inRange must equalTo(true) }