Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 16 additions & 34 deletions worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -216,35 +216,18 @@ class JobHistoryModel(getConnection: () => Connection) {
*/
def addOneTimeJobIfNotExists(jobKey: JobKey, fireInstanceId: Long): Boolean = {
val connection = getConnection()
connection.setAutoCommit(false)

// Use a trigger key that the database won't clean up in "JobHistoryCleanup"
val triggerKey: TriggerKey = oneTimeTriggerKey(fireInstanceId)

try {
// Mark this trigger (as already run) in the same transaction, to prevent race conditions where two requests
// trigger the same job
val triggerStartTime: Instant = Instant.now()
// Same as "addJob()" but without the finish
val prepared = connection.prepareStatement(
"""
SELECT EXISTS(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hunterrees do you have an opinion on whether we should use INSERT IGNORE, or add a FOR UPDATE for this select?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the INSERT IGNORE is cleaner personally. Makes it easier to follow

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If INSERT IGNORE will work I think that will be better. Cleaner and I believe fewer locks to worry about.

SELECT 1
FROM job_history
WHERE fire_instance_id=?
LIMIT 1
) as record_exists
""".stripMargin,
)
prepared.setString(1, fireInstanceId.toString)
val rs = prepared.executeQuery()
if (rs.next() && rs.getBoolean(1)) {
// Record already exists, don't add it again
false
} else {
// Mark this trigger (as already run) in the same transaction, to prevent race conditions where two requests
// trigger the same job
val triggerStartTime: Instant = Instant.now()
// Same as "addJob()" but without the finish
val prepared = connection.prepareStatement(
"""
INSERT INTO job_history(
INSERT IGNORE INTO job_history(
fire_instance_id,
job_name,
job_group,
Expand All @@ -255,18 +238,17 @@ class JobHistoryModel(getConnection: () => Connection) {
)
VALUES(?, ?, ?, ?, ?, ?, ?)
""".stripMargin,
)
prepared.setString(1, fireInstanceId.toString)
prepared.setString(2, jobKey.getName)
prepared.setString(3, jobKey.getGroup)
prepared.setString(4, triggerKey.getName)
prepared.setString(5, triggerKey.getGroup)
prepared.setBoolean(6, true)
prepared.setObject(7, triggerStartTime)
prepared.executeUpdate()
connection.commit()
true
}
)
prepared.setString(1, fireInstanceId.toString)
prepared.setString(2, jobKey.getName)
prepared.setString(3, jobKey.getGroup)
prepared.setString(4, triggerKey.getName)
prepared.setString(5, triggerKey.getGroup)
prepared.setBoolean(6, true)
prepared.setObject(7, triggerStartTime)
// Check if we actually inserted the row,
// if we didn't, then the firs_instance_id was already in use
prepared.executeUpdate() > 0
} finally {
connection.close()
}
Expand Down
9 changes: 7 additions & 2 deletions worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import scala.util.Using
import java.util.Date
import java.io.InputStream
import java.time.Instant
import java.time.temporal.ChronoUnit.SECONDS
import scala.concurrent.ExecutionContext.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
Expand Down Expand Up @@ -187,8 +188,12 @@ class ModelTest extends Specification with BeforeAll with AfterAll {
// Doesn't matter which one inserted the record, as long as one did, and one didn't
Await.result(combinedFutures, Duration.Inf) mustEqual Set(true, false)

val fireTime = java.time.Instant.now()
val instanceDurationInMillis: Long = 1000
// Truncate to the second, so that we don't end up with a rounding
// error when we do the comparison.
// Otherwise, on insert, the second might be rounded up, and we end up a second
// of from adding one second to the actual date time, and truncating it.
val fireTime = java.time.Instant.now().truncatedTo(SECONDS)
val instanceDurationInMillis: Long = 3000
jobHistoryModel.completeOneTimeJob(
fireInstanceId.toString,
fireTime,
Expand Down