diff --git a/ci/builder/Dockerfile b/ci/builder/Dockerfile index 123c2411dfea4..52ca961d07941 100644 --- a/ci/builder/Dockerfile +++ b/ci/builder/Dockerfile @@ -10,7 +10,7 @@ # Build a cross-compiling toolchain that targets the oldest version of Linux # that we support. -FROM ubuntu:jammy-20221130 AS crosstool +FROM ubuntu:jammy-20230605 AS crosstool ARG ARCH_GCC ARG ARCH_GO @@ -68,7 +68,7 @@ RUN DEFCONFIG=crosstool-$ARCH_GCC.defconfig ct-ng defconfig \ # Import the cross-compiling toolchain into a fresh image, omitting the # dependencies that we needed to actually build the toolchain. -FROM ubuntu:jammy-20221130 +FROM ubuntu:jammy-20230605 ARG ARCH_GCC ARG ARCH_GO diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 5de5c3c83f2c1..eaa48f3af6d4e 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -7343,6 +7343,7 @@ impl Catalog { } } }, + finalize_shards: self.system_config().storage_client_finalize_shards(), } } diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index da662d7c8c210..0090d744c5c4f 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -959,6 +959,15 @@ const KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES: ServerVar = ServerVar { internal: true }; +/// Whether compute rendering should use Materialize's custom linear join implementation rather +/// than the one from Differential Dataflow. +const STORAGE_CLIENT_FINALIZE_SHARDS: ServerVar = ServerVar { + name: UncasedStr::new("storage_client_finalize_shards"), + value: &true, + description: "Whether to all the storage client to finalize shards (Materialize).", + internal: true, +}; + // Macro to simplify creating feature flags, i.e. boolean flags that we use to toggle the // availability of features. // @@ -2203,6 +2212,11 @@ impl SystemVars { pub fn enable_mz_join_core(&self) -> bool { *self.expect_value(&ENABLE_MZ_JOIN_CORE) } + + /// Returns the `pg_replication_tcp_user_timeout` configuration parameter. + pub fn storage_client_finalize_shards(&self) -> bool { + *self.expect_value(&STORAGE_CLIENT_FINALIZE_SHARDS) + } } /// A `Var` represents a configuration parameter of an arbitrary type. diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index 5e541410e4de5..8473fe323c7bd 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -2295,7 +2295,9 @@ where .await .expect("stash operation must succeed"); - self.finalize_shards().await; + if self.state.config.finalize_shards { + self.finalize_shards().await; + } } Some(StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => { // Note we only hold the locks while moving some plain-old-data around here. diff --git a/src/storage-client/src/types/parameters.proto b/src/storage-client/src/types/parameters.proto index 1aa52c9202213..72fa4fcb3d47d 100644 --- a/src/storage-client/src/types/parameters.proto +++ b/src/storage-client/src/types/parameters.proto @@ -21,6 +21,7 @@ message ProtoStorageParameters { ProtoPgReplicationTimeouts pg_replication_timeouts = 3; uint64 keep_n_source_status_history_entries = 4; mz_rocksdb.config.ProtoRocksDbTuningParameters upsert_rocksdb_tuning_config = 5; + bool finalize_shards = 6; } diff --git a/src/storage-client/src/types/parameters.rs b/src/storage-client/src/types/parameters.rs index 52db6ef50b72f..c2d5244994f52 100644 --- a/src/storage-client/src/types/parameters.rs +++ b/src/storage-client/src/types/parameters.rs @@ -31,6 +31,10 @@ pub struct StorageParameters { pub keep_n_source_status_history_entries: usize, /// A set of parameters used to tune RocksDB when used with `UPSERT` sources. pub upsert_rocksdb_tuning_config: mz_rocksdb::RocksDBTuningParameters, + /// Whether or not to allow shard finalization to occur. Note that this will + /// only disable the actual finalization of shards, not registering them for + /// finalization. + pub finalize_shards: bool, } impl StorageParameters { @@ -42,12 +46,14 @@ impl StorageParameters { pg_replication_timeouts, keep_n_source_status_history_entries, upsert_rocksdb_tuning_config, + finalize_shards, }: StorageParameters, ) { self.persist.update(persist); self.pg_replication_timeouts = pg_replication_timeouts; self.keep_n_source_status_history_entries = keep_n_source_status_history_entries; self.upsert_rocksdb_tuning_config = upsert_rocksdb_tuning_config; + self.finalize_shards = finalize_shards } } @@ -60,6 +66,7 @@ impl RustType for StorageParameters { self.keep_n_source_status_history_entries, ), upsert_rocksdb_tuning_config: Some(self.upsert_rocksdb_tuning_config.into_proto()), + finalize_shards: self.finalize_shards, } } @@ -77,6 +84,7 @@ impl RustType for StorageParameters { upsert_rocksdb_tuning_config: proto .upsert_rocksdb_tuning_config .into_rust_if_some("ProtoStorageParameters::upsert_rocksdb_tuning_config")?, + finalize_shards: proto.finalize_shards, }) } }