From 1f4158c7c9b7f6be57d3fdf1cddb87c6d330d0a6 Mon Sep 17 00:00:00 2001 From: dmitrivasilyev Date: Tue, 3 Mar 2026 14:59:30 +0300 Subject: [PATCH 1/6] Detect stale server connections during client idle in transaction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add server socket monitoring and client idle timeout to prevent pool slot exhaustion when clients abandon transactions or servers terminate backends. Three protection mechanisms: 1. Server socket monitoring via tokio::select! in transaction loop: when PostgreSQL kills a backend (idle_in_transaction_session_timeout, pg_terminate_backend), pg_doorman detects it immediately through server_readable() and releases the pool slot. 2. New config option client_idle_in_transaction_timeout (default: 0/disabled): if a client holds a server connection in a transaction without sending data for longer than this timeout, pg_doorman closes the connection and frees the slot. Uses 3-branch select! only when timeout > 0, 2-branch select! otherwise (no timer wheel overhead in default config). 3. Fix CLIENTS_IN_TRANSACTIONS counter leak on early returns from transaction loop (Terminate handler, client read errors, new select branches) — all paths now properly decrement the counter. Performance: select! runs only when client is idle between queries in a transaction (already waiting on I/O), so overhead is negligible. biased select ensures client data is always checked first. --- pg_doorman.toml | 8 + pg_doorman.yaml | 9 ++ src/app/generate/annotated.rs | 12 ++ src/app/generate/docs.rs | 1 + src/app/generate/fields.yaml | 29 ++++ src/client/transaction.rs | 116 +++++++++++++- src/config/general.rs | 13 ++ src/server/server_backend.rs | 8 + src/server/stream.rs | 9 ++ .../features/stale-server-detection.feature | 150 ++++++++++++++++++ 10 files changed, 349 insertions(+), 6 deletions(-) create mode 100644 tests/bdd/features/stale-server-detection.feature diff --git a/pg_doorman.toml b/pg_doorman.toml index f1092289..ce64407d 100644 --- a/pg_doorman.toml +++ b/pg_doorman.toml @@ -69,6 +69,14 @@ retain_connections_max = 3 # Default: "60s" server_idle_check_timeout = 60000 + +# Maximum time a client can remain idle while holding a server connection +# inside a transaction. If exceeded, pg_doorman closes the client connection +# and releases the server slot. +# 0 means disabled (no timeout). +# Default: "0" +client_idle_in_transaction_timeout = 0 + # Graceful shutdown timeout. # Default: 10000 (10000 ms) shutdown_timeout = 10000 diff --git a/pg_doorman.yaml b/pg_doorman.yaml index 7538f527..0feaf3eb 100644 --- a/pg_doorman.yaml +++ b/pg_doorman.yaml @@ -103,6 +103,15 @@ general: # Default: "60s" server_idle_check_timeout: "60s" + + # Maximum time a client can remain idle while holding a server connection + # inside a transaction. If exceeded, pg_doorman closes the client connection + # and releases the server slot. + # 0 means disabled (no timeout). + # Supports human-readable format: "0", "0ms", or 0 (milliseconds) + # Default: "0" + client_idle_in_transaction_timeout: "0" + # Graceful shutdown timeout. # Supports human-readable format: "10s", "10000ms", or 10000 (milliseconds) # Default: "10s" (10000 ms) diff --git a/src/app/generate/annotated.rs b/src/app/generate/annotated.rs index dc8f1785..443246e0 100644 --- a/src/app/generate/annotated.rs +++ b/src/app/generate/annotated.rs @@ -525,6 +525,18 @@ fn write_general_section(w: &mut ConfigWriter, config: &Config) { "", ); + w.blank(); + + write_field_desc(w, fi, "general", "client_idle_in_transaction_timeout"); + write_duration_value( + w, + fi, + "client_idle_in_transaction_timeout", + g.client_idle_in_transaction_timeout.as_millis(), + "0", + "", + ); + write_field_desc(w, fi, "general", "shutdown_timeout"); write_duration_value( w, diff --git a/src/app/generate/docs.rs b/src/app/generate/docs.rs index 7b661f43..cca4d9c7 100644 --- a/src/app/generate/docs.rs +++ b/src/app/generate/docs.rs @@ -228,6 +228,7 @@ fn write_general_fields(out: &mut String, f: &FieldsData) { "retain_connections_time", "retain_connections_max", "server_idle_check_timeout", + "client_idle_in_transaction_timeout", "server_round_robin", "sync_server_parameters", "tcp_so_linger", diff --git a/src/app/generate/fields.yaml b/src/app/generate/fields.yaml index 9a42c628..541cf483 100644 --- a/src/app/generate/fields.yaml +++ b/src/app/generate/fields.yaml @@ -364,6 +364,35 @@ fields: or PostgreSQL restarts). default: "60s (60 seconds)" + client_idle_in_transaction_timeout: + config: + en: | + Maximum time a client can remain idle while holding a server connection + inside a transaction. If exceeded, pg_doorman closes the client connection + and releases the server slot. + 0 means disabled (no timeout). + ru: | + Максимальное время простоя клиента, удерживающего серверное соединение + внутри транзакции. При превышении pg_doorman закрывает клиентское соединение + и освобождает серверный слот. + 0 — таймаут отключён. + doc: | + Maximum time a client can remain idle while holding a server connection inside a transaction. + When a client opens a transaction (BEGIN) and then stops sending queries, the server connection + is held exclusively and cannot be used by other clients. This timeout prevents indefinite + resource exhaustion caused by abandoned or stalled client connections. + + When the timeout fires, pg_doorman sends an ErrorResponse (SQLSTATE 57014) to the client, + marks the server connection as bad, and releases the pool slot for reuse. + + Additionally, pg_doorman monitors the server socket for unexpected events (e.g., PostgreSQL + terminating the backend via `idle_in_transaction_session_timeout` or `pg_terminate_backend`). + If the server closes the connection, pg_doorman immediately detects it and releases the slot, + regardless of this timeout setting. + + Set to `0` to disable the timeout (default, backward compatible). + default: "0 (disabled)" + shutdown_timeout: config: en: "Graceful shutdown timeout." diff --git a/src/client/transaction.rs b/src/client/transaction.rs index 3d2d19c9..47f42c16 100644 --- a/src/client/transaction.rs +++ b/src/client/transaction.rs @@ -1,6 +1,7 @@ use bytes::{BufMut, BytesMut}; use log::{debug, error, warn}; use std::ops::DerefMut; +use std::pin::pin; use std::sync::atomic::Ordering; use std::time::Duration; @@ -11,6 +12,7 @@ use crate::app::server::{CLIENTS_IN_TRANSACTIONS, SHUTDOWN_IN_PROGRESS}; use crate::client::batch_handling::PARSE_COMPLETE_MSG; use crate::client::core::{BatchOperation, Client, PreparedStatementKey}; use crate::client::util::{is_standalone_begin, QUERY_DEALLOCATE}; +use crate::config::get_config; use crate::errors::Error; use crate::messages::{ check_query_response, deallocate_response, error_response, error_response_terminal, @@ -650,6 +652,12 @@ where let mut initial_message = Some(message); + // Read client_idle_in_transaction_timeout from config (0 = disabled) + let client_idle_in_transaction_timeout_ms = get_config() + .general + .client_idle_in_transaction_timeout + .as_millis(); + // Transaction loop. Multiple queries can be issued by the client here. // The connection belongs to the client until the transaction is over, // or until the client disconnects if we are in session mode. @@ -660,12 +668,106 @@ where let message = match initial_message { None => { self.stats.active_read(); - match read_message(&mut self.read, self.max_memory_usage).await { - Ok(message) => message, - Err(err) => { - self.stats.disconnect(); - server.checkin_cleanup().await?; - return self.process_error(err).await; + + if client_idle_in_transaction_timeout_ms > 0 { + // Three-branch select: client read + server monitor + idle timeout + let idle_timeout_fut = pin!(tokio::time::sleep( + Duration::from_millis(client_idle_in_transaction_timeout_ms,) + )); + + tokio::select! { + biased; + + result = read_message(&mut self.read, self.max_memory_usage) => { + match result { + Ok(message) => message, + Err(err) => { + self.stats.disconnect(); + CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); + self.connected_to_server = false; + server.checkin_cleanup().await?; + self.release(); + return self.process_error(err).await; + } + } + } + + _ = server.server_readable() => { + warn!( + "Server {} connection event while waiting for client {} — \ + server likely terminated (idle_in_transaction_session_timeout, \ + pg_terminate_backend)", + server, self.addr + ); + server.mark_bad("server closed while client idle in transaction"); + let _ = error_response( + &mut self.write, + "server closed the connection, possibly due to idle_in_transaction_session_timeout", + "08006", + ).await; + self.stats.disconnect(); + CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); + self.connected_to_server = false; + self.release(); + return Ok(()); + } + + _ = idle_timeout_fut => { + warn!( + "Client {} idle in transaction timeout ({}ms) exceeded", + self.addr, client_idle_in_transaction_timeout_ms + ); + server.mark_bad("client idle in transaction timeout"); + let _ = error_response( + &mut self.write, + "client idle in transaction timeout exceeded", + "57014", + ).await; + self.stats.disconnect(); + CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); + self.connected_to_server = false; + self.release(); + return Ok(()); + } + } + } else { + // Two-branch select: client read + server monitor (no idle timeout) + tokio::select! { + biased; + + result = read_message(&mut self.read, self.max_memory_usage) => { + match result { + Ok(message) => message, + Err(err) => { + self.stats.disconnect(); + CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); + self.connected_to_server = false; + server.checkin_cleanup().await?; + self.release(); + return self.process_error(err).await; + } + } + } + + _ = server.server_readable() => { + warn!( + "Server {} connection event while waiting for client {} — \ + server likely terminated (idle_in_transaction_session_timeout, \ + pg_terminate_backend)", + server, self.addr + ); + server.mark_bad("server closed while client idle in transaction"); + let _ = error_response( + &mut self.write, + "server closed the connection, possibly due to idle_in_transaction_session_timeout", + "08006", + ).await; + self.stats.disconnect(); + CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); + self.connected_to_server = false; + self.release(); + return Ok(()); + } } } } @@ -697,6 +799,8 @@ where // принудительно закрываем чтобы не допустить длинную транзакцию server.checkin_cleanup().await?; self.stats.disconnect(); + CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); + self.connected_to_server = false; self.release(); return Ok(()); } diff --git a/src/config/general.rs b/src/config/general.rs index 856c28df..0474ecc9 100644 --- a/src/config/general.rs +++ b/src/config/general.rs @@ -188,6 +188,13 @@ pub struct General { )] pub hba: Vec, + /// Client idle in transaction timeout (in milliseconds). + /// If a client holds a server connection inside a transaction and sends no data + /// for longer than this timeout, pg_doorman closes the connection and releases + /// the server slot. 0 means disabled (default, backward compatible). + #[serde(default = "General::default_client_idle_in_transaction_timeout")] + pub client_idle_in_transaction_timeout: Duration, + // New pg_hba rules: either inline content or a file path (see `PgHba` deserialization). #[serde(default, skip_serializing)] pub pg_hba: Option, @@ -337,6 +344,11 @@ impl General { 0 } + /// Default: 0 (disabled) + pub fn default_client_idle_in_transaction_timeout() -> Duration { + Duration::from_millis(0) + } + pub fn default_daemon_pid_file() -> String { "/tmp/pg_doorman.pid".to_string() } @@ -437,6 +449,7 @@ impl Default for General { prepared_statements_cache_size: Self::default_prepared_statements_cache_size(), client_prepared_statements_cache_size: Self::default_client_prepared_statements_cache_size(), + client_idle_in_transaction_timeout: Self::default_client_idle_in_transaction_timeout(), hba: Self::default_hba(), pg_hba: None, daemon_pid_file: Self::default_daemon_pid_file(), diff --git a/src/server/server_backend.rs b/src/server/server_backend.rs index 973dcd09..e0b4f746 100644 --- a/src/server/server_backend.rs +++ b/src/server/server_backend.rs @@ -226,6 +226,14 @@ impl Server { self.bad = true; } + /// Returns a future that completes when the server socket becomes readable. + /// Between queries in a transaction, BufStream is empty (everything was read + /// up to ReadyForQuery), so readable on the underlying socket correctly + /// reflects new data from the server (e.g., FATAL after idle_in_transaction_session_timeout). + pub async fn server_readable(&self) -> std::io::Result<()> { + self.stream.get_ref().readable().await + } + /// Server & client are out of sync, we must discard this connection. /// This happens with clients that misbehave. pub fn is_bad(&self) -> bool { diff --git a/src/server/stream.rs b/src/server/stream.rs index 14ef89f0..a073ffd1 100644 --- a/src/server/stream.rs +++ b/src/server/stream.rs @@ -77,6 +77,15 @@ impl StreamInner { StreamInner::UnixSocket { stream } => stream.try_write(buf), } } + + /// Waits until the server socket becomes readable (data or EOF/error). + /// Cancel-safe: no data is consumed, only readiness notification. + pub async fn readable(&self) -> std::io::Result<()> { + match self { + StreamInner::TCPPlain { stream } => stream.readable().await, + StreamInner::UnixSocket { stream } => stream.readable().await, + } + } } pub(crate) async fn create_unix_stream_inner(host: &str, port: u16) -> Result { diff --git a/tests/bdd/features/stale-server-detection.feature b/tests/bdd/features/stale-server-detection.feature new file mode 100644 index 00000000..1c0bad14 --- /dev/null +++ b/tests/bdd/features/stale-server-detection.feature @@ -0,0 +1,150 @@ +@rust @rust-4 @stale-server-detection +Feature: Detect stale server connections during client idle in transaction + When a client holds a server connection inside a transaction and the server + terminates (e.g., idle_in_transaction_session_timeout, pg_terminate_backend), + pg_doorman should detect this and release the pool slot. + + Background: + Given PostgreSQL started with pg_hba.conf: + """ + local all all trust + host all all 127.0.0.1/32 trust + """ + And fixtures from "tests/fixture.sql" applied + + @stale-server-idle-in-transaction-timeout + Scenario: Detect server killed by idle_in_transaction_session_timeout + # PostgreSQL idle_in_transaction_session_timeout=2s kills backends + # pg_doorman should detect the dead server via server_readable() and release the slot + Given pg_doorman started with config: + """ + [general] + host = "127.0.0.1" + port = ${DOORMAN_PORT} + admin_username = "admin" + admin_password = "admin" + pg_hba.content = "host all all 127.0.0.1/32 trust" + server_lifetime = 60000 + server_idle_check_timeout = 0 + + [pools.example_db] + server_host = "127.0.0.1" + server_port = ${PG_PORT} + + [[pools.example_db.users]] + username = "example_user_1" + password = "" + pool_size = 1 + + [[pools.example_db.users]] + username = "postgres" + password = "" + pool_size = 2 + """ + # Set idle_in_transaction_session_timeout on the database + When we create session "setup" to postgres as "postgres" with password "" and database "example_db" + And we send SimpleQuery "ALTER DATABASE example_db SET idle_in_transaction_session_timeout = '2s'" to session "setup" without waiting + And we sleep for 200 milliseconds + # Client opens transaction through pg_doorman + When we create session "client1" to pg_doorman as "example_user_1" with password "" and database "example_db" + When we send SimpleQuery "BEGIN" to session "client1" without waiting + Then we read SimpleQuery response from session "client1" within 2000ms + When we send SimpleQuery "SELECT 1" to session "client1" without waiting + Then we read SimpleQuery response from session "client1" within 2000ms + # Wait for PostgreSQL to kill the backend (idle_in_transaction_session_timeout=2s) + When we sleep for 3000 milliseconds + # pg_doorman should have detected the dead server and released the slot + # New client should be able to connect successfully + When we create session "client2" to pg_doorman as "example_user_1" with password "" and database "example_db" + When we send SimpleQuery "SELECT 2" to session "client2" without waiting + Then we read SimpleQuery response from session "client2" within 5000ms + Then session "client2" should receive DataRow with "2" + # Reset the setting + When we send SimpleQuery "ALTER DATABASE example_db RESET idle_in_transaction_session_timeout" to session "setup" without waiting + + @stale-server-pg-terminate-backend + Scenario: Detect server killed by pg_terminate_backend + # Client holds transaction, then backend is killed via pg_terminate_backend + # pg_doorman should detect via server_readable() and release the slot + Given pg_doorman started with config: + """ + [general] + host = "127.0.0.1" + port = ${DOORMAN_PORT} + admin_username = "admin" + admin_password = "admin" + pg_hba.content = "host all all 127.0.0.1/32 trust" + server_lifetime = 60000 + server_idle_check_timeout = 0 + + [pools.example_db] + server_host = "127.0.0.1" + server_port = ${PG_PORT} + + [[pools.example_db.users]] + username = "example_user_1" + password = "" + pool_size = 1 + + [[pools.example_db.users]] + username = "postgres" + password = "" + pool_size = 2 + """ + # Client opens transaction and gets backend_pid + When we create session "victim" to pg_doorman as "example_user_1" with password "" and database "example_db" + When we send SimpleQuery "SELECT pg_backend_pid()" to session "victim" and store backend_pid as "victim_pid" + When we send SimpleQuery "BEGIN" to session "victim" without waiting + Then we read SimpleQuery response from session "victim" within 2000ms + When we send SimpleQuery "SELECT 1" to session "victim" without waiting + Then we read SimpleQuery response from session "victim" within 2000ms + # Kill the backend through a separate superuser connection + When we create session "killer" to pg_doorman as "postgres" with password "" and database "example_db" + When we terminate backend "victim_pid" from session "victim" via session "killer" + # Wait for pg_doorman to detect the dead server + When we sleep for 500 milliseconds + # Pool slot should be released — new client should succeed + When we create session "client2" to pg_doorman as "example_user_1" with password "" and database "example_db" + When we send SimpleQuery "SELECT 2" to session "client2" without waiting + Then we read SimpleQuery response from session "client2" within 5000ms + Then session "client2" should receive DataRow with "2" + + @stale-server-client-idle-timeout + Scenario: Client idle in transaction timeout releases pool slot + # pg_doorman with client_idle_in_transaction_timeout=2000ms + # Client opens transaction, then goes silent + # pg_doorman should close by timeout and release the slot + Given pg_doorman started with config: + """ + [general] + host = "127.0.0.1" + port = ${DOORMAN_PORT} + admin_username = "admin" + admin_password = "admin" + pg_hba.content = "host all all 127.0.0.1/32 trust" + server_lifetime = 60000 + server_idle_check_timeout = 0 + client_idle_in_transaction_timeout = 2000 + + [pools.example_db] + server_host = "127.0.0.1" + server_port = ${PG_PORT} + + [[pools.example_db.users]] + username = "example_user_1" + password = "" + pool_size = 1 + """ + # Client opens transaction through pg_doorman + When we create session "slow_client" to pg_doorman as "example_user_1" with password "" and database "example_db" + When we send SimpleQuery "BEGIN" to session "slow_client" without waiting + Then we read SimpleQuery response from session "slow_client" within 2000ms + When we send SimpleQuery "SELECT 1" to session "slow_client" without waiting + Then we read SimpleQuery response from session "slow_client" within 2000ms + # Wait for client_idle_in_transaction_timeout to fire (2s + margin) + When we sleep for 3000 milliseconds + # Pool slot should be released — new client should succeed + When we create session "new_client" to pg_doorman as "example_user_1" with password "" and database "example_db" + When we send SimpleQuery "SELECT 2" to session "new_client" without waiting + Then we read SimpleQuery response from session "new_client" within 5000ms + Then session "new_client" should receive DataRow with "2" From 5cdbaa79ae394251030a66eddddff738c07c3973 Mon Sep 17 00:00:00 2001 From: dmitrivasilyev Date: Tue, 3 Mar 2026 15:30:38 +0300 Subject: [PATCH 2/6] Fix spurious server_readable() triggering after BufStream operations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After execute_server_roundtrip, BufStream's BufReader may have drained all protocol data without reading the underlying socket until WouldBlock. This leaves a stale readiness flag on the raw socket, causing server_readable() to fire immediately in the select! — falsely detecting a "dead server" and resetting the client connection. Fix: when server_readable() fires, verify with try_read() on the raw socket. If WouldBlock — spurious readiness, continue the loop. If EOF/data/error — genuine server event, handle as before. Add StreamInner::try_read() for non-blocking socket verification. --- src/client/transaction.rs | 91 ++++++++++++++++++++++++--------------- src/server/stream.rs | 10 +++++ 2 files changed, 67 insertions(+), 34 deletions(-) diff --git a/src/client/transaction.rs b/src/client/transaction.rs index 47f42c16..f968bb23 100644 --- a/src/client/transaction.rs +++ b/src/client/transaction.rs @@ -693,23 +693,37 @@ where } _ = server.server_readable() => { - warn!( - "Server {} connection event while waiting for client {} — \ - server likely terminated (idle_in_transaction_session_timeout, \ - pg_terminate_backend)", - server, self.addr - ); - server.mark_bad("server closed while client idle in transaction"); - let _ = error_response( - &mut self.write, - "server closed the connection, possibly due to idle_in_transaction_session_timeout", - "08006", - ).await; - self.stats.disconnect(); - CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); - self.connected_to_server = false; - self.release(); - return Ok(()); + // Verify readiness is genuine, not spurious from BufStream buffering. + // After execute_server_roundtrip, BufReader may have drained the protocol + // data without reading the underlying socket until WouldBlock, leaving + // a stale readiness flag. + let mut verify_buf = [0u8; 1]; + match server.stream.get_ref().try_read(&mut verify_buf) { + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // Spurious readiness — no actual data, continue waiting + continue; + } + _ => { + // Genuine: EOF (Ok(0)), data (Ok(n)), or socket error + warn!( + "Server {} connection event while waiting for client {} — \ + server likely terminated (idle_in_transaction_session_timeout, \ + pg_terminate_backend)", + server, self.addr + ); + server.mark_bad("server closed while client idle in transaction"); + let _ = error_response( + &mut self.write, + "server closed the connection, possibly due to idle_in_transaction_session_timeout", + "08006", + ).await; + self.stats.disconnect(); + CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); + self.connected_to_server = false; + self.release(); + return Ok(()); + } + } } _ = idle_timeout_fut => { @@ -750,23 +764,32 @@ where } _ = server.server_readable() => { - warn!( - "Server {} connection event while waiting for client {} — \ - server likely terminated (idle_in_transaction_session_timeout, \ - pg_terminate_backend)", - server, self.addr - ); - server.mark_bad("server closed while client idle in transaction"); - let _ = error_response( - &mut self.write, - "server closed the connection, possibly due to idle_in_transaction_session_timeout", - "08006", - ).await; - self.stats.disconnect(); - CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); - self.connected_to_server = false; - self.release(); - return Ok(()); + // Verify readiness is genuine (see 3-branch select comment above) + let mut verify_buf = [0u8; 1]; + match server.stream.get_ref().try_read(&mut verify_buf) { + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + continue; + } + _ => { + warn!( + "Server {} connection event while waiting for client {} — \ + server likely terminated (idle_in_transaction_session_timeout, \ + pg_terminate_backend)", + server, self.addr + ); + server.mark_bad("server closed while client idle in transaction"); + let _ = error_response( + &mut self.write, + "server closed the connection, possibly due to idle_in_transaction_session_timeout", + "08006", + ).await; + self.stats.disconnect(); + CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); + self.connected_to_server = false; + self.release(); + return Ok(()); + } + } } } } diff --git a/src/server/stream.rs b/src/server/stream.rs index a073ffd1..5531e006 100644 --- a/src/server/stream.rs +++ b/src/server/stream.rs @@ -86,6 +86,16 @@ impl StreamInner { StreamInner::UnixSocket { stream } => stream.readable().await, } } + + /// Non-blocking read attempt on the raw socket (bypasses BufStream). + /// Used to verify that `readable()` readiness is genuine, not spurious + /// from BufStream buffering. Returns WouldBlock if no data available. + pub fn try_read(&self, buf: &mut [u8]) -> std::io::Result { + match self { + StreamInner::TCPPlain { stream } => stream.try_read(buf), + StreamInner::UnixSocket { stream } => stream.try_read(buf), + } + } } pub(crate) async fn create_unix_stream_inner(host: &str, port: u16) -> Result { From 113af6dde4cc3fc36fec66973ec3fc61493a9bf5 Mon Sep 17 00:00:00 2001 From: dmitrivasilyev Date: Tue, 3 Mar 2026 16:11:18 +0300 Subject: [PATCH 3/6] Remove client_idle_in_transaction_timeout to simplify the PR Keep only server socket monitoring via tokio::select! + server_readable(). The idle timeout feature can be added separately if needed. --- pg_doorman.toml | 8 - pg_doorman.yaml | 9 - src/app/generate/annotated.rs | 12 -- src/app/generate/docs.rs | 1 - src/app/generate/fields.yaml | 29 ---- src/client/transaction.rs | 162 ++++-------------- src/config/general.rs | 13 -- .../features/stale-server-detection.feature | 39 ----- 8 files changed, 38 insertions(+), 235 deletions(-) diff --git a/pg_doorman.toml b/pg_doorman.toml index ce64407d..f1092289 100644 --- a/pg_doorman.toml +++ b/pg_doorman.toml @@ -69,14 +69,6 @@ retain_connections_max = 3 # Default: "60s" server_idle_check_timeout = 60000 - -# Maximum time a client can remain idle while holding a server connection -# inside a transaction. If exceeded, pg_doorman closes the client connection -# and releases the server slot. -# 0 means disabled (no timeout). -# Default: "0" -client_idle_in_transaction_timeout = 0 - # Graceful shutdown timeout. # Default: 10000 (10000 ms) shutdown_timeout = 10000 diff --git a/pg_doorman.yaml b/pg_doorman.yaml index 0feaf3eb..7538f527 100644 --- a/pg_doorman.yaml +++ b/pg_doorman.yaml @@ -103,15 +103,6 @@ general: # Default: "60s" server_idle_check_timeout: "60s" - - # Maximum time a client can remain idle while holding a server connection - # inside a transaction. If exceeded, pg_doorman closes the client connection - # and releases the server slot. - # 0 means disabled (no timeout). - # Supports human-readable format: "0", "0ms", or 0 (milliseconds) - # Default: "0" - client_idle_in_transaction_timeout: "0" - # Graceful shutdown timeout. # Supports human-readable format: "10s", "10000ms", or 10000 (milliseconds) # Default: "10s" (10000 ms) diff --git a/src/app/generate/annotated.rs b/src/app/generate/annotated.rs index 443246e0..dc8f1785 100644 --- a/src/app/generate/annotated.rs +++ b/src/app/generate/annotated.rs @@ -525,18 +525,6 @@ fn write_general_section(w: &mut ConfigWriter, config: &Config) { "", ); - w.blank(); - - write_field_desc(w, fi, "general", "client_idle_in_transaction_timeout"); - write_duration_value( - w, - fi, - "client_idle_in_transaction_timeout", - g.client_idle_in_transaction_timeout.as_millis(), - "0", - "", - ); - write_field_desc(w, fi, "general", "shutdown_timeout"); write_duration_value( w, diff --git a/src/app/generate/docs.rs b/src/app/generate/docs.rs index cca4d9c7..7b661f43 100644 --- a/src/app/generate/docs.rs +++ b/src/app/generate/docs.rs @@ -228,7 +228,6 @@ fn write_general_fields(out: &mut String, f: &FieldsData) { "retain_connections_time", "retain_connections_max", "server_idle_check_timeout", - "client_idle_in_transaction_timeout", "server_round_robin", "sync_server_parameters", "tcp_so_linger", diff --git a/src/app/generate/fields.yaml b/src/app/generate/fields.yaml index 541cf483..9a42c628 100644 --- a/src/app/generate/fields.yaml +++ b/src/app/generate/fields.yaml @@ -364,35 +364,6 @@ fields: or PostgreSQL restarts). default: "60s (60 seconds)" - client_idle_in_transaction_timeout: - config: - en: | - Maximum time a client can remain idle while holding a server connection - inside a transaction. If exceeded, pg_doorman closes the client connection - and releases the server slot. - 0 means disabled (no timeout). - ru: | - Максимальное время простоя клиента, удерживающего серверное соединение - внутри транзакции. При превышении pg_doorman закрывает клиентское соединение - и освобождает серверный слот. - 0 — таймаут отключён. - doc: | - Maximum time a client can remain idle while holding a server connection inside a transaction. - When a client opens a transaction (BEGIN) and then stops sending queries, the server connection - is held exclusively and cannot be used by other clients. This timeout prevents indefinite - resource exhaustion caused by abandoned or stalled client connections. - - When the timeout fires, pg_doorman sends an ErrorResponse (SQLSTATE 57014) to the client, - marks the server connection as bad, and releases the pool slot for reuse. - - Additionally, pg_doorman monitors the server socket for unexpected events (e.g., PostgreSQL - terminating the backend via `idle_in_transaction_session_timeout` or `pg_terminate_backend`). - If the server closes the connection, pg_doorman immediately detects it and releases the slot, - regardless of this timeout setting. - - Set to `0` to disable the timeout (default, backward compatible). - default: "0 (disabled)" - shutdown_timeout: config: en: "Graceful shutdown timeout." diff --git a/src/client/transaction.rs b/src/client/transaction.rs index f968bb23..9541930b 100644 --- a/src/client/transaction.rs +++ b/src/client/transaction.rs @@ -1,7 +1,6 @@ use bytes::{BufMut, BytesMut}; use log::{debug, error, warn}; use std::ops::DerefMut; -use std::pin::pin; use std::sync::atomic::Ordering; use std::time::Duration; @@ -12,7 +11,6 @@ use crate::app::server::{CLIENTS_IN_TRANSACTIONS, SHUTDOWN_IN_PROGRESS}; use crate::client::batch_handling::PARSE_COMPLETE_MSG; use crate::client::core::{BatchOperation, Client, PreparedStatementKey}; use crate::client::util::{is_standalone_begin, QUERY_DEALLOCATE}; -use crate::config::get_config; use crate::errors::Error; use crate::messages::{ check_query_response, deallocate_response, error_response, error_response_terminal, @@ -652,12 +650,6 @@ where let mut initial_message = Some(message); - // Read client_idle_in_transaction_timeout from config (0 = disabled) - let client_idle_in_transaction_timeout_ms = get_config() - .general - .client_idle_in_transaction_timeout - .as_millis(); - // Transaction loop. Multiple queries can be issued by the client here. // The connection belongs to the client until the transaction is over, // or until the client disconnects if we are in session mode. @@ -669,126 +661,48 @@ where None => { self.stats.active_read(); - if client_idle_in_transaction_timeout_ms > 0 { - // Three-branch select: client read + server monitor + idle timeout - let idle_timeout_fut = pin!(tokio::time::sleep( - Duration::from_millis(client_idle_in_transaction_timeout_ms,) - )); - - tokio::select! { - biased; - - result = read_message(&mut self.read, self.max_memory_usage) => { - match result { - Ok(message) => message, - Err(err) => { - self.stats.disconnect(); - CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); - self.connected_to_server = false; - server.checkin_cleanup().await?; - self.release(); - return self.process_error(err).await; - } + tokio::select! { + biased; + + result = read_message(&mut self.read, self.max_memory_usage) => { + match result { + Ok(message) => message, + Err(err) => { + self.stats.disconnect(); + CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); + self.connected_to_server = false; + server.checkin_cleanup().await?; + self.release(); + return self.process_error(err).await; } } - - _ = server.server_readable() => { - // Verify readiness is genuine, not spurious from BufStream buffering. - // After execute_server_roundtrip, BufReader may have drained the protocol - // data without reading the underlying socket until WouldBlock, leaving - // a stale readiness flag. - let mut verify_buf = [0u8; 1]; - match server.stream.get_ref().try_read(&mut verify_buf) { - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - // Spurious readiness — no actual data, continue waiting - continue; - } - _ => { - // Genuine: EOF (Ok(0)), data (Ok(n)), or socket error - warn!( - "Server {} connection event while waiting for client {} — \ - server likely terminated (idle_in_transaction_session_timeout, \ - pg_terminate_backend)", - server, self.addr - ); - server.mark_bad("server closed while client idle in transaction"); - let _ = error_response( - &mut self.write, - "server closed the connection, possibly due to idle_in_transaction_session_timeout", - "08006", - ).await; - self.stats.disconnect(); - CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); - self.connected_to_server = false; - self.release(); - return Ok(()); - } - } - } - - _ = idle_timeout_fut => { - warn!( - "Client {} idle in transaction timeout ({}ms) exceeded", - self.addr, client_idle_in_transaction_timeout_ms - ); - server.mark_bad("client idle in transaction timeout"); - let _ = error_response( - &mut self.write, - "client idle in transaction timeout exceeded", - "57014", - ).await; - self.stats.disconnect(); - CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); - self.connected_to_server = false; - self.release(); - return Ok(()); - } } - } else { - // Two-branch select: client read + server monitor (no idle timeout) - tokio::select! { - biased; - - result = read_message(&mut self.read, self.max_memory_usage) => { - match result { - Ok(message) => message, - Err(err) => { - self.stats.disconnect(); - CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); - self.connected_to_server = false; - server.checkin_cleanup().await?; - self.release(); - return self.process_error(err).await; - } - } - } - _ = server.server_readable() => { - // Verify readiness is genuine (see 3-branch select comment above) - let mut verify_buf = [0u8; 1]; - match server.stream.get_ref().try_read(&mut verify_buf) { - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - continue; - } - _ => { - warn!( - "Server {} connection event while waiting for client {} — \ - server likely terminated (idle_in_transaction_session_timeout, \ - pg_terminate_backend)", - server, self.addr - ); - server.mark_bad("server closed while client idle in transaction"); - let _ = error_response( - &mut self.write, - "server closed the connection, possibly due to idle_in_transaction_session_timeout", - "08006", - ).await; - self.stats.disconnect(); - CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); - self.connected_to_server = false; - self.release(); - return Ok(()); - } + _ = server.server_readable() => { + // Verify readiness is genuine (see 3-branch select comment above) + let mut verify_buf = [0u8; 1]; + match server.stream.get_ref().try_read(&mut verify_buf) { + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + continue; + } + _ => { + warn!( + "Server {} connection event while waiting for client {} — \ + server likely terminated (idle_in_transaction_session_timeout, \ + pg_terminate_backend)", + server, self.addr + ); + server.mark_bad("server closed while client idle in transaction"); + let _ = error_response( + &mut self.write, + "server closed the connection, possibly due to idle_in_transaction_session_timeout", + "08006", + ).await; + self.stats.disconnect(); + CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); + self.connected_to_server = false; + self.release(); + return Ok(()); } } } diff --git a/src/config/general.rs b/src/config/general.rs index 0474ecc9..856c28df 100644 --- a/src/config/general.rs +++ b/src/config/general.rs @@ -188,13 +188,6 @@ pub struct General { )] pub hba: Vec, - /// Client idle in transaction timeout (in milliseconds). - /// If a client holds a server connection inside a transaction and sends no data - /// for longer than this timeout, pg_doorman closes the connection and releases - /// the server slot. 0 means disabled (default, backward compatible). - #[serde(default = "General::default_client_idle_in_transaction_timeout")] - pub client_idle_in_transaction_timeout: Duration, - // New pg_hba rules: either inline content or a file path (see `PgHba` deserialization). #[serde(default, skip_serializing)] pub pg_hba: Option, @@ -344,11 +337,6 @@ impl General { 0 } - /// Default: 0 (disabled) - pub fn default_client_idle_in_transaction_timeout() -> Duration { - Duration::from_millis(0) - } - pub fn default_daemon_pid_file() -> String { "/tmp/pg_doorman.pid".to_string() } @@ -449,7 +437,6 @@ impl Default for General { prepared_statements_cache_size: Self::default_prepared_statements_cache_size(), client_prepared_statements_cache_size: Self::default_client_prepared_statements_cache_size(), - client_idle_in_transaction_timeout: Self::default_client_idle_in_transaction_timeout(), hba: Self::default_hba(), pg_hba: None, daemon_pid_file: Self::default_daemon_pid_file(), diff --git a/tests/bdd/features/stale-server-detection.feature b/tests/bdd/features/stale-server-detection.feature index 1c0bad14..ef8d49e8 100644 --- a/tests/bdd/features/stale-server-detection.feature +++ b/tests/bdd/features/stale-server-detection.feature @@ -109,42 +109,3 @@ Feature: Detect stale server connections during client idle in transaction Then we read SimpleQuery response from session "client2" within 5000ms Then session "client2" should receive DataRow with "2" - @stale-server-client-idle-timeout - Scenario: Client idle in transaction timeout releases pool slot - # pg_doorman with client_idle_in_transaction_timeout=2000ms - # Client opens transaction, then goes silent - # pg_doorman should close by timeout and release the slot - Given pg_doorman started with config: - """ - [general] - host = "127.0.0.1" - port = ${DOORMAN_PORT} - admin_username = "admin" - admin_password = "admin" - pg_hba.content = "host all all 127.0.0.1/32 trust" - server_lifetime = 60000 - server_idle_check_timeout = 0 - client_idle_in_transaction_timeout = 2000 - - [pools.example_db] - server_host = "127.0.0.1" - server_port = ${PG_PORT} - - [[pools.example_db.users]] - username = "example_user_1" - password = "" - pool_size = 1 - """ - # Client opens transaction through pg_doorman - When we create session "slow_client" to pg_doorman as "example_user_1" with password "" and database "example_db" - When we send SimpleQuery "BEGIN" to session "slow_client" without waiting - Then we read SimpleQuery response from session "slow_client" within 2000ms - When we send SimpleQuery "SELECT 1" to session "slow_client" without waiting - Then we read SimpleQuery response from session "slow_client" within 2000ms - # Wait for client_idle_in_transaction_timeout to fire (2s + margin) - When we sleep for 3000 milliseconds - # Pool slot should be released — new client should succeed - When we create session "new_client" to pg_doorman as "example_user_1" with password "" and database "example_db" - When we send SimpleQuery "SELECT 2" to session "new_client" without waiting - Then we read SimpleQuery response from session "new_client" within 5000ms - Then session "new_client" should receive DataRow with "2" From 3868f19d8dc368e21d3c7a8a74df2f6a93f97fe2 Mon Sep 17 00:00:00 2001 From: dmitrivasilyev Date: Tue, 3 Mar 2026 16:18:04 +0300 Subject: [PATCH 4/6] Fix stale comment referencing removed 3-branch select --- src/client/transaction.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/transaction.rs b/src/client/transaction.rs index 9541930b..cc2666fa 100644 --- a/src/client/transaction.rs +++ b/src/client/transaction.rs @@ -679,7 +679,7 @@ where } _ = server.server_readable() => { - // Verify readiness is genuine (see 3-branch select comment above) + // Verify readiness is genuine, not spurious from BufStream buffering let mut verify_buf = [0u8; 1]; match server.stream.get_ref().try_read(&mut verify_buf) { Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { From afec3974855d6620574d2d54ddce413b1bf3ae28 Mon Sep 17 00:00:00 2001 From: dmitrivasilyev Date: Tue, 3 Mar 2026 16:21:21 +0300 Subject: [PATCH 5/6] Remove idle_in_transaction_session_timeout BDD scenario The pg_terminate_backend scenario covers the same pg_doorman behavior more directly and without the 3s wait. --- .../features/stale-server-detection.feature | 50 ------------------- 1 file changed, 50 deletions(-) diff --git a/tests/bdd/features/stale-server-detection.feature b/tests/bdd/features/stale-server-detection.feature index ef8d49e8..2bc24133 100644 --- a/tests/bdd/features/stale-server-detection.feature +++ b/tests/bdd/features/stale-server-detection.feature @@ -12,56 +12,6 @@ Feature: Detect stale server connections during client idle in transaction """ And fixtures from "tests/fixture.sql" applied - @stale-server-idle-in-transaction-timeout - Scenario: Detect server killed by idle_in_transaction_session_timeout - # PostgreSQL idle_in_transaction_session_timeout=2s kills backends - # pg_doorman should detect the dead server via server_readable() and release the slot - Given pg_doorman started with config: - """ - [general] - host = "127.0.0.1" - port = ${DOORMAN_PORT} - admin_username = "admin" - admin_password = "admin" - pg_hba.content = "host all all 127.0.0.1/32 trust" - server_lifetime = 60000 - server_idle_check_timeout = 0 - - [pools.example_db] - server_host = "127.0.0.1" - server_port = ${PG_PORT} - - [[pools.example_db.users]] - username = "example_user_1" - password = "" - pool_size = 1 - - [[pools.example_db.users]] - username = "postgres" - password = "" - pool_size = 2 - """ - # Set idle_in_transaction_session_timeout on the database - When we create session "setup" to postgres as "postgres" with password "" and database "example_db" - And we send SimpleQuery "ALTER DATABASE example_db SET idle_in_transaction_session_timeout = '2s'" to session "setup" without waiting - And we sleep for 200 milliseconds - # Client opens transaction through pg_doorman - When we create session "client1" to pg_doorman as "example_user_1" with password "" and database "example_db" - When we send SimpleQuery "BEGIN" to session "client1" without waiting - Then we read SimpleQuery response from session "client1" within 2000ms - When we send SimpleQuery "SELECT 1" to session "client1" without waiting - Then we read SimpleQuery response from session "client1" within 2000ms - # Wait for PostgreSQL to kill the backend (idle_in_transaction_session_timeout=2s) - When we sleep for 3000 milliseconds - # pg_doorman should have detected the dead server and released the slot - # New client should be able to connect successfully - When we create session "client2" to pg_doorman as "example_user_1" with password "" and database "example_db" - When we send SimpleQuery "SELECT 2" to session "client2" without waiting - Then we read SimpleQuery response from session "client2" within 5000ms - Then session "client2" should receive DataRow with "2" - # Reset the setting - When we send SimpleQuery "ALTER DATABASE example_db RESET idle_in_transaction_session_timeout" to session "setup" without waiting - @stale-server-pg-terminate-backend Scenario: Detect server killed by pg_terminate_backend # Client holds transaction, then backend is killed via pg_terminate_backend From 5b3d884e606e258c2d105886fb132c2b73cc7e7d Mon Sep 17 00:00:00 2001 From: dmitrivasilyev Date: Thu, 12 Mar 2026 15:56:05 +0300 Subject: [PATCH 6/6] Refactor: RAII guard for transaction counter, extract wait_for_next_message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace manual CLIENTS_IN_TRANSACTIONS fetch_add/fetch_sub (separated by ~200 lines with multiple early return paths) with TransactionGuard that increments on creation and decrements on drop. Fixes counter leaks on early returns from sync_parameters, deferred BEGIN, and write_all_flush. Extract inlined select! into wait_for_next_message method returning NextClientMessage enum. Encapsulate server liveness verification in Server::check_server_alive() — no more server.stream.get_ref().try_read() from transaction.rs. Co-Authored-By: Claude Opus 4.6 --- src/client/transaction.rs | 123 +++++++++++++++++++++-------------- src/server/server_backend.rs | 15 ++++- 2 files changed, 86 insertions(+), 52 deletions(-) diff --git a/src/client/transaction.rs b/src/client/transaction.rs index cc2666fa..6f391ff2 100644 --- a/src/client/transaction.rs +++ b/src/client/transaction.rs @@ -128,6 +128,29 @@ use crate::utils::debug_messages::{log_client_to_server, log_server_to_client}; /// When the buffer reaches this size, it will be flushed to avoid excessive memory usage. const BUFFER_FLUSH_THRESHOLD: usize = 8192; +/// RAII guard for CLIENTS_IN_TRANSACTIONS counter. +/// Increments on creation, decrements on drop. +struct TransactionGuard; + +impl TransactionGuard { + fn new() -> Self { + CLIENTS_IN_TRANSACTIONS.fetch_add(1, Ordering::Relaxed); + Self + } +} + +impl Drop for TransactionGuard { + fn drop(&mut self) { + CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); + } +} + +/// Result of waiting for the next client message while monitoring server liveness. +enum NextClientMessage { + Message(BytesMut), + ServerDead, +} + /// Action to take after processing a message in the transaction loop enum TransactionAction { /// Continue processing messages in the transaction loop @@ -176,6 +199,26 @@ where Ok(()) } + /// Wait for the next client message while monitoring server connection liveness. + /// Uses `select!` to race client read against server readability, detecting dead + /// server connections (e.g., `pg_terminate_backend`) while client is idle in transaction. + async fn wait_for_next_message(&mut self, server: &Server) -> Result { + loop { + tokio::select! { + biased; + result = read_message(&mut self.read, self.max_memory_usage) => { + return result.map(NextClientMessage::Message); + } + _ = server.server_readable() => { + if server.check_server_alive() { + continue; + } + return Ok(NextClientMessage::ServerDead); + } + } + } + } + /// Handle cancel mode - when client wants to cancel a previously issued query. /// Opens a new separate connection to the server, sends the backend_id /// and secret_key and then closes it for security reasons. @@ -590,8 +633,9 @@ where server.claim(self.process_id, self.secret_key); self.connected_to_server = true; - // Signal that client is now in transaction (has server connection) - CLIENTS_IN_TRANSACTIONS.fetch_add(1, Ordering::Relaxed); + // RAII guard: increments CLIENTS_IN_TRANSACTIONS now, + // decrements automatically when this block exits (normal or early return). + let _tx_guard = TransactionGuard::new(); // Update statistics self.stats.active_idle(); @@ -660,51 +704,33 @@ where let message = match initial_message { None => { self.stats.active_read(); - - tokio::select! { - biased; - - result = read_message(&mut self.read, self.max_memory_usage) => { - match result { - Ok(message) => message, - Err(err) => { - self.stats.disconnect(); - CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); - self.connected_to_server = false; - server.checkin_cleanup().await?; - self.release(); - return self.process_error(err).await; - } - } + match self.wait_for_next_message(server).await { + Ok(NextClientMessage::Message(msg)) => msg, + Ok(NextClientMessage::ServerDead) => { + warn!( + "Server {} connection died while client {} idle in transaction", + server, self.addr + ); + server.mark_bad( + "server closed while client idle in transaction", + ); + let _ = error_response( + &mut self.write, + "server closed the connection unexpectedly while client was idle in transaction", + "08006", + ) + .await; + self.stats.disconnect(); + self.connected_to_server = false; + self.release(); + return Ok(()); } - - _ = server.server_readable() => { - // Verify readiness is genuine, not spurious from BufStream buffering - let mut verify_buf = [0u8; 1]; - match server.stream.get_ref().try_read(&mut verify_buf) { - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - continue; - } - _ => { - warn!( - "Server {} connection event while waiting for client {} — \ - server likely terminated (idle_in_transaction_session_timeout, \ - pg_terminate_backend)", - server, self.addr - ); - server.mark_bad("server closed while client idle in transaction"); - let _ = error_response( - &mut self.write, - "server closed the connection, possibly due to idle_in_transaction_session_timeout", - "08006", - ).await; - self.stats.disconnect(); - CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); - self.connected_to_server = false; - self.release(); - return Ok(()); - } - } + Err(err) => { + self.stats.disconnect(); + self.connected_to_server = false; + server.checkin_cleanup().await?; + self.release(); + return self.process_error(err).await; } } } @@ -733,10 +759,8 @@ where // Terminate 'X' => { - // принудительно закрываем чтобы не допустить длинную транзакцию server.checkin_cleanup().await?; self.stats.disconnect(); - CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); self.connected_to_server = false; self.release(); return Ok(()); @@ -831,8 +855,7 @@ where self.client_last_messages_in_tx.clear(); } - // Signal that client finished transaction (released server connection) - CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed); + // TransactionGuard dropped at end of block above, counter already decremented. self.connected_to_server = false; // If shutdown is in progress, send error to client and exit diff --git a/src/server/server_backend.rs b/src/server/server_backend.rs index e0b4f746..ac763398 100644 --- a/src/server/server_backend.rs +++ b/src/server/server_backend.rs @@ -230,8 +230,19 @@ impl Server { /// Between queries in a transaction, BufStream is empty (everything was read /// up to ReadyForQuery), so readable on the underlying socket correctly /// reflects new data from the server (e.g., FATAL after idle_in_transaction_session_timeout). - pub async fn server_readable(&self) -> std::io::Result<()> { - self.stream.get_ref().readable().await + pub async fn server_readable(&self) { + let _ = self.stream.get_ref().readable().await; + } + + /// Verify that server_readable() readiness is genuine, not spurious. + /// Returns true if the connection is alive (WouldBlock = no real data). + /// Returns false if the server sent data or closed the connection (dead). + pub fn check_server_alive(&self) -> bool { + let mut buf = [0u8; 1]; + matches!( + self.stream.get_ref().try_read(&mut buf), + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock + ) } /// Server & client are out of sync, we must discard this connection.