From a2a16dc4f4eab3699b26a5e98023de251592ed0d Mon Sep 17 00:00:00 2001 From: Hadi Dbouk Date: Mon, 19 Jan 2026 15:55:42 +0200 Subject: [PATCH] feat(swift-concurrency): add swift concurrency target --- Package.resolved | 29 +- Package.swift | 50 +- .../ObservableObject+Extensions.swift | 2 +- .../Extensions/Publisher+Extensions.swift | 2 +- .../Combine/Operators/Combine+Operators.swift | 4 +- .../SwiftConcurrency/AsyncCache.swift | 58 +++ .../SwiftConcurrency/AsyncSemaphore.swift | 168 +++++++ .../AsyncSequence+TakeUntil.swift | 65 +++ .../AsyncSequence/AsyncSequence+Timeout.swift | 114 +++++ .../AsyncStream+FlatMapOptional.swift | 21 + .../AsyncStream+PromoteOptional.swift | 16 + .../AsyncStream/AsyncStream+Single.swift | 14 + .../AsyncStream/AsyncStream+Timeout.swift | 60 +++ .../SwiftConcurrency/BroadcastStream.swift | 81 ++++ .../SwiftConcurrency/BufferedStream.swift | 112 +++++ .../CurrentValueAsyncSubject.swift | 103 ++++ Tests/FueledUtils-Package.xctestplan | 11 +- .../AnyCurrentValuePublisherTests.swift | 2 +- .../CombineTests/CombineLatestManyTests.swift | 2 +- .../CombineTests/SinkForLifetimeTests.swift | 2 +- .../AsyncCacheTests.swift | 92 ++++ .../AsyncSemaphoreTests.swift | 447 ++++++++++++++++++ .../TakeUntilAsyncSequenceTests.swift | 64 +++ .../TimeoutAsyncSequenceTests.swift | 120 +++++ .../AsyncStreamFlatMapOptionalTests.swift | 135 ++++++ .../AsyncStreamPromoteOptionalTests.swift | 77 +++ .../AsyncStreamSingleTests.swift | 30 ++ .../AsyncStreamTimeoutTests.swift | 73 +++ .../BroadcastStreamTests.swift | 140 ++++++ .../BufferedStreamTests.swift | 202 ++++++++ .../CurrentValueAsyncSubjectTests.swift | 107 +++++ 31 files changed, 2378 insertions(+), 25 deletions(-) create mode 100644 Sources/FueledUtils/SwiftConcurrency/AsyncCache.swift create mode 100644 Sources/FueledUtils/SwiftConcurrency/AsyncSemaphore.swift create mode 100644 Sources/FueledUtils/SwiftConcurrency/AsyncSequence/AsyncSequence+TakeUntil.swift create mode 100644 Sources/FueledUtils/SwiftConcurrency/AsyncSequence/AsyncSequence+Timeout.swift create mode 100644 Sources/FueledUtils/SwiftConcurrency/AsyncStream/AsyncStream+FlatMapOptional.swift create mode 100644 Sources/FueledUtils/SwiftConcurrency/AsyncStream/AsyncStream+PromoteOptional.swift create mode 100644 Sources/FueledUtils/SwiftConcurrency/AsyncStream/AsyncStream+Single.swift create mode 100644 Sources/FueledUtils/SwiftConcurrency/AsyncStream/AsyncStream+Timeout.swift create mode 100644 Sources/FueledUtils/SwiftConcurrency/BroadcastStream.swift create mode 100644 Sources/FueledUtils/SwiftConcurrency/BufferedStream.swift create mode 100644 Sources/FueledUtils/SwiftConcurrency/CurrentValueAsyncSubject.swift create mode 100644 Tests/FueledUtils/SwiftConcurrencyTests/AsyncCacheTests.swift create mode 100644 Tests/FueledUtils/SwiftConcurrencyTests/AsyncSemaphoreTests.swift create mode 100644 Tests/FueledUtils/SwiftConcurrencyTests/AsyncSequenceTests/TakeUntilAsyncSequenceTests.swift create mode 100644 Tests/FueledUtils/SwiftConcurrencyTests/AsyncSequenceTests/TimeoutAsyncSequenceTests.swift create mode 100644 Tests/FueledUtils/SwiftConcurrencyTests/AsyncStreamTests/AsyncStreamFlatMapOptionalTests.swift create mode 100644 Tests/FueledUtils/SwiftConcurrencyTests/AsyncStreamTests/AsyncStreamPromoteOptionalTests.swift create mode 100644 Tests/FueledUtils/SwiftConcurrencyTests/AsyncStreamTests/AsyncStreamSingleTests.swift create mode 100644 Tests/FueledUtils/SwiftConcurrencyTests/AsyncStreamTests/AsyncStreamTimeoutTests.swift create mode 100644 Tests/FueledUtils/SwiftConcurrencyTests/BroadcastStreamTests.swift create mode 100644 Tests/FueledUtils/SwiftConcurrencyTests/BufferedStreamTests.swift create mode 100644 Tests/FueledUtils/SwiftConcurrencyTests/CurrentValueAsyncSubjectTests.swift diff --git a/Package.resolved b/Package.resolved index 07f58325..8ee743f9 100644 --- a/Package.resolved +++ b/Package.resolved @@ -1,6 +1,33 @@ { - "originHash" : "21ed4123ddbef390cf9db62f8ca1d97008bd0d3abf52f7ab337c97733f79948a", + "originHash" : "e0b46ae6f466744ffbe3659eeeb1b349125b8d634a79e1ee360b6db1b7371410", "pins" : [ + { + "identity" : "swift-async-algorithms", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-async-algorithms", + "state" : { + "revision" : "6c050d5ef8e1aa6342528460db614e9770d7f804", + "version" : "1.1.1" + } + }, + { + "identity" : "swift-collections", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-collections.git", + "state" : { + "revision" : "7b847a3b7008b2dc2f47ca3110d8c782fb2e5c7e", + "version" : "1.3.0" + } + }, + { + "identity" : "swift-concurrency-extras", + "kind" : "remoteSourceControl", + "location" : "https://github.com/pointfreeco/swift-concurrency-extras", + "state" : { + "revision" : "5a3825302b1a0d744183200915a47b508c828e6f", + "version" : "1.3.2" + } + }, { "identity" : "swift-docc-plugin", "kind" : "remoteSourceControl", diff --git a/Package.swift b/Package.swift index 7db1f8ef..38e0a0fb 100644 --- a/Package.swift +++ b/Package.swift @@ -1,5 +1,4 @@ -// swift-tools-version:6.0 -// The swift-tools-version declares the minimum version of Swift required to build this package. +// swift-tools-version:6.2 import PackageDescription @@ -10,50 +9,71 @@ let package = Package( ], products: [ .library( - name: "FueledUtilsCore", - targets: ["FueledUtilsCore"] + name: "FueledCore", + targets: ["FueledCore"] ), .library( - name: "FueledUtilsCombine", - targets: ["FueledUtilsCombine"] + name: "FueledCombine", + targets: ["FueledCombine"] ), .library( - name: "FueledUtilsSwiftUI", - targets: ["FueledUtilsSwiftUI"] + name: "FueledSwiftUI", + targets: ["FueledSwiftUI"] + ), + .library( + name: "FueledSwiftConcurrency", + targets: ["FueledSwiftConcurrency"] ), ], dependencies: [ .package(url: "https://github.com/swiftlang/swift-docc-plugin", from: "1.4.5"), + .package(url: "https://github.com/apple/swift-async-algorithms", from: "1.1.1"), + .package(url: "https://github.com/pointfreeco/swift-concurrency-extras", from: "1.3.2"), ], targets: [ .target( - name: "FueledUtilsCore", + name: "FueledCore", path: "Sources/FueledUtils/Core", linkerSettings: [ .linkedFramework("Foundation") ] ), .target( - name: "FueledUtilsCombine", + name: "FueledCombine", dependencies: [ - "FueledUtilsCore" + "FueledCore" ], path: "Sources/FueledUtils/Combine" ), .target( - name: "FueledUtilsSwiftUI", - dependencies: ["FueledUtilsCombine", "FueledUtilsCore"], + name: "FueledSwiftUI", + dependencies: ["FueledCombine", "FueledCore"], path: "Sources/FueledUtils/SwiftUI", linkerSettings: [ .linkedFramework("SwiftUI", .when(platforms: [.iOS, .tvOS, .macOS])), ] ), + .target( + name: "FueledSwiftConcurrency", + dependencies: [ + .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), + .product(name: "ConcurrencyExtras", package: "swift-concurrency-extras"), + ], + path: "Sources/FueledUtils/SwiftConcurrency" + ), .testTarget( - name: "FueledUtilsCombineTests", + name: "FueledCombineTests", dependencies: [ - "FueledUtilsCombine", + "FueledCombine", ], path: "Tests/FueledUtils/CombineTests" ), + .testTarget( + name: "FueledSwiftConcurrencyTests", + dependencies: [ + "FueledSwiftConcurrency" + ], + path: "Tests/FueledUtils/SwiftConcurrencyTests" + ), ] ) diff --git a/Sources/FueledUtils/Combine/Extensions/ObservableObject+Extensions.swift b/Sources/FueledUtils/Combine/Extensions/ObservableObject+Extensions.swift index 5eaa9206..ab52b6bf 100644 --- a/Sources/FueledUtils/Combine/Extensions/ObservableObject+Extensions.swift +++ b/Sources/FueledUtils/Combine/Extensions/ObservableObject+Extensions.swift @@ -14,7 +14,7 @@ import Combine import Foundation -import FueledUtilsCore +import FueledCore extension ObservableObject where Self.ObjectWillChangePublisher == ObservableObjectPublisher { // Perform a one-way link, where the receiver will listen for changes on the object and automatically trigger its `objectWillChange` publisher diff --git a/Sources/FueledUtils/Combine/Extensions/Publisher+Extensions.swift b/Sources/FueledUtils/Combine/Extensions/Publisher+Extensions.swift index 9765ae9a..71390964 100644 --- a/Sources/FueledUtils/Combine/Extensions/Publisher+Extensions.swift +++ b/Sources/FueledUtils/Combine/Extensions/Publisher+Extensions.swift @@ -13,7 +13,7 @@ // limitations under the License. import Combine -import FueledUtilsCore +import FueledCore // MARK: - Helpers Functions public extension Publisher { diff --git a/Sources/FueledUtils/Combine/Operators/Combine+Operators.swift b/Sources/FueledUtils/Combine/Operators/Combine+Operators.swift index fecc38ae..31f11c47 100644 --- a/Sources/FueledUtils/Combine/Operators/Combine+Operators.swift +++ b/Sources/FueledUtils/Combine/Operators/Combine+Operators.swift @@ -13,9 +13,9 @@ // limitations under the License. import Combine -import FueledUtilsCore +import FueledCore -public typealias OptionalProtocol = FueledUtilsCore.OptionalProtocol +public typealias OptionalProtocol = FueledCore.OptionalProtocol // swiftlint:disable generic_type_name diff --git a/Sources/FueledUtils/SwiftConcurrency/AsyncCache.swift b/Sources/FueledUtils/SwiftConcurrency/AsyncCache.swift new file mode 100644 index 00000000..09abb69e --- /dev/null +++ b/Sources/FueledUtils/SwiftConcurrency/AsyncCache.swift @@ -0,0 +1,58 @@ +/// A concurrency-safe asynchronous cache for storing key-value pairs. +/// +/// `AsyncCache` provides thread-safe caching with support for async value providers. +/// Values are computed lazily on first access and cached for subsequent retrievals. +/// +/// Example usage: +/// ```swift +/// let cache = AsyncCache() +/// let data = try await cache.getOrAdd(key: "user-123") { key in +/// try await fetchUserData(id: key) +/// } +/// ``` +public actor AsyncCache { + private var cachedValues: [Key: Value] = [:] + + /// Creates an empty cache. + public init() { + } + + /// Retrieves a value from the cache if available, or computes and caches it using the provided async provider. + /// + /// - Parameters: + /// - key: The key to look up or associate with a new value. + /// - provider: An asynchronous closure that computes the value if not already cached. + /// - Returns: The cached or newly computed value. + /// - Throws: Rethrows any error thrown by the `provider` closure. + public func getOrAdd( + key: Key, + provider: @escaping @Sendable (Key) async throws -> Value + ) async throws -> Value { + if let cachedValue = cachedValues[key] { + return cachedValue + } + + let value = try await provider(key) + cachedValues[key] = value + return value + } + + /// Removes all cached key-value pairs. + public func clear() { + cachedValues.removeAll() + } + + /// Removes a specific key-value pair from the cache. + /// + /// - Parameter key: The key to remove. + /// - Returns: The removed value, or `nil` if the key was not present. + @discardableResult + public func remove(key: Key) -> Value? { + cachedValues.removeValue(forKey: key) + } + + /// Returns the number of cached items. + public var count: Int { + cachedValues.count + } +} diff --git a/Sources/FueledUtils/SwiftConcurrency/AsyncSemaphore.swift b/Sources/FueledUtils/SwiftConcurrency/AsyncSemaphore.swift new file mode 100644 index 00000000..49a1b805 --- /dev/null +++ b/Sources/FueledUtils/SwiftConcurrency/AsyncSemaphore.swift @@ -0,0 +1,168 @@ +import Foundation + +/// An async-compatible counting semaphore that controls access to a resource across multiple execution contexts. +/// +/// Use `AsyncSemaphore` when you need to limit concurrent access to a shared resource in async/await code. +/// Unlike `DispatchSemaphore`, this semaphore suspends tasks without blocking the underlying thread. +/// +/// Example usage: +/// ```swift +/// let semaphore = AsyncSemaphore(value: 3) // Allow 3 concurrent accesses +/// +/// await semaphore.wait() +/// defer { semaphore.signal() } +/// // Access shared resource +/// ``` +/// +/// ## Topics +/// +/// ### Creating a Semaphore +/// +/// - ``init(value:)`` +/// +/// ### Signaling the Semaphore +/// +/// - ``signal()`` +/// +/// ### Waiting for the Semaphore +/// +/// - ``wait()`` +/// - ``waitUnlessCancelled()`` +public final class AsyncSemaphore: @unchecked Sendable { + private class Suspension: @unchecked Sendable { + enum State { + case pending + case suspendedUnlessCancelled(UnsafeContinuation) + case suspended(UnsafeContinuation) + case cancelled + } + + var state: State + + init(state: State) { + self.state = state + } + } + + private var value: Int + private var suspensions: [Suspension] = [] + private let _lock = NSRecursiveLock() + + /// Creates a semaphore with the specified initial value. + /// + /// - Parameter value: The starting value for the semaphore. Must be greater than or equal to zero. + /// - Precondition: `value` must be >= 0. + public init(value: Int) { + precondition(value >= 0, "AsyncSemaphore requires a value equal or greater than zero") + self.value = value + } + + deinit { + precondition(suspensions.isEmpty, "AsyncSemaphore is deallocated while some task(s) are suspended waiting for a signal.") + } + + private func lock() { _lock.lock() } + private func unlock() { _lock.unlock() } + + /// Waits for, or decrements, the semaphore. + /// + /// Decrements the semaphore count. If the resulting value is less than zero, + /// this method suspends the current task until ``signal()`` is called. + /// This suspension does not block the underlying thread. + public func wait() async { + lock() + + value -= 1 + if value >= 0 { + unlock() + return + } + + await withUnsafeContinuation { continuation in + let suspension = Suspension(state: .suspended(continuation)) + suspensions.insert(suspension, at: 0) + unlock() + } + } + + /// Waits for, or decrements, the semaphore with cancellation support. + /// + /// Decrements the semaphore count. If the resulting value is less than zero, + /// this method suspends the current task until ``signal()`` is called. + /// + /// - Throws: `CancellationError` if the task is cancelled before a signal is received. + public func waitUnlessCancelled() async throws { + lock() + + value -= 1 + if value >= 0 { + defer { unlock() } + + do { + try Task.checkCancellation() + } catch { + value += 1 + throw error + } + + return + } + + let suspension = Suspension(state: .pending) + + try await withTaskCancellationHandler { + try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation) in + if case .cancelled = suspension.state { + unlock() + continuation.resume(throwing: CancellationError()) + } else { + suspension.state = .suspendedUnlessCancelled(continuation) + suspensions.insert(suspension, at: 0) + unlock() + } + } + } onCancel: { + lock() + + value += 1 + if let index = suspensions.firstIndex(where: { $0 === suspension }) { + suspensions.remove(at: index) + } + + if case let .suspendedUnlessCancelled(continuation) = suspension.state { + unlock() + continuation.resume(throwing: CancellationError()) + } else { + suspension.state = .cancelled + unlock() + } + } + } + + /// Signals (increments) the semaphore. + /// + /// Increments the semaphore count. If there are tasks suspended in ``wait()`` + /// or ``waitUnlessCancelled()``, one of them will be resumed. + /// + /// - Returns: `true` if a suspended task was resumed, `false` otherwise. + @discardableResult + public func signal() -> Bool { + lock() + + value += 1 + + switch suspensions.popLast()?.state { + case let .suspendedUnlessCancelled(continuation): + unlock() + continuation.resume() + return true + case let .suspended(continuation): + unlock() + continuation.resume() + return true + default: + unlock() + return false + } + } +} diff --git a/Sources/FueledUtils/SwiftConcurrency/AsyncSequence/AsyncSequence+TakeUntil.swift b/Sources/FueledUtils/SwiftConcurrency/AsyncSequence/AsyncSequence+TakeUntil.swift new file mode 100644 index 00000000..ba8c29e0 --- /dev/null +++ b/Sources/FueledUtils/SwiftConcurrency/AsyncSequence/AsyncSequence+TakeUntil.swift @@ -0,0 +1,65 @@ +public extension AsyncSequence { + /// Takes elements from the sequence until a condition is met, including the element that satisfies it. + /// + /// - Parameter condition: A closure that evaluates each element. Returns `true` to stop taking elements. + /// - Returns: A `TakeUntilAsyncSequence` that emits elements until the condition is satisfied. + func takeUntil(_ condition: @escaping @Sendable (Element) -> Bool) -> TakeUntilAsyncSequence { + TakeUntilAsyncSequence(self, condition: condition) + } +} + +/// An async sequence that emits elements until a condition is met. +/// +/// This sequence stops emitting after the first element that satisfies the condition, +/// including that element in the output. +public struct TakeUntilAsyncSequence: AsyncSequence { + public typealias Element = Base.Element + + private let base: Base + private let condition: @Sendable (Element) -> Bool + + /// Creates a take-until sequence. + /// + /// - Parameters: + /// - base: The underlying async sequence. + /// - condition: A closure that determines when to stop. Returns `true` on the last element to emit. + public init(_ base: Base, condition: @escaping @Sendable (Element) -> Bool) { + self.base = base + self.condition = condition + } + + public func makeAsyncIterator() -> AsyncIterator { + AsyncIterator(base: base, condition: condition) + } + + /// The iterator for `TakeUntilAsyncSequence`. + public struct AsyncIterator: AsyncIteratorProtocol { + private var baseIterator: Base.AsyncIterator + private let condition: @Sendable (Element) -> Bool + private var isFinished = false + + fileprivate init(base: Base, condition: @escaping @Sendable (Element) -> Bool) { + baseIterator = base.makeAsyncIterator() + self.condition = condition + } + + /// Advances to the next element, or returns `nil` if finished or condition is met. + public mutating func next() async throws -> Element? { + guard !isFinished else { + return nil + } + + guard let value = try await baseIterator.next() else { + isFinished = true + return nil + } + + if condition(value) { + isFinished = true + return value + } + + return value + } + } +} diff --git a/Sources/FueledUtils/SwiftConcurrency/AsyncSequence/AsyncSequence+Timeout.swift b/Sources/FueledUtils/SwiftConcurrency/AsyncSequence/AsyncSequence+Timeout.swift new file mode 100644 index 00000000..2b936e3c --- /dev/null +++ b/Sources/FueledUtils/SwiftConcurrency/AsyncSequence/AsyncSequence+Timeout.swift @@ -0,0 +1,114 @@ +import AsyncAlgorithms + +public extension AsyncSequence where Self: Sendable, Element: Sendable { + /// Creates a sequence that times out after the specified interval. + /// + /// - Parameters: + /// - interval: The duration to wait before timing out. + /// - clock: The clock to use for timing. + /// - tolerance: Optional tolerance for the timer. + /// - throwing: Optional error to throw on timeout. If `nil`, the sequence finishes normally. + /// - alwaysFinishAfterTimeout: If `true`, always finishes after timeout. If `false`, + /// only finishes if no value has been received yet. Defaults to `true`. + /// - Returns: A `TimeoutAsyncSequence` that applies the timeout behavior. + func timeout( + for interval: C.Instant.Duration, + clock: C, + tolerance: C.Instant.Duration? = nil, + throwing: (any Error)? = nil, + alwaysFinishAfterTimeout: Bool = true + ) -> TimeoutAsyncSequence { + TimeoutAsyncSequence( + base: self, + for: interval, + clock: clock, + tolerance: tolerance, + throwing: throwing, + alwaysFinishAfterTimeout: alwaysFinishAfterTimeout + ) + } +} + +/// An async sequence that applies a timeout to another sequence. +/// +/// If no element is received within the specified interval, the sequence +/// either finishes or throws an error based on configuration. +public struct TimeoutAsyncSequence: AsyncSequence where Base: AsyncSequence, Base.Element: Sendable, Base: Sendable { + public typealias Element = Base.Element + + let base: Base + let interval: C.Instant.Duration + let tolerance: C.Instant.Duration? + let clock: C + let alwaysFinishAfterTimeout: Bool + let throwing: (any Error)? + + init( + base: Base, + for interval: C.Instant.Duration, + clock: C, + tolerance: C.Instant.Duration? = nil, + throwing: (any Error)? = nil, + alwaysFinishAfterTimeout: Bool + ) { + self.base = base + self.interval = interval + self.tolerance = tolerance + self.clock = clock + self.throwing = throwing + self.alwaysFinishAfterTimeout = alwaysFinishAfterTimeout + } + + public func makeAsyncIterator() -> TimeoutAsyncIterator { + TimeoutAsyncIterator(sequence: self) + } + + /// The iterator for `TimeoutAsyncSequence`. + public struct TimeoutAsyncIterator: AsyncIteratorProtocol { + private enum Event { + case element(Element) + case timeout + } + + private let alwaysFinishAfterTimeout: Bool + private let throwing: (any Error)? + private var isValueReceived = false + private var mergedIterator: AsyncMerge2Sequence>, AsyncStream>>.AsyncIterator + + init(sequence: TimeoutAsyncSequence) { + throwing = sequence.throwing + alwaysFinishAfterTimeout = sequence.alwaysFinishAfterTimeout + let timerSequence = AsyncStream> { continuation in + Task { [clock = sequence.clock, interval = sequence.interval, tolerance = sequence.tolerance] in + try? await clock.sleep(for: interval, tolerance: tolerance) + continuation.yield(.timeout) + continuation.finish() + } + } + let elements = sequence.base.map { Event.element($0) } + let mergedSequence = merge(elements, timerSequence) + mergedIterator = mergedSequence.makeAsyncIterator() + } + + /// Advances to the next element, or returns `nil` if the sequence finishes or times out. + public mutating func next() async throws -> Element? { + switch try await mergedIterator.next() { + case let .element(value): + isValueReceived = true + return value + case .timeout: + if alwaysFinishAfterTimeout || !isValueReceived { + if let throwing { + throw throwing + } else { + return nil + } + } else { + return try await next() + } + case .none: + return nil + } + } + } +} diff --git a/Sources/FueledUtils/SwiftConcurrency/AsyncStream/AsyncStream+FlatMapOptional.swift b/Sources/FueledUtils/SwiftConcurrency/AsyncStream/AsyncStream+FlatMapOptional.swift new file mode 100644 index 00000000..81085944 --- /dev/null +++ b/Sources/FueledUtils/SwiftConcurrency/AsyncStream/AsyncStream+FlatMapOptional.swift @@ -0,0 +1,21 @@ +import ConcurrencyExtras + +public extension AsyncStream where Element: Sendable { + /// Transforms each element into an optional stream, flattening the result. + /// + /// For each element, the transform closure returns either a stream or `nil`. + /// If `nil` is returned, a single `nil` value is emitted. Otherwise, the returned + /// stream is flattened with values wrapped as optionals. + /// + /// - Parameter transform: A closure that takes an element and returns an optional `AsyncStream`. + /// - Returns: A new `AsyncStream` containing the flattened results. + func flatMapOptional(_ transform: @Sendable @escaping (Element) -> AsyncStream?) -> AsyncStream { + flatMap { value -> AsyncStream in + guard let stream = transform(value) else { + return .single(value: nil) + } + return stream.promoteOptional() + } + .eraseToStream() + } +} diff --git a/Sources/FueledUtils/SwiftConcurrency/AsyncStream/AsyncStream+PromoteOptional.swift b/Sources/FueledUtils/SwiftConcurrency/AsyncStream/AsyncStream+PromoteOptional.swift new file mode 100644 index 00000000..d07cff5f --- /dev/null +++ b/Sources/FueledUtils/SwiftConcurrency/AsyncStream/AsyncStream+PromoteOptional.swift @@ -0,0 +1,16 @@ +import ConcurrencyExtras + +public extension AsyncStream where Element: Sendable { + /// Wraps each emitted value in an optional, converting `AsyncStream` to `AsyncStream`. + /// + /// This is useful when you need to unify streams that may or may not have values + /// into a common optional type. + /// + /// - Returns: A new `AsyncStream` where every emitted value is wrapped as `Optional`. + func promoteOptional() -> AsyncStream { + map { + Optional($0) + } + .eraseToStream() + } +} diff --git a/Sources/FueledUtils/SwiftConcurrency/AsyncStream/AsyncStream+Single.swift b/Sources/FueledUtils/SwiftConcurrency/AsyncStream/AsyncStream+Single.swift new file mode 100644 index 00000000..1e12375d --- /dev/null +++ b/Sources/FueledUtils/SwiftConcurrency/AsyncStream/AsyncStream+Single.swift @@ -0,0 +1,14 @@ +public extension AsyncStream { + /// Creates an `AsyncStream` that emits a single value and then completes. + /// + /// Use this when you need to wrap a single value as an async stream. + /// + /// - Parameter value: The value to emit. + /// - Returns: An `AsyncStream` that emits the value once and finishes. + static func single(value: T) -> AsyncStream { + AsyncStream { continuation in + continuation.yield(value) + continuation.finish() + } + } +} diff --git a/Sources/FueledUtils/SwiftConcurrency/AsyncStream/AsyncStream+Timeout.swift b/Sources/FueledUtils/SwiftConcurrency/AsyncStream/AsyncStream+Timeout.swift new file mode 100644 index 00000000..98f44d85 --- /dev/null +++ b/Sources/FueledUtils/SwiftConcurrency/AsyncStream/AsyncStream+Timeout.swift @@ -0,0 +1,60 @@ +import AsyncAlgorithms +import ConcurrencyExtras +import Foundation + +public extension AsyncSequence where Element: Sendable { + /// Creates a stream that completes after the specified duration. + /// + /// - Parameter duration: The time interval in seconds after which the stream finishes. + /// - Returns: An `AsyncStream` that emits elements until the timeout expires. + func timeout(after duration: TimeInterval) -> AsyncStream { + let sequence = UncheckedSendable(self).eraseToStream() + return sequence.timeout(after: duration) + } +} + +private enum TimeoutEvent: Sendable { + case value(Element) + case timeout +} + +public extension AsyncStream where Element: Sendable { + /// Creates a stream that completes after the specified duration. + /// + /// Uses `AsyncTimerSequence` from Swift Async Algorithms to create a timeout. + /// When the duration elapses, the stream finishes and stops emitting values. + /// + /// Example usage: + /// ```swift + /// let stream = AsyncStream { ... } + /// for await value in stream.timeout(after: 5.0) { + /// print(value) + /// } + /// // Stream finishes after 5 seconds + /// ``` + /// + /// - Parameter duration: The time interval in seconds after which the stream finishes. + /// - Returns: An `AsyncStream` that emits elements until the timeout expires. + func timeout(after duration: TimeInterval) -> AsyncStream { + AsyncStream { continuation in + let timer = AsyncTimerSequence(interval: .seconds(duration), clock: .continuous) + let merged = merge( + map { TimeoutEvent.value($0) }, + timer.map { _ in TimeoutEvent.timeout } + ) + Task { + for await event in merged { + switch event { + case .value(let value): + continuation.yield(value) + case .timeout: + continuation.finish() + return + } + } + + continuation.finish() + } + } + } +} diff --git a/Sources/FueledUtils/SwiftConcurrency/BroadcastStream.swift b/Sources/FueledUtils/SwiftConcurrency/BroadcastStream.swift new file mode 100644 index 00000000..060698a5 --- /dev/null +++ b/Sources/FueledUtils/SwiftConcurrency/BroadcastStream.swift @@ -0,0 +1,81 @@ +import Foundation + +/// A thread-safe broadcast stream that emits values to multiple subscribers. +/// +/// `BroadcastStream` allows multiple consumers to subscribe to the same stream of values. +/// When a value is emitted, all active subscribers receive it. New subscribers can optionally +/// receive the last emitted value upon subscription. +/// +/// Example usage: +/// ```swift +/// let broadcast = BroadcastStream() +/// +/// // Subscribe to receive values +/// Task { +/// for await value in broadcast.stream(emitLastValue: true) { +/// print("Received: \(value)") +/// } +/// } +/// +/// // Emit values to all subscribers +/// broadcast.emit(value: "Hello") +/// ``` +public final class BroadcastStream: @unchecked Sendable { + private var currentValue: Value? + private var continuations: [UUID: AsyncStream.Continuation] = [:] + private let queue: DispatchQueue + + /// Creates an empty broadcast stream. + /// + /// - Parameter queueLabel: Optional custom label for the internal dispatch queue. + /// If `nil`, uses the default label `"com.fueled.broadcast-stream"`. + public init(queueLabel: String? = nil) { + queue = DispatchQueue(label: queueLabel ?? "com.fueled.broadcast-stream") + } + + /// Emits a value to all active subscribers. + /// + /// - Parameter value: The value to broadcast to all subscribers. + public func emit(value: Value) { + queue.sync { + currentValue = value + for continuation in continuations.values { + continuation.yield(value) + } + } + } + + /// Creates a new subscriber stream. + /// + /// - Parameter emitLastValue: If `true` and a value has been previously emitted, + /// the subscriber immediately receives that value. Defaults to `false`. + /// - Returns: An `AsyncStream` that receives all future emitted values. + public func stream(emitLastValue: Bool = false) -> AsyncStream { + let id = UUID() + + return AsyncStream { continuation in + queue.sync { + if let currentValue, emitLastValue { + continuation.yield(currentValue) + } + continuations[id] = continuation + } + + continuation.onTermination = { [weak self] _ in + self?.queue.sync { + _ = self?.continuations.removeValue(forKey: id) + } + } + } + } + + /// Finishes all active subscriber streams. + public func finish() { + queue.sync { + for continuation in continuations.values { + continuation.finish() + } + continuations.removeAll() + } + } +} diff --git a/Sources/FueledUtils/SwiftConcurrency/BufferedStream.swift b/Sources/FueledUtils/SwiftConcurrency/BufferedStream.swift new file mode 100644 index 00000000..ca2fd6d9 --- /dev/null +++ b/Sources/FueledUtils/SwiftConcurrency/BufferedStream.swift @@ -0,0 +1,112 @@ +import ConcurrencyExtras + +/// Converts any `AsyncSequence` into a `BufferedStream` for value storage and replay. +public extension AsyncSequence { + /// Converts the async sequence into a `BufferedStream`, allowing values to be stored and replayed. + /// + /// - Returns: A `BufferedStream` that buffers all emitted values for later consumption. + func eraseToBufferedStream() -> BufferedStream where Element: Sendable { + let stream = UncheckedSendable(self).eraseToStream() + return BufferedStream(stream) + } +} + +/// A buffered async stream that stores emitted values and replays them to new consumers. +/// +/// `BufferedStream` maintains an internal buffer of all emitted values. When a consumer +/// starts iterating, they receive all previously buffered values before receiving new ones. +/// +/// Example usage: +/// ```swift +/// let buffer = BufferedStream() +/// buffer.yield(1) +/// buffer.yield(2) +/// +/// // New consumer receives 1, 2, then any future values +/// for await value in buffer { +/// print(value) +/// } +/// ``` +public final class BufferedStream: AsyncSequence, Sendable { + public typealias AsyncIterator = BufferedStreamIterator + public typealias Element = T + + private let buffer = LockIsolated<[T]>([]) + private let continuation = LockIsolated.Continuation?>(nil) + private let isFinished = LockIsolated(false) + + /// Creates an empty buffered stream. + public init() { + } + + fileprivate init(_ stream: AsyncStream) { + Task { + for await value in stream { + yield(value) + } + + finish() + } + } + + private var stream: AsyncStream { + AsyncStream { continuation in + self.continuation.withValue { + $0 = continuation + } + for value in buffer.value { + continuation.yield(value) + } + if isFinished.value { + continuation.finish() + } + } + } + + /// Emits a value into the buffered stream. + /// + /// The value is stored in the buffer and delivered to any active consumer. + /// Future consumers will receive this value when they start iterating. + /// + /// - Parameter value: The value to emit. + public func yield(_ value: T) { + if let continuation = continuation.value { + continuation.yield(value) + } else { + buffer.withValue { + $0.append(value) + } + } + } + + /// Marks the stream as finished, preventing further emissions. + /// + /// After calling this method, any active consumer will complete iteration + /// and new consumers will only receive previously buffered values. + public func finish() { + isFinished.withValue { + $0 = true + } + continuation.withValue { + $0?.finish() + } + } + + public func makeAsyncIterator() -> BufferedStreamIterator { + BufferedStreamIterator(stream: stream) + } +} + +/// An async iterator for `BufferedStream`. +public struct BufferedStreamIterator: AsyncIteratorProtocol { + private var streamIterator: AsyncStream.AsyncIterator + + init(stream: AsyncStream) { + streamIterator = stream.makeAsyncIterator() + } + + /// Advances to and returns the next element, or `nil` if no more elements exist. + public mutating func next() async throws -> T? { + await streamIterator.next() + } +} diff --git a/Sources/FueledUtils/SwiftConcurrency/CurrentValueAsyncSubject.swift b/Sources/FueledUtils/SwiftConcurrency/CurrentValueAsyncSubject.swift new file mode 100644 index 00000000..4dad096b --- /dev/null +++ b/Sources/FueledUtils/SwiftConcurrency/CurrentValueAsyncSubject.swift @@ -0,0 +1,103 @@ +import Foundation + +/// A thread-safe subject that holds a current value and broadcasts changes to subscribers. +/// +/// `CurrentValueAsyncSubject` is similar to Combine's `CurrentValueSubject` but designed +/// for async/await usage. It always has a current value that subscribers receive immediately +/// upon subscription, followed by any subsequent updates. +/// +/// Example usage: +/// ```swift +/// let subject = CurrentValueAsyncSubject(0) +/// +/// // Read current value synchronously +/// print(subject.value) // 0 +/// +/// // Subscribe to receive updates +/// Task { +/// for await value in subject.values() { +/// print("Received: \(value)") +/// } +/// } +/// +/// // Send new values +/// subject.send(42) +/// ``` +public final class CurrentValueAsyncSubject: @unchecked Sendable { + private let queue: DispatchQueue + private var _value: T + private var continuations: [UUID: AsyncStream.Continuation] = [:] + + /// Creates a subject with the specified initial value. + /// + /// - Parameters: + /// - initialValue: The initial value held by the subject. + /// - queueLabel: Optional custom label for the internal dispatch queue. + /// If `nil`, uses the default label `"com.fueled.current-value-async-subject"`. + public init(_ initialValue: T, queueLabel: String? = nil) { + _value = initialValue + queue = DispatchQueue(label: queueLabel ?? "com.fueled.current-value-async-subject") + } + + /// The current value held by the subject. + /// + /// Reading this property is thread-safe and returns the most recently sent value. + public var value: T { + queue.sync { + _value + } + } + + /// Sends a new value to the subject. + /// + /// The new value becomes the current value and is delivered to all active subscribers. + /// + /// - Parameter newValue: The value to send. + public func send(_ newValue: T) { + queue.async { [weak self] in + guard let self else { + return + } + _value = newValue + for (_, continuation) in continuations { + continuation.yield(newValue) + } + } + } + + /// Creates an async stream that emits the current value immediately, then all subsequent values. + /// + /// - Returns: An `AsyncStream` that starts with the current value and continues with future updates. + public func values() -> AsyncStream { + let id = UUID() + + return AsyncStream { continuation in + queue.async { [weak self] in + guard let self else { + return + } + continuation.yield(_value) + continuations[id] = continuation + + continuation.onTermination = { @Sendable _ in + self.queue.async { + self.continuations.removeValue(forKey: id) + } + } + } + } + } + + /// Finishes all active subscriber streams. + /// + /// After calling this method, all active subscribers complete their iteration. + /// New subscribers created via ``values()`` will still receive the current value. + public func finishContinuations() { + queue.sync { + for (_, continuation) in continuations { + continuation.finish() + } + continuations.removeAll() + } + } +} diff --git a/Tests/FueledUtils-Package.xctestplan b/Tests/FueledUtils-Package.xctestplan index 68bc7d82..7578c436 100644 --- a/Tests/FueledUtils-Package.xctestplan +++ b/Tests/FueledUtils-Package.xctestplan @@ -12,12 +12,19 @@ }, "testTargets" : [ + { + "target" : { + "containerPath" : "container:", + "identifier" : "FueledSwiftConcurrencyTests", + "name" : "FueledSwiftConcurrencyTests" + } + }, { "parallelizable" : false, "target" : { "containerPath" : "container:", - "identifier" : "FueledUtilsCombineTests", - "name" : "FueledUtilsCombineTests" + "identifier" : "FueledCombineTests", + "name" : "FueledCombineTests" } } ], diff --git a/Tests/FueledUtils/CombineTests/AnyCurrentValuePublisherTests.swift b/Tests/FueledUtils/CombineTests/AnyCurrentValuePublisherTests.swift index 5e81ffc8..375000a1 100644 --- a/Tests/FueledUtils/CombineTests/AnyCurrentValuePublisherTests.swift +++ b/Tests/FueledUtils/CombineTests/AnyCurrentValuePublisherTests.swift @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -@testable import FueledUtilsCombine +@testable import FueledCombine import Combine import Testing diff --git a/Tests/FueledUtils/CombineTests/CombineLatestManyTests.swift b/Tests/FueledUtils/CombineTests/CombineLatestManyTests.swift index 614a30dc..adea21a0 100644 --- a/Tests/FueledUtils/CombineTests/CombineLatestManyTests.swift +++ b/Tests/FueledUtils/CombineTests/CombineLatestManyTests.swift @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -@testable import FueledUtilsCombine +@testable import FueledCombine import Combine import Foundation diff --git a/Tests/FueledUtils/CombineTests/SinkForLifetimeTests.swift b/Tests/FueledUtils/CombineTests/SinkForLifetimeTests.swift index 1deb7c3c..b69d29f3 100644 --- a/Tests/FueledUtils/CombineTests/SinkForLifetimeTests.swift +++ b/Tests/FueledUtils/CombineTests/SinkForLifetimeTests.swift @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -@testable import FueledUtilsCombine +@testable import FueledCombine import Combine import Foundation diff --git a/Tests/FueledUtils/SwiftConcurrencyTests/AsyncCacheTests.swift b/Tests/FueledUtils/SwiftConcurrencyTests/AsyncCacheTests.swift new file mode 100644 index 00000000..5ac86a14 --- /dev/null +++ b/Tests/FueledUtils/SwiftConcurrencyTests/AsyncCacheTests.swift @@ -0,0 +1,92 @@ +import ConcurrencyExtras +@testable import FueledSwiftConcurrency +import Testing + +private actor CountActor { + private(set) var count = 0 + + func increment() { + count += 1 + } +} + +@Suite("Async Cache Tests") +struct AsyncCacheTests { + @Test("Retrieves and caches value for a key") + func retrievesAndCachesValue() async throws { + let cache = AsyncCache() + let countActor = CountActor() + + let result1 = try await cache.getOrAdd(key: "test") { key in + await countActor.increment() + return key.count + } + + #expect(result1 == 4) + #expect(await countActor.count == 1) + + // Second call should return cached value + let result2 = try await cache.getOrAdd(key: "test") { key in + await countActor.increment() + return key.count + } + + #expect(result2 == 4) + #expect(await countActor.count == 1) + } + + @Test("Handles different keys independently") + func handlesDifferentKeys() async throws { + let cache = AsyncCache() + + let result1 = try await cache.getOrAdd(key: "test1") { key in + key.count + } + + let result2 = try await cache.getOrAdd(key: "test2") { key in + key.count * 2 + } + + #expect(result1 == 5) + #expect(result2 == 10) + } + + @Test("Clears cache correctly") + func clearsCacheCorrectly() async throws { + let cache = AsyncCache() + + let countActor = CountActor() + let result1 = try await cache.getOrAdd(key: "test") { key in + await countActor.increment() + return key.count + } + + #expect(result1 == 4) + #expect(await countActor.count == 1) + + // Clear the cache + await cache.clear() + + // Subsequent call should invoke provider again + let result2 = try await cache.getOrAdd(key: "test") { key in + await countActor.increment() + return key.count + } + + #expect(result2 == 4) + #expect(await countActor.count == 2) + } + + @Test("Handles provider throwing error") + func handlesProviderError() async throws { + let cache = AsyncCache() + + struct TestError: Error {} + + await #expect(throws: TestError.self) { + _ = try await cache.getOrAdd(key: "test") { _ in + throw TestError() + } + } + } +} diff --git a/Tests/FueledUtils/SwiftConcurrencyTests/AsyncSemaphoreTests.swift b/Tests/FueledUtils/SwiftConcurrencyTests/AsyncSemaphoreTests.swift new file mode 100644 index 00000000..ef008668 --- /dev/null +++ b/Tests/FueledUtils/SwiftConcurrencyTests/AsyncSemaphoreTests.swift @@ -0,0 +1,447 @@ +import ConcurrencyExtras +import Foundation +@testable import FueledSwiftConcurrency +import Testing + +@Suite("AsyncSemaphore Tests") +// swiftlint:disable type_body_length +struct AsyncSemaphoreTests { + @Test("Signal without suspended tasks") + func signalWithoutSuspendedTasks() { + // Check DispatchSemaphore behavior + do { + let sem = DispatchSemaphore(value: 0) + #expect(sem.signal() == 0) + } + do { + let sem = DispatchSemaphore(value: 1) + #expect(sem.signal() == 0) + } + do { + let sem = DispatchSemaphore(value: 2) + #expect(sem.signal() == 0) + } + + // Test that AsyncSemaphore behaves identically + do { + let sem = AsyncSemaphore(value: 0) + let woken = sem.signal() + #expect(!woken) + } + do { + let sem = AsyncSemaphore(value: 1) + let woken = sem.signal() + #expect(!woken) + } + do { + let sem = AsyncSemaphore(value: 2) + let woken = sem.signal() + #expect(!woken) + } + } + + @Test("Signal returns whether it resumes a suspended task") + func signalReturnsWhetherItResumesASuspendedTask() async throws { + let delay: UInt64 = 500_000_000 + + // Check DispatchSemaphore behavior + do { + // Given a thread waiting for the semaphore + let sem = DispatchSemaphore(value: 0) + Thread { sem.wait() }.start() + try await Task.sleep(nanoseconds: delay) + + // First signal wakes the waiting thread + #expect(sem.signal() != 0) + // Second signal does not wake any thread + #expect(sem.signal() == 0) + } + + // Test that AsyncSemaphore behaves identically + do { + // Given a task suspended on the semaphore + let sem = AsyncSemaphore(value: 0) + Task { await sem.wait() } + try await Task.sleep(nanoseconds: delay) + + // First signal resumes the suspended task + #expect(sem.signal()) + // Second signal does not resume any task + #expect(!sem.signal()) + } + } + + @Test("Wait suspends on zero semaphore until signal") + func waitSuspendsOnZeroSemaphoreUntilSignal() async throws { + // Test that AsyncSemaphore behaves correctly + // Given a zero semaphore + let sem = AsyncSemaphore(value: 0) + + // Create a task that will wait on the semaphore + let completed = LockIsolated(false) + let task = Task { + await sem.wait() + completed.setValue(true) + } + + // Give task time to start and suspend + try await Task.sleep(nanoseconds: 500_000_000) + + // Task should be suspended, not completed + #expect(!completed.value) + + // Signal the semaphore + sem.signal() + + // Give task time to resume and complete + try await Task.sleep(nanoseconds: 500_000_000) + + // Task should now be completed + #expect(completed.value) + + // Clean up + task.cancel() + } + + @Test("Cancellation while suspended throws CancellationError") + func cancellationWhileSuspendedThrowsCancellationError() async throws { + let sem = AsyncSemaphore(value: 0) + let errorType = LockIsolated(nil) + let taskCompleted = LockIsolated(false) + + let task = Task { + do { + try await sem.waitUnlessCancelled() + #expect(Bool(false), "Expected CancellationError") + } catch let error as CancellationError { + errorType.setValue(type(of: error)) + } catch { + #expect(Bool(false), "Unexpected error: \(error)") + } + taskCompleted.setValue(true) + } + + try await Task.sleep(nanoseconds: 100_000_000) + task.cancel() + + // Give task time to handle cancellation + try await Task.sleep(nanoseconds: 500_000_000) + + #expect(taskCompleted.value) + #expect(errorType.value == CancellationError.self) + } + + @Test("Cancellation before suspension throws CancellationError") + func cancellationBeforeSuspensionThrowsCancellationError() async throws { + let sem = AsyncSemaphore(value: 0) + let errorType = LockIsolated(nil) + let taskCompleted = LockIsolated(false) + + let task = Task { + // Uncancellable delay + await withUnsafeContinuation { continuation in + DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) { + continuation.resume() + } + } + + do { + try await sem.waitUnlessCancelled() + #expect(Bool(false), "Expected CancellationError") + } catch let error as CancellationError { + errorType.setValue(type(of: error)) + } catch { + #expect(Bool(false), "Unexpected error: \(error)") + } + taskCompleted.setValue(true) + } + + task.cancel() + + // Give task time to complete + try await Task.sleep(nanoseconds: 500_000_000) + + #expect(taskCompleted.value) + #expect(errorType.value == CancellationError.self) + } + + @Test("Cancellation while suspended increments the semaphore") + func cancellationWhileSuspendedIncrementsTheSemaphore() async throws { + // Given a task cancelled while suspended on a semaphore + let sem = AsyncSemaphore(value: 0) + let task = Task { + try await sem.waitUnlessCancelled() + } + try await Task.sleep(nanoseconds: 100_000_000) + task.cancel() + + // Create a second task that waits on the semaphore + let completed = LockIsolated(false) + let task2 = Task { + await sem.wait() + completed.setValue(true) + } + + // Give second task time to start and suspend + try await Task.sleep(nanoseconds: 500_000_000) + + // Task should be suspended, not completed + #expect(!completed.value) + + // Signal the semaphore + sem.signal() + + // Give task time to resume and complete + try await Task.sleep(nanoseconds: 500_000_000) + + // Task should now be completed + #expect(completed.value) + + // Clean up + task2.cancel() + } + + @Test("Cancellation before suspension increments the semaphore") + func cancellationBeforeSuspensionIncrementsTheSemaphore() async throws { + // Given a task cancelled before it waits on a semaphore + let sem = AsyncSemaphore(value: 0) + let task = Task { + // Uncancellable delay + await withUnsafeContinuation { continuation in + DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) { + continuation.resume() + } + } + try await sem.waitUnlessCancelled() + } + task.cancel() + + // Create a second task that waits on the semaphore + let completed = LockIsolated(false) + let task2 = Task { + await sem.wait() + completed.setValue(true) + } + + // Give second task time to start and suspend + try await Task.sleep(nanoseconds: 500_000_000) + + // Task should be suspended, not completed + #expect(!completed.value) + + // Signal the semaphore + sem.signal() + + // Give task time to resume and complete + try await Task.sleep(nanoseconds: 500_000_000) + + // Task should now be completed + #expect(completed.value) + + // Clean up + task2.cancel() + } + + @Test("Cancellation before suspension increments the semaphore (variant)") + func cancellationBeforeSuspensionIncrementsTheSemaphoreVariant() async throws { + // Given a task that waits for a semaphore with value 1 after the + // task has been cancelled + let sem = AsyncSemaphore(value: 1) + let task = Task { + while !Task.isCancelled { + await Task.yield() + } + try await sem.waitUnlessCancelled() + } + task.cancel() + try? await task.value + + // Create a second task that waits on the semaphore + let completed = LockIsolated(false) + let task2 = Task { + await sem.wait() + completed.setValue(true) + } + + // Give second task time to execute + try await Task.sleep(nanoseconds: 500_000_000) + + // Second task should complete without being suspended + #expect(completed.value) + + // Clean up + task2.cancel() + } + + @Test("Semaphore limits concurrent executions of actor method") + func semaphoreLimitsConcurrentExecutionsOfActorMethod() async { + /// An actor that limits the number of concurrent executions of + /// its `run()` method, and counts the effective number of + /// concurrent executions for testing purpose. + actor Runner { + private let semaphore: AsyncSemaphore + private var count = 0 + private(set) var effectiveMaxConcurrentRuns = 0 + + init(maxConcurrentRuns: Int) { + semaphore = AsyncSemaphore(value: maxConcurrentRuns) + } + + func run() async { + await semaphore.wait() + defer { semaphore.signal() } + + count += 1 + effectiveMaxConcurrentRuns = max(effectiveMaxConcurrentRuns, count) + try! await Task.sleep(nanoseconds: 100_000_000) + count -= 1 + } + } + + for maxConcurrentRuns in 1...10 { + let runner = Runner(maxConcurrentRuns: maxConcurrentRuns) + + // Spawn many concurrent tasks + await withTaskGroup(of: Void.self) { group in + for _ in 0..<20 { + group.addTask { + await runner.run() + } + } + } + + let effectiveMaxConcurrentRuns = await runner.effectiveMaxConcurrentRuns + #expect(effectiveMaxConcurrentRuns == maxConcurrentRuns) + } + } + + @Test("Semaphore limits concurrent executions of async method") + func semaphoreLimitsConcurrentExecutionsOfAsyncMethod() async { + /// A class that limits the number of concurrent executions of + /// its `run()` method, and counts the effective number of + /// concurrent executions for testing purpose. + @MainActor + class Runner { + private let semaphore: AsyncSemaphore + private var count = 0 + private(set) var effectiveMaxConcurrentRuns = 0 + + init(maxConcurrentRuns: Int) { + semaphore = AsyncSemaphore(value: maxConcurrentRuns) + } + + func run() async { + await semaphore.wait() + defer { semaphore.signal() } + + count += 1 + effectiveMaxConcurrentRuns = max(effectiveMaxConcurrentRuns, count) + try! await Task.sleep(nanoseconds: 100_000_000) + count -= 1 + } + } + + for maxConcurrentRuns in 1...10 { + let runner = await Runner(maxConcurrentRuns: maxConcurrentRuns) + + // Spawn many concurrent tasks + await withTaskGroup(of: Void.self) { group in + for _ in 0..<20 { + group.addTask { + await runner.run() + } + } + } + + let effectiveMaxConcurrentRuns = await runner.effectiveMaxConcurrentRuns + #expect(effectiveMaxConcurrentRuns == maxConcurrentRuns) + } + } + + @Test("Semaphore limits concurrent executions on single thread") + // swiftlint:disable identifier_name + func semaphoreLimitsConcurrentExecutionsOnSingleThread() async { + /// A class that limits the number of concurrent executions of + /// its `run()` method, and counts the effective number of + /// concurrent executions for testing purpose. + @MainActor + class Runner { + private let semaphore: AsyncSemaphore + private var count = 0 + private(set) var effectiveMaxConcurrentRuns = 0 + + init(maxConcurrentRuns: Int) { + semaphore = AsyncSemaphore(value: maxConcurrentRuns) + } + + func run() async { + await semaphore.wait() + defer { semaphore.signal() } + + count += 1 + effectiveMaxConcurrentRuns = max(effectiveMaxConcurrentRuns, count) + try! await Task.sleep(nanoseconds: 100_000_000) + count -= 1 + } + } + + await Task { @MainActor in + let runner = Runner(maxConcurrentRuns: 3) + async let x0: Void = runner.run() + async let x1: Void = runner.run() + async let x2: Void = runner.run() + async let x3: Void = runner.run() + async let x4: Void = runner.run() + async let x5: Void = runner.run() + async let x6: Void = runner.run() + async let x7: Void = runner.run() + async let x8: Void = runner.run() + async let x9: Void = runner.run() + _ = await (x0, x1, x2, x3, x4, x5, x6, x7, x8, x9) + let effectiveMaxConcurrentRuns = runner.effectiveMaxConcurrentRuns + #expect(effectiveMaxConcurrentRuns == 3) + }.value + } + + @Test("Semaphore limits concurrent executions with cancellation support") + func semaphoreLimitsConcurrentExecutionsWithCancellationSupport() async { + /// An actor that limits the number of concurrent executions of + /// its `run()` method, and counts the effective number of + /// concurrent executions for testing purpose. + actor Runner { + private let semaphore: AsyncSemaphore + private var count = 0 + private(set) var effectiveMaxConcurrentRuns = 0 + + init(maxConcurrentRuns: Int) { + semaphore = AsyncSemaphore(value: maxConcurrentRuns) + } + + func run() async throws { + try await semaphore.waitUnlessCancelled() + defer { semaphore.signal() } + + count += 1 + effectiveMaxConcurrentRuns = max(effectiveMaxConcurrentRuns, count) + try! await Task.sleep(nanoseconds: 100_000_000) + count -= 1 + } + } + + for maxConcurrentRuns in 1...10 { + let runner = Runner(maxConcurrentRuns: maxConcurrentRuns) + + // Spawn many concurrent tasks + await withThrowingTaskGroup(of: Void.self) { group in + for _ in 0..<20 { + group.addTask { + try await runner.run() + } + } + } + + let effectiveMaxConcurrentRuns = await runner.effectiveMaxConcurrentRuns + #expect(effectiveMaxConcurrentRuns == maxConcurrentRuns) + } + } +} diff --git a/Tests/FueledUtils/SwiftConcurrencyTests/AsyncSequenceTests/TakeUntilAsyncSequenceTests.swift b/Tests/FueledUtils/SwiftConcurrencyTests/AsyncSequenceTests/TakeUntilAsyncSequenceTests.swift new file mode 100644 index 00000000..dfed2f73 --- /dev/null +++ b/Tests/FueledUtils/SwiftConcurrencyTests/AsyncSequenceTests/TakeUntilAsyncSequenceTests.swift @@ -0,0 +1,64 @@ +import ConcurrencyExtras +import Foundation +import Testing + +@Suite("TakeUntilAsyncSequence Tests") +struct TakeUntilAsyncSequenceTests { + @Test("TakeUntil stops emitting after condition is met") + func takeUntilStopsAfterCondition() async throws { + let sequence = AsyncStream { continuation in + continuation.yield(1) + continuation.yield(2) + continuation.yield(3) + continuation.yield(4) + continuation.finish() + } + + let condition: @Sendable (Int) -> Bool = { $0 == 3 } + let takeUntilSequence = sequence.takeUntil(condition) + + var receivedValues: [Int] = [] + for try await value in takeUntilSequence { + receivedValues.append(value) + } + + #expect(receivedValues == [1, 2, 3]) + } + + @Test("TakeUntil handles empty sequence") + func takeUntilHandlesEmptySequence() async throws { + let sequence = AsyncStream { continuation in + continuation.finish() + } + + let condition: @Sendable (Int) -> Bool = { $0 == 3 } + let takeUntilSequence = sequence.takeUntil(condition) + + var receivedValues: [Int] = [] + for try await value in takeUntilSequence { + receivedValues.append(value) + } + + #expect(receivedValues.isEmpty) + } + + @Test("TakeUntil handles condition never being met") + func takeUntilHandlesConditionNeverMet() async throws { + let sequence = AsyncStream { continuation in + continuation.yield(1) + continuation.yield(2) + continuation.yield(3) + continuation.finish() + } + + let condition: @Sendable (Int) -> Bool = { $0 == 5 } + let takeUntilSequence = sequence.takeUntil(condition) + + var receivedValues: [Int] = [] + for try await value in takeUntilSequence { + receivedValues.append(value) + } + + #expect(receivedValues == [1, 2, 3]) + } +} diff --git a/Tests/FueledUtils/SwiftConcurrencyTests/AsyncSequenceTests/TimeoutAsyncSequenceTests.swift b/Tests/FueledUtils/SwiftConcurrencyTests/AsyncSequenceTests/TimeoutAsyncSequenceTests.swift new file mode 100644 index 00000000..c3e097d6 --- /dev/null +++ b/Tests/FueledUtils/SwiftConcurrencyTests/AsyncSequenceTests/TimeoutAsyncSequenceTests.swift @@ -0,0 +1,120 @@ +import FueledSwiftConcurrency +import Testing + +struct TimeoutError: Error {} + +@Suite("TimeoutAsyncSequence Tests") +struct TimeoutAsyncSequenceTests { + @Test("Stream emits values until timeout") + func streamEmitsUntilTimeout() async throws { + let sequence1 = AsyncStream { continuation in + Task { + try? await Task.sleep(for: .milliseconds(100)) + continuation.yield(1) + try? await Task.sleep(for: .milliseconds(500)) + continuation.yield(2) + continuation.finish() + } + } + + let sut = sequence1.timeout(for: .milliseconds(200), clock: .continuous, alwaysFinishAfterTimeout: true) + + var receivedValues: [Int] = [] + for try await value in sut { + receivedValues.append(value) + } + + #expect(receivedValues == [1]) + } + + @Test("Stream emits after timeout when receive value from main stream") + func streamEmitsAfterTimeout() async throws { + let sequence1 = AsyncStream { continuation in + Task { + try? await Task.sleep(for: .milliseconds(100)) + continuation.yield(1) + try? await Task.sleep(for: .milliseconds(500)) + continuation.yield(2) + continuation.finish() + } + } + + let sut = sequence1.timeout(for: .milliseconds(200), clock: .continuous, alwaysFinishAfterTimeout: false) + + var receivedValues: [Int] = [] + for try await value in sut { + receivedValues.append(value) + } + + #expect(receivedValues == [1, 2]) + } + + @Test("Stream throw error after timeout") + func streamThrowAfterTimeout() async throws { + let sequence1 = AsyncStream { continuation in + Task { + try? await Task.sleep(for: .milliseconds(100)) + continuation.yield(1) + try? await Task.sleep(for: .milliseconds(500)) + continuation.yield(2) + continuation.finish() + } + } + + let sut = sequence1.timeout( + for: .milliseconds(200), + clock: .continuous, + throwing: TimeoutError(), + alwaysFinishAfterTimeout: true + ) + + var receivedValues: [Int] = [] + + await #expect(throws: TimeoutError.self) { + for try await value in sut { + receivedValues.append(value) + } + } + } + + @Test("Stream not throwing error after timeout") + func streamNotThrowAfterTimeout() async throws { + let sequence1 = AsyncStream { continuation in + Task { + try? await Task.sleep(for: .milliseconds(100)) + continuation.yield(1) + try? await Task.sleep(for: .milliseconds(500)) + continuation.yield(2) + continuation.finish() + } + } + + let sut = sequence1.timeout( + for: .milliseconds(200), + clock: .continuous, + throwing: TimeoutError(), + alwaysFinishAfterTimeout: false + ) + + var receivedValues: [Int] = [] + for try await value in sut { + receivedValues.append(value) + } + + #expect(receivedValues == [1, 2]) + } + + @Test("Stream handles empty source correctly") + func streamHandlesEmptySource() async throws { + var valueReceived = false + let stream = AsyncStream { continuation in + continuation.finish() + } + + for try await _ in stream.timeout(for: .seconds(0.1), clock: .continuous) { + valueReceived = true + } + + #expect(valueReceived == false) + } +} diff --git a/Tests/FueledUtils/SwiftConcurrencyTests/AsyncStreamTests/AsyncStreamFlatMapOptionalTests.swift b/Tests/FueledUtils/SwiftConcurrencyTests/AsyncStreamTests/AsyncStreamFlatMapOptionalTests.swift new file mode 100644 index 00000000..da232171 --- /dev/null +++ b/Tests/FueledUtils/SwiftConcurrencyTests/AsyncStreamTests/AsyncStreamFlatMapOptionalTests.swift @@ -0,0 +1,135 @@ +@testable import FueledSwiftConcurrency +import Testing + +@Suite("AsyncStream FlatMapOptional Tests") +struct AsyncStreamFlatMapOptionalTests { + @Test("Transforms values through streams") + func transformsValuesTest() async { + let originalStream = AsyncStream { continuation in + continuation.yield(1) + continuation.yield(2) + continuation.finish() + } + + let transformedStream = originalStream.flatMapOptional { value -> AsyncStream? in + AsyncStream { continuation in + continuation.yield("\(value) transformed") + continuation.finish() + } + } + + var receivedValues: [String?] = [] + for await value in transformedStream { + receivedValues.append(value) + } + + #expect(receivedValues.count == 2) + #expect(receivedValues[0] == "1 transformed") + #expect(receivedValues[1] == "2 transformed") + } + + @Test("Handles nil streams correctly") + func handlesNilStreamsTest() async { + let originalStream = AsyncStream { continuation in + continuation.yield(1) + continuation.yield(2) + continuation.yield(3) + continuation.finish() + } + + let transformedStream = originalStream.flatMapOptional { value -> AsyncStream? in + if value.isMultiple(of: 2) { + return AsyncStream { continuation in + continuation.yield("Even: \(value)") + continuation.finish() + } + } else { + return nil + } + } + + var receivedValues: [String?] = [] + for await value in transformedStream { + receivedValues.append(value) + } + + #expect(receivedValues.count == 3) + #expect(receivedValues[0] == nil) + #expect(receivedValues[1] == "Even: 2") + #expect(receivedValues[2] == nil) + } + + @Test("Propagates upstream termination") + func propagatesTerminationTest() async { + let originalStream = AsyncStream { continuation in + continuation.yield(1) + continuation.finish() + } + + let transformedStream = originalStream.flatMapOptional { value -> AsyncStream? in + AsyncStream { continuation in + continuation.yield("\(value) transformed") + continuation.yield("additional value") + continuation.finish() + } + } + + var receivedValues: [String?] = [] + for await value in transformedStream { + receivedValues.append(value) + } + + #expect(receivedValues.count == 2) + #expect(receivedValues[0] == "1 transformed") + #expect(receivedValues[1] == "additional value") + } + + @Test("Handles empty source stream") + func handlesEmptySourceTest() async { + let emptyStream = AsyncStream { continuation in + continuation.finish() + } + + let transformedStream = emptyStream.flatMapOptional { value -> AsyncStream? in + AsyncStream { continuation in + continuation.yield("\(value) transformed") + continuation.finish() + } + } + + var receivedValues: [String?] = [] + for await value in transformedStream { + receivedValues.append(value) + } + + #expect(receivedValues.isEmpty) + } + + @Test("Multiple values from inner streams are propagated") + func multipleInnerValuesTest() async { + let originalStream = AsyncStream { continuation in + continuation.yield(1) + continuation.yield(2) + continuation.finish() + } + + let transformedStream = originalStream.flatMapOptional { value -> AsyncStream? in + AsyncStream { continuation in + for i in 1...value { + continuation.yield("\(value).\(i)") + } + continuation.finish() + } + } + + var receivedValues: [String?] = [] + for await value in transformedStream { + receivedValues.append(value) + } + + #expect(receivedValues.count == 3) + #expect(receivedValues[0] == "1.1") + #expect(receivedValues[1] == "2.1") + #expect(receivedValues[2] == "2.2") + } +} diff --git a/Tests/FueledUtils/SwiftConcurrencyTests/AsyncStreamTests/AsyncStreamPromoteOptionalTests.swift b/Tests/FueledUtils/SwiftConcurrencyTests/AsyncStreamTests/AsyncStreamPromoteOptionalTests.swift new file mode 100644 index 00000000..14569d47 --- /dev/null +++ b/Tests/FueledUtils/SwiftConcurrencyTests/AsyncStreamTests/AsyncStreamPromoteOptionalTests.swift @@ -0,0 +1,77 @@ +@testable import FueledSwiftConcurrency +import Testing + +@Suite("AsyncStream PromoteOptional Tests") +struct AsyncStreamPromoteOptionalTests { + @Test("Promotes values to optionals") + func promotesToOptionalsTest() async { + let originalStream = AsyncStream { continuation in + continuation.yield(1) + continuation.finish() + } + + let optionalStream = originalStream.promoteOptional() + + var receivedValue: Int? + for await value in optionalStream { + receivedValue = value + } + + #expect(receivedValue == 1) + } + + @Test("Empty stream is still empty after promotion") + func emptyStreamTest() async { + let emptyStream = AsyncStream { continuation in + continuation.finish() + } + + let optionalStream = emptyStream.promoteOptional() + + var receivedValues: [String?] = [] + for await value in optionalStream { + receivedValues.append(value) + } + + #expect(receivedValues.isEmpty) + } + + @Test("Works with already optional values") + func worksWithOptionalValuesTest() async { + let originalStream = AsyncStream { continuation in + continuation.yield(1) + continuation.yield(nil) + continuation.yield(3) + continuation.finish() + } + + let doubleOptionalStream = originalStream.promoteOptional() + + var receivedValues: [Int??] = [] + for await value in doubleOptionalStream { + receivedValues.append(value) + } + + #expect(receivedValues.count == 3) + #expect(receivedValues[0] == 1) + #expect(receivedValues[1]! == nil) + #expect(receivedValues[2] == 3) + } + + @Test("Stream is properly finished after promotion") + func streamFinishesTest() async { + let originalStream = AsyncStream { continuation in + continuation.yield(42) + continuation.finish() + } + + let optionalStream = originalStream.promoteOptional() + + var iterator = optionalStream.makeAsyncIterator() + let firstValue = await iterator.next() + let secondValue = await iterator.next() + + #expect(firstValue == 42) + #expect(secondValue == nil) + } +} diff --git a/Tests/FueledUtils/SwiftConcurrencyTests/AsyncStreamTests/AsyncStreamSingleTests.swift b/Tests/FueledUtils/SwiftConcurrencyTests/AsyncStreamTests/AsyncStreamSingleTests.swift new file mode 100644 index 00000000..c8d3371a --- /dev/null +++ b/Tests/FueledUtils/SwiftConcurrencyTests/AsyncStreamTests/AsyncStreamSingleTests.swift @@ -0,0 +1,30 @@ +@testable import FueledSwiftConcurrency +import Testing + +@Suite("AsyncStream Single Tests") +struct AsyncStreamSingleTests { + @Test("Single value is emitted correctly") + func singleValueTest() async { + let stream = AsyncStream.single(value: 42) + + var receivedValues: [Int] = [] + for await value in stream { + receivedValues.append(value) + } + + #expect(receivedValues.count == 1) + #expect(receivedValues.first == 42) + } + + @Test("Stream finishes after single value") + func streamFinishesTest() async { + let stream = AsyncStream.single(value: "test") + + var iterator = stream.makeAsyncIterator() + let firstValue = await iterator.next() + let secondValue = await iterator.next() + + #expect(firstValue == "test") + #expect(secondValue == nil) + } +} diff --git a/Tests/FueledUtils/SwiftConcurrencyTests/AsyncStreamTests/AsyncStreamTimeoutTests.swift b/Tests/FueledUtils/SwiftConcurrencyTests/AsyncStreamTests/AsyncStreamTimeoutTests.swift new file mode 100644 index 00000000..ad2ffa12 --- /dev/null +++ b/Tests/FueledUtils/SwiftConcurrencyTests/AsyncStreamTests/AsyncStreamTimeoutTests.swift @@ -0,0 +1,73 @@ +import ConcurrencyExtras +@testable import FueledSwiftConcurrency +import Testing + +@Suite("AsyncStream Timeout Tests") +struct AsyncStreamTimeoutTests { + @Test("Stream emits values until timeout") + func streamEmitsUntilTimeout() async throws { + let streamTuple = AsyncStream.makeStream(of: Int.self) + let receivedValues = LockIsolated([Int]()) + + let timeoutStream = streamTuple + .stream + .eraseToBufferedStream() + + Task { + for i in 0...10 { + streamTuple.continuation.yield(i) + try await Task.sleep(for: .milliseconds(100)) + } + } + + for try await value in timeoutStream.timeout(after: 0.3) { + receivedValues.withValue { + $0.append(value) + } + } + + #expect([3, 4].contains(receivedValues.value.count)) + #expect([[0, 1, 2], [0, 1, 2, 3]].contains(receivedValues.value)) + } + + @Test("Stream finishes immediately on timeout") + func streamFinishesOnTimeout() async throws { + let valueReceived = LockIsolated(false) + + let streamTuple = AsyncStream.makeStream(of: Void.self) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask(priority: .low) { + try await Task.sleep(for: .milliseconds(200)) + streamTuple.continuation.yield() + } + + group.addTask(priority: .high) { + for try await _ in streamTuple.stream.timeout(after: 0.1) { + valueReceived.withValue { + $0 = true + } + } + } + + try await group.next() + group.cancelAll() + } + + #expect(valueReceived.value == false) + } + + @Test("Stream handles empty source correctly") + func streamHandlesEmptySource() async throws { + var valueReceived = false + let stream = AsyncStream { continuation in + continuation.finish() + } + + for try await _ in stream.timeout(after: 0.1) { + valueReceived = true + } + + #expect(valueReceived == false) + } +} diff --git a/Tests/FueledUtils/SwiftConcurrencyTests/BroadcastStreamTests.swift b/Tests/FueledUtils/SwiftConcurrencyTests/BroadcastStreamTests.swift new file mode 100644 index 00000000..f993d9ed --- /dev/null +++ b/Tests/FueledUtils/SwiftConcurrencyTests/BroadcastStreamTests.swift @@ -0,0 +1,140 @@ +import ConcurrencyExtras +import Foundation +@testable import FueledSwiftConcurrency +import Testing + +@Suite("BroadcastStream Tests") +struct BroadcastStreamTests { + @Test("Single listener receives emitted value") + func emitValueToSingleListener() async throws { + let broadcast = BroadcastStream() + let expectedValue = 42 + + let task = Task { + let stream = broadcast.stream(emitLastValue: true) + for await value in stream { + #expect(value == expectedValue) + break + } + } + + broadcast.emit(value: expectedValue) + await task.value + } + + @Test("Multiple listeners receive same emitted value") + func emitValueToMultipleListeners() async throws { + let broadcast = BroadcastStream() + let expectedValue = 42 + let listenerCount = 3 + let counter = LockIsolated(0) + + let tasks = (0..() + let expectedValue = 42 + + broadcast.emit(value: expectedValue) + + let task = Task { + let stream = broadcast.stream(emitLastValue: true) + for await value in stream { + #expect(value == expectedValue) + break + } + } + + await task.value + } + + @Test("New listener shouldn't receive last emitted value if not enabled by the stream") + func newListenerShouldNowReceiveLastEmittedValue() async throws { + let broadcast = BroadcastStream() + let expectedValue = 42 + + broadcast.emit(value: expectedValue) + + let task = Task { + let stream = broadcast.stream(emitLastValue: false) + for await _ in stream { + Issue.record("Shouldn't receive value!") + } + } + + try await Task.sleep(for: .seconds(0.1)) + task.cancel() + } + + @Test("Multiple emissions are received in order") + func multipleEmissions() async throws { + let broadcast = BroadcastStream() + let values = [1, 2, 3, 4, 5] + let receivedValues = LockIsolated<[Int]>([]) + + let task = Task { + let stream = broadcast.stream() + for await value in stream { + receivedValues.withValue { + $0.append( value) + } + if receivedValues.count == values.count { + break + } + } + } + + for value in values { + try? await Task.sleep(for: .milliseconds(100)) + broadcast.emit(value: value) + } + + await task.value + #expect(receivedValues.value == values) + } + + @Test("Listeners are cleaned up after cancellation") + func testListenerCleanupOnCancellation() async throws { + let broadcast = BroadcastStream() + + let task = Task { + let stream = broadcast.stream() + for await _ in stream { + break + } + } + + task.cancel() + try? await Task.sleep(for: .milliseconds(100)) + + // Access the private property for testing + let mirror = Mirror(reflecting: broadcast) + if let continuations = mirror.children.first(where: { $0.label == "continuations" })?.value as? [UUID: AsyncStream.Continuation] { + #expect(continuations.isEmpty) + } + } +} diff --git a/Tests/FueledUtils/SwiftConcurrencyTests/BufferedStreamTests.swift b/Tests/FueledUtils/SwiftConcurrencyTests/BufferedStreamTests.swift new file mode 100644 index 00000000..691d5380 --- /dev/null +++ b/Tests/FueledUtils/SwiftConcurrencyTests/BufferedStreamTests.swift @@ -0,0 +1,202 @@ +import ConcurrencyExtras +@testable import FueledSwiftConcurrency +import Testing + +@Suite("BufferedStream Tests") +struct BufferedStreamTests { + @Test("Empty buffer initially") + func emptyBufferInitiallyTest() async throws { + let stream = BufferedStream() + let task = Task { + var values: [Int] = [] + for try await value in stream { + values.append(value) + if values.count == 1 { + break + } + } + return values + } + + // No values should be received yet + try await Task.sleep(for: .milliseconds(50)) + stream.finish() + let values = try await task.value + #expect(values.isEmpty) + } + + @Test("Yield adds values to buffer before iteration") + func yieldToBufferBeforeIterationTest() async throws { + let stream = BufferedStream() + + stream.yield(1) + stream.yield(2) + stream.yield(3) + + let receivedValues = LockIsolated<[Int]>([]) + let task = Task { + for try await value in stream { + receivedValues.withValue { + $0.append(value) + } + if receivedValues.count == 3 { + break + } + } + } + + try await Task.sleep(for: .milliseconds(100)) + stream.finish() + try await task.value + + #expect(receivedValues.count == 3) + #expect(receivedValues.value == [1, 2, 3]) + } + + @Test("Yield sends values directly to continuation after iteration starts") + func yieldToContinuationAfterIterationTest() async throws { + let stream = BufferedStream() + let receivedValues = LockIsolated<[Int]>([]) + + let task = Task { + for try await value in stream { + receivedValues.withValue { $0.append(value) } + if receivedValues.value.count == 5 { + break + } + } + } + + try await Task.sleep(for: .milliseconds(50)) + + stream.yield(1) + stream.yield(2) + stream.yield(3) + stream.yield(4) + stream.yield(5) + + try await Task.sleep(for: .milliseconds(100)) + stream.finish() + try await task.value + + #expect(receivedValues.value.count == 5) + #expect(receivedValues.value == [1, 2, 3, 4, 5]) + } + + @Test("Finish terminates all subscriptions") + func finishTerminatesTest() async throws { + let stream = BufferedStream() + let receivedValues = LockIsolated<[Int]>([]) + + let task = Task { + for try await value in stream { + receivedValues.withValue { $0.append(value) } + } + } + + try await Task.sleep(for: .milliseconds(50)) + + stream.yield(1) + stream.yield(2) + + try await Task.sleep(for: .milliseconds(50)) + + // Finish should terminate the stream + stream.finish() + + try await Task.sleep(for: .milliseconds(50)) + + // These should not be received + stream.yield(3) + stream.yield(4) + + try await Task.sleep(for: .milliseconds(100)) + task.cancel() + + #expect(receivedValues.value.count == 2) + #expect(receivedValues.value == [1, 2]) + } + + @Test("Can create from an existing AsyncStream") + func createFromAsyncStreamTest() async throws { + let originalStream = AsyncStream { continuation in + continuation.yield(1) + continuation.yield(2) + continuation.yield(3) + continuation.finish() + } + + let bufferedStream = originalStream.eraseToBufferedStream() + + var receivedValues: [Int] = [] + for try await value in bufferedStream { + receivedValues.append(value) + } + + #expect(receivedValues.count == 3) + #expect(receivedValues == [1, 2, 3]) + } + + @Test("Iterator returns nil after finish") + func iteratorReturnsNilAfterFinishTest() async throws { + let stream = BufferedStream() + + stream.yield(1) + stream.finish() + + var iterator = stream.makeAsyncIterator() + let firstValue = try await iterator.next() + let secondValue = try await iterator.next() + + #expect(firstValue == 1) + #expect(secondValue == nil) + } + + @Test("eraseToBufferedStream preserves values emitted before reading starts") + func eraseToBufferedStreamRaceConditionTest() async throws { + // Create an AsyncStream that emits values immediately + let (originalStream, continuation) = AsyncStream.makeStream() + + // Convert to buffered stream + let bufferedStream = originalStream.eraseToBufferedStream() + + // Emit values BEFORE anyone starts reading (simulating race condition) + continuation.yield("fast_value_1") + continuation.yield("fast_value_2") + continuation.yield("fast_value_3") + + // Small delay to simulate processing time + try await Task.sleep(for: .milliseconds(10)) + + // Now start reading - should get all values that were emitted earlier + let receivedValues = LockIsolated<[String]>([]) + let readingTask = Task { + for try await value in bufferedStream { + receivedValues.withValue { + $0.append(value) + } + if receivedValues.count >= 5 { + break + } + } + } + + // Emit more values after reading started + try await Task.sleep(for: .milliseconds(5)) + continuation.yield("after_reading_1") + continuation.yield("after_reading_2") + + // Wait for reading to complete + try await readingTask.value + + // Verify we got ALL values in correct order + #expect(receivedValues.count == 5) + #expect(receivedValues.value == [ + "fast_value_1", + "fast_value_2", + "fast_value_3", + "after_reading_1", + "after_reading_2", + ]) + } +} diff --git a/Tests/FueledUtils/SwiftConcurrencyTests/CurrentValueAsyncSubjectTests.swift b/Tests/FueledUtils/SwiftConcurrencyTests/CurrentValueAsyncSubjectTests.swift new file mode 100644 index 00000000..c4008c73 --- /dev/null +++ b/Tests/FueledUtils/SwiftConcurrencyTests/CurrentValueAsyncSubjectTests.swift @@ -0,0 +1,107 @@ +import ConcurrencyExtras +@testable import FueledSwiftConcurrency +import Testing + +@Suite("CurrentValueAsyncSubject Tests") +struct CurrentValueAsyncSubjectTests { + @Test("Initial value is set correctly") + func initialValueTest() { + let subject = CurrentValueAsyncSubject(42) + #expect(subject.value == 42) + } + + @Test("Send updates current value") + func sendUpdatesValueTest() async { + let subject = CurrentValueAsyncSubject(0) + subject.send(99) + + try? await Task.sleep(for: .milliseconds(50)) + #expect(subject.value == 99) + } + + @Test("Multiple subscribers receive updates") + func multipleSubscribersTest() async throws { + let subject = CurrentValueAsyncSubject(0) + let receivedValues1 = LockIsolated<[Int]>([]) + let receivedValues2 = LockIsolated<[Int]>([]) + + let task1 = Task { + for await value in subject.values() { + receivedValues1.withValue { + $0.append(value) + } + } + } + + let task2 = Task { + for await value in subject.values() { + receivedValues2.withValue { + $0.append(value) + } + } + } + + try await Task.sleep(for: .milliseconds(50)) + + // Send some values + subject.send(1) + subject.send(2) + subject.send(3) + + try await Task.sleep(for: .milliseconds(100)) + + task1.cancel() + task2.cancel() + + #expect(receivedValues1.value.count == 4) + #expect(receivedValues2.value.count == 4) + #expect(receivedValues1.value == [0, 1, 2, 3]) + #expect(receivedValues2.value == [0, 1, 2, 3]) + } + + @Test("Subscribers receive initial value immediately") + func immediateInitialValueTest() async throws { + let subject = CurrentValueAsyncSubject(42) + let received = LockIsolated(nil) + + let task = Task { + for await value in subject.values() { + received.setValue(value) + break + } + } + + try await Task.sleep(for: .milliseconds(100)) + task.cancel() + + #expect(received.value == 42) + } + + @Test("Finish terminates all subscriptions") + func finishTerminatesTest() async throws { + let subject = CurrentValueAsyncSubject(0) + let receivedValues = LockIsolated<[Int]>([]) + + let task = Task { + for await value in subject.values() { + receivedValues.withValue { + $0.append(value) + } + } + } + + try await Task.sleep(for: .milliseconds(50)) + subject.send(1) + try await Task.sleep(for: .milliseconds(50)) + + subject.finishContinuations() + try await Task.sleep(for: .milliseconds(50)) + + subject.send(2) // This should not be received + try await Task.sleep(for: .milliseconds(50)) + + task.cancel() + + #expect(receivedValues.value == [0, 1]) + } +}