diff --git a/src/client/transaction.rs b/src/client/transaction.rs index 3d2d19c9..6f391ff2 100644 --- a/src/client/transaction.rs +++ b/src/client/transaction.rs @@ -128,6 +128,29 @@ use crate::utils::debug_messages::{log_client_to_server, log_server_to_client}; /// When the buffer reaches this size, it will be flushed to avoid excessive memory usage. const BUFFER_FLUSH_THRESHOLD: usize = 8192; +/// RAII guard for CLIENTS_IN_TRANSACTIONS counter. +/// Increments on creation, decrements on drop. +struct TransactionGuard; + +impl TransactionGuard { + fn new() -> Self { + CLIENTS_IN_TRANSACTIONS.fetch_add(1, Ordering::Relaxed); + Self + } +} + +impl Drop for TransactionGuard { + fn drop(&mut self) { + CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); + } +} + +/// Result of waiting for the next client message while monitoring server liveness. +enum NextClientMessage { + Message(BytesMut), + ServerDead, +} + /// Action to take after processing a message in the transaction loop enum TransactionAction { /// Continue processing messages in the transaction loop @@ -176,6 +199,26 @@ where Ok(()) } + /// Wait for the next client message while monitoring server connection liveness. + /// Uses `select!` to race client read against server readability, detecting dead + /// server connections (e.g., `pg_terminate_backend`) while client is idle in transaction. + async fn wait_for_next_message(&mut self, server: &Server) -> Result { + loop { + tokio::select! { + biased; + result = read_message(&mut self.read, self.max_memory_usage) => { + return result.map(NextClientMessage::Message); + } + _ = server.server_readable() => { + if server.check_server_alive() { + continue; + } + return Ok(NextClientMessage::ServerDead); + } + } + } + } + /// Handle cancel mode - when client wants to cancel a previously issued query. /// Opens a new separate connection to the server, sends the backend_id /// and secret_key and then closes it for security reasons. @@ -590,8 +633,9 @@ where server.claim(self.process_id, self.secret_key); self.connected_to_server = true; - // Signal that client is now in transaction (has server connection) - CLIENTS_IN_TRANSACTIONS.fetch_add(1, Ordering::Relaxed); + // RAII guard: increments CLIENTS_IN_TRANSACTIONS now, + // decrements automatically when this block exits (normal or early return). + let _tx_guard = TransactionGuard::new(); // Update statistics self.stats.active_idle(); @@ -660,11 +704,32 @@ where let message = match initial_message { None => { self.stats.active_read(); - match read_message(&mut self.read, self.max_memory_usage).await { - Ok(message) => message, + match self.wait_for_next_message(server).await { + Ok(NextClientMessage::Message(msg)) => msg, + Ok(NextClientMessage::ServerDead) => { + warn!( + "Server {} connection died while client {} idle in transaction", + server, self.addr + ); + server.mark_bad( + "server closed while client idle in transaction", + ); + let _ = error_response( + &mut self.write, + "server closed the connection unexpectedly while client was idle in transaction", + "08006", + ) + .await; + self.stats.disconnect(); + self.connected_to_server = false; + self.release(); + return Ok(()); + } Err(err) => { self.stats.disconnect(); + self.connected_to_server = false; server.checkin_cleanup().await?; + self.release(); return self.process_error(err).await; } } @@ -694,9 +759,9 @@ where // Terminate 'X' => { - // принудительно закрываем чтобы не допустить длинную транзакцию server.checkin_cleanup().await?; self.stats.disconnect(); + self.connected_to_server = false; self.release(); return Ok(()); } @@ -790,8 +855,7 @@ where self.client_last_messages_in_tx.clear(); } - // Signal that client finished transaction (released server connection) - CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); + // TransactionGuard dropped at end of block above, counter already decremented. self.connected_to_server = false; // If shutdown is in progress, send error to client and exit diff --git a/src/server/server_backend.rs b/src/server/server_backend.rs index 973dcd09..ac763398 100644 --- a/src/server/server_backend.rs +++ b/src/server/server_backend.rs @@ -226,6 +226,25 @@ impl Server { self.bad = true; } + /// Returns a future that completes when the server socket becomes readable. + /// Between queries in a transaction, BufStream is empty (everything was read + /// up to ReadyForQuery), so readable on the underlying socket correctly + /// reflects new data from the server (e.g., FATAL after idle_in_transaction_session_timeout). + pub async fn server_readable(&self) { + let _ = self.stream.get_ref().readable().await; + } + + /// Verify that server_readable() readiness is genuine, not spurious. + /// Returns true if the connection is alive (WouldBlock = no real data). + /// Returns false if the server sent data or closed the connection (dead). + pub fn check_server_alive(&self) -> bool { + let mut buf = [0u8; 1]; + matches!( + self.stream.get_ref().try_read(&mut buf), + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock + ) + } + /// Server & client are out of sync, we must discard this connection. /// This happens with clients that misbehave. pub fn is_bad(&self) -> bool { diff --git a/src/server/stream.rs b/src/server/stream.rs index 14ef89f0..5531e006 100644 --- a/src/server/stream.rs +++ b/src/server/stream.rs @@ -77,6 +77,25 @@ impl StreamInner { StreamInner::UnixSocket { stream } => stream.try_write(buf), } } + + /// Waits until the server socket becomes readable (data or EOF/error). + /// Cancel-safe: no data is consumed, only readiness notification. + pub async fn readable(&self) -> std::io::Result<()> { + match self { + StreamInner::TCPPlain { stream } => stream.readable().await, + StreamInner::UnixSocket { stream } => stream.readable().await, + } + } + + /// Non-blocking read attempt on the raw socket (bypasses BufStream). + /// Used to verify that `readable()` readiness is genuine, not spurious + /// from BufStream buffering. Returns WouldBlock if no data available. + pub fn try_read(&self, buf: &mut [u8]) -> std::io::Result { + match self { + StreamInner::TCPPlain { stream } => stream.try_read(buf), + StreamInner::UnixSocket { stream } => stream.try_read(buf), + } + } } pub(crate) async fn create_unix_stream_inner(host: &str, port: u16) -> Result { diff --git a/tests/bdd/features/stale-server-detection.feature b/tests/bdd/features/stale-server-detection.feature new file mode 100644 index 00000000..2bc24133 --- /dev/null +++ b/tests/bdd/features/stale-server-detection.feature @@ -0,0 +1,61 @@ +@rust @rust-4 @stale-server-detection +Feature: Detect stale server connections during client idle in transaction + When a client holds a server connection inside a transaction and the server + terminates (e.g., idle_in_transaction_session_timeout, pg_terminate_backend), + pg_doorman should detect this and release the pool slot. + + Background: + Given PostgreSQL started with pg_hba.conf: + """ + local all all trust + host all all 127.0.0.1/32 trust + """ + And fixtures from "tests/fixture.sql" applied + + @stale-server-pg-terminate-backend + Scenario: Detect server killed by pg_terminate_backend + # Client holds transaction, then backend is killed via pg_terminate_backend + # pg_doorman should detect via server_readable() and release the slot + Given pg_doorman started with config: + """ + [general] + host = "127.0.0.1" + port = ${DOORMAN_PORT} + admin_username = "admin" + admin_password = "admin" + pg_hba.content = "host all all 127.0.0.1/32 trust" + server_lifetime = 60000 + server_idle_check_timeout = 0 + + [pools.example_db] + server_host = "127.0.0.1" + server_port = ${PG_PORT} + + [[pools.example_db.users]] + username = "example_user_1" + password = "" + pool_size = 1 + + [[pools.example_db.users]] + username = "postgres" + password = "" + pool_size = 2 + """ + # Client opens transaction and gets backend_pid + When we create session "victim" to pg_doorman as "example_user_1" with password "" and database "example_db" + When we send SimpleQuery "SELECT pg_backend_pid()" to session "victim" and store backend_pid as "victim_pid" + When we send SimpleQuery "BEGIN" to session "victim" without waiting + Then we read SimpleQuery response from session "victim" within 2000ms + When we send SimpleQuery "SELECT 1" to session "victim" without waiting + Then we read SimpleQuery response from session "victim" within 2000ms + # Kill the backend through a separate superuser connection + When we create session "killer" to pg_doorman as "postgres" with password "" and database "example_db" + When we terminate backend "victim_pid" from session "victim" via session "killer" + # Wait for pg_doorman to detect the dead server + When we sleep for 500 milliseconds + # Pool slot should be released — new client should succeed + When we create session "client2" to pg_doorman as "example_user_1" with password "" and database "example_db" + When we send SimpleQuery "SELECT 2" to session "client2" without waiting + Then we read SimpleQuery response from session "client2" within 5000ms + Then session "client2" should receive DataRow with "2" +