Skip to content

Commit d9f17d1

Browse files
committed
fix: move SSH tunnel relay I/O off cooperative thread pool to prevent deadlock
1 parent 3588e9d commit d9f17d1

1 file changed

Lines changed: 120 additions & 67 deletions

File tree

TablePro/Core/SSH/LibSSH2Tunnel.swift

Lines changed: 120 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
3030
private let isAlive = OSAllocatedUnfairLock(initialState: true)
3131
private let relayTasks = OSAllocatedUnfairLock(initialState: [Task<Void, Never>]())
3232

33+
/// Dedicated queue for blocking I/O (poll, send, recv, libssh2 calls).
34+
/// Keeps blocking work off the Swift cooperative thread pool.
35+
private static let relayQueue = DispatchQueue(
36+
label: "com.TablePro.ssh.relay",
37+
qos: .utility,
38+
attributes: .concurrent
39+
)
40+
3341
/// Callback invoked when the tunnel dies (keep-alive failure, etc.)
3442
var onDeath: ((UUID) -> Void)?
3543

@@ -64,34 +72,41 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
6472

6573
forwardingTask = Task.detached { [weak self] in
6674
guard let self else { return }
67-
Self.logger.info("Forwarding started on port \(self.localPort) -> \(remoteHost):\(remotePort)")
6875

69-
while !Task.isCancelled && self.isRunning {
70-
let clientFD = self.acceptClient()
71-
guard clientFD >= 0 else {
72-
if !Task.isCancelled && self.isRunning {
73-
// accept timed out or was interrupted, retry
74-
continue
75-
}
76-
break
77-
}
76+
await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
77+
Self.relayQueue.async { [weak self] in
78+
defer { continuation.resume() }
79+
guard let self else { return }
7880

79-
let channel = self.openDirectTcpipChannel(
80-
remoteHost: remoteHost,
81-
remotePort: remotePort
82-
)
81+
Self.logger.info("Forwarding started on port \(self.localPort) -> \(remoteHost):\(remotePort)")
8382

84-
guard let channel else {
85-
Self.logger.error("Failed to open direct-tcpip channel")
86-
Darwin.close(clientFD)
87-
continue
88-
}
83+
while self.isRunning {
84+
let clientFD = self.acceptClient()
85+
guard clientFD >= 0 else {
86+
if self.isRunning {
87+
continue
88+
}
89+
break
90+
}
8991

90-
Self.logger.debug("Client connected, relaying to \(remoteHost):\(remotePort)")
91-
self.spawnRelay(clientFD: clientFD, channel: channel)
92-
}
92+
let channel = self.openDirectTcpipChannel(
93+
remoteHost: remoteHost,
94+
remotePort: remotePort
95+
)
9396

94-
Self.logger.info("Forwarding loop ended for port \(self.localPort)")
97+
guard let channel else {
98+
Self.logger.error("Failed to open direct-tcpip channel")
99+
Darwin.close(clientFD)
100+
continue
101+
}
102+
103+
Self.logger.debug("Client connected, relaying to \(remoteHost):\(remotePort)")
104+
self.spawnRelay(clientFD: clientFD, channel: channel)
105+
}
106+
107+
Self.logger.info("Forwarding loop ended for port \(self.localPort)")
108+
}
109+
}
95110
}
96111
}
97112

@@ -271,36 +286,54 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
271286
}
272287

