From 6ac32fa41c239ad1dbb9387e2910d15a08ca3b3f Mon Sep 17 00:00:00 2001 From: Luke Howard Date: Wed, 12 Nov 2025 19:13:27 +1100 Subject: [PATCH 1/5] WIP: OCA broker channel manager --- .../OcaBrokerChannelManager.swift | 159 ++++++++++++++++++ .../FlutterSwiftOCA/OcaChannelManager.swift | 38 +++-- 2 files changed, 186 insertions(+), 11 deletions(-) create mode 100644 Sources/FlutterSwiftOCA/OcaBrokerChannelManager.swift diff --git a/Sources/FlutterSwiftOCA/OcaBrokerChannelManager.swift b/Sources/FlutterSwiftOCA/OcaBrokerChannelManager.swift new file mode 100644 index 0000000..7e46d11 --- /dev/null +++ b/Sources/FlutterSwiftOCA/OcaBrokerChannelManager.swift @@ -0,0 +1,159 @@ +// +// Copyright (c) 2025 PADL Software Pty Ltd +// +// Licensed under the Apache License, Version 2.0 (the License); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import AsyncAlgorithms +import AsyncExtensions +@_spi(FlutterSwiftPrivate) +import FlutterSwift +import Foundation +import Logging +@_spi(SwiftOCAPrivate) +import SwiftOCA + +public let OcaBrokerChannelPrefix = "oca-broker/" + +public protocol OcaBrokerChannelManagerDelegate: AnyObject, Sendable {} + +public final class OcaBrokerChannelManager: Sendable { + private let broker: OcaConnectionBroker + private let binaryMessenger: FlutterBinaryMessenger + private let logger: Logger + private let flags: OcaChannelManager.Flags + + private let eventChannel: FlutterEventChannel + private let controlChannel: FlutterMethodChannel + private let channelManagers = + ManagedCriticalState<[OcaConnectionBroker.DeviceIdentifier: OcaChannelManager]>([:]) + + public typealias OnConnectionCallback = @Sendable ( + OcaConnectionBroker.DeviceIdentifier, + Ocp1Connection + ) async throws -> () + + private let onConnectionCallback: OnConnectionCallback? + + @FlutterPlatformThreadActor + public init( + connectionOptions: Ocp1ConnectionOptions, + binaryMessenger: FlutterBinaryMessenger, + logger: Logger, + flags: OcaChannelManager.Flags = [], + propertyEventChannelBufferSize: Int = 10, + onConnectionCallback: OnConnectionCallback? = nil + ) async throws { + broker = await OcaConnectionBroker(connectionOptions: connectionOptions) + self.binaryMessenger = binaryMessenger + self.logger = logger + self.flags = flags + self.onConnectionCallback = onConnectionCallback + + eventChannel = FlutterEventChannel( + name: "\(OcaBrokerChannelPrefix)events", + binaryMessenger: binaryMessenger + ) + controlChannel = FlutterMethodChannel( + name: "\(OcaBrokerChannelPrefix)control", + binaryMessenger: binaryMessenger + ) + + try eventChannel.setStreamHandler( + onListen: onEventListen, + onCancel: onEventCancel + ) + + try eventChannel.allowChannelBufferOverflow(true) + try controlChannel.setMethodCallHandler(onControl) + } + + @Sendable @FlutterPlatformThreadActor + private func onControl( + call: FlutterMethodCall + ) async throws -> Bool { + try await throwingFlutterError { + guard let deviceIdentifierString = call.arguments, + let deviceIdentifier = OcaConnectionBroker.DeviceIdentifier(deviceIdentifierString) + else { + throw Ocp1Error.status(.badFormat) + } + switch call.method { + case "connect": + try await broker.connect(device: deviceIdentifier) + let connection = try await broker.withDeviceConnection(deviceIdentifier) { connection in + try await onConnectionCallback?(deviceIdentifier, connection) + return connection + } + + let channelManager = try await FlutterPlatformThreadActor.run { + try OcaChannelManager( + connection: connection, + binaryMessenger: binaryMessenger, + logger: logger, + flags: flags, + channelSuffix: String(describing: deviceIdentifier) + ) + } + + channelManagers.withCriticalRegion { $0[deviceIdentifier] = channelManager } + case "disconnect": + channelManagers.withCriticalRegion { $0[deviceIdentifier] = nil } + try await broker.disconnect(device: deviceIdentifier) + default: + return false + } + return true + } + } + + @Sendable + private func onEventListen(_ target: String?) async throws + -> FlutterEventStream + { + try await throwingFlutterError { + await broker.events.compactMap { event in + let eventTypeString: String + + switch event.eventType { + case .deviceAdded: + eventTypeString = "added" + case .deviceRemoved: + eventTypeString = "removed" + case .connectionStateChanged: + return nil + } + + return try AnyFlutterStandardCodable([eventTypeString, event.deviceIdentifier.id]) + }.eraseToAnyAsyncSequence() + } + } + + @Sendable + private func onEventCancel(_ target: String?) async throws { + try await throwingFlutterError {} + } + + private func throwingFlutterError(_ block: @Sendable () async throws -> T) async throws -> T { + do { + return try await block() + } catch let error as Ocp1Error { + let flutterError = FlutterError( + error: error, + channelPrefix: OcaBrokerChannelPrefix + ) + logger.trace("throwing \(flutterError)") + throw flutterError + } + } +} diff --git a/Sources/FlutterSwiftOCA/OcaChannelManager.swift b/Sources/FlutterSwiftOCA/OcaChannelManager.swift index 83db899..ce0637d 100644 --- a/Sources/FlutterSwiftOCA/OcaChannelManager.swift +++ b/Sources/FlutterSwiftOCA/OcaChannelManager.swift @@ -48,6 +48,7 @@ Sendable { private let binaryMessenger: FlutterBinaryMessenger private let logger: Logger private let flags: Flags + private let channelSuffix: String? // method channels private let methodChannel: FlutterMethodChannel @@ -95,6 +96,14 @@ Sendable { private let subscriptions: ManagedCriticalState + private static func _makeChannelPrefix(with channelSuffix: String?) -> String { + if let channelSuffix { + return OcaChannelPrefix + channelSuffix + "/" + } else { + return OcaChannelPrefix + } + } + public struct Flags: OptionSet, Sendable { public typealias RawValue = UInt @@ -113,28 +122,31 @@ Sendable { binaryMessenger: FlutterBinaryMessenger, logger: Logger, flags: Flags = [], - propertyEventChannelBufferSize: Int = 10 + propertyEventChannelBufferSize: Int = 10, + channelSuffix: String? = nil ) throws { self.connection = connection self.binaryMessenger = binaryMessenger self.logger = logger self.flags = flags + self.channelSuffix = channelSuffix subscriptions = ManagedCriticalState(EventSubscriptions()) + let channelPrefix = Self._makeChannelPrefix(with: channelSuffix) methodChannel = FlutterMethodChannel( - name: "\(OcaChannelPrefix)method", + name: "\(channelPrefix)method", binaryMessenger: binaryMessenger ) getPropertyChannel = FlutterMethodChannel( - name: "\(OcaChannelPrefix)get_property", + name: "\(channelPrefix)get_property", binaryMessenger: binaryMessenger ) setPropertyChannel = FlutterMethodChannel( - name: "\(OcaChannelPrefix)set_property", + name: "\(channelPrefix)set_property", binaryMessenger: binaryMessenger ) sampleRateChannel = FlutterMethodChannel( - name: "\(OcaChannelPrefix)sample_rate", + name: "\(channelPrefix)sample_rate", binaryMessenger: binaryMessenger ) datasetBlobChannel = FlutterMethodChannel( @@ -146,15 +158,15 @@ Sendable { binaryMessenger: binaryMessenger ) propertyEventChannel = FlutterEventChannel( - name: "\(OcaChannelPrefix)property_event", + name: "\(channelPrefix)property_event", binaryMessenger: binaryMessenger ) meteringEventChannel = FlutterEventChannel( - name: "\(OcaChannelPrefix)metering_event", + name: "\(channelPrefix)metering_event", binaryMessenger: binaryMessenger ) connectionStateChannel = FlutterEventChannel( - name: "\(OcaChannelPrefix)connection_state", + name: "\(channelPrefix)connection_state", binaryMessenger: binaryMessenger ) @@ -192,7 +204,10 @@ Sendable { do { return try await block() } catch let error as Ocp1Error { - let flutterError = FlutterError(error: error) + let flutterError = FlutterError( + error: error, + channelPrefix: Self._makeChannelPrefix(with: channelSuffix) + ) logger.trace("throwing \(flutterError)") throw flutterError } @@ -690,10 +705,11 @@ extension FlutterError { init( error: Ocp1Error, message: String? = nil, - stacktrace: String? = nil + stacktrace: String? = nil, + channelPrefix: String ) { self.init( - code: "\(OcaChannelPrefix)" + String(describing: error), + code: "\(channelPrefix)" + String(describing: error), message: message, stacktrace: stacktrace ) From 5c8254d766d083630d61390d9b4a3e86e27792df Mon Sep 17 00:00:00 2001 From: Peer Espen Ursfjord Date: Sun, 7 Dec 2025 11:01:25 +0100 Subject: [PATCH 2/5] make platformStateChannel use channelPrefix --- Sources/FlutterSwiftOCA/OcaChannelManager.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/FlutterSwiftOCA/OcaChannelManager.swift b/Sources/FlutterSwiftOCA/OcaChannelManager.swift index bdd10ef..3cddef1 100644 --- a/Sources/FlutterSwiftOCA/OcaChannelManager.swift +++ b/Sources/FlutterSwiftOCA/OcaChannelManager.swift @@ -160,7 +160,7 @@ Sendable { binaryMessenger: binaryMessenger ) platformStateChannel = FlutterMethodChannel( - name: "\(OcaChannelPrefix)platform_state", + name: "\(channelPrefix)platform_state", binaryMessenger: binaryMessenger ) propertyEventChannel = FlutterEventChannel( @@ -201,7 +201,7 @@ Sendable { try meteringEventChannel.allowChannelBufferOverflow(true) try connectionStateChannel.allowChannelBufferOverflow(true) - logger.trace("OCA platform channels ready") + logger.trace("OCA platform channels ready (\(channelSuffix ?? "no suffix"))") Task{ // let Flutter code know it is safe to subsribe to the channels above From 61485caad268edf218b5afff431f38f2175abdea Mon Sep 17 00:00:00 2001 From: Peer Espen Ursfjord Date: Sun, 7 Dec 2025 11:02:08 +0100 Subject: [PATCH 3/5] provide name of discovered device to broker clients --- Sources/FlutterSwiftOCA/OcaBrokerChannelManager.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/FlutterSwiftOCA/OcaBrokerChannelManager.swift b/Sources/FlutterSwiftOCA/OcaBrokerChannelManager.swift index 7e46d11..2aa7440 100644 --- a/Sources/FlutterSwiftOCA/OcaBrokerChannelManager.swift +++ b/Sources/FlutterSwiftOCA/OcaBrokerChannelManager.swift @@ -134,7 +134,7 @@ public final class OcaBrokerChannelManager: Sendable { return nil } - return try AnyFlutterStandardCodable([eventTypeString, event.deviceIdentifier.id]) + return try AnyFlutterStandardCodable([eventTypeString, event.deviceIdentifier.id, event.deviceIdentifier.name]) }.eraseToAnyAsyncSequence() } } From 4a0ecf2719b69ded1896ef1a0a2c06116c9d362c Mon Sep 17 00:00:00 2001 From: Luke Howard Date: Tue, 9 Dec 2025 09:50:15 +1100 Subject: [PATCH 4/5] formatting --- Sources/FlutterSwiftOCA/OcaChannelManager.swift | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/Sources/FlutterSwiftOCA/OcaChannelManager.swift b/Sources/FlutterSwiftOCA/OcaChannelManager.swift index 2736d80..4e1da41 100644 --- a/Sources/FlutterSwiftOCA/OcaChannelManager.swift +++ b/Sources/FlutterSwiftOCA/OcaChannelManager.swift @@ -221,9 +221,12 @@ Sendable { logger.trace("OCA platform channels ready (\(channelSuffix ?? "no suffix"))") - Task{ + Task { // let Flutter code know it is safe to subsribe to the channels above - try await platformStateChannel.invoke(method: OcaPlatformStateReadyMethodName, arguments: true) + try await platformStateChannel.invoke( + method: OcaPlatformStateReadyMethodName, + arguments: true + ) } } From e027b8bfe52ae77f3c8751360bb69c8956ade8ed Mon Sep 17 00:00:00 2001 From: Luke Howard Date: Tue, 9 Dec 2025 09:50:34 +1100 Subject: [PATCH 5/5] add list method to connection broker --- .../FlutterSwiftOCA/OcaBrokerChannelManager.swift | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/Sources/FlutterSwiftOCA/OcaBrokerChannelManager.swift b/Sources/FlutterSwiftOCA/OcaBrokerChannelManager.swift index 2aa7440..27cdf86 100644 --- a/Sources/FlutterSwiftOCA/OcaBrokerChannelManager.swift +++ b/Sources/FlutterSwiftOCA/OcaBrokerChannelManager.swift @@ -81,7 +81,7 @@ public final class OcaBrokerChannelManager: Sendable { @Sendable @FlutterPlatformThreadActor private func onControl( call: FlutterMethodCall - ) async throws -> Bool { + ) async throws -> [String] { try await throwingFlutterError { guard let deviceIdentifierString = call.arguments, let deviceIdentifier = OcaConnectionBroker.DeviceIdentifier(deviceIdentifierString) @@ -110,10 +110,12 @@ public final class OcaBrokerChannelManager: Sendable { case "disconnect": channelManagers.withCriticalRegion { $0[deviceIdentifier] = nil } try await broker.disconnect(device: deviceIdentifier) + case "list": + return await broker.registeredDevices.map { String(describing: $0) } default: - return false + break } - return true + return [] } } @@ -134,7 +136,11 @@ public final class OcaBrokerChannelManager: Sendable { return nil } - return try AnyFlutterStandardCodable([eventTypeString, event.deviceIdentifier.id, event.deviceIdentifier.name]) + return try AnyFlutterStandardCodable([ + eventTypeString, + event.deviceIdentifier.id, + event.deviceIdentifier.name, + ]) }.eraseToAnyAsyncSequence() } }