Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions Sources/FlutterSwiftOCA/OcaBrokerChannelManager.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
//
// Copyright (c) 2025 PADL Software Pty Ltd
//
// Licensed under the Apache License, Version 2.0 (the License);
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an 'AS IS' BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

import AsyncAlgorithms
import AsyncExtensions
@_spi(FlutterSwiftPrivate)
import FlutterSwift
import Foundation
import Logging
@_spi(SwiftOCAPrivate)
import SwiftOCA

public let OcaBrokerChannelPrefix = "oca-broker/"

public protocol OcaBrokerChannelManagerDelegate: AnyObject, Sendable {}

public final class OcaBrokerChannelManager: Sendable {
private let broker: OcaConnectionBroker
private let binaryMessenger: FlutterBinaryMessenger
private let logger: Logger
private let flags: OcaChannelManager.Flags

private let eventChannel: FlutterEventChannel
private let controlChannel: FlutterMethodChannel
private let channelManagers =
ManagedCriticalState<[OcaConnectionBroker.DeviceIdentifier: OcaChannelManager]>([:])

public typealias OnConnectionCallback = @Sendable (
OcaConnectionBroker.DeviceIdentifier,
Ocp1Connection
) async throws -> ()

private let onConnectionCallback: OnConnectionCallback?

@FlutterPlatformThreadActor
public init(
connectionOptions: Ocp1ConnectionOptions,
binaryMessenger: FlutterBinaryMessenger,
logger: Logger,
flags: OcaChannelManager.Flags = [],
propertyEventChannelBufferSize: Int = 10,
onConnectionCallback: OnConnectionCallback? = nil
) async throws {
broker = await OcaConnectionBroker(connectionOptions: connectionOptions)
self.binaryMessenger = binaryMessenger
self.logger = logger
self.flags = flags
self.onConnectionCallback = onConnectionCallback

eventChannel = FlutterEventChannel(
name: "\(OcaBrokerChannelPrefix)events",
binaryMessenger: binaryMessenger
)
controlChannel = FlutterMethodChannel(
name: "\(OcaBrokerChannelPrefix)control",
binaryMessenger: binaryMessenger
)

try eventChannel.setStreamHandler(
onListen: onEventListen,
onCancel: onEventCancel
)

try eventChannel.allowChannelBufferOverflow(true)
try controlChannel.setMethodCallHandler(onControl)
}

@Sendable @FlutterPlatformThreadActor
private func onControl(
call: FlutterMethodCall<String>
) async throws -> [String] {
try await throwingFlutterError {
guard let deviceIdentifierString = call.arguments,
let deviceIdentifier = OcaConnectionBroker.DeviceIdentifier(deviceIdentifierString)
else {
throw Ocp1Error.status(.badFormat)
}
switch call.method {
case "connect":
try await broker.connect(device: deviceIdentifier)
let connection = try await broker.withDeviceConnection(deviceIdentifier) { connection in
try await onConnectionCallback?(deviceIdentifier, connection)
return connection
}

let channelManager = try await FlutterPlatformThreadActor.run {
try OcaChannelManager(
connection: connection,
binaryMessenger: binaryMessenger,
logger: logger,
flags: flags,
channelSuffix: String(describing: deviceIdentifier)
)
}

channelManagers.withCriticalRegion { $0[deviceIdentifier] = channelManager }
case "disconnect":
channelManagers.withCriticalRegion { $0[deviceIdentifier] = nil }
try await broker.disconnect(device: deviceIdentifier)
case "list":
return await broker.registeredDevices.map { String(describing: $0) }
default:
break
}
return []
}
}

@Sendable
private func onEventListen(_ target: String?) async throws
-> FlutterEventStream<AnyFlutterStandardCodable>
{
try await throwingFlutterError {
await broker.events.compactMap { event in
let eventTypeString: String

switch event.eventType {
case .deviceAdded:
eventTypeString = "added"
case .deviceRemoved:
eventTypeString = "removed"
case .connectionStateChanged:
return nil
}

return try AnyFlutterStandardCodable([
eventTypeString,
event.deviceIdentifier.id,
event.deviceIdentifier.name,
])
}.eraseToAnyAsyncSequence()
}
}

@Sendable
private func onEventCancel(_ target: String?) async throws {
try await throwingFlutterError {}
}

private func throwingFlutterError<T>(_ block: @Sendable () async throws -> T) async throws -> T {
do {
return try await block()
} catch let error as Ocp1Error {
let flutterError = FlutterError(
error: error,
channelPrefix: OcaBrokerChannelPrefix
)
logger.trace("throwing \(flutterError)")
throw flutterError
}
}
}
40 changes: 28 additions & 12 deletions Sources/FlutterSwiftOCA/OcaChannelManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Sendable {
private let binaryMessenger: FlutterBinaryMessenger
private let logger: Logger
private let flags: Flags
private let channelSuffix: String?

// method channels
private let methodChannel: FlutterMethodChannel
Expand Down Expand Up @@ -100,6 +101,14 @@ Sendable {

private let subscriptions: ManagedCriticalState<EventSubscriptions>

private static func _makeChannelPrefix(with channelSuffix: String?) -> String {
if let channelSuffix {
return OcaChannelPrefix + channelSuffix + "/"
} else {
return OcaChannelPrefix
}
}

public struct Flags: OptionSet, Sendable {
public typealias RawValue = UInt

Expand All @@ -119,28 +128,31 @@ Sendable {
logger: Logger,
flags: Flags = [],
propertyEventChannelBufferSize: Int = 10,
channelSuffix: String? = nil,
identificationSensorONo: OcaONo = OcaInvalidONo
) throws {
self.connection = connection
self.binaryMessenger = binaryMessenger
self.logger = logger
self.flags = flags
self.channelSuffix = channelSuffix
subscriptions = ManagedCriticalState(EventSubscriptions())
let channelPrefix = Self._makeChannelPrefix(with: channelSuffix)

methodChannel = FlutterMethodChannel(
name: "\(OcaChannelPrefix)method",
name: "\(channelPrefix)method",
binaryMessenger: binaryMessenger
)
getPropertyChannel = FlutterMethodChannel(
name: "\(OcaChannelPrefix)get_property",
name: "\(channelPrefix)get_property",
binaryMessenger: binaryMessenger
)
setPropertyChannel = FlutterMethodChannel(
name: "\(OcaChannelPrefix)set_property",
name: "\(channelPrefix)set_property",
binaryMessenger: binaryMessenger
)
sampleRateChannel = FlutterMethodChannel(
name: "\(OcaChannelPrefix)sample_rate",
name: "\(channelPrefix)sample_rate",
binaryMessenger: binaryMessenger
)
datasetBlobChannel = FlutterMethodChannel(
Expand All @@ -152,19 +164,19 @@ Sendable {
binaryMessenger: binaryMessenger
)
platformStateChannel = FlutterMethodChannel(
name: "\(OcaChannelPrefix)platform_state",
name: "\(channelPrefix)platform_state",
binaryMessenger: binaryMessenger
)
propertyEventChannel = FlutterEventChannel(
name: "\(OcaChannelPrefix)property_event",
name: "\(channelPrefix)property_event",
binaryMessenger: binaryMessenger
)
meteringEventChannel = FlutterEventChannel(
name: "\(OcaChannelPrefix)metering_event",
name: "\(channelPrefix)metering_event",
binaryMessenger: binaryMessenger
)
connectionStateChannel = FlutterEventChannel(
name: "\(OcaChannelPrefix)connection_state",
name: "\(channelPrefix)connection_state",
binaryMessenger: binaryMessenger
)

Expand Down Expand Up @@ -207,7 +219,7 @@ Sendable {
try meteringEventChannel.allowChannelBufferOverflow(true)
try connectionStateChannel.allowChannelBufferOverflow(true)

logger.trace("OCA platform channels ready")
logger.trace("OCA platform channels ready (\(channelSuffix ?? "no suffix"))")

Task {
// let Flutter code know it is safe to subsribe to the channels above
Expand All @@ -226,7 +238,10 @@ Sendable {
do {
return try await block()
} catch let error as Ocp1Error {
let flutterError = FlutterError(error: error)
let flutterError = FlutterError(
error: error,
channelPrefix: Self._makeChannelPrefix(with: channelSuffix)
)
logger.trace("throwing \(flutterError)")
throw flutterError
}
Expand Down Expand Up @@ -741,10 +756,11 @@ extension FlutterError {
init(
error: Ocp1Error,
message: String? = nil,
stacktrace: String? = nil
stacktrace: String? = nil,
channelPrefix: String
) {
self.init(
code: "\(OcaChannelPrefix)" + String(describing: error),
code: "\(channelPrefix)" + String(describing: error),
message: message,
stacktrace: stacktrace
)
Expand Down