diff --git a/Sources/MRP/Applications/MMRP/MMRPApplication.swift b/Sources/MRP/Applications/MMRP/MMRPApplication.swift index 97287b40..4c0c4abc 100644 --- a/Sources/MRP/Applications/MMRP/MMRPApplication.swift +++ b/Sources/MRP/Applications/MMRP/MMRPApplication.swift @@ -38,34 +38,32 @@ protocol MMRPAwareBridge
: Bridge where P: Port {
) async throws
}
-public final class MMRPApplication ? { _controller.object }
+ public nonisolated var controller: MRPController ? { _controller.object }
- let _participants =
- Mutex<[MAPContextIdentifier: Set ) async throws {
@@ -74,14 +72,13 @@ public final class MMRPApplication ? { _controller.object }
+ public nonisolated var controller: MRPController ? { _controller.object }
- let _participants =
- Mutex<[MAPContextIdentifier: Set ]>([:])
+ fileprivate var _portStates: [P.ID: MSRPPortState ] = [:]
fileprivate let _mmrp: MMRPApplication ?
fileprivate var _priorityMapNotificationTask: Task<(), Error>?
// Convenience accessors for flags
- fileprivate var _forceAvbCapable: Bool { _flags.contains(.forceAvbCapable) }
- fileprivate var _configureQueues: Bool { _flags.contains(.configureQueues) }
- var _ignoreAsCapable: Bool { _flags.contains(.ignoreAsCapable) }
- fileprivate var _talkerPruning: Bool { _flags.contains(.talkerPruning) }
+ fileprivate nonisolated var _forceAvbCapable: Bool { _flags.contains(.forceAvbCapable) }
+ fileprivate nonisolated var _configureQueues: Bool { _flags.contains(.configureQueues) }
+ nonisolated var _ignoreAsCapable: Bool { _flags.contains(.ignoreAsCapable) }
+ fileprivate nonisolated var _talkerPruning: Bool { _flags.contains(.talkerPruning) }
public init(
controller: MRPController ,
@@ -228,12 +227,10 @@ public final class MSRPApplication ) throws -> T
) throws -> T {
- try _portStates.withLock {
- if let index = $0.index(forKey: port.id) {
- return try body(&$0.values[index])
- } else {
- throw MRPError.portNotFound
- }
+ if let index = _portStates.index(forKey: port.id) {
+ return try body(&_portStates.values[index])
+ } else {
+ throw MRPError.portNotFound
}
}
@@ -276,85 +273,73 @@ public final class MSRPApplication
- ) throws {
+ ) async throws {
guard contextIdentifier == MAPBaseSpanningTreeContext else { return }
if !_forceAvbCapable {
- _portStates.withLock {
- for port in context {
- guard let index = $0.index(forKey: port.id) else { continue }
- if $0.values[index].msrpPortEnabledStatus != port.isAvbCapable {
- _logger.info("MSRP: port \(port) changed isAvbCapable, now \(port.isAvbCapable)")
- }
- $0.values[index].msrpPortEnabledStatus = port.isAvbCapable
+ for port in context {
+ guard let index = _portStates.index(forKey: port.id) else { continue }
+ if _portStates.values[index].msrpPortEnabledStatus != port.isAvbCapable {
+ _logger.info("MSRP: port \(port) changed isAvbCapable, now \(port.isAvbCapable)")
}
+ _portStates.values[index].msrpPortEnabledStatus = port.isAvbCapable
}
}
- Task {
- for port in context {
- _logger.debug("MSRP: re-declaring domains for port \(port)")
- try await _declareDomains(port: port)
- }
+ for port in context {
+ _logger.debug("MSRP: re-declaring domains for port \(port)")
+ try _declareDomains(port: port)
}
}
func onContextRemoved(
contextIdentifier: MAPContextIdentifier,
with context: MAPContext
- ) throws {
+ ) async throws {
guard contextIdentifier == MAPBaseSpanningTreeContext else { return }
if _configureQueues {
- Task {
- for port in context {
- guard port.isAvbCapable,
- let bridge = (controller?.bridge as? any MSRPAwareBridge ) else { continue }
- do {
- try await bridge.unconfigureQueues(port: port)
- } catch {
- _logger.error("MSRP: failed to unconfigure queues for port \(port): \(error)")
- }
+ for port in context {
+ guard port.isAvbCapable,
+ let bridge = (controller?.bridge as? any MSRPAwareBridge ) else { continue }
+ do {
+ try await bridge.unconfigureQueues(port: port)
+ } catch {
+ _logger.error("MSRP: failed to unconfigure queues for port \(port): \(error)")
}
}
}
- _portStates.withLock {
- for port in context {
- _logger.debug("MSRP: port \(port) disappeared, removing")
- $0.removeValue(forKey: port.id)
- }
+ for port in context {
+ _logger.debug("MSRP: port \(port) disappeared, removing")
+ _portStates.removeValue(forKey: port.id)
}
}
- public var description: String {
- let participants: String = _participants.withLock { String(describing: $0) }
- let portStates: String = _portStates.withLock { String(describing: $0) }
- return "MSRPApplication(controller: \(controller?.description ?? " ,
provisionalTalker: MSRPTalkerAdvertiseValue? = nil
- ) async throws -> [SRclassID: Int] {
+ ) throws -> [SRclassID: Int] {
var bandwidthUsed = [SRclassID: Int]()
// Find all active talkers (those with listeners in ready or readyFailed state)
- var talkers = await _findActiveTalkers(participant: participant)
+ var talkers = _findActiveTalkers(participant: participant)
// Add provisional talker if provided (for bandwidth admission control check)
if let provisionalTalker { talkers.insert(provisionalTalker) }
@@ -744,7 +729,7 @@ extension MSRPApplication {
dataFrameParameters: MSRPDataFrameParameters,
tSpec: MSRPTSpec,
priorityAndRank: MSRPPriorityAndRank
- ) async throws -> Bool {
+ ) throws -> Bool {
let port = participant.port
let provisionalTalker = MSRPTalkerAdvertiseValue(
streamID: streamID,
@@ -754,7 +739,7 @@ extension MSRPApplication {
accumulatedLatency: 0 // or this
)
- let bandwidthUsed = try await _calculateBandwidthUsed(
+ let bandwidthUsed = try _calculateBandwidthUsed(
participant: participant,
portState: portState,
provisionalTalker: provisionalTalker
@@ -789,7 +774,7 @@ extension MSRPApplication {
accumulatedLatency: UInt32,
isNew: Bool,
eventSource: EventSource
- ) async throws {
+ ) throws {
let port = participant.port
do {
@@ -802,7 +787,7 @@ extension MSRPApplication {
throw MSRPFailure(systemID: port.systemID, failureCode: .egressPortIsNotAvbCapable)
}
- if let existingTalkerRegistration = await _findTalkerRegistration(
+ if let existingTalkerRegistration = _findTalkerRegistration(
for: streamID,
participant: participant
), existingTalkerRegistration.dataFrameParameters != dataFrameParameters {
@@ -822,7 +807,7 @@ extension MSRPApplication {
throw MSRPFailure(systemID: port.systemID, failureCode: .egressPortIsNotAvbCapable)
}
- guard await !_isFanInPortLimitReached() else {
+ guard !_isFanInPortLimitReached() else {
_logger.error("MSRP: fan in port limit reached")
throw MSRPFailure(systemID: port.systemID, failureCode: .fanInPortLimitReached)
}
@@ -839,7 +824,7 @@ extension MSRPApplication {
throw MSRPFailure(systemID: port.systemID, failureCode: .maxFrameSizeTooLargeForMedia)
}
- guard try await _checkAvailableBandwidth(
+ guard try _checkAvailableBandwidth(
participant: participant,
portState: portState,
streamID: streamID,
@@ -890,7 +875,7 @@ extension MSRPApplication {
// Deregister the opposite talker type from the peer to ensure mutual exclusion
if eventSource == .peer {
let sourceParticipant = try findParticipant(for: contextIdentifier, port: port)
- try await _enforceTalkerMutualExclusion(
+ try _enforceTalkerMutualExclusion(
participant: sourceParticipant,
declarationType: declarationType,
streamID: talkerValue.streamID,
@@ -938,7 +923,7 @@ extension MSRPApplication {
// Leave the opposite talker declaration type to ensure mutual exclusion
// (per spec, only one talker declaration type should exist per stream)
- try await _enforceTalkerMutualExclusion(
+ try _enforceTalkerMutualExclusion(
participant: participant,
declarationType: declarationType,
streamID: talkerValue.streamID,
@@ -947,7 +932,7 @@ extension MSRPApplication {
if declarationType == .talkerAdvertise {
do {
- try await _canBridgeTalker(
+ try _canBridgeTalker(
participant: participant,
port: port,
streamID: talkerValue.streamID,
@@ -970,7 +955,7 @@ extension MSRPApplication {
.debug(
"MSRP: propagating talker advertise \(talkerAdvertise) to port \(port)"
)
- try await participant.join(
+ try participant.join(
attributeType: MSRPAttributeType.talkerAdvertise.rawValue,
attributeValue: talkerAdvertise,
isNew: false,
@@ -990,7 +975,7 @@ extension MSRPApplication {
.debug(
"MSRP: propagating talker failed \(talkerFailed) on port \(port), error \(error)"
)
- try await participant.join(
+ try participant.join(
attributeType: MSRPAttributeType.talkerFailed.rawValue,
attributeValue: talkerFailed,
isNew: true,
@@ -1012,7 +997,7 @@ extension MSRPApplication {
.debug(
"MSRP: propagating talker failed \(talkerFailed) to port \(port), transitive"
)
- try await participant.join(
+ try participant.join(
attributeType: MSRPAttributeType.talkerFailed.rawValue,
attributeValue: talkerFailed,
isNew: false,
@@ -1042,7 +1027,7 @@ extension MSRPApplication {
// _propagateListenerDeclarationToTalker() will examine all listeners and
// return the merged declaration type, so there is no need to do this
// within the apply() loop
- guard let mergedDeclarationType = try? await _propagateListenerDeclarationToTalker(
+ guard let mergedDeclarationType = try? _propagateListenerDeclarationToTalker(
contextIdentifier: contextIdentifier,
listenerPort: nil,
declarationType: nil,
@@ -1056,7 +1041,7 @@ extension MSRPApplication {
// that didn't exist previously, we do need to update port parameters on
// each talker port that matches the listener stream ID
await apply(for: contextIdentifier) { participant in
- guard let listenerRegistration = await _findListenerRegistration(
+ guard let listenerRegistration = _findListenerRegistration(
for: talkerValue.streamID,
participant: participant
) else {
@@ -1064,7 +1049,7 @@ extension MSRPApplication {
}
// verify talker still exists (guard against race with talker departure)
- guard let currentTalker = await _findTalkerRegistration(
+ guard let currentTalker = _findTalkerRegistration(
for: talkerValue.streamID,
participant: talkerParticipant
), currentTalker.streamID == talkerValue.streamID else {
@@ -1135,8 +1120,8 @@ extension MSRPApplication {
private func _findListenerRegistration(
for streamID: MSRPStreamID,
participant: Participant : Bridge where P: Port {
func deregister(vlan: VLAN, from port: P) async throws
}
-public final class MVRPApplication ? { _controller.object }
+ public nonisolated var controller: MRPController ? { _controller.object }
- let _participants =
- Mutex<[MAPContextIdentifier: Set ,
!bridge.hasLocalMVRPApplicant else { return }
- try await join(
+ try join(
attributeType: MVRPAttributeType.vid.rawValue,
attributeValue: VLAN(contextIdentifier: contextIdentifier),
isNew: true,
@@ -232,21 +229,19 @@ extension MVRPApplication {
func onContextUpdated(
contextIdentifier: MAPContextIdentifier,
with context: MAPContext
- ) throws {}
+ ) async throws {}
func onContextRemoved(
contextIdentifier: MAPContextIdentifier,
with context: MAPContext
- ) throws {
+ ) async throws {
guard let bridge = controller?.bridge as? any MVRPAwareBridge ,
!bridge.hasLocalMVRPApplicant else { return }
- Task {
- try await leave(
- attributeType: MVRPAttributeType.vid.rawValue,
- attributeValue: VLAN(contextIdentifier: contextIdentifier),
- for: MAPBaseSpanningTreeContext
- )
- }
+ try leave(
+ attributeType: MVRPAttributeType.vid.rawValue,
+ attributeValue: VLAN(contextIdentifier: contextIdentifier),
+ for: MAPBaseSpanningTreeContext
+ )
}
}
diff --git a/Sources/MRP/Base/MRPController.swift b/Sources/MRP/Base/MRPController.swift
index 595b2ae2..60b5fb06 100644
--- a/Sources/MRP/Base/MRPController.swift
+++ b/Sources/MRP/Base/MRPController.swift
@@ -116,7 +116,7 @@ public actor MRPController
- ) throws {
+ ) async throws {
for application in _applications.values {
- try application.didUpdate(contextIdentifier: contextIdentifier, with: context)
+ try await application.didUpdate(contextIdentifier: contextIdentifier, with: context)
}
}
private func _didRemove(
contextIdentifier: MAPContextIdentifier,
with context: MAPContext
- ) throws {
+ ) async throws {
for application in _applications.values {
- try application.didRemove(contextIdentifier: contextIdentifier, with: context)
+ try await application.didRemove(contextIdentifier: contextIdentifier, with: context)
}
}
diff --git a/Sources/MRP/Base/Utility.swift b/Sources/MRP/Base/Utility.swift
index 64f58780..7c7b67f4 100644
--- a/Sources/MRP/Base/Utility.swift
+++ b/Sources/MRP/Base/Utility.swift
@@ -30,14 +30,6 @@ extension Weak: Equatable where T: Equatable {
}
}
-// https://stackoverflow.com/questions/25329186/safe-bounds-checked-array-lookup-in-swift-through-optional-bindings
-extension Collection {
- /// Returns the element at the specified index if it is within bounds, otherwise nil.
- subscript(safe index: Index) -> Element? {
- indices.contains(index) ? self[index] : nil
- }
-}
-
extension Array {
/// Creates an array from a collection, padding to a multiple of the specified value.
init(_ collection: some Collection : AnyObject, Equatable, Hashable, Sendable {
+public protocol Application : Actor, Equatable, Hashable, Sendable {
associatedtype P: Port
typealias ApplyFunction ? { get }
+ nonisolated var controller: MRPController ? { get }
// notifications from controller when a port is added, didUpdated or removed
// if contextIdentifier is MAPBaseSpanningTreeContext, the ports are physical
// ports on the bridge; otherwise, they are virtual ports managed by MVRP.
func didAdd(contextIdentifier: MAPContextIdentifier, with context: MAPContext ) async throws
- func didUpdate(contextIdentifier: MAPContextIdentifier, with context: MAPContext ) throws
- func didRemove(contextIdentifier: MAPContextIdentifier, with context: MAPContext ) throws
+ func didUpdate(contextIdentifier: MAPContextIdentifier, with context: MAPContext ) async throws
+ func didRemove(contextIdentifier: MAPContextIdentifier, with context: MAPContext ) async throws
// apply for all participants. if contextIdentifier is nil, then all participants are called
// regardless of contextIdentifier.
@@ -62,12 +62,12 @@ public protocol Application : AnyObject, Equatable, Hashable, Sendable {
_ block: AsyncApplyFunction : AnyObject, Equatable, Hashable, Sendable {
}
public extension Application {
- func hash(into hasher: inout Hasher) {
+ nonisolated func hash(into hasher: inout Hasher) {
etherType.hash(into: &hasher)
}
- static func == (lhs: Self, rhs: Self) -> Bool {
+ nonisolated static func == (lhs: Self, rhs: Self) -> Bool {
lhs.etherType == rhs.etherType
}
}
@@ -118,16 +118,6 @@ extension Application {
}
}
- private func apply : BaseApplication {
@@ -29,8 +29,14 @@ protocol BaseApplicationContextObserver : BaseApplication {
contextIdentifier: MAPContextIdentifier,
with context: MAPContext
) async throws
- func onContextUpdated(contextIdentifier: MAPContextIdentifier, with context: MAPContext ) throws
- func onContextRemoved(contextIdentifier: MAPContextIdentifier, with context: MAPContext ) throws
+ func onContextUpdated(
+ contextIdentifier: MAPContextIdentifier,
+ with context: MAPContext
+ ) async throws
+ func onContextRemoved(
+ contextIdentifier: MAPContextIdentifier,
+ with context: MAPContext
+ ) async throws
}
protocol BaseApplicationEventObserver : BaseApplication {
@@ -61,12 +67,10 @@ extension BaseApplication {
nonBaseContextsSupported || participant
.contextIdentifier == MAPBaseSpanningTreeContext
)
- _participants.withLock {
- if let index = $0.index(forKey: participant.contextIdentifier) {
- $0.values[index].insert(participant)
- } else {
- $0[participant.contextIdentifier] = Set([participant])
- }
+ if let index = _participants.index(forKey: participant.contextIdentifier) {
+ _participants.values[index].insert(participant)
+ } else {
+ _participants[participant.contextIdentifier] = Set([participant])
}
}
@@ -77,9 +81,7 @@ extension BaseApplication {
nonBaseContextsSupported || participant
.contextIdentifier == MAPBaseSpanningTreeContext
)
- _ = _participants.withLock {
- $0[participant.contextIdentifier]?.remove(participant)
- }
+ _participants[participant.contextIdentifier]?.remove(participant)
}
@discardableResult
@@ -87,13 +89,10 @@ extension BaseApplication {
for contextIdentifier: MAPContextIdentifier? = nil,
_ block: AsyncApplyFunction
- ) throws {
+ ) async throws {
if _isParticipantValid(contextIdentifier: contextIdentifier) {
for port in context {
let participant = try findParticipant(
for: contextIdentifier,
port: port
)
- Task { try await participant.redeclare() }
+ try participant.redeclare()
}
}
// also call this regardless of the value of nonBaseContextsSupported, so that
// MVRP can be advised of VLAN changes on a port
if let observer = self as? any BaseApplicationContextObserver {
- try observer.onContextUpdated(contextIdentifier: contextIdentifier, with: context)
+ try await observer.onContextUpdated(contextIdentifier: contextIdentifier, with: context)
}
}
public func didRemove(
contextIdentifier: MAPContextIdentifier,
with context: MAPContext
- ) throws {
+ ) async throws {
// call observer _before_ removing participants so it can do any other cleanup
// also call this regardless of the value of nonBaseContextsSupported, so that
// MVRP can be advised of VLAN changes on a port
if let observer = self as? any BaseApplicationContextObserver {
- try observer.onContextRemoved(contextIdentifier: contextIdentifier, with: context)
+ try await observer.onContextRemoved(contextIdentifier: contextIdentifier, with: context)
}
if _isParticipantValid(contextIdentifier: contextIdentifier) {
for port in context {
@@ -197,7 +193,7 @@ extension BaseApplication {
for: contextIdentifier,
port: port
)
- Task { try await participant.flush() }
+ try participant.flush()
try remove(participant: participant)
}
}
@@ -236,11 +232,11 @@ extension BaseApplication {
attributeValue: some Value,
isNew: Bool,
eventSource: EventSource
- ) async throws {
+ ) throws {
guard shouldPropagate(eventSource: eventSource) else { return }
- try await apply(for: contextIdentifier) { participant in
+ try apply(for: contextIdentifier) { participant in
guard participant.port != port else { return }
- try await participant.join(
+ try participant.join(
attributeType: attributeType,
attributeSubtype: attributeSubtype,
attributeValue: attributeValue,
@@ -275,7 +271,7 @@ extension BaseApplication {
} catch MRPError.doNotPropagateAttribute {
return
}
- try await _propagateJoinIndicated(
+ try _propagateJoinIndicated(
contextIdentifier: contextIdentifier,
port: port,
attributeType: attributeType,
@@ -293,11 +289,11 @@ extension BaseApplication {
attributeSubtype: AttributeSubtype?,
attributeValue: some Value,
eventSource: EventSource
- ) async throws {
+ ) throws {
guard shouldPropagate(eventSource: eventSource) else { return }
- try await apply(for: contextIdentifier) { participant in
+ try apply(for: contextIdentifier) { participant in
guard participant.port != port else { return }
- try await participant.leave(
+ try participant.leave(
attributeType: attributeType,
attributeSubtype: attributeSubtype,
attributeValue: attributeValue,
@@ -329,7 +325,7 @@ extension BaseApplication {
} catch MRPError.doNotPropagateAttribute {
return
}
- try await _propagateLeaveIndicated(
+ try _propagateLeaveIndicated(
contextIdentifier: contextIdentifier,
port: port,
attributeType: attributeType,
diff --git a/Sources/MRP/Model/Participant.swift b/Sources/MRP/Model/Participant.swift
index 8c51b8eb..1297b5db 100644
--- a/Sources/MRP/Model/Participant.swift
+++ b/Sources/MRP/Model/Participant.swift
@@ -100,7 +100,9 @@ private enum EnqueuedEvent