@@ -30,13 +30,12 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
3030 private let isAlive = OSAllocatedUnfairLock ( initialState: true )
3131 private let relayTasks = OSAllocatedUnfairLock ( initialState: [ Task < Void , Never > ] ( ) )
3232
33- /// Dedicated queue for blocking I/O (poll, send, recv, libssh2 calls).
34- /// Keeps blocking work off the Swift cooperative thread pool.
35- private static let relayQueue = DispatchQueue (
36- label: " com.TablePro.ssh.relay " ,
37- qos: . utility,
38- attributes: . concurrent
39- )
33+ /// Serial queue for all libssh2 calls on this tunnel's session.
34+ /// libssh2 is not thread-safe per session, so every call must be serialized.
35+ private let sessionQueue : DispatchQueue
36+
37+ /// Dedicated queue for the accept loop (poll + accept only, no libssh2 calls).
38+ private let acceptQueue : DispatchQueue
4039
4140 /// Callback invoked when the tunnel dies (keep-alive failure, etc.)
4241 var onDeath : ( ( UUID ) -> Void ) ?
@@ -59,6 +58,14 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
5958 self . listenFD = listenFD
6059 self . jumpChain = jumpChain
6160 self . createdAt = Date ( )
61+ self . sessionQueue = DispatchQueue (
62+ label: " com.TablePro.ssh.session. \( connectionId. uuidString) " ,
63+ qos: . utility
64+ )
65+ self . acceptQueue = DispatchQueue (
66+ label: " com.TablePro.ssh.accept. \( connectionId. uuidString) " ,
67+ qos: . utility
68+ )
6269 }
6370
6471 var isRunning : Bool {
@@ -74,11 +81,13 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
7481 guard let self else { return }
7582
7683 await withCheckedContinuation { ( continuation: CheckedContinuation < Void , Never > ) in
77- Self . relayQueue . async { [ weak self] in
84+ self . acceptQueue . async { [ weak self] in
7885 defer { continuation. resume ( ) }
7986 guard let self else { return }
8087
81- Self . logger. info ( " Forwarding started on port \( self . localPort) -> \( remoteHost) : \( remotePort) " )
88+ Self . logger. info (
89+ " Forwarding started on port \( self . localPort) -> \( remoteHost) : \( remotePort) "
90+ )
8291
8392 while self . isRunning {
8493 let clientFD = self . acceptClient ( )
@@ -89,19 +98,29 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
8998 break
9099 }
91100
92- let channel = self . openDirectTcpipChannel (
93- remoteHost: remoteHost,
94- remotePort: remotePort
95- )
101+ // Open channel and spawn relay on the session queue
102+ // to serialize libssh2 calls
103+ let semaphore = DispatchSemaphore ( value: 0 )
104+ self . sessionQueue. async {
105+ let channel = self . openDirectTcpipChannel (
106+ remoteHost: remoteHost,
107+ remotePort: remotePort
108+ )
109+
110+ guard let channel else {
111+ Self . logger. error ( " Failed to open direct-tcpip channel " )
112+ Darwin . close ( clientFD)
113+ semaphore. signal ( )
114+ return
115+ }
96116
97- guard let channel else {
98- Self . logger. error ( " Failed to open direct-tcpip channel " )
99- Darwin . close ( clientFD)
100- continue
117+ Self . logger. debug (
118+ " Client connected, relaying to \( remoteHost) : \( remotePort) "
119+ )
120+ self . spawnRelay ( clientFD: clientFD, channel: channel)
121+ semaphore. signal ( )
101122 }
102-
103- Self . logger. debug ( " Client connected, relaying to \( remoteHost) : \( remotePort) " )
104- self . spawnRelay ( clientFD: clientFD, channel: channel)
123+ semaphore. wait ( )
105124 }
106125
107126 Self . logger. info ( " Forwarding loop ended for port \( self . localPort) " )
@@ -119,17 +138,21 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
119138 guard let self else { return }
120139
121140 while !Task. isCancelled && self . isRunning {
122- var secondsToNext : Int32 = 0
123- let rc = libssh2_keepalive_send ( self . session, & secondsToNext)
141+ let failed = await withCheckedContinuation { ( continuation: CheckedContinuation < Bool , Never > ) in
142+ self . sessionQueue. async {
143+ var secondsToNext : Int32 = 0
144+ let rc = libssh2_keepalive_send ( self . session, & secondsToNext)
145+ continuation. resume ( returning: rc != 0 )
146+ }
147+ }
124148
125- if rc != 0 {
126- Self . logger. warning ( " Keep-alive failed with error \( rc ) , marking tunnel dead " )
149+ if failed {
150+ Self . logger. warning ( " Keep-alive failed, marking tunnel dead " )
127151 self . markDead ( )
128152 break
129153 }
130154
131- let sleepInterval = max ( Int ( secondsToNext) , 10 )
132- try ? await Task . sleep ( for: . seconds( sleepInterval) )
155+ try ? await Task . sleep ( for: . seconds( 10 ) )
133156 }
134157 }
135158 }
@@ -154,24 +177,27 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
154177 return copy
155178 }
156179
157- // Close listenFD first to stop accepting new connections
180+ // Shutdown socketFD to unblock any blocking reads in relay tasks
181+ // without closing the fd (which could be reused by another thread)
182+ shutdown ( socketFD, SHUT_RDWR)
183+ // Close listenFD to stop accepting new connections
158184 Darwin . close ( listenFD)
159- // Close socketFD to unblock any channel reads in relay tasks
160- Darwin . close ( socketFD)
161185
162186 // Defer session teardown to a detached task that waits for relays to exit.
163- // Relay tasks will see isRunning=false and skip libssh2 channel cleanup.
164187 let session = self . session
188+ let socketFD = self . socketFD
165189 let jumpChain = self . jumpChain
166190 let connectionId = self . connectionId
167191 Task . detached {
168192 // Wait for all relay tasks to finish (they'll exit quickly since
169- // socketFD is closed and isRunning is false)
193+ // socketFD is shut down and isRunning is false)
170194 for task in currentRelayTasks {
171195 await task. value
172196 }
173197
174- // Now safe to tear down the session — no relay is using it
198+ // Now safe to close the socket and tear down the session
199+ Darwin . close ( socketFD)
200+
175201 libssh2_session_set_blocking ( session, 1 )
176202 tablepro_libssh2_session_disconnect ( session, " Closing tunnel " )
177203 libssh2_session_free ( session)
@@ -190,7 +216,8 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
190216
191217 /// Synchronous cleanup for app termination.
192218 /// At termination the process is exiting imminently, so we cancel relay tasks
193- /// and tear down immediately — any in-flight relays will be killed with the process.
219+ /// and tear down immediately. We avoid closing socketFD or freeing the session
220+ /// since relay tasks may still reference them; the OS reclaims all resources.
194221 func closeSync( ) {
195222 let wasAlive = isAlive. withLock { alive -> Bool in
196223 let was = alive
@@ -206,22 +233,14 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
206233 tasks. removeAll ( )
207234 }
208235
209- // Close sockets first so any blocking relay reads fail immediately
236+ // Shutdown sockets to unblock reads, close listenFD (accept loop only)
237+ shutdown ( socketFD, SHUT_RDWR)
210238 Darwin . close ( listenFD)
211- Darwin . close ( socketFD)
212-
213- // At app termination we free the session synchronously.
214- // Relay tasks are cancelled and won't touch libssh2 (isRunning is false).
215- libssh2_session_set_blocking ( session, 1 )
216- tablepro_libssh2_session_disconnect ( session, " Closing tunnel " )
217- libssh2_session_free ( session)
218239
240+ // At app termination, skip session teardown and fd close.
241+ // Relay tasks may still be using them, and the OS reclaims everything.
219242 for hop in jumpChain. reversed ( ) {
220243 hop. relayTask? . cancel ( )
221- libssh2_channel_free ( hop. channel)
222- tablepro_libssh2_session_disconnect ( hop. session, " Closing " )
223- libssh2_session_free ( hop. session)
224- Darwin . close ( hop. socket)
225244 }
226245 }
227246
@@ -260,6 +279,7 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
260279 }
261280
262281 /// Open a direct-tcpip channel, handling EAGAIN with select().
282+ /// Must be called on `sessionQueue`.
263283 private func openDirectTcpipChannel( remoteHost: String , remotePort: Int ) -> OpaquePointer ? {
264284 while true {
265285 let channel = libssh2_channel_direct_tcpip_ex (
@@ -286,18 +306,16 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
286306 }
287307
288308 /// Bidirectional relay between a client socket and an SSH channel.
289- /// Runs on a dedicated dispatch queue to avoid blocking Swift's cooperative thread pool .
309+ /// The relay loop runs on `sessionQueue` to serialize all libssh2 calls .
290310 private func spawnRelay( clientFD: Int32 , channel: OpaquePointer ) {
291- // Wrap the blocking relay in a Task so close() can cancel/await it,
292- // but immediately hop to the dedicated dispatch queue for the actual I/O.
293311 let task = Task . detached { [ weak self] in
294312 guard let self else {
295313 Darwin . close ( clientFD)
296314 return
297315 }
298316
299317 await withCheckedContinuation { ( continuation: CheckedContinuation < Void , Never > ) in
300- Self . relayQueue . async { [ weak self] in
318+ self . sessionQueue . async { [ weak self] in
301319 defer { continuation. resume ( ) }
302320 guard let self else {
303321 Darwin . close ( clientFD)
@@ -314,7 +332,7 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
314332 }
315333 }
316334
317- /// Blocking relay loop — must only be called on `relayQueue`, never the cooperative pool .
335+ /// Blocking relay loop. Must only be called on `sessionQueue` .
318336 private func runRelay( clientFD: Int32 , channel: OpaquePointer ) {
319337 let buffer = UnsafeMutablePointer< CChar> . allocate( capacity: Self . relayBufferSize)
320338 defer {
0 commit comments