Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Package.resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 30 additions & 1 deletion Package@swift-5.9.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,38 @@ let package = Package(
],
Product.allTests()
] as [[Product]]).flatMap { $0 },
dependencies: [
// NOTE: Currently used for WASI targets only.
// Avoid using this dependency for any other targets.
.package(
url: "https://github.com/apple/swift-atomics.git",
from: "1.2.0"
),
.package(url: "https://github.com/PassiveLogic/swift-dispatch-async.git", from: "1.0.0")
],
targets: ([
[
.rxTarget(name: "RxSwift", dependencies: []),
.rxTarget(
name: "RxSwift",
dependencies: [
// WASI targets can't use CoreFoundation, but WASI does support
// compiling Swift Atomics.
//
// This dependency is added ONLY for WASI targets, and should NOT
// be added for any other platforms.
.product(
name: "Atomics",
package: "swift-atomics",
condition: .when(platforms: [.wasi])
),

.product(
name: "DispatchAsync",
package: "swift-dispatch-async",
condition: .when(platforms: [.wasi])
),
]
),
],
Target.rxCocoa(),
Target.rxCocoaRuntime(),
Expand Down
33 changes: 33 additions & 0 deletions Platform/AtomicInt.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,37 @@
// Copyright © 2018 Krunoslav Zaher. All rights reserved.
//

#if canImport(Atomics)

import Atomics

typealias AtomicInt = ManagedAtomic<Int32>

@discardableResult
@inline(__always)
func add(_ this: AtomicInt, _ value: Int32) -> Int32 {
this.loadThenWrappingIncrement(by: value, ordering: .sequentiallyConsistent)
}

@discardableResult
@inline(__always)
func sub(_ this: AtomicInt, _ value: Int32) -> Int32 {
this.loadThenWrappingDecrement(by: value, ordering: .sequentiallyConsistent)
}

@discardableResult
@inline(__always)
func fetchOr(_ this: AtomicInt, _ mask: Int32) -> Int32 {
this.loadThenBitwiseOr(with: mask, ordering: .sequentiallyConsistent)
}

@inline(__always)
func load(_ this: AtomicInt) -> Int32 {
this.load(ordering: .sequentiallyConsistent)
}

#else

import CoreFoundation
// This CoreFoundation import can be dropped when this issue is resolved:
// https://github.com/swiftlang/swift-corelibs-foundation/pull/5122
Expand Down Expand Up @@ -56,6 +87,8 @@ func load(_ this: AtomicInt) -> Int32 {
return oldValue
}

#endif // end of #else condition

@discardableResult
@inline(__always)
func increment(_ this: AtomicInt) -> Int32 {
Expand Down
4 changes: 4 additions & 0 deletions Platform/DispatchQueue+Extensions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
// Copyright © 2016 Krunoslav Zaher. All rights reserved.
//

#if !os(WASI)

import Dispatch

extension DispatchQueue {
Expand All @@ -19,3 +21,5 @@ extension DispatchQueue {
DispatchQueue.getSpecific(key: token) != nil
}
}

#endif // !os(WASI)
5 changes: 4 additions & 1 deletion RxExample/RxExample/RxExample.xcdatamodeld/.xccurrentversion
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict/>
<dict>
<key>_XCCurrentVersionName</key>
<string>RxExample.xcdatamodel</string>
</dict>
</plist>
23 changes: 23 additions & 0 deletions RxSwift/Binder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,27 @@ public struct Binder<Value>: ObserverType {
/// - parameter target: Target object.
/// - parameter scheduler: Scheduler used to bind the events.
/// - parameter binding: Binding logic.
#if os(WASI)
public init<Target: AnyObject>(_ target: Target, scheduler: ImmediateSchedulerType, binding: @escaping (Target, Value) -> Void) {
weak var weakTarget = target

self.binding = { event in
switch event {
case .next(let element):
_ = scheduler.schedule(element) { element in
if let target = weakTarget {
binding(target, element)
}
return Disposables.create()
}
case .error(let error):
rxFatalErrorInDebug("Binding error: \(error)")
case .completed:
break
}
}
}
#else
public init<Target: AnyObject>(_ target: Target, scheduler: ImmediateSchedulerType = MainScheduler(), binding: @escaping (Target, Value) -> Void) {
weak var weakTarget = target

Expand All @@ -44,6 +65,8 @@ public struct Binder<Value>: ObserverType {
}
}
}
#endif


/// Binds next element to owner view as described in `binding`.
public func on(_ event: Event<Value>) {
Expand Down
4 changes: 4 additions & 0 deletions RxSwift/Date+Dispatch.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
// Copyright © 2019 Krunoslav Zaher. All rights reserved.
//

#if !os(WASI)

import Dispatch
import Foundation

Expand Down Expand Up @@ -62,3 +64,5 @@ extension Date {
}

}

#endif // !os(WASI)
6 changes: 3 additions & 3 deletions RxSwift/ObservableType+Extensions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ extension ObservableType {
disposable = Disposables.create()
}

#if DEBUG
#if DEBUG && !os(WASI)
let synchronizationTracker = SynchronizationTracker()
#endif

let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []

let observer = AnonymousObserver<Element> { event in

#if DEBUG
#if DEBUG && !os(WASI)
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
Expand Down Expand Up @@ -144,7 +144,7 @@ extension Hooks {
#endif
}
private static var _customCaptureSubscriptionCallstack: CustomCaptureSubscriptionCallstack = {
#if DEBUG
#if DEBUG && !os(WASI)
return Thread.callStackSymbols
#else
return []
Expand Down
4 changes: 2 additions & 2 deletions RxSwift/Observables/Create.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observ
// state
private let isStopped = AtomicInt(0)

#if DEBUG
#if DEBUG && !os(WASI)
private let synchronizationTracker = SynchronizationTracker()
#endif

Expand All @@ -38,7 +38,7 @@ final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observ
}

func on(_ event: Event<Element>) {
#if DEBUG
#if DEBUG && !os(WASI)
self.synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self.synchronizationTracker.unregister() }
#endif
Expand Down
4 changes: 4 additions & 0 deletions RxSwift/Observables/Delay.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
// Copyright © 2016 Krunoslav Zaher. All rights reserved.
//

#if !os(WASI)

import Foundation

extension ObservableType {
Expand Down Expand Up @@ -172,3 +174,5 @@ final private class Delay<Element>: Producer<Element> {
return (sink: sink, subscription: subscription)
}
}

#endif // !os(WASI)
2 changes: 2 additions & 0 deletions RxSwift/Observables/Generate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//

#if !os(WASI)
extension ObservableType {
/**
Generates an observable sequence by running a state-driven loop producing the sequence's elements, using the specified scheduler
Expand All @@ -23,6 +24,7 @@ extension ObservableType {
Generate(initialState: initialState, condition: condition, iterate: iterate, resultSelector: { $0 }, scheduler: scheduler)
}
}
#endif // !os(WASI)

final private class GenerateSink<Sequence, Observer: ObserverType>: Sink<Observer> {
typealias Parent = Generate<Sequence, Observer.Element>
Expand Down
26 changes: 26 additions & 0 deletions RxSwift/Observables/Merge.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ extension ObservableType where Element: ObservableConvertibleType {
Merge(source: self.asObservable())
}

#if !os(WASI)
/**
Merges elements from all inner observable sequences into a single observable sequence, limiting the number of concurrent subscriptions to inner sequences.

Expand All @@ -65,8 +66,10 @@ extension ObservableType where Element: ObservableConvertibleType {
-> Observable<Element.Element> {
MergeLimited(source: self.asObservable(), maxConcurrent: maxConcurrent)
}
#endif // !os(WASI)
}

#if !os(WASI)
extension ObservableType where Element: ObservableConvertibleType {

/**
Expand All @@ -80,6 +83,7 @@ extension ObservableType where Element: ObservableConvertibleType {
self.merge(maxConcurrent: 1)
}
}
#endif // !os(WASI)

extension ObservableType {
/**
Expand Down Expand Up @@ -243,6 +247,27 @@ private class MergeLimitedSink<SourceElement, SourceSequence: ObservableConverti

func dequeueNextAndSubscribe() {
if let next = queue.dequeue() {
// HACK: concatMap depends on MergeLimitedSink, which depends on CurrentThreadScheduler.
// But CurrentThreadScheduler is challenging to implement, since SwiftWasm doesn't have
// a Thread.current API.
//
// Two options:
//
// 1) Add overload to concatMap that requires a scheduler pipe that to here.
//
// 2) Since Wasm is main-thread only for now, assume main thread and queue a simple Task
// to the main thread.
//
// Option 2 is more of hack, but it's easier. Going with the hack for now.
#if os(WASI)
// subscribing immediately can produce values immediately which can re-enter and cause stack overflows
Task { @MainActor [] in
// lock again
self.lock.performLocked {
self.subscribe(next, group: self.group)
}
}
#else
// subscribing immediately can produce values immediately which can re-enter and cause stack overflows
let disposable = CurrentThreadScheduler.instance.schedule(()) { _ in
// lock again
Expand All @@ -251,6 +276,7 @@ private class MergeLimitedSink<SourceElement, SourceSequence: ObservableConverti
}
}
_ = group.insert(disposable)
#endif
}
else {
activeCount -= 1
Expand Down
10 changes: 8 additions & 2 deletions RxSwift/Observables/ObserveOn.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ extension ObservableType {
- parameter scheduler: Scheduler to notify observers on.
- returns: The source sequence whose observations happen on the specified scheduler.
*/
public func observe(on scheduler: ImmediateSchedulerType)
-> Observable<Element> {
public func observe(on scheduler: ImmediateSchedulerType) -> Observable<Element> {
#if os(WASI)
return ObserveOn(source: self.asObservable(), scheduler: scheduler)
#else
guard let serialScheduler = scheduler as? SerialDispatchQueueScheduler else {
return ObserveOn(source: self.asObservable(), scheduler: scheduler)
}

return ObserveOnSerialDispatchQueue(source: self.asObservable(),
scheduler: serialScheduler)
#endif
}

/**
Expand Down Expand Up @@ -176,6 +179,8 @@ final private class ObserveOnSink<Observer: ObserverType>: ObserverBase<Observer
}
#endif

#if !os(WASI)

final private class ObserveOnSerialDispatchQueueSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
let scheduler: SerialDispatchQueueScheduler
let observer: Observer
Expand Down Expand Up @@ -241,3 +246,4 @@ final private class ObserveOnSerialDispatchQueue<Element>: Producer<Element> {
}
#endif
}
#endif // !os(WASI)
9 changes: 9 additions & 0 deletions RxSwift/Observables/Producer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ class Producer<Element>: Observable<Element> {
}

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
#if os(WASI) // TODO: Is this simplification ok? Also, if CurrentThreadScheduler compiled, it would be better to use that.
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

return disposer
#else
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
Expand All @@ -29,6 +37,7 @@ class Producer<Element>: Observable<Element> {
return disposer
}
}
#endif // !os(WASI)
}

func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
Expand Down
Loading
Loading