Skip to content

Commit a60565f

Browse files
committed
fix(redis): store EXAT/PXAT options and fix cancellation race across pipelines
1 parent 60d6124 commit a60565f

3 files changed

Lines changed: 10 additions & 6 deletions

File tree

Plugins/RedisDriverPlugin/RedisCommandParser.swift

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ enum RedisOperation {
7373
struct RedisSetOptions {
7474
var ex: Int?
7575
var px: Int?
76+
var exat: Int?
77+
var pxat: Int?
7678
var nx: Bool = false
7779
var xx: Bool = false
7880
}
@@ -998,18 +1000,20 @@ struct RedisCommandParser {
9981000
guard i + 1 < args.count else {
9991001
throw RedisParseError.missingArgument("EXAT requires a value")
10001002
}
1001-
guard Int(args[i + 1]) != nil else {
1003+
guard let timestamp = Int(args[i + 1]) else {
10021004
throw RedisParseError.invalidArgument("EXAT value must be a positive integer")
10031005
}
1006+
options.exat = timestamp
10041007
hasOption = true
10051008
i += 1
10061009
case "PXAT":
10071010
guard i + 1 < args.count else {
10081011
throw RedisParseError.missingArgument("PXAT requires a value")
10091012
}
1010-
guard Int(args[i + 1]) != nil else {
1013+
guard let timestamp = Int(args[i + 1]) else {
10111014
throw RedisParseError.invalidArgument("PXAT value must be a positive integer")
10121015
}
1016+
options.pxat = timestamp
10131017
hasOption = true
10141018
i += 1
10151019
case "NX":

Plugins/RedisDriverPlugin/RedisPluginConnection.swift

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ final class RedisPluginConnection: @unchecked Sendable {
331331
}
332332
}
333333

334-
private func resetCancellation() {
334+
func resetCancellation() {
335335
stateLock.lock()
336336
_isCancelled = false
337337
stateLock.unlock()
@@ -356,7 +356,6 @@ final class RedisPluginConnection: @unchecked Sendable {
356356
func executeCommand(_ args: [String]) async throws -> RedisReply {
357357
#if canImport(CRedis)
358358
return try await pluginDispatchAsync(on: queue) { [self] in
359-
resetCancellation()
360359
guard !isShuttingDown else {
361360
throw RedisPluginError.notConnected
362361
}
@@ -379,7 +378,6 @@ final class RedisPluginConnection: @unchecked Sendable {
379378
func executePipeline(_ commands: [[String]]) async throws -> [RedisReply] {
380379
#if canImport(CRedis)
381380
return try await pluginDispatchAsync(on: queue) { [self] in
382-
resetCancellation()
383381
guard !isShuttingDown else {
384382
throw RedisPluginError.notConnected
385383
}
@@ -404,7 +402,6 @@ final class RedisPluginConnection: @unchecked Sendable {
404402
func selectDatabase(_ index: Int) async throws {
405403
#if canImport(CRedis)
406404
try await pluginDispatchAsync(on: queue) { [self] in
407-
resetCancellation()
408405
guard !isShuttingDown else {
409406
throw RedisPluginError.notConnected
410407
}

Plugins/RedisDriverPlugin/RedisPluginDriver.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ final class RedisPluginDriver: PluginDatabaseDriver, @unchecked Sendable {
7878
let startTime = Date()
7979
cachedScanPattern = nil
8080
cachedScanKeys = nil
81+
redisConnection?.resetCancellation()
8182

8283
guard let conn = redisConnection else {
8384
throw RedisPluginError.notConnected
@@ -610,6 +611,8 @@ private extension RedisPluginDriver {
610611
if let opts = options {
611612
if let ex = opts.ex { args += ["EX", String(ex)] }
612613
if let px = opts.px { args += ["PX", String(px)] }
614+
if let exat = opts.exat { args += ["EXAT", String(exat)] }
615+
if let pxat = opts.pxat { args += ["PXAT", String(pxat)] }
613616
if opts.nx { args.append("NX") }
614617
if opts.xx { args.append("XX") }
615618
}

0 commit comments

Comments
 (0)