Skip to content

A Swift package that bridges AsyncSequence with Combine's Publisher protocol, enabling seamless integration between Swift's async/await concurrency model and Combine's reactive programming paradigm.

License

Notifications You must be signed in to change notification settings

heydamianc/AsyncSequencePublisher

Repository files navigation

AsyncSequencePublisher

A Swift package that bridges AsyncSequence with Combine's Publisher protocol, enabling seamless integration between Swift's async/await concurrency model and Combine's reactive programming paradigm.

Features

  • πŸ”„ Convert any AsyncSequence to a Combine Publisher
  • 🌍 Cross-platform support - Works with both Apple's Combine and OpenCombine
  • 🎯 Full backpressure support - Respects Combine's demand system
  • πŸ”’ Thread-safe - Safe to use from any thread or actor context
  • ❌ Proper cancellation handling - Cleans up resources when cancelled
  • πŸ“¦ Zero dependencies on Apple platforms
  • βœ… Comprehensive test coverage

Requirements

  • Swift 5.10+
  • iOS 13.0+ / macOS 10.15+ / tvOS 13.0+ / watchOS 6.0+ / visionOS 1.0+
  • Linux / Windows (via OpenCombine)

Installation

Swift Package Manager

Add the following to your Package.swift file:

dependencies: [
    .package(url: "https://github.com/yourusername/AsyncSequencePublisher.git", from: "1.0.0")
]

Then add AsyncSequencePublisher as a dependency to your target:

targets: [
    .target(
        name: "YourTarget",
        dependencies: ["AsyncSequencePublisher"]
    )
]

Usage

Basic Example

import AsyncSequencePublisher
import Combine

// Create an AsyncSequence
let asyncSequence = AsyncStream<Int> { continuation in
    Task {
        for i in 1...5 {
            continuation.yield(i)
            try? await Task.sleep(nanoseconds: 500_000_000) // 0.5 seconds
        }
        continuation.finish()
    }
}

// Convert to Publisher
let publisher = AsyncSequencePublisher(sequence: asyncSequence)

// Use with Combine
let cancellable = publisher
    .map { $0 * 2 }
    .sink { value in
        print("Received: \(value)")
    }

Working with AsyncThrowingStream

let throwingSequence = AsyncThrowingStream<String, Error> { continuation in
    continuation.yield("Hello")
    continuation.yield("World")
    continuation.finish()
}

let publisher = AsyncSequencePublisher(sequence: throwingSequence)

// Errors are handled gracefully - the publisher completes on error
publisher
    .sink(
        receiveCompletion: { completion in
            print("Completed: \(completion)")
        },
        receiveValue: { value in
            print("Value: \(value)")
        }
    )

Custom AsyncSequence

struct CountdownSequence: AsyncSequence {
    typealias Element = Int
    let from: Int

    struct AsyncIterator: AsyncIteratorProtocol {
        var current: Int

        mutating func next() async -> Int? {
            guard current > 0 else { return nil }
            defer { current -= 1 }
            try? await Task.sleep(nanoseconds: 1_000_000_000)
            return current
        }
    }

    func makeAsyncIterator() -> AsyncIterator {
        AsyncIterator(current: from)
    }
}

// Use with the publisher
let countdown = CountdownSequence(from: 5)
let publisher = AsyncSequencePublisher(sequence: countdown)

Thread Safety

The publisher is safe to use from any thread or actor context:

@MainActor
class ViewModel: ObservableObject {
    private var cancellables = Set<AnyCancellable>()

    func startListening() {
        let sequence = AsyncStream<String> { continuation in
            // Simulate events from different threads
            Task.detached {
                continuation.yield("Event 1")
            }
        }

        AsyncSequencePublisher(sequence: sequence)
            .receive(on: DispatchQueue.main)
            .sink { value in
                // Safe to update UI
                self.updateUI(with: value)
            }
            .store(in: &amp;cancellables)
    }
}

Using the Extension Method

The package provides a convenient extension on AsyncSequence:

// Any AsyncSequence can be converted to a publisher
let publisher = myAsyncSequence.publisher()

// Works with AsyncStream
AsyncStream&lt;String&gt; { continuation in
    continuation.yield(&quot;Hello&quot;)
    continuation.finish()
}
.publisher()
.sink { print($0) }

// Works with AsyncThrowingStream
AsyncThrowingStream&lt;Int, Error&gt; { continuation in
    continuation.yield(42)
    continuation.finish()
}
.publisher()
.sink { print($0) }

// Works with custom AsyncSequences
myCustomSequence
    .publisher()
    .map { transform($0) }
    .sink { process($0) }

This extension method provides a more idiomatic Swift API that feels natural alongside other Combine operators.

Advanced Usage

Multicast Sequences

If you need multiple subscribers to receive all values from a sequence (rather than competing for values), create a sequence that supports multiple iterations:

struct MulticastAsyncSequence<Element>: AsyncSequence, Sendable {
    let elements: [Element]

    func makeAsyncIterator() -> AsyncIterator {
        AsyncIterator(elements: elements)
    }

    struct AsyncIterator: AsyncIteratorProtocol {
        let elements: [Element]
        var index = 0

        mutating func next() async -> Element? {
            guard index < elements.count else { return nil }
            defer { index += 1 }
            return elements[index]
        }
    }
}

Handling Backpressure

The publisher properly implements Combine's demand system:

let subscriber = AnySubscriber<Int, Never>(
    receiveSubscription: { subscription in
        // Request only 3 values at a time
        subscription.request(.max(3))
    },
    receiveValue: { value in
        print("Received: \(value)")
        // Request one more value
        return .max(1)
    },
    receiveCompletion: { _ in
        print("Completed")
    }
)

publisher.subscribe(subscriber)

Cross-Platform Support

This package automatically uses the appropriate framework:

  • Apple Platforms: Uses the built-in Combine framework
  • Linux/Windows: Uses OpenCombine

No code changes needed - just import and use:

#if canImport(Combine)
import Combine
#else
import OpenCombine
#endif
import AsyncSequencePublisher

// Your code works the same on all platforms

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments

  • Thanks to the Swift community for the ongoing evolution of Swift concurrency
  • The OpenCombine project for enabling Combine on non-Apple platforms

About

A Swift package that bridges AsyncSequence with Combine's Publisher protocol, enabling seamless integration between Swift's async/await concurrency model and Combine's reactive programming paradigm.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages