From ad0ac2964f788f0feb185ad1655f5c77ae88c2ab Mon Sep 17 00:00:00 2001 From: Luke Howard Date: Thu, 12 Feb 2026 16:50:19 +1100 Subject: [PATCH] close socket immediately on controller close, not on deinit --- .../OCP.1/Backend/CF/Ocp1CFController.swift | 32 +++++++++++----- .../Backend/IORing/Ocp1IORingController.swift | 37 +++++++++++++------ 2 files changed, 49 insertions(+), 20 deletions(-) diff --git a/Sources/SwiftOCADevice/OCP.1/Backend/CF/Ocp1CFController.swift b/Sources/SwiftOCADevice/OCP.1/Backend/CF/Ocp1CFController.swift index ccc42ff9..4bf2f012 100644 --- a/Sources/SwiftOCADevice/OCP.1/Backend/CF/Ocp1CFController.swift +++ b/Sources/SwiftOCADevice/OCP.1/Backend/CF/Ocp1CFController.swift @@ -31,6 +31,7 @@ import Foundation import SocketAddress @_spi(SwiftOCAPrivate) import SwiftOCA +import Synchronization import SystemPackage #if canImport(Darwin) import Darwin @@ -91,11 +92,16 @@ actor Ocp1CFStreamController: Ocp1CFControllerPrivate, CustomStringConvertible { private let _messages: AsyncThrowingStream private let _messagesContinuation: AsyncThrowingStream.Continuation - private let socket: _CFSocketWrapper + private let _socket: Mutex<_CFSocketWrapper?> let notificationSocket: _CFSocketWrapper nonisolated var description: String { - "\(type(of: self))(socket: \(socket))" + let socket = socket + return "\(type(of: self))(socket: \(socket != nil ? String(describing: socket!) : ""))" + } + + private nonisolated var socket: _CFSocketWrapper? { + _socket.withLock { $0 } } init( @@ -103,7 +109,7 @@ actor Ocp1CFStreamController: Ocp1CFControllerPrivate, CustomStringConvertible { socket: _CFSocketWrapper, notificationSocket: _CFSocketWrapper ) async { - self.socket = socket + _socket = .init(socket) self.notificationSocket = notificationSocket peerAddress = socket.peerAddress! @@ -118,10 +124,10 @@ actor Ocp1CFStreamController: Ocp1CFControllerPrivate, CustomStringConvertible { connectionPrefix = OcaTcpConnectionPrefix } - receiveMessageTask = Task { [weak self, socket] in + receiveMessageTask = Task { [weak self] in do { repeat { - guard !Task.isCancelled else { break } + guard !Task.isCancelled, let socket = self?.socket else { break } let messages = try await OcaDevice .receiveMessages { try await Array(socket.read(count: $0)) } self?._messagesContinuation.yield(messages) @@ -132,16 +138,23 @@ actor Ocp1CFStreamController: Ocp1CFControllerPrivate, CustomStringConvertible { } } - func close() async throws { - // don't close the socket, it will be closed when last reference is released + private func closeSocket() { + _socket.withLock { $0 = nil } + } + func close() async throws { keepAliveTask?.cancel() keepAliveTask = nil - receiveMessageTask?.cancel() - receiveMessageTask = nil + if let receiveMessageTask { + receiveMessageTask.cancel() + _ = await receiveMessageTask.result + self.receiveMessageTask = nil + } _messagesContinuation.finish() + + closeSocket() } deinit { @@ -157,6 +170,7 @@ actor Ocp1CFStreamController: Ocp1CFControllerPrivate, CustomStringConvertible { } func sendOcp1EncodedData(_ data: Data) async throws { + guard let socket else { throw Errno.badFileDescriptor } _ = try await socket.write(data: data) } diff --git a/Sources/SwiftOCADevice/OCP.1/Backend/IORing/Ocp1IORingController.swift b/Sources/SwiftOCADevice/OCP.1/Backend/IORing/Ocp1IORingController.swift index 97f7fc89..8aa1f501 100644 --- a/Sources/SwiftOCADevice/OCP.1/Backend/IORing/Ocp1IORingController.swift +++ b/Sources/SwiftOCADevice/OCP.1/Backend/IORing/Ocp1IORingController.swift @@ -35,6 +35,8 @@ public import IORing import SocketAddress import SwiftOCA +import Synchronization +import struct SystemPackage.Errno protocol Ocp1IORingControllerPrivate: Ocp1ControllerInternal, Ocp1ControllerInternalLightweightNotifyingInternal, Actor, @@ -82,11 +84,16 @@ actor Ocp1IORingStreamController: Ocp1IORingControllerPrivate, CustomStringConve private let _messages: AsyncThrowingStream private let _messagesContinuation: AsyncThrowingStream.Continuation - private let socket: Socket + private let _socket: Mutex let notificationSocket: Socket nonisolated var description: String { - "\(type(of: self))(socket: \(socket))" + let socket = socket + return "\(type(of: self))(socket: \(socket != nil ? String(describing: socket!) : ""))" + } + + private nonisolated var socket: Socket? { + _socket.withLock { $0 } } init( @@ -94,7 +101,7 @@ actor Ocp1IORingStreamController: Ocp1IORingControllerPrivate, CustomStringConve socket: Socket, notificationSocket: Socket ) async throws { - self.socket = socket + _socket = .init(socket) self.notificationSocket = notificationSocket self.endpoint = endpoint @@ -103,17 +110,17 @@ actor Ocp1IORingStreamController: Ocp1IORingControllerPrivate, CustomStringConve throwing: Error.self ) - peerAddress = try AnySocketAddress(self.socket.peerAddress) + peerAddress = try AnySocketAddress(socket.peerAddress) if peerAddress.family == AF_LOCAL { connectionPrefix = OcaLocalConnectionPrefix } else { connectionPrefix = OcaTcpConnectionPrefix } - receiveMessageTask = Task { [weak self, socket] in + receiveMessageTask = Task { [weak self] in do { repeat { - guard !Task.isCancelled else { break } + guard !Task.isCancelled, let socket = self?.socket else { break } let messages = try await OcaDevice.receiveMessages { try await socket.read( count: $0, awaitingAllRead: true @@ -126,16 +133,23 @@ actor Ocp1IORingStreamController: Ocp1IORingControllerPrivate, CustomStringConve } } - func close() { - // don't close the socket, it will be closed when last reference is released + private func closeSocket() { + _socket.withLock { $0 = nil } + } + func close() async { keepAliveTask?.cancel() keepAliveTask = nil - receiveMessageTask?.cancel() - receiveMessageTask = nil + if let receiveMessageTask { + receiveMessageTask.cancel() + _ = await receiveMessageTask.result + self.receiveMessageTask = nil + } _messagesContinuation.finish() + + closeSocket() } deinit { @@ -151,6 +165,7 @@ actor Ocp1IORingStreamController: Ocp1IORingControllerPrivate, CustomStringConve } func sendOcp1EncodedData(_ data: Data) async throws { + guard let socket else { throw Errno.badFileDescriptor } _ = try await socket.write( [UInt8](data), count: data.count, @@ -163,7 +178,7 @@ actor Ocp1IORingStreamController: Ocp1IORingControllerPrivate, CustomStringConve } nonisolated var identifier: String { - (try? socket.peerName) ?? "unknown" + (try? socket?.peerName) ?? "unknown" } }