@@ -30,6 +30,13 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
3030 private let isAlive = OSAllocatedUnfairLock ( initialState: true )
3131 private let relayTasks = OSAllocatedUnfairLock ( initialState: [ Task < Void , Never > ] ( ) )
3232
33+ /// Dedicated queue for blocking I/O (poll, send, recv, libssh2 calls).
34+ /// Keeps blocking work off the Swift cooperative thread pool.
35+ private static let relayQueue = DispatchQueue (
36+ label: " com.TablePro.ssh.relay " ,
37+ qos: . utility
38+ )
39+
3340 /// Callback invoked when the tunnel dies (keep-alive failure, etc.)
3441 var onDeath : ( ( UUID ) -> Void ) ?
3542
@@ -64,34 +71,41 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
6471
6572 forwardingTask = Task . detached { [ weak self] in
6673 guard let self else { return }
67- Self . logger. info ( " Forwarding started on port \( self . localPort) -> \( remoteHost) : \( remotePort) " )
6874
69- while !Task. isCancelled && self . isRunning {
70- let clientFD = self . acceptClient ( )
71- guard clientFD >= 0 else {
72- if !Task. isCancelled && self . isRunning {
73- // accept timed out or was interrupted, retry
74- continue
75- }
76- break
77- }
75+ await withCheckedContinuation { ( continuation: CheckedContinuation < Void , Never > ) in
76+ Self . relayQueue. async { [ weak self] in
77+ defer { continuation. resume ( ) }
78+ guard let self else { return }
7879
79- let channel = self . openDirectTcpipChannel (
80- remoteHost: remoteHost,
81- remotePort: remotePort
82- )
80+ Self . logger. info ( " Forwarding started on port \( self . localPort) -> \( remoteHost) : \( remotePort) " )
8381
84- guard let channel else {
85- Self . logger. error ( " Failed to open direct-tcpip channel " )
86- Darwin . close ( clientFD)
87- continue
88- }
82+ while self . isRunning {
83+ let clientFD = self . acceptClient ( )
84+ guard clientFD >= 0 else {
85+ if self . isRunning {
86+ continue
87+ }
88+ break
89+ }
8990
90- Self . logger. debug ( " Client connected, relaying to \( remoteHost) : \( remotePort) " )
91- self . spawnRelay ( clientFD: clientFD, channel: channel)
92- }
91+ let channel = self . openDirectTcpipChannel (
92+ remoteHost: remoteHost,
93+ remotePort: remotePort
94+ )
9395
94- Self . logger. info ( " Forwarding loop ended for port \( self . localPort) " )
96+ guard let channel else {
97+ Self . logger. error ( " Failed to open direct-tcpip channel " )
98+ Darwin . close ( clientFD)
99+ continue
100+ }
101+
102+ Self . logger. debug ( " Client connected, relaying to \( remoteHost) : \( remotePort) " )
103+ self . spawnRelay ( clientFD: clientFD, channel: channel)
104+ }
105+
106+ Self . logger. info ( " Forwarding loop ended for port \( self . localPort) " )
107+ }
108+ }
95109 }
96110 }
97111
@@ -271,36 +285,57 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
271285 }
272286
273287 /// Bidirectional relay between a client socket and an SSH channel.
288+ /// Runs on a dedicated dispatch queue to avoid blocking Swift's cooperative thread pool.
274289 private func spawnRelay( clientFD: Int32 , channel: OpaquePointer ) {
290+ // Wrap the blocking relay in a Task so close() can cancel/await it,
291+ // but immediately hop to the dedicated dispatch queue for the actual I/O.
275292 let task = Task . detached { [ weak self] in
276293 guard let self else {
277294 Darwin . close ( clientFD)
278295 return
279296 }
280297
281- let buffer = UnsafeMutablePointer< CChar> . allocate( capacity: Self . relayBufferSize)
282- defer {
283- buffer. deallocate ( )
284- Darwin . close ( clientFD)
285- // Only clean up libssh2 channel if the tunnel is still running.
286- // When close() tears down the tunnel, the session is freed first,
287- // making channel calls invalid (use-after-free).
288- if self . isRunning {
289- libssh2_channel_close ( channel)
290- libssh2_channel_free ( channel)
298+ await withCheckedContinuation { ( continuation: CheckedContinuation < Void , Never > ) in
299+ Self . relayQueue. async { [ weak self] in
300+ defer { continuation. resume ( ) }
301+ guard let self else {
302+ Darwin . close ( clientFD)
303+ return
304+ }
305+ self . runRelay ( clientFD: clientFD, channel: channel)
291306 }
292307 }
308+ }
293309
294- while !Task. isCancelled && self . isRunning {
295- var pollFDs = [
296- pollfd ( fd: clientFD, events: Int16 ( POLLIN) , revents: 0 ) ,
297- pollfd ( fd: self . socketFD, events: Int16 ( POLLIN) , revents: 0 ) ,
298- ]
310+ relayTasks. withLock { tasks in
311+ tasks. removeAll { $0. isCancelled }
312+ tasks. append ( task)
313+ }
314+ }
315+
316+ /// Blocking relay loop — must only be called on `relayQueue`, never the cooperative pool.
317+ private func runRelay( clientFD: Int32 , channel: OpaquePointer ) {
318+ let buffer = UnsafeMutablePointer< CChar> . allocate( capacity: Self . relayBufferSize)
319+ defer {
320+ buffer. deallocate ( )
321+ Darwin . close ( clientFD)
322+ if self . isRunning {
323+ libssh2_channel_close ( channel)
324+ libssh2_channel_free ( channel)
325+ }
326+ }
327+
328+ while self . isRunning {
329+ var pollFDs = [
330+ pollfd ( fd: clientFD, events: Int16 ( POLLIN) , revents: 0 ) ,
331+ pollfd ( fd: self . socketFD, events: Int16 ( POLLIN) , revents: 0 ) ,
332+ ]
299333
300- let pollResult = poll ( & pollFDs, 2 , 100 ) // 100ms timeout
301- if pollResult < 0 { break }
334+ let pollResult = poll ( & pollFDs, 2 , 100 ) // 100ms timeout
335+ if pollResult < 0 { break }
302336
303- // Read from SSH channel -> write to client
337+ // Only read from SSH channel when the SSH socket has data ready
338+ if pollFDs [ 1 ] . revents & Int16 ( POLLIN) != 0 {
304339 let channelRead = tablepro_libssh2_channel_read (
305340 channel, buffer, Self . relayBufferSize
306341 )
@@ -317,42 +352,62 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
317352 totalSent += sent
318353 }
319354 } else if channelRead == 0 || libssh2_channel_eof ( channel) != 0 {
320- // Channel EOF
321355 return
322356 } else if channelRead != Int ( LIBSSH2_ERROR_EAGAIN) {
323- // Real error
324357 return
325358 }
359+ }
326360
327- // Read from client -> write to SSH channel
328- if pollFDs [ 0 ] . revents & Int16 ( POLLIN) != 0 {
329- let clientRead = recv ( clientFD, buffer, Self . relayBufferSize, 0 )
330- if clientRead <= 0 { return }
331-
332- var totalWritten = 0
333- while totalWritten < Int ( clientRead) {
334- let written = tablepro_libssh2_channel_write (
335- channel,
336- buffer. advanced ( by: totalWritten) ,
337- Int ( clientRead) - totalWritten
361+ // Also attempt a non-blocking channel read when poll timed out,
362+ // because libssh2 may have buffered data internally
363+ if pollResult == 0 {
364+ let channelRead = tablepro_libssh2_channel_read (
365+ channel, buffer, Self . relayBufferSize
366+ )
367+ if channelRead > 0 {
368+ var totalSent = 0
369+ while totalSent < Int ( channelRead) {
370+ let sent = send (
371+ clientFD,
372+ buffer. advanced ( by: totalSent) ,
373+ Int ( channelRead) - totalSent,
374+ 0
338375 )
339- if written > 0 {
340- totalWritten += Int ( written)
341- } else if written == Int ( LIBSSH2_ERROR_EAGAIN) {
342- _ = self . waitForSocket (
343- session: self . session,
344- socketFD: self . socketFD,
345- timeoutMs: 1_000
346- )
347- } else {
348- return
349- }
376+ if sent <= 0 { return }
377+ totalSent += sent
378+ }
379+ } else if channelRead == 0 || libssh2_channel_eof ( channel) != 0 {
380+ return
381+ }
382+ // Ignore EAGAIN on timeout read — no data buffered
383+ }
384+
385+ // Read from client -> write to SSH channel
386+ if pollFDs [ 0 ] . revents & Int16 ( POLLIN) != 0 {
387+ let clientRead = recv ( clientFD, buffer, Self . relayBufferSize, 0 )
388+ if clientRead <= 0 { return }
389+
390+ var totalWritten = 0
391+ while totalWritten < Int ( clientRead) {
392+ let written = tablepro_libssh2_channel_write (
393+ channel,
394+ buffer. advanced ( by: totalWritten) ,
395+ Int ( clientRead) - totalWritten
396+ )
397+ if written > 0 {
398+ totalWritten += Int ( written)
399+ } else if written == Int ( LIBSSH2_ERROR_EAGAIN) {
400+ _ = self . waitForSocket (
401+ session: self . session,
402+ socketFD: self . socketFD,
403+ timeoutMs: 1_000
404+ )
405+ } else {
406+ return
350407 }
351408 }
352409 }
353410 }
354-
355- relayTasks. withLock { $0. append ( task) }
356411 }
357412
358413 /// Wait for the SSH socket to become ready, based on libssh2's block directions.
0 commit comments