From 3e18b515e83a04916fa87a049cc242045dfc375d Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Fri, 13 Feb 2026 01:36:18 -0700 Subject: [PATCH 1/4] fix: Avoid races for one time jobs Fix a race condition in inserting a one-time job. It could also be avoided by using a "FOR UPDATE" on the select, but I think this solution is cleaner. Also fix a bug in the test where if we rounded up to the nearest second, the finish time was one second off from the value we compared it to, and the test failed 50% of the time. --- .../lucidchart/piezo/JobHistoryModel.scala | 50 ++++++------------- .../com/lucidchart/piezo/ModelTest.scala | 9 +++- 2 files changed, 23 insertions(+), 36 deletions(-) diff --git a/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala b/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala index ff823aef..51c8cb0a 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala @@ -216,35 +216,18 @@ class JobHistoryModel(getConnection: () => Connection) { */ def addOneTimeJobIfNotExists(jobKey: JobKey, fireInstanceId: Long): Boolean = { val connection = getConnection() - connection.setAutoCommit(false) // Use a trigger key that the database won't clean up in "JobHistoryCleanup" val triggerKey: TriggerKey = oneTimeTriggerKey(fireInstanceId) try { + // Mark this trigger (as already run) in the same transaction, to prevent race conditions where two requests + // trigger the same job + val triggerStartTime: Instant = Instant.now() + // Same as "addJob()" but without the finish val prepared = connection.prepareStatement( """ - SELECT EXISTS( - SELECT 1 - FROM job_history - WHERE fire_instance_id=? - LIMIT 1 - ) as record_exists - """.stripMargin, - ) - prepared.setString(1, fireInstanceId.toString) - val rs = prepared.executeQuery() - if (rs.next() && rs.getBoolean(1)) { - // Record already exists, don't add it again - false - } else { - // Mark this trigger (as already run) in the same transaction, to prevent race conditions where two requests - // trigger the same job - val triggerStartTime: Instant = Instant.now() - // Same as "addJob()" but without the finish - val prepared = connection.prepareStatement( - """ - INSERT INTO job_history( + INSERT IGNORE INTO job_history( fire_instance_id, job_name, job_group, @@ -255,18 +238,17 @@ class JobHistoryModel(getConnection: () => Connection) { ) VALUES(?, ?, ?, ?, ?, ?, ?) """.stripMargin, - ) - prepared.setString(1, fireInstanceId.toString) - prepared.setString(2, jobKey.getName) - prepared.setString(3, jobKey.getGroup) - prepared.setString(4, triggerKey.getName) - prepared.setString(5, triggerKey.getGroup) - prepared.setBoolean(6, true) - prepared.setObject(7, triggerStartTime) - prepared.executeUpdate() - connection.commit() - true - } + ) + prepared.setString(1, fireInstanceId.toString) + prepared.setString(2, jobKey.getName) + prepared.setString(3, jobKey.getGroup) + prepared.setString(4, triggerKey.getName) + prepared.setString(5, triggerKey.getGroup) + prepared.setBoolean(6, true) + prepared.setObject(7, triggerStartTime) + // Check if we actually inserted the row, + // if we didn't, then the firs_instance_id was already in use + prepared.executeUpdate() > 0 } finally { connection.close() } diff --git a/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala b/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala index bde9248f..ed60651e 100644 --- a/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala +++ b/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala @@ -12,6 +12,7 @@ import scala.util.Using import java.util.Date import java.io.InputStream import java.time.Instant +import java.time.temporal.ChronoUnit.SECONDS import scala.concurrent.ExecutionContext.global import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} @@ -187,8 +188,12 @@ class ModelTest extends Specification with BeforeAll with AfterAll { // Doesn't matter which one inserted the record, as long as one did, and one didn't Await.result(combinedFutures, Duration.Inf) mustEqual Set(true, false) - val fireTime = java.time.Instant.now() - val instanceDurationInMillis: Long = 1000 + // Truncate to the second, so that we don't end up with a rounding + // error when we do the comparison. + // Otherwise, on insert, the second might be rounded up, and we end up a second + // of from adding one second to the actual date time, and truncating it. + val fireTime = java.time.Instant.now().truncatedTo(SECONDS) + val instanceDurationInMillis: Long = 3000 jobHistoryModel.completeOneTimeJob( fireInstanceId.toString, fireTime, From f9623af357dff838acd4cceae07178b7341de20f Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Thu, 12 Feb 2026 16:56:38 -0700 Subject: [PATCH 2/4] refactor: Use java.time instead of Date --- .../piezo/admin/controllers/Jobs.scala | 2 +- .../piezo/admin/controllers/Triggers.scala | 17 +++++----- .../piezo/admin/controllers/TestUtil.scala | 6 ++-- .../lucidchart/piezo/JobHistoryModel.scala | 27 ++++++++-------- .../piezo/TriggerHistoryModel.scala | 32 +++++++++---------- .../piezo/TriggerMonitoringModel.scala | 10 +++--- .../lucidchart/piezo/WorkerJobListener.scala | 2 +- .../piezo/WorkerTriggerListener.scala | 6 ++-- .../jobs/cleanup/JobHistoryCleanup.scala | 14 ++++---- .../com/lucidchart/piezo/ModelTest.scala | 19 ++++++----- 10 files changed, 67 insertions(+), 68 deletions(-) diff --git a/admin/app/com/lucidchart/piezo/admin/controllers/Jobs.scala b/admin/app/com/lucidchart/piezo/admin/controllers/Jobs.scala index e0bdd132..89851956 100644 --- a/admin/app/com/lucidchart/piezo/admin/controllers/Jobs.scala +++ b/admin/app/com/lucidchart/piezo/admin/controllers/Jobs.scala @@ -83,7 +83,7 @@ class Jobs( .flatMap { job => jobHistoryModel.getJob(job).headOption } - .sortWith(_.start after _.start) + .sortWith((a, b) => a.start.isAfter(b.start)) val triggeredJobs: List[JobKey] = TriggerHelper .getTriggersByGroup(scheduler) .flatMap { case (group, triggerKeys) => diff --git a/admin/app/com/lucidchart/piezo/admin/controllers/Triggers.scala b/admin/app/com/lucidchart/piezo/admin/controllers/Triggers.scala index 32c0a904..23a6c903 100644 --- a/admin/app/com/lucidchart/piezo/admin/controllers/Triggers.scala +++ b/admin/app/com/lucidchart/piezo/admin/controllers/Triggers.scala @@ -1,17 +1,17 @@ package com.lucidchart.piezo.admin.controllers import com.lucidchart.piezo.TriggerMonitoringPriority -import java.util.Date -import org.quartz.Trigger.TriggerState +import com.lucidchart.piezo.admin.models.{ModelComponents, MonitoringTeams} +import java.time.Instant import org.quartz.* +import org.quartz.Trigger.TriggerState import org.quartz.impl.triggers.{CronTriggerImpl, SimpleTriggerImpl} +import play.api.Logging import play.api.libs.json.* import play.api.mvc.* -import com.lucidchart.piezo.admin.models.{ModelComponents, MonitoringTeams} import scala.collection.mutable import scala.jdk.CollectionConverters.* import scala.util.Try -import play.api.Logging class Triggers( scheduler: Scheduler, @@ -27,9 +27,10 @@ class Triggers( val triggerFormHelper = new TriggerFormHelper(scheduler, monitoringTeams) - def firesFirst(time: Date)(trigger1: Trigger, trigger2: Trigger): Boolean = { - val time1 = trigger1.getFireTimeAfter(time) - val time2 = trigger2.getFireTimeAfter(time) + def firesFirst(time: Instant)(trigger1: Trigger, trigger2: Trigger): Boolean = { + val d = java.util.Date.from(time) + val time1 = trigger1.getFireTimeAfter(d) + val time2 = trigger2.getFireTimeAfter(d) if (time2 == null) true else if (time1 == null) false else if (time1 != time2) time1 before time2 @@ -37,7 +38,7 @@ class Triggers( } def getIndex: Action[AnyContent] = Action { implicit request => - val now = new Date() + val now = Instant.now() val allTriggers: List[Trigger] = TriggerHelper .getTriggersByGroup(scheduler) .flatMap { case (group, triggerKeys) => diff --git a/admin/test/com/lucidchart/piezo/admin/controllers/TestUtil.scala b/admin/test/com/lucidchart/piezo/admin/controllers/TestUtil.scala index 9f85b2bd..3ae7919d 100644 --- a/admin/test/com/lucidchart/piezo/admin/controllers/TestUtil.scala +++ b/admin/test/com/lucidchart/piezo/admin/controllers/TestUtil.scala @@ -3,7 +3,7 @@ package com.lucidchart.piezo.admin.controllers import org.quartz.{JobBuilder, Scheduler, SimpleScheduleBuilder, TriggerBuilder} import com.lucidchart.piezo.jobs.monitoring.HeartBeat import com.lucidchart.piezo.admin.models.ModelComponents -import java.util.Date +import java.time.Instant /** */ @@ -13,7 +13,7 @@ object TestUtil { val triggerGroup = "testTriggerGroup" val triggerName = "testTriggerName" - def createJob(scheduler: Scheduler): Date = { + def createJob(scheduler: Scheduler): Instant = { val jobDetail = JobBuilder .newJob(classOf[HeartBeat]) .withIdentity(jobName, jobGroup) @@ -30,7 +30,7 @@ object TestUtil { .withDescription("test schedule description") .build() scheduler.deleteJob(jobDetail.getKey()) - scheduler.scheduleJob(jobDetail, trigger) + scheduler.scheduleJob(jobDetail, trigger).toInstant } val mockModelComponents = new ModelComponents(() => throw new Exception("fake connection")) diff --git a/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala b/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala index 51c8cb0a..f97f106c 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala @@ -1,12 +1,11 @@ package com.lucidchart.piezo -import java.sql.{ResultSet, Timestamp} -import java.util.Date -import org.quartz.{JobDataMap, JobKey, TriggerKey} -import org.slf4j.LoggerFactory -import org.slf4j.Logger import java.sql.Connection +import java.sql.ResultSet import java.time.Instant +import org.quartz.{JobDataMap, JobKey, TriggerKey} +import org.slf4j.Logger +import org.slf4j.LoggerFactory case class JobRecord( name: String, @@ -14,8 +13,8 @@ case class JobRecord( trigger_name: String, trigger_group: String, success: Int, - start: Date, - finish: Date, + start: Instant, + finish: Instant, fire_instance_id: String, ) @@ -40,7 +39,7 @@ class JobHistoryModel(getConnection: () => Connection) { fireInstanceId: String, jobKey: JobKey, triggerKey: TriggerKey, - fireTime: Date, + fireTime: Instant, instanceDurationInMillis: Long, success: Boolean, ): Unit = { @@ -67,8 +66,8 @@ class JobHistoryModel(getConnection: () => Connection) { prepared.setString(4, triggerKey.getName) prepared.setString(5, triggerKey.getGroup) prepared.setBoolean(6, success) - prepared.setTimestamp(7, new Timestamp(fireTime.getTime)) - prepared.setTimestamp(8, new Timestamp(fireTime.getTime + instanceDurationInMillis)) + prepared.setObject(7, fireTime) + prepared.setObject(8, fireTime.plusMillis(instanceDurationInMillis)) prepared.executeUpdate() } catch { case e: Exception => logger.error("error in recording start of job", e) @@ -77,7 +76,7 @@ class JobHistoryModel(getConnection: () => Connection) { } // TODO: close statement? } - def deleteJobs(minStart: Long): Int = { + def deleteJobs(minStart: Instant): Int = { val connection = getConnection() try { val prepared = connection.prepareStatement( @@ -88,7 +87,7 @@ class JobHistoryModel(getConnection: () => Connection) { AND trigger_group != '$oneTimeJobTriggerGroup' """.stripMargin, ) - prepared.setTimestamp(1, new Timestamp(minStart)) + prepared.setObject(1, minStart) prepared.executeUpdate() } catch { case e: Exception => @@ -201,8 +200,8 @@ class JobHistoryModel(getConnection: () => Connection) { rs.getString("trigger_name"), rs.getString("trigger_group"), rs.getInt("success"), - rs.getTimestamp("start"), - rs.getTimestamp("finish"), + rs.getTimestamp("start").toInstant, + rs.getTimestamp("finish").toInstant, rs.getString("fire_instance_id"), ) } diff --git a/worker/src/main/scala/com/lucidchart/piezo/TriggerHistoryModel.scala b/worker/src/main/scala/com/lucidchart/piezo/TriggerHistoryModel.scala index ef43e287..a129990d 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/TriggerHistoryModel.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/TriggerHistoryModel.scala @@ -1,17 +1,17 @@ package com.lucidchart.piezo -import java.sql.{Connection, Timestamp} +import java.time.Instant +import java.sql.Connection import org.quartz.TriggerKey -import org.slf4j.LoggerFactory -import java.util.Date import org.slf4j.Logger +import org.slf4j.LoggerFactory case class TriggerRecord( name: String, group: String, - scheduled_start: Date, - actual_start: Option[Date], - finish: Date, + scheduled_start: Instant, + actual_start: Option[Instant], + finish: Instant, misfire: Int, fire_instance_id: String, ) @@ -21,8 +21,8 @@ class TriggerHistoryModel(getConnection: () => Connection) { def addTrigger( triggerKey: TriggerKey, - triggerFireTime: Option[Date], - actualStart: Option[Date], + triggerFireTime: Option[Instant], + actualStart: Option[Instant], misfire: Boolean, fireInstanceId: Option[String], ): Unit = { @@ -51,9 +51,9 @@ class TriggerHistoryModel(getConnection: () => Connection) { ) prepared.setString(1, triggerKey.getName) prepared.setString(2, triggerKey.getGroup) - prepared.setTimestamp(3, new Timestamp(triggerFireTime.getOrElse(new Date).getTime)) - prepared.setTimestamp(4, actualStart.map(date => new Timestamp(date.getTime)).getOrElse(null)) - prepared.setTimestamp(5, new Timestamp(System.currentTimeMillis)) + prepared.setObject(3, triggerFireTime.getOrElse(Instant.now())) + prepared.setObject(4, actualStart.getOrElse(null)) + prepared.setObject(5, Instant.now()) prepared.setBoolean(6, misfire) prepared.setString(7, fireInstanceId.getOrElse("")) prepared.executeUpdate() @@ -64,11 +64,11 @@ class TriggerHistoryModel(getConnection: () => Connection) { } } - def deleteTriggers(minScheduledStart: Long): Int = { + def deleteTriggers(minScheduledStart: Instant): Int = { val connection = getConnection() try { val prepared = connection.prepareStatement("""DELETE FROM trigger_history WHERE scheduled_start < ?""") - prepared.setTimestamp(1, new Timestamp(minScheduledStart)) + prepared.setObject(1, minScheduledStart) prepared.executeUpdate() } catch { case e: Exception => @@ -95,9 +95,9 @@ class TriggerHistoryModel(getConnection: () => Connection) { result :+= new TriggerRecord( rs.getString("trigger_name"), rs.getString("trigger_group"), - rs.getTimestamp("scheduled_start"), - Option(rs.getTimestamp("actual_start")), - rs.getTimestamp("finish"), + rs.getTimestamp("scheduled_start").toInstant, + Option(rs.getTimestamp("actual_start")).map(_.toInstant), + rs.getTimestamp("finish").toInstant, rs.getInt("misfire"), rs.getString("fire_instance_id"), ) diff --git a/worker/src/main/scala/com/lucidchart/piezo/TriggerMonitoringModel.scala b/worker/src/main/scala/com/lucidchart/piezo/TriggerMonitoringModel.scala index 3e20a037..245fe500 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/TriggerMonitoringModel.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/TriggerMonitoringModel.scala @@ -1,11 +1,11 @@ package com.lucidchart.piezo import com.lucidchart.piezo.TriggerMonitoringPriority.TriggerMonitoringPriority -import java.util.Date import org.quartz.TriggerKey import org.slf4j.LoggerFactory import org.slf4j.Logger import java.sql.Connection +import java.time.Instant object TriggerMonitoringPriority { case class Value(id: Int, name: String) { @@ -31,8 +31,8 @@ case class TriggerMonitoringRecord( priority: TriggerMonitoringPriority, maxSecondsInError: Int, monitoringTeam: Option[String], - created: Date, - modified: Date, + created: Instant, + modified: Instant, ) class TriggerMonitoringModel(getConnection: () => Connection) { @@ -127,8 +127,8 @@ class TriggerMonitoringModel(getConnection: () => Connection) { priority, rs.getInt("max_error_time"), Option(rs.getString("monitoring_team")), - rs.getDate("created"), - rs.getDate("modified"), + rs.getTimestamp("created").toInstant, + rs.getTimestamp("modified").toInstant, ) } } else { diff --git a/worker/src/main/scala/com/lucidchart/piezo/WorkerJobListener.scala b/worker/src/main/scala/com/lucidchart/piezo/WorkerJobListener.scala index 7aadae70..3c0a3480 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/WorkerJobListener.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/WorkerJobListener.scala @@ -37,7 +37,7 @@ class WorkerJobListener(getConnection: () => Connection, statsd: StatsDClient, u context.getFireInstanceId, context.getTrigger.getJobKey, context.getTrigger.getKey, - context.getFireTime, + context.getFireTime.toInstant, context.getJobRunTime, success = success, ) diff --git a/worker/src/main/scala/com/lucidchart/piezo/WorkerTriggerListener.scala b/worker/src/main/scala/com/lucidchart/piezo/WorkerTriggerListener.scala index 0c79e09c..65445e03 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/WorkerTriggerListener.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/WorkerTriggerListener.scala @@ -49,8 +49,8 @@ class WorkerTriggerListener(getConnection: () => Connection, statsd: StatsDClien try { triggerHistoryModel.addTrigger( trigger.getKey, - Option(trigger.getPreviousFireTime), - Some(context.getFireTime), + Option(trigger.getPreviousFireTime).map(_.toInstant), + Some(context.getFireTime.toInstant), misfire = false, Some(context.getFireInstanceId), ) @@ -70,7 +70,7 @@ class WorkerTriggerListener(getConnection: () => Connection, statsd: StatsDClien try { triggerHistoryModel.addTrigger( trigger.getKey, - Option(trigger.getPreviousFireTime), + Option(trigger.getPreviousFireTime).map(_.toInstant), None, misfire = true, None, diff --git a/worker/src/main/scala/com/lucidchart/piezo/jobs/cleanup/JobHistoryCleanup.scala b/worker/src/main/scala/com/lucidchart/piezo/jobs/cleanup/JobHistoryCleanup.scala index d79e7c65..22ae2c94 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/jobs/cleanup/JobHistoryCleanup.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/jobs/cleanup/JobHistoryCleanup.scala @@ -3,28 +3,28 @@ package com.lucidchart.piezo.jobs.cleanup import org.quartz.{Job, JobExecutionContext} import org.slf4j.LoggerFactory import com.lucidchart.piezo.{JobHistoryModel, TriggerHistoryModel, Worker} -import java.util.Date +import java.time.{Duration, Instant} import java.sql.Connection class JobHistoryCleanup extends Job { private val logger = LoggerFactory.getLogger(this.getClass) def execute(context: JobExecutionContext): Unit = { - val maxAge = context.getMergedJobDataMap.getLong("maxAgeDays") * 24L * 3600L * 1000L - val minStart = System.currentTimeMillis() - maxAge + val maxAge = Duration.ofDays(context.getMergedJobDataMap.getLong("maxAgeDays")) + val minStart = Instant.now().minus(maxAge) val getConnection = Worker.connectionFactory(context.getScheduler.getContext) deleteTriggerHistories(getConnection, minStart) deleteJobHistories(getConnection, minStart) } - private[this] def deleteTriggerHistories(getConnection: () => Connection, minStart: Long): Unit = { - logger.info("Deleting triggers older than " + new Date(minStart)) + private[this] def deleteTriggerHistories(getConnection: () => Connection, minStart: Instant): Unit = { + logger.info("Deleting triggers older than " + minStart) val numDeleted = new TriggerHistoryModel(getConnection).deleteTriggers(minStart) logger.info("Deleted " + numDeleted + " trigger histories") } - private[this] def deleteJobHistories(getConnection: () => Connection, minStart: Long): Unit = { - logger.info("Deleting jobs older than " + new Date(minStart)) + private[this] def deleteJobHistories(getConnection: () => Connection, minStart: Instant): Unit = { + logger.info("Deleting jobs older than " + minStart) val numDeleted = new JobHistoryModel(getConnection).deleteJobs(minStart) logger.info("Deleted " + numDeleted + " job histories") } diff --git a/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala b/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala index ed60651e..7eceacde 100644 --- a/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala +++ b/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala @@ -9,7 +9,6 @@ import java.sql.DriverManager import java.util.Properties import scala.jdk.CollectionConverters.* import scala.util.Using -import java.util.Date import java.io.InputStream import java.time.Instant import java.time.temporal.ChronoUnit.SECONDS @@ -86,7 +85,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll { val jobKey = new JobKey("blah", "blah") val triggerKey = new TriggerKey("blahtn", "blahtg") jobHistoryModel.getJobs().isEmpty must beTrue - jobHistoryModel.addJob("ab", jobKey, triggerKey, new Date(), 1000, true) + jobHistoryModel.addJob("ab", jobKey, triggerKey, Instant.now(), 1000, true) jobHistoryModel.getJob(jobKey).headOption must beSome jobHistoryModel.getLastJobSuccessByTrigger(triggerKey) must beSome jobHistoryModel.getJobs().nonEmpty must beTrue @@ -105,7 +104,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll { val jobKey = new JobKey("blahc", "blahc") val triggerKey = new TriggerKey("blahtnc", "blahtgc") jobHistoryModel.getJob(jobKey).headOption must beNone - jobHistoryModel.addJob("abc", jobKey, triggerKey, new Date(), 1000, true) + jobHistoryModel.addJob("abc", jobKey, triggerKey, Instant.now(), 1000, true) jobHistoryModel.getJob(jobKey).headOption must beSome jobHistoryModel.getLastJobSuccessByTrigger(triggerKey) must beSome @@ -141,7 +140,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll { val jobHistoryModel = new JobHistoryModel(getConnectionProvider()) val temporaryTriggerKey = new TriggerKey("blahjz", "blahzg") val jobKey = new JobKey("blahjz123", "blahzg123") - val scheduledStart = java.util.Date.from(java.time.Instant.now()) + val scheduledStart = java.time.Instant.now() val temporaryFireInstanceId = "FireInstanceId" val permanentFireInstanceId = 123456789 val permanentFireInstanceIdString = permanentFireInstanceId.toString @@ -152,7 +151,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll { .getJob(jobKey) .map(_.fire_instance_id) .toSet mustEqual Set(temporaryFireInstanceId, permanentFireInstanceIdString) - jobHistoryModel.deleteJobs(Instant.now().plusSeconds(3).toEpochMilli) mustEqual 1 + jobHistoryModel.deleteJobs(Instant.now().plusSeconds(3)) mustEqual 1 jobHistoryModel .getJob(jobKey) .map(_.fire_instance_id) @@ -206,7 +205,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll { val expectedFinishSeconds = fireTime.plusMillis(instanceDurationInMillis).getEpochSecond jobHistoryModel .getJob(jobKey) - .map(record => (record.fire_instance_id, record.finish.toInstant.getEpochSecond)) must containTheSameElementsAs( + .map(record => (record.fire_instance_id, record.finish.getEpochSecond)) must containTheSameElementsAs( List( ( fireInstanceId.toString, @@ -214,7 +213,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll { ), ), ) - jobHistoryModel.deleteJobs(Instant.now().plusSeconds(3).toEpochMilli) mustEqual 0 + jobHistoryModel.deleteJobs(Instant.now().plusSeconds(3)) mustEqual 0 // Delete the remaining record, so it doesn't affect other tests val connection = getConnectionProvider()() @@ -241,12 +240,12 @@ class ModelTest extends Specification with BeforeAll with AfterAll { insertedRecord.actual_start must beNone insertedRecord.fire_instance_id mustEqual "" // increase the time by 1 second so that the condition for the test satisfies. - triggerHistoryModel.deleteTriggers(new Date().getTime + 1000) mustEqual 1 + triggerHistoryModel.deleteTriggers(Instant.now().plusSeconds(1)) mustEqual 1 val triggerKey2 = new TriggerKey("blahj2", "blahg") triggerHistoryModel.addTrigger( triggerKey2, triggerFireTime = None, - actualStart = Some(new Date()), + actualStart = Some(Instant.now()), misfire = true, fireInstanceId = Some("blah"), ) @@ -254,7 +253,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll { newRecord.actual_start must beSome newRecord.fire_instance_id mustEqual "blah" - triggerHistoryModel.deleteTriggers(new Date().getTime + 1000) mustEqual 1 + triggerHistoryModel.deleteTriggers(Instant.now().plusSeconds(1)) mustEqual 1 } } } From e650cee7398588c0fd4b53da2a5af20ea758060c Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Thu, 12 Feb 2026 17:48:55 -0700 Subject: [PATCH 3/4] refactor: Use java.time instead of joda --- .../piezo/admin/controllers/HealthCheck.scala | 20 ++++++++----- .../piezo/admin/views/TimeFormat.scala | 30 +++++++++++++++++++ .../piezo/admin/views/jobs.scala.html | 9 ++---- .../piezo/admin/views/triggers.scala.html | 13 ++++---- .../admin/controllers/HealthCheckTest.scala | 11 ++++--- worker/build.sbt | 2 -- .../scala/com/lucidchart/piezo/Worker.scala | 10 ++++--- .../com/lucidchart/piezo/WorkerTest.scala | 5 ++-- 8 files changed, 65 insertions(+), 35 deletions(-) create mode 100644 admin/app/com/lucidchart/piezo/admin/views/TimeFormat.scala diff --git a/admin/app/com/lucidchart/piezo/admin/controllers/HealthCheck.scala b/admin/app/com/lucidchart/piezo/admin/controllers/HealthCheck.scala index 58a4da1c..be986f54 100644 --- a/admin/app/com/lucidchart/piezo/admin/controllers/HealthCheck.scala +++ b/admin/app/com/lucidchart/piezo/admin/controllers/HealthCheck.scala @@ -1,14 +1,17 @@ package com.lucidchart.piezo.admin.controllers -import org.joda.time.{DateTime, Minutes} -import org.joda.time.format.ISODateTimeFormat import play.api.* import play.api.libs.json.* import play.api.Logging import play.api.mvc.* import scala.io.Source +import java.time.Instant +import java.time.ZoneOffset.UTC +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit.{MINUTES, SECONDS} class HealthCheck(configuration: Configuration, cc: ControllerComponents) extends AbstractController(cc) with Logging { + import HealthCheck.timeFormatter val heartbeatFilename: String = configuration.getOptional[String]("com.lucidchart.piezo.heartbeatFile").getOrElse { logger.warn("heartbeat file not specified") @@ -34,13 +37,16 @@ class HealthCheck(configuration: Configuration, cc: ControllerComponents) extend try { val heartbeatFileLines = heartbeatFile.getLines().toList val heartbeatTimestamp = heartbeatFileLines(0) - val formatter = ISODateTimeFormat.dateTimeNoMillis().withZoneUTC() - val heartbeatTime = formatter.parseDateTime(heartbeatTimestamp) - val currentTime = new DateTime - val isTimestampRecent = Minutes.minutesBetween(heartbeatTime, currentTime).getMinutes < minutesBetweenBeats - (isTimestampRecent, formatter.print(heartbeatTime)) + val heartbeatTime = timeFormatter.parse(heartbeatTimestamp, Instant.from) + val currentTime = Instant.now() + val isTimestampRecent = heartbeatTime.until(currentTime, MINUTES) < minutesBetweenBeats + (isTimestampRecent, timeFormatter.format(heartbeatTime.truncatedTo(SECONDS))) } finally { heartbeatFile.close() } } } + +object HealthCheck { + private[piezo] val timeFormatter = DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(UTC) +} diff --git a/admin/app/com/lucidchart/piezo/admin/views/TimeFormat.scala b/admin/app/com/lucidchart/piezo/admin/views/TimeFormat.scala new file mode 100644 index 00000000..581f0a54 --- /dev/null +++ b/admin/app/com/lucidchart/piezo/admin/views/TimeFormat.scala @@ -0,0 +1,30 @@ +package com.lucidchart.piezo.admin.views + +import java.time.Instant +import java.time.ZoneOffset.UTC +import java.time.format.DateTimeFormatter + +object TimeFormat { + + private val dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(UTC) + + /** + * Print a java.util.Date in a standard format. + * + * Will prilnt "--" if null is passed + */ + def printDate(d: java.util.Date): String = { + if (d == null) { + "--" + } else { + dtf.format(d.toInstant) + } + } + + /** + * Print an instant in a standard format. + */ + def printInstant(i: Instant): String = { + dtf.format(i) + } +} diff --git a/admin/app/com/lucidchart/piezo/admin/views/jobs.scala.html b/admin/app/com/lucidchart/piezo/admin/views/jobs.scala.html index d7b4db65..ffb90e76 100644 --- a/admin/app/com/lucidchart/piezo/admin/views/jobs.scala.html +++ b/admin/app/com/lucidchart/piezo/admin/views/jobs.scala.html @@ -1,7 +1,6 @@ @import com.lucidchart.piezo.admin.controllers.{routes=>piezoRoutes} -@import org.joda.time.format.DateTimeFormat -@import org.joda.time.DateTime @import org.quartz.{JobKey, SchedulerMetaData} +@import com.lucidchart.piezo.admin.views.TimeFormat.printInstant @( jobsByGroup: scala.collection.mutable.Buffer[(String, scala.collection.immutable.List[org.quartz.JobKey])], currentJob: Option[org.quartz.JobDetail], @@ -63,11 +62,10 @@

Jobs History

- @defining(DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")) { dtf => @jobsHistory.get.map { record => - @dtf.print(new DateTime(record.start)) - @dtf.print(new DateTime(record.finish)) + @printInstant(record.start) + @printInstant(record.finish) @record.group @record.name @record.trigger_group @@ -76,7 +74,6 @@

Jobs History

} - } } diff --git a/admin/app/com/lucidchart/piezo/admin/views/triggers.scala.html b/admin/app/com/lucidchart/piezo/admin/views/triggers.scala.html index 76e4b81b..53b73f9c 100644 --- a/admin/app/com/lucidchart/piezo/admin/views/triggers.scala.html +++ b/admin/app/com/lucidchart/piezo/admin/views/triggers.scala.html @@ -9,9 +9,7 @@ request: play.api.mvc.Request[AnyContent] ) @import com.lucidchart.piezo.admin.views -@import java.util.Date -@import org.joda.time.format.DateTimeFormat -@import org.joda.time.DateTime +@import com.lucidchart.piezo.admin.views.TimeFormat.printDate @com.lucidchart.piezo.admin.views.html.triggersLayout(triggersByGroup, currentTrigger) {

Select a trigger

@@ -42,12 +40,12 @@

Upcoming Triggers

- @defining(DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")) { dtf => - @defining(new Date()) { now => + *{ quartz still uses java.util.Date }* + @defining(new java.util.Date()) { now => @upcomingTriggers.map { trigger => - @dtf.print(new DateTime(trigger.getFireTimeAfter(now))) - @dtf.print(new DateTime(trigger.getFireTimeAfter(trigger.getFireTimeAfter(now)))) + @printDate(trigger.getFireTimeAfter(now)) + @printDate(trigger.getFireTimeAfter(trigger.getFireTimeAfter(now))) @trigger.getKey.getGroup @trigger.getKey.getName @trigger.getJobKey.getGroup @@ -56,7 +54,6 @@

Upcoming Triggers

} } - } } diff --git a/admin/test/com/lucidchart/piezo/admin/controllers/HealthCheckTest.scala b/admin/test/com/lucidchart/piezo/admin/controllers/HealthCheckTest.scala index 802f7ac7..5802571c 100644 --- a/admin/test/com/lucidchart/piezo/admin/controllers/HealthCheckTest.scala +++ b/admin/test/com/lucidchart/piezo/admin/controllers/HealthCheckTest.scala @@ -1,18 +1,17 @@ package com.lucidchart.piezo.admin.controllers import java.io.{File, FileWriter} -import org.joda.time.DateTime -import org.joda.time.format.ISODateTimeFormat import org.specs2.mutable.* import play.api.Configuration import play.api.test.Helpers.* import play.api.test.* -import org.joda.time.format.DateTimeFormatter +import java.time.Instant +import java.time.temporal.ChronoUnit.MINUTES class HealthCheckTest extends Specification { val filename = "HeartbeatTestFile" - val dtf: DateTimeFormatter = ISODateTimeFormat.dateTimeNoMillis().withZoneUTC() + private val dtf = HealthCheck.timeFormatter trait FileCleaner extends After { def after: Unit = new File(filename).delete @@ -24,7 +23,7 @@ class HealthCheckTest extends Specification { "send 200 when the worker timestamp is recent" in new FileCleaner { val file = new File(filename) val fileWrite = new FileWriter(file) - val heartbeatTime = dtf.print(new DateTime(System.currentTimeMillis())) + val heartbeatTime = dtf.format(Instant.now()) fileWrite.write(heartbeatTime) fileWrite.close() val healthCheck = new HealthCheck(testConfig(filename), Helpers.stubControllerComponents()) @@ -35,7 +34,7 @@ class HealthCheckTest extends Specification { "send 503 when the worker timestamp is too far in the past" in new FileCleaner { val file = new File(filename) val fileWrite = new FileWriter(file) - val heartbeatTime = dtf.print(new DateTime(System.currentTimeMillis()).minusMinutes(10)) + val heartbeatTime = dtf.format(Instant.now().minus(10, MINUTES)) fileWrite.write(heartbeatTime) fileWrite.close() val healthCheck = new HealthCheck(testConfig(filename), Helpers.stubControllerComponents()) diff --git a/worker/build.sbt b/worker/build.sbt index 45f0c0b6..2708a0f1 100644 --- a/worker/build.sbt +++ b/worker/build.sbt @@ -15,8 +15,6 @@ libraryDependencies ++= Seq( "org.specs2" %% "specs2-core" % "4.20.9" % Test, "mysql" % "mysql-connector-java" % "8.0.33", "javax.transaction" % "jta" % "1.1", - "joda-time" % "joda-time" % "2.13.1", - "org.joda" % "joda-convert" % "3.0.1", "com.typesafe" % "config" % "1.4.3", "com.datadoghq" % "java-dogstatsd-client" % "4.4.3", ) diff --git a/worker/src/main/scala/com/lucidchart/piezo/Worker.scala b/worker/src/main/scala/com/lucidchart/piezo/Worker.scala index 7838f162..9852fa20 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/Worker.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/Worker.scala @@ -4,13 +4,15 @@ import com.timgroup.statsd.NonBlockingStatsDClientBuilder import java.io.* import java.util.Properties import java.util.concurrent.{Semaphore, TimeUnit} -import org.joda.time.DateTime -import org.joda.time.format.ISODateTimeFormat import org.quartz.Scheduler import org.slf4j.LoggerFactory import scala.util.Try import scala.util.control.NonFatal import org.quartz.utils.DBConnectionManager +import java.time.Instant +import java.time.ZoneOffset.UTC +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit.SECONDS import java.sql.Connection import org.quartz.SchedulerContext @@ -29,7 +31,7 @@ object Worker { private val logger = LoggerFactory.getLogger(this.getClass) private[piezo] val runSemaphore = new Semaphore(0) private val shutdownSemaphore = new Semaphore(1) - private[piezo] val dtf = ISODateTimeFormat.dateTimeNoMillis().withZoneUTC() + private[piezo] val dtf = DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(UTC) def main(args: Array[String]): Unit = { logger.info("worker starting") @@ -129,7 +131,7 @@ object Worker { val file = new File(filePath) file.getParentFile.mkdirs() val fileWrite = new FileWriter(file) - val heartbeatTime = dtf.print(new DateTime(System.currentTimeMillis())) + val heartbeatTime = dtf.format(Instant.now().truncatedTo(SECONDS)) fileWrite.write(heartbeatTime) fileWrite.close() } catch { diff --git a/worker/src/test/scala/com/lucidchart/piezo/WorkerTest.scala b/worker/src/test/scala/com/lucidchart/piezo/WorkerTest.scala index 4de8d169..66ea53ef 100644 --- a/worker/src/test/scala/com/lucidchart/piezo/WorkerTest.scala +++ b/worker/src/test/scala/com/lucidchart/piezo/WorkerTest.scala @@ -8,6 +8,7 @@ import org.quartz.JobBuilder.* import org.quartz.TriggerBuilder.* import org.quartz.SimpleScheduleBuilder.* import org.quartz.impl.StdSchedulerFactory +import java.time.Instant import java.util.Properties import scala.util.Random @@ -95,8 +96,8 @@ class WorkerTest extends Specification { val heartbeat = reader.readLine() reader.close() println("heartbeat timestamp: " + heartbeat) - val heartbeatTime = Worker.dtf.parseDateTime(heartbeat.trim) - val inRange = heartbeatTime.isAfter(System.currentTimeMillis() - 5 * 1000) + val heartbeatTime = Worker.dtf.parse(heartbeat.trim, Instant.from) + val inRange = heartbeatTime.isAfter(Instant.now().minusSeconds(5)) inRange must equalTo(true) } From da7dec0ff6006bd390bbbd243fc5338b216c7e5e Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Fri, 13 Feb 2026 11:35:36 -0700 Subject: [PATCH 4/4] fix: Make finish time for job record optional Because the database record is nullable, and we set it to null when we insert a one-time job. --- admin/app/com/lucidchart/piezo/admin/views/TimeFormat.scala | 5 +++++ .../main/scala/com/lucidchart/piezo/JobHistoryModel.scala | 4 ++-- worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala | 4 ++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/admin/app/com/lucidchart/piezo/admin/views/TimeFormat.scala b/admin/app/com/lucidchart/piezo/admin/views/TimeFormat.scala index 581f0a54..660296e7 100644 --- a/admin/app/com/lucidchart/piezo/admin/views/TimeFormat.scala +++ b/admin/app/com/lucidchart/piezo/admin/views/TimeFormat.scala @@ -27,4 +27,9 @@ object TimeFormat { def printInstant(i: Instant): String = { dtf.format(i) } + + def printInstant(instant: Option[Instant]): String = instant match { + case Some(i) => dtf.format(i) + case None => "--" + } } diff --git a/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala b/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala index f97f106c..b72d664d 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala @@ -14,7 +14,7 @@ case class JobRecord( trigger_group: String, success: Int, start: Instant, - finish: Instant, + finish: Option[Instant], fire_instance_id: String, ) @@ -201,7 +201,7 @@ class JobHistoryModel(getConnection: () => Connection) { rs.getString("trigger_group"), rs.getInt("success"), rs.getTimestamp("start").toInstant, - rs.getTimestamp("finish").toInstant, + Option(rs.getTimestamp("finish")).map(_.toInstant), rs.getString("fire_instance_id"), ) } diff --git a/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala b/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala index 7eceacde..22f798b5 100644 --- a/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala +++ b/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala @@ -205,11 +205,11 @@ class ModelTest extends Specification with BeforeAll with AfterAll { val expectedFinishSeconds = fireTime.plusMillis(instanceDurationInMillis).getEpochSecond jobHistoryModel .getJob(jobKey) - .map(record => (record.fire_instance_id, record.finish.getEpochSecond)) must containTheSameElementsAs( + .map(record => (record.fire_instance_id, record.finish.map(_.getEpochSecond))) must containTheSameElementsAs( List( ( fireInstanceId.toString, - expectedFinishSeconds, + Some(expectedFinishSeconds), ), ), )