From a90ba2e47a73c4945a24050947fc03a016e873c4 Mon Sep 17 00:00:00 2001 From: Sean Loiselle Date: Sun, 9 Apr 2023 15:53:11 -0400 Subject: [PATCH 1/2] pg sources: reconnect to PG database if encountering multiple successive timeouts --- src/storage/src/source/postgres.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/storage/src/source/postgres.rs b/src/storage/src/source/postgres.rs index 091949ec9885c..1094b231fc942 100644 --- a/src/storage/src/source/postgres.rs +++ b/src/storage/src/source/postgres.rs @@ -1036,6 +1036,7 @@ async fn produce_replication<'a>( let mut stream = Box::pin(LogicalReplicationStream::new(copy_stream)); let mut last_data_message = Instant::now(); + let mut err_counter = 0; // The inner loop loop { @@ -1051,6 +1052,10 @@ async fn produce_replication<'a>( ) .await; + if res.is_ok() { + err_counter = 0; + } + // The upstream will periodically request status updates by setting the keepalive's // reply field to 1. However, we cannot rely on these messages arriving on time. For // example, when the upstream is sending a big transaction its keepalive messages are @@ -1281,6 +1286,15 @@ async fn produce_replication<'a>( "if our request timed out, it must have been at least long enough to require \ a status update" ); + + // If we've timed out multiple times, try reconnecting. This should have + // given us plenty of time to receive a KeepAlive message from the server if + // we're going to. + if err_counter > 1 { + break; + } + + err_counter += 1; } } if needs_status_update { From 3075871a6fd363ee0ceb28cab2d825a092dd6cab Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sun, 9 Apr 2023 21:50:32 +0000 Subject: [PATCH 2/2] build(deps): bump predicates from 2.1.4 to 3.0.2 Bumps [predicates](https://github.com/assert-rs/predicates-rs) from 2.1.4 to 3.0.2. - [Release notes](https://github.com/assert-rs/predicates-rs/releases) - [Changelog](https://github.com/assert-rs/predicates-rs/blob/master/CHANGELOG.md) - [Commits](https://github.com/assert-rs/predicates-rs/compare/v2.1.4...v3.0.2) --- updated-dependencies: - dependency-name: predicates dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] --- Cargo.lock | 26 ++++++++++++++++++++++---- src/environmentd/Cargo.toml | 2 +- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dabb983409793..401903f951d30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -71,6 +71,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" +[[package]] +name = "anstyle" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23ea9e81bd02e310c216d080f6223c179012256e5151c41db88d12c88a1684d2" + [[package]] name = "anyhow" version = "1.0.66" @@ -183,7 +189,7 @@ checksum = "d5c2ca00549910ec251e3bd15f87aeeb206c9456b9a77b43ff6c97c54042a472" dependencies = [ "bstr", "doc-comment", - "predicates", + "predicates 2.1.5", "predicates-core", "predicates-tree", "wait-timeout", @@ -3839,7 +3845,7 @@ dependencies = [ "postgres-openssl", "postgres-protocol", "postgres_array", - "predicates", + "predicates 3.0.2", "prometheus", "rand", "rdkafka-sys", @@ -5852,10 +5858,22 @@ checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" [[package]] name = "predicates" -version = "2.1.4" +version = "2.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd" +dependencies = [ + "difflib", + "itertools", + "predicates-core", +] + +[[package]] +name = "predicates" +version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f54fc5dc63ed3bbf19494623db4f3af16842c0d975818e469022d09e53f0aa05" +checksum = "c575290b64d24745b6c57a12a31465f0a66f3a4799686a6921526a33b0797965" dependencies = [ + "anstyle", "difflib", "float-cmp", "itertools", diff --git a/src/environmentd/Cargo.toml b/src/environmentd/Cargo.toml index c6ba40e87534c..7238ded890830 100644 --- a/src/environmentd/Cargo.toml +++ b/src/environmentd/Cargo.toml @@ -104,7 +104,7 @@ postgres = { git = "https://github.com/MaterializeInc/rust-postgres", features = postgres-openssl = { git = "https://github.com/MaterializeInc/rust-postgres" } postgres-protocol = { git = "https://github.com/MaterializeInc/rust-postgres" } postgres_array = { git = "https://github.com/MaterializeInc/rust-postgres-array" } -predicates = "2.1.4" +predicates = "3.0.2" regex = "1.7.0" reqwest = { version = "0.11.13", features = ["blocking"] } serde_json = "1.0.89"