diff --git a/admin/app/com/lucidchart/piezo/admin/controllers/Triggers.scala b/admin/app/com/lucidchart/piezo/admin/controllers/Triggers.scala index f4281abe..32c0a904 100644 --- a/admin/app/com/lucidchart/piezo/admin/controllers/Triggers.scala +++ b/admin/app/com/lucidchart/piezo/admin/controllers/Triggers.scala @@ -354,6 +354,34 @@ class Triggers( } } + def triggerJobOneTime(group: String, name: String, id: Long): Action[AnyContent] = Action { request => + val jobKey = new JobKey(name, group) + + if (scheduler.checkExists(jobKey)) { + try { + // Only run a trigger, if we haven't seen this id before + if (jobHistoryModel.addOneTimeJobIfNotExists(jobKey, id)) { + // Single run trigger has its id passed to the scheduler via the job-data-map. The WorkerJobListener will + // use that id to update the existing record in job_history table + val jobDataMap = jobHistoryModel.createJobDataMapForOneTimeJob(id) + scheduler.triggerJob(jobKey, jobDataMap) + } + Ok + } catch { + case e: SchedulerException => { + logger.error( + "Exception caught triggering job one-time %s %s - %s. -- %s" + .format(group, name, id, e.getLocalizedMessage), + e, + ) + InternalServerError + } + } + } else { + NotFound + } + } + def patchTrigger(group: String, name: String): Action[AnyContent] = Action { implicit request => val triggerKey = new TriggerKey(name, group) val triggerExists = scheduler.checkExists(triggerKey) diff --git a/admin/conf/routes b/admin/conf/routes index 719982ec..0b27666f 100644 --- a/admin/conf/routes +++ b/admin/conf/routes @@ -3,39 +3,40 @@ # ~~~~ # Home page -GET / com.lucidchart.piezo.admin.controllers.ApplicationController.index -GET /jobs com.lucidchart.piezo.admin.controllers.Jobs.getIndex -GET /jobs/new com.lucidchart.piezo.admin.controllers.Jobs.getNewJobForm(templateGroup: Option[String] ?= None, templateName: Option[String] ?= None) -GET /jobs/:group/:name com.lucidchart.piezo.admin.controllers.Jobs.getJob(group: String, name: String) -POST /jobs/:group/:name com.lucidchart.piezo.admin.controllers.Jobs.putJob(group: String, name: String) -DELETE /jobs/:group/:name com.lucidchart.piezo.admin.controllers.Jobs.deleteJob(group: String, name: String) -GET /jobs/:group/:name/editor com.lucidchart.piezo.admin.controllers.Jobs.getEditJobAction(group: String, name: String) -POST /jobs com.lucidchart.piezo.admin.controllers.Jobs.postJob - -POST /data/jobs com.lucidchart.piezo.admin.controllers.Jobs.postJobs -GET /data/jobs com.lucidchart.piezo.admin.controllers.Jobs.getJobsDetail -GET /data/jobs/:group/:name com.lucidchart.piezo.admin.controllers.Jobs.getJobDetail(group: String, name: String) - -GET /typeahead/jobs/:sofar com.lucidchart.piezo.admin.controllers.Jobs.jobGroupTypeAhead(sofar: String) -GET /typeahead/jobs/:group/:sofar com.lucidchart.piezo.admin.controllers.Jobs.jobNameTypeAhead(group: String, sofar: String) - -GET /triggers com.lucidchart.piezo.admin.controllers.Triggers.getIndex -GET /triggers/new/:triggerType com.lucidchart.piezo.admin.controllers.Triggers.getNewTriggerForm(triggerType, jobGroup: String ?= "", jobName: String ?= "", templateGroup: Option[String] ?= None, templateName: Option[String] ?= None) -GET /triggers/:group/:name com.lucidchart.piezo.admin.controllers.Triggers.getTrigger(group: String, name: String) -POST /triggers/:group/:name com.lucidchart.piezo.admin.controllers.Triggers.putTrigger(group: String, name: String) -DELETE /triggers/:group/:name com.lucidchart.piezo.admin.controllers.Triggers.deleteTrigger(group: String, name: String) -GET /triggers/:group/:name/editor com.lucidchart.piezo.admin.controllers.Triggers.getEditTriggerAction(group: String, name: String) -POST /triggers/:group/:name/runner com.lucidchart.piezo.admin.controllers.Triggers.triggerJob(group: String, name: String) -POST /triggers com.lucidchart.piezo.admin.controllers.Triggers.postTrigger() -PATCH /triggers/:group/:name com.lucidchart.piezo.admin.controllers.Triggers.patchTrigger(group: String, name: String) - -GET /typeahead/triggers/:sofar com.lucidchart.piezo.admin.controllers.Triggers.triggerGroupTypeAhead(sofar: String) - -GET /favicon.ico controllers.Assets.at(path="/public/img", file="favicon.ico") +GET / com.lucidchart.piezo.admin.controllers.ApplicationController.index +GET /jobs com.lucidchart.piezo.admin.controllers.Jobs.getIndex +GET /jobs/new com.lucidchart.piezo.admin.controllers.Jobs.getNewJobForm(templateGroup: Option[String] ?= None, templateName: Option[String] ?= None) +GET /jobs/:group/:name com.lucidchart.piezo.admin.controllers.Jobs.getJob(group: String, name: String) +POST /jobs/:group/:name com.lucidchart.piezo.admin.controllers.Jobs.putJob(group: String, name: String) +DELETE /jobs/:group/:name com.lucidchart.piezo.admin.controllers.Jobs.deleteJob(group: String, name: String) +GET /jobs/:group/:name/editor com.lucidchart.piezo.admin.controllers.Jobs.getEditJobAction(group: String, name: String) +POST /jobs com.lucidchart.piezo.admin.controllers.Jobs.postJob + +POST /data/jobs com.lucidchart.piezo.admin.controllers.Jobs.postJobs +GET /data/jobs com.lucidchart.piezo.admin.controllers.Jobs.getJobsDetail +GET /data/jobs/:group/:name com.lucidchart.piezo.admin.controllers.Jobs.getJobDetail(group: String, name: String) + +GET /typeahead/jobs/:sofar com.lucidchart.piezo.admin.controllers.Jobs.jobGroupTypeAhead(sofar: String) +GET /typeahead/jobs/:group/:sofar com.lucidchart.piezo.admin.controllers.Jobs.jobNameTypeAhead(group: String, sofar: String) + +GET /triggers com.lucidchart.piezo.admin.controllers.Triggers.getIndex +GET /triggers/new/:triggerType com.lucidchart.piezo.admin.controllers.Triggers.getNewTriggerForm(triggerType, jobGroup: String ?= "", jobName: String ?= "", templateGroup: Option[String] ?= None, templateName: Option[String] ?= None) +GET /triggers/:group/:name com.lucidchart.piezo.admin.controllers.Triggers.getTrigger(group: String, name: String) +POST /triggers/:group/:name com.lucidchart.piezo.admin.controllers.Triggers.putTrigger(group: String, name: String) +DELETE /triggers/:group/:name com.lucidchart.piezo.admin.controllers.Triggers.deleteTrigger(group: String, name: String) +GET /triggers/:group/:name/editor com.lucidchart.piezo.admin.controllers.Triggers.getEditTriggerAction(group: String, name: String) +POST /triggers/:group/:name/runner com.lucidchart.piezo.admin.controllers.Triggers.triggerJob(group: String, name: String) +POST /triggers/:group/:name/onetime/:id com.lucidchart.piezo.admin.controllers.Triggers.triggerJobOneTime(group: String, name: String, id: Long) +POST /triggers com.lucidchart.piezo.admin.controllers.Triggers.postTrigger() +PATCH /triggers/:group/:name com.lucidchart.piezo.admin.controllers.Triggers.patchTrigger(group: String, name: String) + +GET /typeahead/triggers/:sofar com.lucidchart.piezo.admin.controllers.Triggers.triggerGroupTypeAhead(sofar: String) + +GET /favicon.ico controllers.Assets.at(path="/public/img", file="favicon.ico") # Worker Health Check -GET /health com.lucidchart.piezo.admin.controllers.HealthCheck.main() +GET /health com.lucidchart.piezo.admin.controllers.HealthCheck.main() # Map static resources from the /public folder to the /assets URL path -GET /assets/*file controllers.Assets.at(path="/public", file) +GET /assets/*file controllers.Assets.at(path="/public", file) diff --git a/worker/src/main/resources/piezo_mysql_9.sql b/worker/src/main/resources/piezo_mysql_9.sql new file mode 100644 index 00000000..eb060886 --- /dev/null +++ b/worker/src/main/resources/piezo_mysql_9.sql @@ -0,0 +1,3 @@ +ALTER TABLE job_history + DROP INDEX start_key, + ADD INDEX start_key (start, trigger_group); diff --git a/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala b/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala index 05c75f91..ff823aef 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala @@ -2,10 +2,11 @@ package com.lucidchart.piezo import java.sql.{ResultSet, Timestamp} import java.util.Date -import org.quartz.{JobKey, TriggerKey} +import org.quartz.{JobDataMap, JobKey, TriggerKey} import org.slf4j.LoggerFactory import org.slf4j.Logger import java.sql.Connection +import java.time.Instant case class JobRecord( name: String, @@ -21,6 +22,20 @@ case class JobRecord( class JobHistoryModel(getConnection: () => Connection) { val logger: Logger = LoggerFactory.getLogger(this.getClass) + // Trigger Group for records that aren't deletable + private final val oneTimeJobTriggerGroup = "ONE_TIME_JOB" + final def oneTimeTriggerKey(fireInstanceId: Long): TriggerKey = + TriggerKey(fireInstanceId.toString, oneTimeJobTriggerGroup) + + // Methods to store the one-time-job id in a job-data-map + final val jobDataMapOneTimeJobKey = "OneTimeJobId" + final def getOneTimeJobIdFromDataMap(jobDataMap: JobDataMap): Option[String] = Option( + jobDataMap.getString(jobDataMapOneTimeJobKey), + ) + final def createJobDataMapForOneTimeJob(id: Long): JobDataMap = new JobDataMap( + java.util.Map.of(jobDataMapOneTimeJobKey, id), + ) + def addJob( fireInstanceId: String, jobKey: JobKey, @@ -66,10 +81,11 @@ class JobHistoryModel(getConnection: () => Connection) { val connection = getConnection() try { val prepared = connection.prepareStatement( - """ + s""" DELETE FROM job_history WHERE start < ? + AND trigger_group != '$oneTimeJobTriggerGroup' """.stripMargin, ) prepared.setTimestamp(1, new Timestamp(minStart)) @@ -190,4 +206,99 @@ class JobHistoryModel(getConnection: () => Connection) { rs.getString("fire_instance_id"), ) } + + /** + * Check if we have already triggered a one-time-job with the given trigger key and fireInstanceId. + * + * This is useful for seeing if a one-time job has already been triggered, to ensure that triggering a one-time job + * with the same instance id is an idempotent operation. If the one-time job has not been triggered, the same + * transaction is used to add the one-time-job to the database, to avoid race conditions + */ + def addOneTimeJobIfNotExists(jobKey: JobKey, fireInstanceId: Long): Boolean = { + val connection = getConnection() + connection.setAutoCommit(false) + + // Use a trigger key that the database won't clean up in "JobHistoryCleanup" + val triggerKey: TriggerKey = oneTimeTriggerKey(fireInstanceId) + + try { + val prepared = connection.prepareStatement( + """ + SELECT EXISTS( + SELECT 1 + FROM job_history + WHERE fire_instance_id=? + LIMIT 1 + ) as record_exists + """.stripMargin, + ) + prepared.setString(1, fireInstanceId.toString) + val rs = prepared.executeQuery() + if (rs.next() && rs.getBoolean(1)) { + // Record already exists, don't add it again + false + } else { + // Mark this trigger (as already run) in the same transaction, to prevent race conditions where two requests + // trigger the same job + val triggerStartTime: Instant = Instant.now() + // Same as "addJob()" but without the finish + val prepared = connection.prepareStatement( + """ + INSERT INTO job_history( + fire_instance_id, + job_name, + job_group, + trigger_name, + trigger_group, + success, + start + ) + VALUES(?, ?, ?, ?, ?, ?, ?) + """.stripMargin, + ) + prepared.setString(1, fireInstanceId.toString) + prepared.setString(2, jobKey.getName) + prepared.setString(3, jobKey.getGroup) + prepared.setString(4, triggerKey.getName) + prepared.setString(5, triggerKey.getGroup) + prepared.setBoolean(6, true) + prepared.setObject(7, triggerStartTime) + prepared.executeUpdate() + connection.commit() + true + } + } finally { + connection.close() + } + } + + def completeOneTimeJob( + fireInstanceId: String, + fireTime: Instant, + instanceDurationInMillis: Long, + success: Boolean, + ): Unit = { + val connection = getConnection() + try { + val prepared = connection.prepareStatement( + """ + UPDATE job_history + SET + success=?, + start=?, + finish=? + WHERE fire_instance_id=? + """.stripMargin, + ) + prepared.setBoolean(1, success) + prepared.setObject(2, fireTime) + prepared.setObject(3, fireTime.plusMillis(instanceDurationInMillis)) + prepared.setString(4, fireInstanceId) + prepared.executeUpdate() + } catch { + case e: Exception => logger.error("error in recording completion of one-time-job", e) + } finally { + connection.close() + } + } } diff --git a/worker/src/main/scala/com/lucidchart/piezo/TriggerHistoryModel.scala b/worker/src/main/scala/com/lucidchart/piezo/TriggerHistoryModel.scala index 8c1771c5..ef43e287 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/TriggerHistoryModel.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/TriggerHistoryModel.scala @@ -1,11 +1,10 @@ package com.lucidchart.piezo -import java.sql.Timestamp +import java.sql.{Connection, Timestamp} import org.quartz.TriggerKey import org.slf4j.LoggerFactory import java.util.Date import org.slf4j.Logger -import java.sql.Connection case class TriggerRecord( name: String, diff --git a/worker/src/main/scala/com/lucidchart/piezo/WorkerJobListener.scala b/worker/src/main/scala/com/lucidchart/piezo/WorkerJobListener.scala index 154c8461..7aadae70 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/WorkerJobListener.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/WorkerJobListener.scala @@ -23,14 +23,25 @@ class WorkerJobListener(getConnection: () => Connection, statsd: StatsDClient, u def jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException): Unit = { try { val success = jobException == null - jobHistoryModel.addJob( - context.getFireInstanceId, - context.getTrigger.getJobKey, - context.getTrigger.getKey, - context.getFireTime, - context.getJobRunTime, - success = success, - ) + val oneTimeJobIdOption: Option[String] = jobHistoryModel.getOneTimeJobIdFromDataMap(context.getMergedJobDataMap) + oneTimeJobIdOption match { + case Some(oneTimeJobId) => // Update the existing record from the job_history table + jobHistoryModel.completeOneTimeJob( + oneTimeJobId, + context.getFireTime.toInstant, + context.getJobRunTime, + success = success, + ) + case None => + jobHistoryModel.addJob( + context.getFireInstanceId, + context.getTrigger.getJobKey, + context.getTrigger.getKey, + context.getFireTime, + context.getJobRunTime, + success = success, + ) + } val suffix = if (success) "succeeded" else "failed" val jobKey = s"${context.getTrigger.getJobKey.getGroup}.${context.getTrigger.getJobKey.getName}" diff --git a/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala b/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala index ac5011c3..bde9248f 100644 --- a/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala +++ b/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala @@ -11,8 +11,14 @@ import scala.jdk.CollectionConverters.* import scala.util.Using import java.util.Date import java.io.InputStream +import java.time.Instant +import scala.concurrent.ExecutionContext.global +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} class ModelTest extends Specification with BeforeAll with AfterAll { + sequential + val propertiesStream: InputStream = getClass().getResourceAsStream("/quartz_test_mysql.properties") val properties = new Properties properties.load(propertiesStream) @@ -44,7 +50,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll { } override def beforeAll(): Unit = { - val piezoSchema = for (num <- 0 to 8) yield getPatchFile(s"piezo_mysql_$num.sql") + val piezoSchema = for (num <- 0 to 9) yield getPatchFile(s"piezo_mysql_$num.sql") val quartzSchema = getPatchFile("quartz_mysql_0.sql") val schema = (quartzSchema +: piezoSchema) .map { path => @@ -83,6 +89,14 @@ class ModelTest extends Specification with BeforeAll with AfterAll { jobHistoryModel.getJob(jobKey).headOption must beSome jobHistoryModel.getLastJobSuccessByTrigger(triggerKey) must beSome jobHistoryModel.getJobs().nonEmpty must beTrue + + // Delete the remaining record, so it doesn't affect other tests + val connection = getConnectionProvider()() + val prepared = connection.prepareStatement(s"""DELETE FROM job_history""") + prepared.executeUpdate() + connection.close() + + jobHistoryModel.getJob(jobKey).toSet mustEqual Set.empty } "work correctly with a failover for every connection to the database" in { @@ -93,13 +107,21 @@ class ModelTest extends Specification with BeforeAll with AfterAll { jobHistoryModel.addJob("abc", jobKey, triggerKey, new Date(), 1000, true) jobHistoryModel.getJob(jobKey).headOption must beSome jobHistoryModel.getLastJobSuccessByTrigger(triggerKey) must beSome + + // Delete the remaining record, so it doesn't affect other tests + val connection = getConnectionProvider()() + val prepared = connection.prepareStatement(s"""DELETE FROM job_history""") + prepared.executeUpdate() + connection.close() + + jobHistoryModel.getJob(jobKey).toSet mustEqual Set.empty } } "TriggerMonitoringModel" should { "work correctly" in { val triggerMonitoringPriorityModel = new TriggerMonitoringModel(getConnectionProvider()) - val triggerKey = new TriggerKey("blahj", "blahg") + val triggerKey = new TriggerKey("blahz", "blahz") triggerMonitoringPriorityModel.getTriggerMonitoringRecord(triggerKey) must beNone triggerMonitoringPriorityModel.setTriggerMonitoringRecord( triggerKey, @@ -113,6 +135,92 @@ class ModelTest extends Specification with BeforeAll with AfterAll { } } + "JobHistoryCleanup" should { + "cleanup only non-permanent records" in { + val jobHistoryModel = new JobHistoryModel(getConnectionProvider()) + val temporaryTriggerKey = new TriggerKey("blahjz", "blahzg") + val jobKey = new JobKey("blahjz123", "blahzg123") + val scheduledStart = java.util.Date.from(java.time.Instant.now()) + val temporaryFireInstanceId = "FireInstanceId" + val permanentFireInstanceId = 123456789 + val permanentFireInstanceIdString = permanentFireInstanceId.toString + jobHistoryModel.addJob(temporaryFireInstanceId, jobKey, temporaryTriggerKey, scheduledStart, 1, true) + jobHistoryModel.addOneTimeJobIfNotExists(jobKey, permanentFireInstanceId) + + jobHistoryModel + .getJob(jobKey) + .map(_.fire_instance_id) + .toSet mustEqual Set(temporaryFireInstanceId, permanentFireInstanceIdString) + jobHistoryModel.deleteJobs(Instant.now().plusSeconds(3).toEpochMilli) mustEqual 1 + jobHistoryModel + .getJob(jobKey) + .map(_.fire_instance_id) + .toSet mustEqual Set(permanentFireInstanceIdString) + + // Delete the remaining record, so it doesn't affect other tests + val connection = getConnectionProvider()() + val prepared = connection.prepareStatement(s"""DELETE FROM job_history""") + prepared.executeUpdate() + connection.close() + + jobHistoryModel.getJob(jobKey).toSet mustEqual Set.empty + } + + "only triggers job once, when given the same fireInstanceId" in { + given scala.concurrent.ExecutionContext = global + + val jobHistoryModel = new JobHistoryModel(getConnectionProvider()) + val jobKey = new JobKey("blahjzasd", "blahzgasd") + val fireInstanceId: Long = 123123123 + + val combinedFutures: Future[Set[Boolean]] = Future.sequence( + Set( + Future { + jobHistoryModel.addOneTimeJobIfNotExists(jobKey, fireInstanceId) + }, + Future { + jobHistoryModel.addOneTimeJobIfNotExists(jobKey, fireInstanceId) + }, + ), + ) + + // Doesn't matter which one inserted the record, as long as one did, and one didn't + Await.result(combinedFutures, Duration.Inf) mustEqual Set(true, false) + + val fireTime = java.time.Instant.now() + val instanceDurationInMillis: Long = 1000 + jobHistoryModel.completeOneTimeJob( + fireInstanceId.toString, + fireTime, + instanceDurationInMillis, + true, + ) + + // Verify that only one of the one-time-jobs was added to the table, and that it was "completed" with the correct time + // Calculate expected finish time the same way completeOneTimeJob does to avoid rounding issues + val expectedFinishSeconds = fireTime.plusMillis(instanceDurationInMillis).getEpochSecond + jobHistoryModel + .getJob(jobKey) + .map(record => (record.fire_instance_id, record.finish.toInstant.getEpochSecond)) must containTheSameElementsAs( + List( + ( + fireInstanceId.toString, + expectedFinishSeconds, + ), + ), + ) + jobHistoryModel.deleteJobs(Instant.now().plusSeconds(3).toEpochMilli) mustEqual 0 + + // Delete the remaining record, so it doesn't affect other tests + val connection = getConnectionProvider()() + val prepared = connection.prepareStatement(s"""DELETE FROM job_history""") + prepared.executeUpdate() + connection.close() + + jobHistoryModel.getJob(jobKey).toSet mustEqual Set.empty + } + } + "TriggerHistoryModel" should { "work correctly" in { val triggerHistoryModel = new TriggerHistoryModel(getConnectionProvider())