diff --git a/Sources/FlutterSwiftOCA/OcaBrokerChannelManager.swift b/Sources/FlutterSwiftOCA/OcaBrokerChannelManager.swift new file mode 100644 index 0000000..27cdf86 --- /dev/null +++ b/Sources/FlutterSwiftOCA/OcaBrokerChannelManager.swift @@ -0,0 +1,165 @@ +// +// Copyright (c) 2025 PADL Software Pty Ltd +// +// Licensed under the Apache License, Version 2.0 (the License); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import AsyncAlgorithms +import AsyncExtensions +@_spi(FlutterSwiftPrivate) +import FlutterSwift +import Foundation +import Logging +@_spi(SwiftOCAPrivate) +import SwiftOCA + +public let OcaBrokerChannelPrefix = "oca-broker/" + +public protocol OcaBrokerChannelManagerDelegate: AnyObject, Sendable {} + +public final class OcaBrokerChannelManager: Sendable { + private let broker: OcaConnectionBroker + private let binaryMessenger: FlutterBinaryMessenger + private let logger: Logger + private let flags: OcaChannelManager.Flags + + private let eventChannel: FlutterEventChannel + private let controlChannel: FlutterMethodChannel + private let channelManagers = + ManagedCriticalState<[OcaConnectionBroker.DeviceIdentifier: OcaChannelManager]>([:]) + + public typealias OnConnectionCallback = @Sendable ( + OcaConnectionBroker.DeviceIdentifier, + Ocp1Connection + ) async throws -> () + + private let onConnectionCallback: OnConnectionCallback? + + @FlutterPlatformThreadActor + public init( + connectionOptions: Ocp1ConnectionOptions, + binaryMessenger: FlutterBinaryMessenger, + logger: Logger, + flags: OcaChannelManager.Flags = [], + propertyEventChannelBufferSize: Int = 10, + onConnectionCallback: OnConnectionCallback? = nil + ) async throws { + broker = await OcaConnectionBroker(connectionOptions: connectionOptions) + self.binaryMessenger = binaryMessenger + self.logger = logger + self.flags = flags + self.onConnectionCallback = onConnectionCallback + + eventChannel = FlutterEventChannel( + name: "\(OcaBrokerChannelPrefix)events", + binaryMessenger: binaryMessenger + ) + controlChannel = FlutterMethodChannel( + name: "\(OcaBrokerChannelPrefix)control", + binaryMessenger: binaryMessenger + ) + + try eventChannel.setStreamHandler( + onListen: onEventListen, + onCancel: onEventCancel + ) + + try eventChannel.allowChannelBufferOverflow(true) + try controlChannel.setMethodCallHandler(onControl) + } + + @Sendable @FlutterPlatformThreadActor + private func onControl( + call: FlutterMethodCall + ) async throws -> [String] { + try await throwingFlutterError { + guard let deviceIdentifierString = call.arguments, + let deviceIdentifier = OcaConnectionBroker.DeviceIdentifier(deviceIdentifierString) + else { + throw Ocp1Error.status(.badFormat) + } + switch call.method { + case "connect": + try await broker.connect(device: deviceIdentifier) + let connection = try await broker.withDeviceConnection(deviceIdentifier) { connection in + try await onConnectionCallback?(deviceIdentifier, connection) + return connection + } + + let channelManager = try await FlutterPlatformThreadActor.run { + try OcaChannelManager( + connection: connection, + binaryMessenger: binaryMessenger, + logger: logger, + flags: flags, + channelSuffix: String(describing: deviceIdentifier) + ) + } + + channelManagers.withCriticalRegion { $0[deviceIdentifier] = channelManager } + case "disconnect": + channelManagers.withCriticalRegion { $0[deviceIdentifier] = nil } + try await broker.disconnect(device: deviceIdentifier) + case "list": + return await broker.registeredDevices.map { String(describing: $0) } + default: + break + } + return [] + } + } + + @Sendable + private func onEventListen(_ target: String?) async throws + -> FlutterEventStream + { + try await throwingFlutterError { + await broker.events.compactMap { event in + let eventTypeString: String + + switch event.eventType { + case .deviceAdded: + eventTypeString = "added" + case .deviceRemoved: + eventTypeString = "removed" + case .connectionStateChanged: + return nil + } + + return try AnyFlutterStandardCodable([ + eventTypeString, + event.deviceIdentifier.id, + event.deviceIdentifier.name, + ]) + }.eraseToAnyAsyncSequence() + } + } + + @Sendable + private func onEventCancel(_ target: String?) async throws { + try await throwingFlutterError {} + } + + private func throwingFlutterError(_ block: @Sendable () async throws -> T) async throws -> T { + do { + return try await block() + } catch let error as Ocp1Error { + let flutterError = FlutterError( + error: error, + channelPrefix: OcaBrokerChannelPrefix + ) + logger.trace("throwing \(flutterError)") + throw flutterError + } + } +} diff --git a/Sources/FlutterSwiftOCA/OcaChannelManager.swift b/Sources/FlutterSwiftOCA/OcaChannelManager.swift index 0132a8f..4e1da41 100644 --- a/Sources/FlutterSwiftOCA/OcaChannelManager.swift +++ b/Sources/FlutterSwiftOCA/OcaChannelManager.swift @@ -49,6 +49,7 @@ Sendable { private let binaryMessenger: FlutterBinaryMessenger private let logger: Logger private let flags: Flags + private let channelSuffix: String? // method channels private let methodChannel: FlutterMethodChannel @@ -100,6 +101,14 @@ Sendable { private let subscriptions: ManagedCriticalState + private static func _makeChannelPrefix(with channelSuffix: String?) -> String { + if let channelSuffix { + return OcaChannelPrefix + channelSuffix + "/" + } else { + return OcaChannelPrefix + } + } + public struct Flags: OptionSet, Sendable { public typealias RawValue = UInt @@ -119,28 +128,31 @@ Sendable { logger: Logger, flags: Flags = [], propertyEventChannelBufferSize: Int = 10, + channelSuffix: String? = nil, identificationSensorONo: OcaONo = OcaInvalidONo ) throws { self.connection = connection self.binaryMessenger = binaryMessenger self.logger = logger self.flags = flags + self.channelSuffix = channelSuffix subscriptions = ManagedCriticalState(EventSubscriptions()) + let channelPrefix = Self._makeChannelPrefix(with: channelSuffix) methodChannel = FlutterMethodChannel( - name: "\(OcaChannelPrefix)method", + name: "\(channelPrefix)method", binaryMessenger: binaryMessenger ) getPropertyChannel = FlutterMethodChannel( - name: "\(OcaChannelPrefix)get_property", + name: "\(channelPrefix)get_property", binaryMessenger: binaryMessenger ) setPropertyChannel = FlutterMethodChannel( - name: "\(OcaChannelPrefix)set_property", + name: "\(channelPrefix)set_property", binaryMessenger: binaryMessenger ) sampleRateChannel = FlutterMethodChannel( - name: "\(OcaChannelPrefix)sample_rate", + name: "\(channelPrefix)sample_rate", binaryMessenger: binaryMessenger ) datasetBlobChannel = FlutterMethodChannel( @@ -152,19 +164,19 @@ Sendable { binaryMessenger: binaryMessenger ) platformStateChannel = FlutterMethodChannel( - name: "\(OcaChannelPrefix)platform_state", + name: "\(channelPrefix)platform_state", binaryMessenger: binaryMessenger ) propertyEventChannel = FlutterEventChannel( - name: "\(OcaChannelPrefix)property_event", + name: "\(channelPrefix)property_event", binaryMessenger: binaryMessenger ) meteringEventChannel = FlutterEventChannel( - name: "\(OcaChannelPrefix)metering_event", + name: "\(channelPrefix)metering_event", binaryMessenger: binaryMessenger ) connectionStateChannel = FlutterEventChannel( - name: "\(OcaChannelPrefix)connection_state", + name: "\(channelPrefix)connection_state", binaryMessenger: binaryMessenger ) @@ -207,7 +219,7 @@ Sendable { try meteringEventChannel.allowChannelBufferOverflow(true) try connectionStateChannel.allowChannelBufferOverflow(true) - logger.trace("OCA platform channels ready") + logger.trace("OCA platform channels ready (\(channelSuffix ?? "no suffix"))") Task { // let Flutter code know it is safe to subsribe to the channels above @@ -226,7 +238,10 @@ Sendable { do { return try await block() } catch let error as Ocp1Error { - let flutterError = FlutterError(error: error) + let flutterError = FlutterError( + error: error, + channelPrefix: Self._makeChannelPrefix(with: channelSuffix) + ) logger.trace("throwing \(flutterError)") throw flutterError } @@ -741,10 +756,11 @@ extension FlutterError { init( error: Ocp1Error, message: String? = nil, - stacktrace: String? = nil + stacktrace: String? = nil, + channelPrefix: String ) { self.init( - code: "\(OcaChannelPrefix)" + String(describing: error), + code: "\(channelPrefix)" + String(describing: error), message: message, stacktrace: stacktrace )