Skip to content
52 changes: 52 additions & 0 deletions OptimizelySwiftSDK.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

160 changes: 107 additions & 53 deletions Sources/Customization/DefaultEventDispatcher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ open class DefaultEventDispatcher: BackgroundingCallbacks, OPTEventDispatcher {
// network reachability
let reachability = NetworkReachability(maxContiguousFails: 1)

// sync group used to ensure that the flushEvents is synchronous for close()
let notify = DispatchGroup()
// track if flush is currently in progress
private var isFlushing = false

public init(batchSize: Int = DefaultValues.batchSize,
backingStore: DataStoreType = .file,
dataStoreName: String = "OPTEventQueue",
Expand Down Expand Up @@ -107,69 +112,111 @@ open class DefaultEventDispatcher: BackgroundingCallbacks, OPTEventDispatcher {
completionHandler?(.success(event.body))
}

// notify group used to ensure that the sendEvent is synchronous.
// used in flushEvents
let notify = DispatchGroup()

// Per-batch retry: Each batch gets up to 3 attempts with exponential backoff
// Global failure counter stops processing after 3 consecutive batch failures

open func flushEvents() {
queueLock.async {
// we don't remove anthing off of the queue unless it is successfully sent.
var failureCount = 0
guard !self.isFlushing else { return }

func removeStoredEvents(num: Int) {
if let removedItem = self.eventQueue.removeFirstItems(count: num), removedItem.count > 0 {
// avoid event-log-message preparation overheads with closure-logging
self.logger.d({ "Removed stored \(num) events starting with \(removedItem.first!)" })
} else {
self.logger.e("Failed to removed \(num) events")
}
}
self.isFlushing = true
self.notify.enter()

while let eventsToSend: [EventForDispatch] = self.eventQueue.getFirstItems(count: self.batchSize) {
let (numEvents, batched) = eventsToSend.batch()

guard numEvents > 0 else { break }

guard let batchEvent = batched else {
// discard an invalid event that causes batching failure
// - if an invalid event is found while batching, it batches all the valid ones before the invalid one and sends it out.
// - when trying to batch next, it finds the invalid one at the header. It discards that specific invalid one and continue batching next ones.

removeStoredEvents(num: 1)
continue
}

// we've exhuasted our failure count. Give up and try the next time a event
// is queued or someone calls flush (changed to >= so that retried exactly "maxFailureCount" times).
if failureCount >= DefaultValues.maxFailureCount {
self.logger.e(.eventSendRetyFailed(failureCount))
break
}

// make the send event synchronous. enter our notify
self.notify.enter()
self.sendEvent(event: batchEvent) { (result) -> Void in
switch result {
case .failure(let error):
self.logger.e(error.reason)
failureCount += 1
case .success:
// we succeeded. remove the batch size sent.
removeStoredEvents(num: numEvents)

// reset failureCount
failureCount = 0
self.processNextBatch(failureCount: 0)
}
}

private func processNextBatch(failureCount: Int) {
// Global failure counter across all batches in this flush
if failureCount >= DefaultValues.maxFailureCount {
self.logger.e(.eventSendRetyFailed(failureCount))
self.finishFlush()
return
}

// Check reachability
if self.reachability.shouldBlockNetworkAccess() {
self.logger.e("NetworkReachability down")
self.finishFlush()
return
}

guard let eventsToSend: [EventForDispatch] = self.eventQueue.getFirstItems(count: self.batchSize) else {
self.finishFlush()
return
}

let (numEvents, batchedEvent) = eventsToSend.batch()

guard numEvents > 0 else {
self.finishFlush()
return
}

guard let batchEvent = batchedEvent else {
// discard an invalid event that causes batching failure
// - if an invalid event is found while batching, it batches all the valid ones before the invalid one and sends it out.
// - when trying to batch next, it finds the invalid one at the header. It discards and continue with next batch
self.removeStoredEvents(num: 1)
self.processNextBatch(failureCount: failureCount)
return
}

self.sendBatch(event: batchEvent, numEvents: numEvents) { success in
if success {
self.removeStoredEvents(num: numEvents)
self.processNextBatch(failureCount: 0)
} else {
// Retry with backoff
let attempt = failureCount + 1
if attempt < DefaultValues.maxFailureCount {
let delay = self.calculateRetryDelay(attempt: attempt)
self.queueLock.asyncAfter(deadline: .now() + delay) {
self.processNextBatch(failureCount: attempt)
}
// our send is done.
self.notify.leave()

} else {
self.logger.e(.eventSendRetyFailed(attempt))
self.finishFlush()
}
// wait for send
self.notify.wait()
}
}
}

private func sendBatch(event: EventForDispatch, numEvents: Int, completion: @escaping (Bool) -> Void) {
self.sendEvent(event: event) { result in
switch result {
case .success:
completion(true)
case .failure(let error):
self.logger.e(error.reason)
completion(false)
}
}
}

private func finishFlush() {
self.isFlushing = false
self.notify.leave()
}

private func removeStoredEvents(num: Int) {
if let removedItem = self.eventQueue.removeFirstItems(count: num), removedItem.count > 0 {
self.logger.d({ "Removed \(num) event(s) from queue starting with \(removedItem.first!)" })
} else {
self.logger.e("Failed to remove \(num) event(s) from queue")
}
}

/// Calculate retry delay using exponential backoff
/// - Parameter attempt: Current attempt number (1, 2, 3)
/// - Returns: Delay in seconds (200ms, 400ms, 800ms, capped at 1s)
private func calculateRetryDelay(attempt: Int) -> TimeInterval {
let retryStrategy = RetryStrategy(maxRetries: 2,
initialInterval: 0.2,
maxInterval: 1.0)
return retryStrategy.delayForRetry(attempt: attempt)
}

open func sendEvent(event: EventForDispatch, completionHandler: @escaping DispatchCompletionHandler) {

if self.reachability.shouldBlockNetworkAccess() {
Expand Down Expand Up @@ -211,7 +258,14 @@ open class DefaultEventDispatcher: BackgroundingCallbacks, OPTEventDispatcher {

open func close() {
self.flushEvents()
// Ensure flush async block has started
self.queueLock.sync {}
// Wait for the flush to complete with a safety timeout.
// We use a 10-second timeout to prevent the app from hanging indefinitely during shutdown.
// If the flush takes longer (e.g. due to slow network or large queue), we proceed to close
// to avoid the OS watchdog killing the app for blocking the main thread for too long.
// This ensures a "best effort" flush while prioritizing a safe and graceful exit.
_ = self.notify.wait(timeout: .now() + 10.0)
}

}
Expand Down
4 changes: 2 additions & 2 deletions Sources/Extensions/ArrayEventForDispatch+Extension.swift
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ extension Array where Element == EventForDispatch {
// no batched event since the first event is invalid. notify so that it can be removed.
return (1, nil)
}

return (eventsBatched.count, makeBatchEvent(base: eventsBatched.first!, visitors: visitors, url: url))
}

Expand All @@ -112,7 +112,7 @@ extension Array where Element == EventForDispatch {
anonymizeIP: base.anonymizeIP,
enrichDecisions: true,
region: base.region)

guard let data = try? JSONEncoder().encode(batchEvent) else {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/ODP/OdpConfig.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class OdpConfig {
// disable future event queueing if datafile has no ODP integrations.
self.odpServiceIntegrated = .notIntegrated
}

if self.apiKey == apiKey, self.apiHost == apiHost, self.segmentsToCheck == segmentsToCheck {
return false
} else {
Expand Down
Loading
Loading