Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions admin/app/com/lucidchart/piezo/admin/controllers/Triggers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,34 @@ class Triggers(
}
}

def triggerJobOneTime(group: String, name: String, id: Long): Action[AnyContent] = Action { request =>
val jobKey = new JobKey(name, group)

if (scheduler.checkExists(jobKey)) {
try {
// Only run a trigger, if we haven't seen this id before
if (jobHistoryModel.addOneTimeJobIfNotExists(jobKey, id)) {
// Single run trigger has its id passed to the scheduler via the job-data-map. The WorkerJobListener will
// use that id to update the existing record in job_history table
val jobDataMap = jobHistoryModel.createJobDataMapForOneTimeJob(id)
scheduler.triggerJob(jobKey, jobDataMap)
}
Ok
} catch {
case e: SchedulerException => {
logger.error(
"Exception caught triggering job one-time %s %s - %s. -- %s"
.format(group, name, id, e.getLocalizedMessage),
e,
)
InternalServerError
}
}
} else {
NotFound
}
}

def patchTrigger(group: String, name: String): Action[AnyContent] = Action { implicit request =>
val triggerKey = new TriggerKey(name, group)
val triggerExists = scheduler.checkExists(triggerKey)
Expand Down
63 changes: 32 additions & 31 deletions admin/conf/routes
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,40 @@
# ~~~~

# Home page
GET / com.lucidchart.piezo.admin.controllers.ApplicationController.index
GET /jobs com.lucidchart.piezo.admin.controllers.Jobs.getIndex
GET /jobs/new com.lucidchart.piezo.admin.controllers.Jobs.getNewJobForm(templateGroup: Option[String] ?= None, templateName: Option[String] ?= None)
GET /jobs/:group/:name com.lucidchart.piezo.admin.controllers.Jobs.getJob(group: String, name: String)
POST /jobs/:group/:name com.lucidchart.piezo.admin.controllers.Jobs.putJob(group: String, name: String)
DELETE /jobs/:group/:name com.lucidchart.piezo.admin.controllers.Jobs.deleteJob(group: String, name: String)
GET /jobs/:group/:name/editor com.lucidchart.piezo.admin.controllers.Jobs.getEditJobAction(group: String, name: String)
POST /jobs com.lucidchart.piezo.admin.controllers.Jobs.postJob

POST /data/jobs com.lucidchart.piezo.admin.controllers.Jobs.postJobs
GET /data/jobs com.lucidchart.piezo.admin.controllers.Jobs.getJobsDetail
GET /data/jobs/:group/:name com.lucidchart.piezo.admin.controllers.Jobs.getJobDetail(group: String, name: String)

GET /typeahead/jobs/:sofar com.lucidchart.piezo.admin.controllers.Jobs.jobGroupTypeAhead(sofar: String)
GET /typeahead/jobs/:group/:sofar com.lucidchart.piezo.admin.controllers.Jobs.jobNameTypeAhead(group: String, sofar: String)

GET /triggers com.lucidchart.piezo.admin.controllers.Triggers.getIndex
GET /triggers/new/:triggerType com.lucidchart.piezo.admin.controllers.Triggers.getNewTriggerForm(triggerType, jobGroup: String ?= "", jobName: String ?= "", templateGroup: Option[String] ?= None, templateName: Option[String] ?= None)
GET /triggers/:group/:name com.lucidchart.piezo.admin.controllers.Triggers.getTrigger(group: String, name: String)
POST /triggers/:group/:name com.lucidchart.piezo.admin.controllers.Triggers.putTrigger(group: String, name: String)
DELETE /triggers/:group/:name com.lucidchart.piezo.admin.controllers.Triggers.deleteTrigger(group: String, name: String)
GET /triggers/:group/:name/editor com.lucidchart.piezo.admin.controllers.Triggers.getEditTriggerAction(group: String, name: String)
POST /triggers/:group/:name/runner com.lucidchart.piezo.admin.controllers.Triggers.triggerJob(group: String, name: String)
POST /triggers com.lucidchart.piezo.admin.controllers.Triggers.postTrigger()
PATCH /triggers/:group/:name com.lucidchart.piezo.admin.controllers.Triggers.patchTrigger(group: String, name: String)

GET /typeahead/triggers/:sofar com.lucidchart.piezo.admin.controllers.Triggers.triggerGroupTypeAhead(sofar: String)

GET /favicon.ico controllers.Assets.at(path="/public/img", file="favicon.ico")
GET / com.lucidchart.piezo.admin.controllers.ApplicationController.index
GET /jobs com.lucidchart.piezo.admin.controllers.Jobs.getIndex
GET /jobs/new com.lucidchart.piezo.admin.controllers.Jobs.getNewJobForm(templateGroup: Option[String] ?= None, templateName: Option[String] ?= None)
GET /jobs/:group/:name com.lucidchart.piezo.admin.controllers.Jobs.getJob(group: String, name: String)
POST /jobs/:group/:name com.lucidchart.piezo.admin.controllers.Jobs.putJob(group: String, name: String)
DELETE /jobs/:group/:name com.lucidchart.piezo.admin.controllers.Jobs.deleteJob(group: String, name: String)
GET /jobs/:group/:name/editor com.lucidchart.piezo.admin.controllers.Jobs.getEditJobAction(group: String, name: String)
POST /jobs com.lucidchart.piezo.admin.controllers.Jobs.postJob

POST /data/jobs com.lucidchart.piezo.admin.controllers.Jobs.postJobs
GET /data/jobs com.lucidchart.piezo.admin.controllers.Jobs.getJobsDetail
GET /data/jobs/:group/:name com.lucidchart.piezo.admin.controllers.Jobs.getJobDetail(group: String, name: String)

GET /typeahead/jobs/:sofar com.lucidchart.piezo.admin.controllers.Jobs.jobGroupTypeAhead(sofar: String)
GET /typeahead/jobs/:group/:sofar com.lucidchart.piezo.admin.controllers.Jobs.jobNameTypeAhead(group: String, sofar: String)

GET /triggers com.lucidchart.piezo.admin.controllers.Triggers.getIndex
GET /triggers/new/:triggerType com.lucidchart.piezo.admin.controllers.Triggers.getNewTriggerForm(triggerType, jobGroup: String ?= "", jobName: String ?= "", templateGroup: Option[String] ?= None, templateName: Option[String] ?= None)
GET /triggers/:group/:name com.lucidchart.piezo.admin.controllers.Triggers.getTrigger(group: String, name: String)
POST /triggers/:group/:name com.lucidchart.piezo.admin.controllers.Triggers.putTrigger(group: String, name: String)
DELETE /triggers/:group/:name com.lucidchart.piezo.admin.controllers.Triggers.deleteTrigger(group: String, name: String)
GET /triggers/:group/:name/editor com.lucidchart.piezo.admin.controllers.Triggers.getEditTriggerAction(group: String, name: String)
POST /triggers/:group/:name/runner com.lucidchart.piezo.admin.controllers.Triggers.triggerJob(group: String, name: String)
POST /triggers/:group/:name/onetime/:id com.lucidchart.piezo.admin.controllers.Triggers.triggerJobOneTime(group: String, name: String, id: Long)
POST /triggers com.lucidchart.piezo.admin.controllers.Triggers.postTrigger()
PATCH /triggers/:group/:name com.lucidchart.piezo.admin.controllers.Triggers.patchTrigger(group: String, name: String)

GET /typeahead/triggers/:sofar com.lucidchart.piezo.admin.controllers.Triggers.triggerGroupTypeAhead(sofar: String)

GET /favicon.ico controllers.Assets.at(path="/public/img", file="favicon.ico")

# Worker Health Check
GET /health com.lucidchart.piezo.admin.controllers.HealthCheck.main()
GET /health com.lucidchart.piezo.admin.controllers.HealthCheck.main()

# Map static resources from the /public folder to the /assets URL path

GET /assets/*file controllers.Assets.at(path="/public", file)
GET /assets/*file controllers.Assets.at(path="/public", file)
3 changes: 3 additions & 0 deletions worker/src/main/resources/piezo_mysql_9.sql

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to manually apply this patch everywhere unfortunately (we probably don't care as much about dev/staging though)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should flatten these patches, so that there is just a single sql file you run to set up the tables (but keep the patches files so you can migrate existing DBs?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think thats a good idea, but it is out of scope for this PR

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE job_history
DROP INDEX start_key,
ADD INDEX start_key (start, trigger_group);
115 changes: 113 additions & 2 deletions worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package com.lucidchart.piezo

import java.sql.{ResultSet, Timestamp}
import java.util.Date
import org.quartz.{JobKey, TriggerKey}
import org.quartz.{JobDataMap, JobKey, TriggerKey}
import org.slf4j.LoggerFactory
import org.slf4j.Logger
import java.sql.Connection
import java.time.Instant

case class JobRecord(
name: String,
Expand All @@ -21,6 +22,20 @@ case class JobRecord(
class JobHistoryModel(getConnection: () => Connection) {
val logger: Logger = LoggerFactory.getLogger(this.getClass)

// Trigger Group for records that aren't deletable
private final val oneTimeJobTriggerGroup = "ONE_TIME_JOB"
final def oneTimeTriggerKey(fireInstanceId: Long): TriggerKey =
TriggerKey(fireInstanceId.toString, oneTimeJobTriggerGroup)

// Methods to store the one-time-job id in a job-data-map
final val jobDataMapOneTimeJobKey = "OneTimeJobId"
final def getOneTimeJobIdFromDataMap(jobDataMap: JobDataMap): Option[String] = Option(
jobDataMap.getString(jobDataMapOneTimeJobKey),
)
final def createJobDataMapForOneTimeJob(id: Long): JobDataMap = new JobDataMap(
java.util.Map.of(jobDataMapOneTimeJobKey, id),
)

def addJob(
fireInstanceId: String,
jobKey: JobKey,
Expand Down Expand Up @@ -66,10 +81,11 @@ class JobHistoryModel(getConnection: () => Connection) {
val connection = getConnection()
try {
val prepared = connection.prepareStatement(
"""
s"""
DELETE
FROM job_history
WHERE start < ?
AND trigger_group != '$oneTimeJobTriggerGroup'
""".stripMargin,
)
prepared.setTimestamp(1, new Timestamp(minStart))
Expand Down Expand Up @@ -190,4 +206,99 @@ class JobHistoryModel(getConnection: () => Connection) {
rs.getString("fire_instance_id"),
)
}

/**
* Check if we have already triggered a one-time-job with the given trigger key and fireInstanceId.
*
* This is useful for seeing if a one-time job has already been triggered, to ensure that triggering a one-time job
* with the same instance id is an idempotent operation. If the one-time job has not been triggered, the same
* transaction is used to add the one-time-job to the database, to avoid race conditions
*/
def addOneTimeJobIfNotExists(jobKey: JobKey, fireInstanceId: Long): Boolean = {
val connection = getConnection()
connection.setAutoCommit(false)

// Use a trigger key that the database won't clean up in "JobHistoryCleanup"
val triggerKey: TriggerKey = oneTimeTriggerKey(fireInstanceId)

try {
val prepared = connection.prepareStatement(
"""
SELECT EXISTS(
SELECT 1
FROM job_history
WHERE fire_instance_id=?
LIMIT 1
) as record_exists
""".stripMargin,
)
prepared.setString(1, fireInstanceId.toString)
val rs = prepared.executeQuery()
if (rs.next() && rs.getBoolean(1)) {
// Record already exists, don't add it again
false
} else {
// Mark this trigger (as already run) in the same transaction, to prevent race conditions where two requests
// trigger the same job
val triggerStartTime: Instant = Instant.now()
// Same as "addJob()" but without the finish
val prepared = connection.prepareStatement(
"""
INSERT INTO job_history(
fire_instance_id,
job_name,
job_group,
trigger_name,
trigger_group,
success,
start
)
VALUES(?, ?, ?, ?, ?, ?, ?)
""".stripMargin,
)
prepared.setString(1, fireInstanceId.toString)
prepared.setString(2, jobKey.getName)
prepared.setString(3, jobKey.getGroup)
prepared.setString(4, triggerKey.getName)
prepared.setString(5, triggerKey.getGroup)
prepared.setBoolean(6, true)
prepared.setObject(7, triggerStartTime)
prepared.executeUpdate()
connection.commit()
true
}
} finally {
connection.close()
}
}

def completeOneTimeJob(
fireInstanceId: String,
fireTime: Instant,
instanceDurationInMillis: Long,
success: Boolean,
): Unit = {
val connection = getConnection()
try {
val prepared = connection.prepareStatement(
"""
UPDATE job_history
SET
success=?,
start=?,
finish=?
WHERE fire_instance_id=?
""".stripMargin,
)
prepared.setBoolean(1, success)
prepared.setObject(2, fireTime)
prepared.setObject(3, fireTime.plusMillis(instanceDurationInMillis))
prepared.setString(4, fireInstanceId)
prepared.executeUpdate()
} catch {
case e: Exception => logger.error("error in recording completion of one-time-job", e)
} finally {
connection.close()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package com.lucidchart.piezo

import java.sql.Timestamp
import java.sql.{Connection, Timestamp}
import org.quartz.TriggerKey
import org.slf4j.LoggerFactory
import java.util.Date
import org.slf4j.Logger
import java.sql.Connection

case class TriggerRecord(
name: String,
Expand Down
27 changes: 19 additions & 8 deletions worker/src/main/scala/com/lucidchart/piezo/WorkerJobListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,25 @@ class WorkerJobListener(getConnection: () => Connection, statsd: StatsDClient, u
def jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException): Unit = {
try {
val success = jobException == null
jobHistoryModel.addJob(
context.getFireInstanceId,
context.getTrigger.getJobKey,
context.getTrigger.getKey,
context.getFireTime,
context.getJobRunTime,
success = success,
)
val oneTimeJobIdOption: Option[String] = jobHistoryModel.getOneTimeJobIdFromDataMap(context.getMergedJobDataMap)
oneTimeJobIdOption match {
case Some(oneTimeJobId) => // Update the existing record from the job_history table
jobHistoryModel.completeOneTimeJob(
oneTimeJobId,
context.getFireTime.toInstant,
context.getJobRunTime,
success = success,
)
case None =>
jobHistoryModel.addJob(
context.getFireInstanceId,
context.getTrigger.getJobKey,
context.getTrigger.getKey,
context.getFireTime,
context.getJobRunTime,
success = success,
)
}

val suffix = if (success) "succeeded" else "failed"
val jobKey = s"${context.getTrigger.getJobKey.getGroup}.${context.getTrigger.getJobKey.getName}"
Expand Down
Loading