diff --git a/Cargo.lock b/Cargo.lock index 1b6281bae44b8..be83f2ccebcd7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -21,9 +21,9 @@ dependencies = [ [[package]] name = "addr2line" -version = "0.17.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9ecd88a8c8378ca913a680cd98f0f13ac67383d35993f86c90a70e3f137816b" +checksum = "f4fa78e18c64fce05e902adecd7a5eed15a5e0a3439f7b0e169f0252214865e3" dependencies = [ "gimli", ] @@ -705,15 +705,15 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.66" +version = "0.3.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cab84319d616cfb654d03394f38ab7e6f0919e181b1b57e1fd15e7fb4077d9a7" +checksum = "4319208da049c43661739c5fade2ba182f09d1dc2299b32298d3a31692b17e12" dependencies = [ "addr2line", "cc", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.7.1", "object", "rustc-demangle", ] @@ -2057,7 +2057,7 @@ checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" dependencies = [ "crc32fast", "libz-sys", - "miniz_oxide", + "miniz_oxide 0.5.1", ] [[package]] @@ -2304,9 +2304,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.26.1" +version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" +checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" [[package]] name = "glob" @@ -3019,9 +3019,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.142" +version = "0.2.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a987beff54b60ffa6d51982e1aa1146bc42f19bd26be28b0586f252fccf5317" +checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" [[package]] name = "libloading" @@ -3225,6 +3225,15 @@ dependencies = [ "adler", ] +[[package]] +name = "miniz_oxide" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.8.5" @@ -5454,9 +5463,9 @@ checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" [[package]] name = "object" -version = "0.29.0" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21158b2c33aa6d4561f1c0a6ea283ca92bc54802a93b263e910746d679a7eb53" +checksum = "8bda667d9f2b5051b8833f59f3bf748b28ef54f850f4fcb389a252aa383866d1" dependencies = [ "memchr", ] diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 5de5c3c83f2c1..eaa48f3af6d4e 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -7343,6 +7343,7 @@ impl Catalog { } } }, + finalize_shards: self.system_config().storage_client_finalize_shards(), } } diff --git a/src/prof/Cargo.toml b/src/prof/Cargo.toml index 953b3d498ef83..ae0d6ec74b1b2 100644 --- a/src/prof/Cargo.toml +++ b/src/prof/Cargo.toml @@ -10,7 +10,7 @@ publish = false askama = { version = "0.11.1", default-features = false, features = ["config", "serde-json"] } anyhow = "1.0.66" axum = { version = "0.6.7", features = ["headers"] } -backtrace = "0.3.66" +backtrace = "0.3.68" bytesize = "1.1.0" cfg-if = "1.0.0" headers = "0.3.8" diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index da662d7c8c210..0090d744c5c4f 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -959,6 +959,15 @@ const KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES: ServerVar = ServerVar { internal: true }; +/// Whether compute rendering should use Materialize's custom linear join implementation rather +/// than the one from Differential Dataflow. +const STORAGE_CLIENT_FINALIZE_SHARDS: ServerVar = ServerVar { + name: UncasedStr::new("storage_client_finalize_shards"), + value: &true, + description: "Whether to all the storage client to finalize shards (Materialize).", + internal: true, +}; + // Macro to simplify creating feature flags, i.e. boolean flags that we use to toggle the // availability of features. // @@ -2203,6 +2212,11 @@ impl SystemVars { pub fn enable_mz_join_core(&self) -> bool { *self.expect_value(&ENABLE_MZ_JOIN_CORE) } + + /// Returns the `pg_replication_tcp_user_timeout` configuration parameter. + pub fn storage_client_finalize_shards(&self) -> bool { + *self.expect_value(&STORAGE_CLIENT_FINALIZE_SHARDS) + } } /// A `Var` represents a configuration parameter of an arbitrary type. diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index 5e541410e4de5..8473fe323c7bd 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -2295,7 +2295,9 @@ where .await .expect("stash operation must succeed"); - self.finalize_shards().await; + if self.state.config.finalize_shards { + self.finalize_shards().await; + } } Some(StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => { // Note we only hold the locks while moving some plain-old-data around here. diff --git a/src/storage-client/src/types/parameters.proto b/src/storage-client/src/types/parameters.proto index 1aa52c9202213..72fa4fcb3d47d 100644 --- a/src/storage-client/src/types/parameters.proto +++ b/src/storage-client/src/types/parameters.proto @@ -21,6 +21,7 @@ message ProtoStorageParameters { ProtoPgReplicationTimeouts pg_replication_timeouts = 3; uint64 keep_n_source_status_history_entries = 4; mz_rocksdb.config.ProtoRocksDbTuningParameters upsert_rocksdb_tuning_config = 5; + bool finalize_shards = 6; } diff --git a/src/storage-client/src/types/parameters.rs b/src/storage-client/src/types/parameters.rs index 52db6ef50b72f..c2d5244994f52 100644 --- a/src/storage-client/src/types/parameters.rs +++ b/src/storage-client/src/types/parameters.rs @@ -31,6 +31,10 @@ pub struct StorageParameters { pub keep_n_source_status_history_entries: usize, /// A set of parameters used to tune RocksDB when used with `UPSERT` sources. pub upsert_rocksdb_tuning_config: mz_rocksdb::RocksDBTuningParameters, + /// Whether or not to allow shard finalization to occur. Note that this will + /// only disable the actual finalization of shards, not registering them for + /// finalization. + pub finalize_shards: bool, } impl StorageParameters { @@ -42,12 +46,14 @@ impl StorageParameters { pg_replication_timeouts, keep_n_source_status_history_entries, upsert_rocksdb_tuning_config, + finalize_shards, }: StorageParameters, ) { self.persist.update(persist); self.pg_replication_timeouts = pg_replication_timeouts; self.keep_n_source_status_history_entries = keep_n_source_status_history_entries; self.upsert_rocksdb_tuning_config = upsert_rocksdb_tuning_config; + self.finalize_shards = finalize_shards } } @@ -60,6 +66,7 @@ impl RustType for StorageParameters { self.keep_n_source_status_history_entries, ), upsert_rocksdb_tuning_config: Some(self.upsert_rocksdb_tuning_config.into_proto()), + finalize_shards: self.finalize_shards, } } @@ -77,6 +84,7 @@ impl RustType for StorageParameters { upsert_rocksdb_tuning_config: proto .upsert_rocksdb_tuning_config .into_rust_if_some("ProtoStorageParameters::upsert_rocksdb_tuning_config")?, + finalize_shards: proto.finalize_shards, }) } }