Skip to content

Commit 1a2c348

Browse files
committed
fix: address review findings — relay serialization, pagination state, query counter, dialog guard
1 parent 0d472c0 commit 1a2c348

5 files changed

Lines changed: 127 additions & 128 deletions

File tree

TablePro/Core/Database/DatabaseManager.swift

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ final class DatabaseManager {
4444
/// Tracks connections with user queries currently in-flight.
4545
/// The health monitor skips pings while a query is running to avoid
4646
/// racing on non-thread-safe driver connections.
47-
private var queriesInFlight: Set<UUID> = []
47+
private var queriesInFlight: [UUID: Int] = [:]
4848

4949
/// Current session (computed from currentSessionId)
5050
var currentSession: ConnectionSession? {
@@ -320,8 +320,14 @@ final class DatabaseManager {
320320
throw DatabaseError.notConnected
321321
}
322322

323-
queriesInFlight.insert(sessionId)
324-
defer { queriesInFlight.remove(sessionId) }
323+
queriesInFlight[sessionId, default: 0] += 1
324+
defer {
325+
if let count = queriesInFlight[sessionId], count > 1 {
326+
queriesInFlight[sessionId] = count - 1
327+
} else {
328+
queriesInFlight.removeValue(forKey: sessionId)
329+
}
330+
}
325331
return try await driver.execute(query: query)
326332
}
327333

@@ -461,7 +467,7 @@ final class DatabaseManager {
461467
guard let self else { return false }
462468
// Skip ping while a user query is in-flight to avoid racing
463469
// on the same non-thread-safe driver connection.
464-
guard await !self.queriesInFlight.contains(connectionId) else { return true }
470+
guard await self.queriesInFlight[connectionId] == nil else { return true }
465471
guard let mainDriver = await self.activeSessions[connectionId]?.driver else {
466472
return false
467473
}

TablePro/Core/SSH/LibSSH2Tunnel.swift

Lines changed: 50 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
3434
/// libssh2 is not thread-safe per session, so every call must be serialized.
3535
private let sessionQueue: DispatchQueue
3636

37+
/// Concurrent queue for relay I/O (poll, send, recv — no libssh2 calls).
38+
/// Individual libssh2 calls within each relay are dispatched to `sessionQueue`.
39+
private let relayQueue: DispatchQueue
40+
3741
/// Dedicated queue for the accept loop (poll + accept only, no libssh2 calls).
3842
private let acceptQueue: DispatchQueue
3943

@@ -62,6 +66,11 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
6266
label: "com.TablePro.ssh.session.\(connectionId.uuidString)",
6367
qos: .utility
6468
)
69+
self.relayQueue = DispatchQueue(
70+
label: "com.TablePro.ssh.relay.\(connectionId.uuidString)",
71+
qos: .utility,
72+
attributes: .concurrent
73+
)
6574
self.acceptQueue = DispatchQueue(
6675
label: "com.TablePro.ssh.accept.\(connectionId.uuidString)",
6776
qos: .utility
@@ -98,29 +107,25 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
98107
break
99108
}
100109

101-
// Open channel and spawn relay on the session queue
102-
// to serialize libssh2 calls
103-
let semaphore = DispatchSemaphore(value: 0)
104-
self.sessionQueue.async {
105-
let channel = self.openDirectTcpipChannel(
110+
// Open channel on sessionQueue (serialized libssh2 call),
111+
// then hand off relay to relayQueue (concurrent I/O).
112+
let channel: OpaquePointer? = self.sessionQueue.sync {
113+
self.openDirectTcpipChannel(
106114
remoteHost: remoteHost,
107115
remotePort: remotePort
108116
)
117+
}
109118

110-
guard let channel else {
111-
Self.logger.error("Failed to open direct-tcpip channel")
112-
Darwin.close(clientFD)
113-
semaphore.signal()
114-
return
115-
}
116-
117-
Self.logger.debug(
118-
"Client connected, relaying to \(remoteHost):\(remotePort)"
119-
)
120-
self.spawnRelay(clientFD: clientFD, channel: channel)
121-
semaphore.signal()
119+
guard let channel else {
120+
Self.logger.error("Failed to open direct-tcpip channel")
121+
Darwin.close(clientFD)
122+
continue
122123
}
123-
semaphore.wait()
124+
125+
Self.logger.debug(
126+
"Client connected, relaying to \(remoteHost):\(remotePort)"
127+
)
128+
self.spawnRelay(clientFD: clientFD, channel: channel)
124129
}
125130

126131
Self.logger.info("Forwarding loop ended for port \(self.localPort)")
@@ -306,7 +311,8 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
306311
}
307312

308313
/// Bidirectional relay between a client socket and an SSH channel.
309-
/// The relay loop runs on `sessionQueue` to serialize all libssh2 calls.
314+
/// The relay loop runs on `relayQueue` (concurrent). Individual libssh2 calls
315+
/// are dispatched to `sessionQueue` (serial) for thread safety.
310316
private func spawnRelay(clientFD: Int32, channel: OpaquePointer) {
311317
let task = Task.detached { [weak self] in
312318
guard let self else {
@@ -315,7 +321,7 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
315321
}
316322

317323
await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
318-
self.sessionQueue.async { [weak self] in
324+
self.relayQueue.async { [weak self] in
319325
defer { continuation.resume() }
320326
guard let self else {
321327
Darwin.close(clientFD)
@@ -332,15 +338,17 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
332338
}
333339
}
334340

335-
/// Blocking relay loop. Must only be called on `sessionQueue`.
341+
/// Blocking relay loop. Runs on `relayQueue`; libssh2 calls go through `sessionQueue`.
336342
private func runRelay(clientFD: Int32, channel: OpaquePointer) {
337343
let buffer = UnsafeMutablePointer<CChar>.allocate(capacity: Self.relayBufferSize)
338344
defer {
339345
buffer.deallocate()
340346
Darwin.close(clientFD)
341347
if self.isRunning {
342-
libssh2_channel_close(channel)
343-
libssh2_channel_free(channel)
348+
sessionQueue.sync {
349+
libssh2_channel_close(channel)
350+
libssh2_channel_free(channel)
351+
}
344352
}
345353
}
346354

@@ -353,52 +361,29 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
353361
let pollResult = poll(&pollFDs, 2, 100) // 100ms timeout
354362
if pollResult < 0 { break }
355363

356-
// Only read from SSH channel when the SSH socket has data ready
357-
if pollFDs[1].revents & Int16(POLLIN) != 0 {
358-
let channelRead = tablepro_libssh2_channel_read(
359-
channel, buffer, Self.relayBufferSize
360-
)
361-
if channelRead > 0 {
362-
var totalSent = 0
363-
while totalSent < Int(channelRead) {
364-
let sent = send(
365-
clientFD,
366-
buffer.advanced(by: totalSent),
367-
Int(channelRead) - totalSent,
368-
0
369-
)
370-
if sent <= 0 { return }
371-
totalSent += sent
372-
}
373-
} else if channelRead == 0 || libssh2_channel_eof(channel) != 0 {
374-
return
375-
} else if channelRead != Int(LIBSSH2_ERROR_EAGAIN) {
376-
return
364+
// Read from SSH channel when the SSH socket has data or on timeout
365+
// (libssh2 may have internally buffered data)
366+
if pollFDs[1].revents & Int16(POLLIN) != 0 || pollResult == 0 {
367+
let readResult: Int = sessionQueue.sync {
368+
Int(tablepro_libssh2_channel_read(channel, buffer, Self.relayBufferSize))
377369
}
378-
}
379-
380-
// Also attempt a non-blocking channel read when poll timed out,
381-
// because libssh2 may have buffered data internally
382-
if pollResult == 0 {
383-
let channelRead = tablepro_libssh2_channel_read(
384-
channel, buffer, Self.relayBufferSize
385-
)
386-
if channelRead > 0 {
370+
if readResult > 0 {
387371
var totalSent = 0
388-
while totalSent < Int(channelRead) {
372+
while totalSent < readResult {
389373
let sent = send(
390374
clientFD,
391375
buffer.advanced(by: totalSent),
392-
Int(channelRead) - totalSent,
376+
readResult - totalSent,
393377
0
394378
)
395379
if sent <= 0 { return }
396380
totalSent += sent
397381
}
398-
} else if channelRead == 0 || libssh2_channel_eof(channel) != 0 {
382+
} else if readResult == 0 || sessionQueue.sync({ libssh2_channel_eof(channel) }) != 0 {
383+
return
384+
} else if readResult != Int(LIBSSH2_ERROR_EAGAIN) {
399385
return
400386
}
401-
// Ignore EAGAIN on timeout read — no data buffered
402387
}
403388

404389
// Read from client -> write to SSH channel
@@ -408,13 +393,15 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {
408393

409394
var totalWritten = 0
410395
while totalWritten < Int(clientRead) {
411-
let written = tablepro_libssh2_channel_write(
412-
channel,
413-
buffer.advanced(by: totalWritten),
414-
Int(clientRead) - totalWritten
415-
)
396+
let written: Int = sessionQueue.sync {
397+
Int(tablepro_libssh2_channel_write(
398+
channel,
399+
buffer.advanced(by: totalWritten),
400+
Int(clientRead) - totalWritten
401+
))
402+
}
416403
if written > 0 {
417-
totalWritten += Int(written)
404+
totalWritten += written
418405
} else if written == Int(LIBSSH2_ERROR_EAGAIN) {
419406
_ = self.waitForSocket(
420407
session: self.session,

TablePro/Views/Main/Extensions/MainContentCoordinator+ChangeGuard.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ extension MainContentCoordinator {
2121
return
2222
}
2323

24+
guard !isShowingConfirmAlert else {
25+
completion(false)
26+
return
27+
}
28+
2429
Task { @MainActor in
2530
let window = NSApp.keyWindow
2631
let confirmed = await confirmDiscardChanges(action: action, window: window)

TablePro/Views/Main/Extensions/MainContentCoordinator+Pagination.swift

Lines changed: 54 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -13,53 +13,47 @@ extension MainContentCoordinator {
1313
/// Navigate to next page
1414
func goToNextPage() {
1515
guard let tabIndex = tabManager.selectedTabIndex,
16-
tabIndex < tabManager.tabs.count else { return }
17-
18-
var tab = tabManager.tabs[tabIndex]
19-
guard tab.pagination.hasNextPage else { return }
16+
tabIndex < tabManager.tabs.count,
17+
tabManager.tabs[tabIndex].pagination.hasNextPage else { return }
2018

21-
tab.pagination.goToNextPage()
22-
tabManager.tabs[tabIndex] = tab
23-
reloadCurrentPage()
19+
paginateAfterConfirmation(tabIndex: tabIndex) { pagination in
20+
pagination.goToNextPage()
21+
}
2422
}
2523

2624
/// Navigate to previous page
2725
func goToPreviousPage() {
2826
guard let tabIndex = tabManager.selectedTabIndex,
29-
tabIndex < tabManager.tabs.count else { return }
30-
31-
var tab = tabManager.tabs[tabIndex]
32-
guard tab.pagination.hasPreviousPage else { return }
27+
tabIndex < tabManager.tabs.count,
28+
tabManager.tabs[tabIndex].pagination.hasPreviousPage else { return }
3329

34-
tab.pagination.goToPreviousPage()
35-
tabManager.tabs[tabIndex] = tab
36-
reloadCurrentPage()
30+
paginateAfterConfirmation(tabIndex: tabIndex) { pagination in
31+
pagination.goToPreviousPage()
32+
}
3733
}
3834

3935
/// Navigate to first page
4036
func goToFirstPage() {
4137
guard let tabIndex = tabManager.selectedTabIndex,
42-
tabIndex < tabManager.tabs.count else { return }
43-
44-
var tab = tabManager.tabs[tabIndex]
45-
guard tab.pagination.currentPage != 1 else { return }
38+
tabIndex < tabManager.tabs.count,
39+
tabManager.tabs[tabIndex].pagination.currentPage != 1 else { return }
4640

47-
tab.pagination.goToFirstPage()
48-
tabManager.tabs[tabIndex] = tab
49-
reloadCurrentPage()
41+
paginateAfterConfirmation(tabIndex: tabIndex) { pagination in
42+
pagination.goToFirstPage()
43+
}
5044
}
5145

5246
/// Navigate to last page
5347
func goToLastPage() {
5448
guard let tabIndex = tabManager.selectedTabIndex,
5549
tabIndex < tabManager.tabs.count else { return }
5650

57-
var tab = tabManager.tabs[tabIndex]
51+
let tab = tabManager.tabs[tabIndex]
5852
guard tab.pagination.currentPage != tab.pagination.totalPages else { return }
5953

60-
tab.pagination.goToLastPage()
61-
tabManager.tabs[tabIndex] = tab
62-
reloadCurrentPage()
54+
paginateAfterConfirmation(tabIndex: tabIndex) { pagination in
55+
pagination.goToLastPage()
56+
}
6357
}
6458

6559
/// Update page size (limit) and reload
@@ -68,8 +62,9 @@ extension MainContentCoordinator {
6862
tabIndex < tabManager.tabs.count,
6963
newSize > 0 else { return }
7064

71-
tabManager.tabs[tabIndex].pagination.updatePageSize(newSize)
72-
reloadCurrentPage()
65+
paginateAfterConfirmation(tabIndex: tabIndex) { pagination in
66+
pagination.updatePageSize(newSize)
67+
}
7368
}
7469

7570
/// Update offset and reload
@@ -78,40 +73,50 @@ extension MainContentCoordinator {
7873
tabIndex < tabManager.tabs.count,
7974
newOffset >= 0 else { return }
8075

81-
tabManager.tabs[tabIndex].pagination.updateOffset(newOffset)
82-
reloadCurrentPage()
76+
paginateAfterConfirmation(tabIndex: tabIndex) { pagination in
77+
pagination.updateOffset(newOffset)
78+
}
8379
}
8480

8581
/// Apply both limit and offset changes and reload
8682
func applyPaginationSettings() {
8783
reloadCurrentPage()
8884
}
8985

90-
/// Reload current page data, guarding against unsaved changes
91-
func reloadCurrentPage() {
92-
guard let tabIndex = tabManager.selectedTabIndex,
93-
tabIndex < tabManager.tabs.count,
94-
let tableName = tabManager.tabs[tabIndex].tableName else { return }
86+
// MARK: - Private
9587

96-
let capturedTabIndex = tabIndex
97-
let capturedTableName = tableName
88+
/// Confirm discard if needed, then mutate pagination state and reload.
89+
private func paginateAfterConfirmation(
90+
tabIndex: Int,
91+
mutate: @escaping (inout PaginationState) -> Void
92+
) {
9893
confirmDiscardChangesIfNeeded(action: .pagination) { [weak self] confirmed in
9994
guard let self, confirmed else { return }
100-
guard capturedTabIndex < self.tabManager.tabs.count else { return }
95+
guard tabIndex < self.tabManager.tabs.count else { return }
10196

102-
let tab = self.tabManager.tabs[capturedTabIndex]
103-
let pagination = tab.pagination
97+
mutate(&self.tabManager.tabs[tabIndex].pagination)
98+
self.reloadCurrentPage()
99+
}
100+
}
101+
102+
/// Reload current page data
103+
private func reloadCurrentPage() {
104+
guard let tabIndex = tabManager.selectedTabIndex,
105+
tabIndex < tabManager.tabs.count,
106+
let tableName = tabManager.tabs[tabIndex].tableName else { return }
104107

105-
let newQuery = self.queryBuilder.buildBaseQuery(
106-
tableName: capturedTableName,
107-
sortState: tab.sortState,
108-
columns: tab.resultColumns,
109-
limit: pagination.pageSize,
110-
offset: pagination.currentOffset
111-
)
108+
let tab = tabManager.tabs[tabIndex]
109+
let pagination = tab.pagination
112110

113-
self.tabManager.tabs[capturedTabIndex].query = newQuery
114-
self.runQuery()
115-
}
111+
let newQuery = queryBuilder.buildBaseQuery(
112+
tableName: tableName,
113+
sortState: tab.sortState,
114+
columns: tab.resultColumns,
115+
limit: pagination.pageSize,
116+
offset: pagination.currentOffset
117+
)
118+
119+
tabManager.tabs[tabIndex].query = newQuery
120+
runQuery()
116121
}
117122
}

0 commit comments

Comments
 (0)