273288
/// Bidirectional relay between a client socket and an SSH channel.
289+
/// Runs on a dedicated dispatch queue to avoid blocking Swift's cooperative thread pool.
274290
private func spawnRelay(clientFD: Int32, channel: OpaquePointer) {
291+
// Wrap the blocking relay in a Task so close() can cancel/await it,
292+
// but immediately hop to the dedicated dispatch queue for the actual I/O.
275293
let task = Task.detached { [weak self] in
276294
guard let self else {
277295
Darwin.close(clientFD)
278296
return
279297
}
280298

281-
let buffer = UnsafeMutablePointer<CChar>.allocate(capacity: Self.relayBufferSize)
282-
defer {
283-
buffer.deallocate()
284-
Darwin.close(clientFD)
285-
// Only clean up libssh2 channel if the tunnel is still running.
286-
// When close() tears down the tunnel, the session is freed first,
287-
// making channel calls invalid (use-after-free).
288-
if self.isRunning {
289-
libssh2_channel_close(channel)
290-
libssh2_channel_free(channel)
299+
await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
300+
Self.relayQueue.async { [weak self] in
301+
defer { continuation.resume() }
302+
guard let self else {
303+
Darwin.close(clientFD)
304+
return
305+
}
306+
self.runRelay(clientFD: clientFD, channel: channel)
291307
}
292308
}
309+
}
293310

294-
while !Task.isCancelled && self.isRunning {
295-
var pollFDs = [
296-
pollfd(fd: clientFD, events: Int16(POLLIN), revents: 0),
297-
pollfd(fd: self.socketFD, events: Int16(POLLIN), revents: 0),
298-
]
311+
relayTasks.withLock { $0.append(task) }
312+
}
313+
314+
/// Blocking relay loop — must only be called on `relayQueue`, never the cooperative pool.
315+
private func runRelay(clientFD: Int32, channel: OpaquePointer) {
316+
let buffer = UnsafeMutablePointer<CChar>.allocate(capacity: Self.relayBufferSize)
317+
defer {
318+
buffer.deallocate()
319+
Darwin.close(clientFD)
320+
if self.isRunning {
321+
libssh2_channel_close(channel)
322+
libssh2_channel_free(channel)
323+
}
324+
}
299325

300-
let pollResult = poll(&pollFDs, 2, 100) // 100ms timeout
301-
if pollResult < 0 { break }
326+
while self.isRunning {
327+
var pollFDs = [
328+
pollfd(fd: clientFD, events: Int16(POLLIN), revents: 0),
329+
pollfd(fd: self.socketFD, events: Int16(POLLIN), revents: 0),
330+
]
302331

303-
// Read from SSH channel -> write to client
332+
let pollResult = poll(&pollFDs, 2, 100) // 100ms timeout
333+
if pollResult < 0 { break }
334+
335+
// Only read from SSH channel when the SSH socket has data ready
336+
if pollFDs[1].revents & Int16(POLLIN) != 0 {
304337
let channelRead = tablepro_libssh2_channel_read(
305338
channel, buffer, Self.relayBufferSize
306339
)
@@ -317,42 +350,62 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
317350
totalSent += sent
318351
}
319352
} else if channelRead == 0 || libssh2_channel_eof(channel) != 0 {
320-
// Channel EOF
321353
return
322354
} else if channelRead != Int(LIBSSH2_ERROR_EAGAIN) {
323-
// Real error
324355
return
325356
}
357+
}
326358

327-
// Read from client -> write to SSH channel
328-
if pollFDs[0].revents & Int16(POLLIN) != 0 {
329-
let clientRead = recv(clientFD, buffer, Self.relayBufferSize, 0)
330-
if clientRead <= 0 { return }
331-
332-
var totalWritten = 0
333-
while totalWritten < Int(clientRead) {
334-
let written = tablepro_libssh2_channel_write(
335-
channel,
336-
buffer.advanced(by: totalWritten),
337-
Int(clientRead) - totalWritten
359+
// Also attempt a non-blocking channel read when poll timed out,
360+
// because libssh2 may have buffered data internally
361+
if pollResult == 0 {
362+
let channelRead = tablepro_libssh2_channel_read(
363+
channel, buffer, Self.relayBufferSize
364+
)
365+
if channelRead > 0 {
366+
var totalSent = 0
367+
while totalSent < Int(channelRead) {
368+
let sent = send(
369+
clientFD,
370+
buffer.advanced(by: totalSent),
371+
Int(channelRead) - totalSent,
372+
0
338373
)
339-
if written > 0 {
340-
totalWritten += Int(written)
341-
} else if written == Int(LIBSSH2_ERROR_EAGAIN) {
342-
_ = self.waitForSocket(
343-
session: self.session,
344-
socketFD: self.socketFD,
345-
timeoutMs: 1_000
346-
)
347-
} else {
348-
return
349-
}
374+
if sent <= 0 { return }
375+
totalSent += sent
350376
}
377+
} else if channelRead == 0 || libssh2_channel_eof(channel) != 0 {
378+
return
351379
}
380+
// Ignore EAGAIN on timeout read — no data buffered
352381
}
353-
}
354382

355-
relayTasks.withLock { $0.append(task) }
383+
// Read from client -> write to SSH channel
384+
if pollFDs[0].revents & Int16(POLLIN) != 0 {
385+
let clientRead = recv(clientFD, buffer, Self.relayBufferSize, 0)
386+
if clientRead <= 0 { return }
387+
388+
var totalWritten = 0
389+
while totalWritten < Int(clientRead) {
390+
let written = tablepro_libssh2_channel_write(
391+
channel,
392+
buffer.advanced(by: totalWritten),
393+
Int(clientRead) - totalWritten
394+
)
395+
if written > 0 {
396+
totalWritten += Int(written)
397+
} else if written == Int(LIBSSH2_ERROR_EAGAIN) {
398+
_ = self.waitForSocket(
399+
session: self.session,
400+
socketFD: self.socketFD,
401+
timeoutMs: 1_000
402+
)
403+
} else {
404+
return
405+
}
406+
}
407+
}
408+
}
356409
}
357410

358411
/// Wait for the SSH socket to become ready, based on libssh2's block directions.

0 commit comments

Comments
 (0)