Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7343,6 +7343,7 @@ impl Catalog {
}
}
},
finalize_shards: self.system_config().storage_client_finalize_shards(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/prof/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ publish = false
askama = { version = "0.11.1", default-features = false, features = ["config", "serde-json"] }
anyhow = "1.0.66"
axum = { version = "0.6.7", features = ["headers"] }
backtrace = "0.3.66"
backtrace = "0.3.68"
bytesize = "1.1.0"
cfg-if = "1.0.0"
headers = "0.3.8"
Expand Down
14 changes: 14 additions & 0 deletions src/sql/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,15 @@ const KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES: ServerVar<usize> = ServerVar {
internal: true
};

/// Whether compute rendering should use Materialize's custom linear join implementation rather
/// than the one from Differential Dataflow.
const STORAGE_CLIENT_FINALIZE_SHARDS: ServerVar<bool> = ServerVar {
name: UncasedStr::new("storage_client_finalize_shards"),
value: &true,
description: "Whether to all the storage client to finalize shards (Materialize).",
internal: true,
};

// Macro to simplify creating feature flags, i.e. boolean flags that we use to toggle the
// availability of features.
//
Expand Down Expand Up @@ -2203,6 +2212,11 @@ impl SystemVars {
pub fn enable_mz_join_core(&self) -> bool {
*self.expect_value(&ENABLE_MZ_JOIN_CORE)
}

/// Returns the `pg_replication_tcp_user_timeout` configuration parameter.
pub fn storage_client_finalize_shards(&self) -> bool {
*self.expect_value(&STORAGE_CLIENT_FINALIZE_SHARDS)
}
}

/// A `Var` represents a configuration parameter of an arbitrary type.
Expand Down
4 changes: 3 additions & 1 deletion src/storage-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2295,7 +2295,9 @@ where
.await
.expect("stash operation must succeed");

self.finalize_shards().await;
if self.state.config.finalize_shards {
self.finalize_shards().await;
}
}
Some(StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
// Note we only hold the locks while moving some plain-old-data around here.
Expand Down
1 change: 1 addition & 0 deletions src/storage-client/src/types/parameters.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ message ProtoStorageParameters {
ProtoPgReplicationTimeouts pg_replication_timeouts = 3;
uint64 keep_n_source_status_history_entries = 4;
mz_rocksdb.config.ProtoRocksDbTuningParameters upsert_rocksdb_tuning_config = 5;
bool finalize_shards = 6;
}


Expand Down
8 changes: 8 additions & 0 deletions src/storage-client/src/types/parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ pub struct StorageParameters {
pub keep_n_source_status_history_entries: usize,
/// A set of parameters used to tune RocksDB when used with `UPSERT` sources.
pub upsert_rocksdb_tuning_config: mz_rocksdb::RocksDBTuningParameters,
/// Whether or not to allow shard finalization to occur. Note that this will
/// only disable the actual finalization of shards, not registering them for
/// finalization.
pub finalize_shards: bool,
}

impl StorageParameters {
Expand All @@ -42,12 +46,14 @@ impl StorageParameters {
pg_replication_timeouts,
keep_n_source_status_history_entries,
upsert_rocksdb_tuning_config,
finalize_shards,
}: StorageParameters,
) {
self.persist.update(persist);
self.pg_replication_timeouts = pg_replication_timeouts;
self.keep_n_source_status_history_entries = keep_n_source_status_history_entries;
self.upsert_rocksdb_tuning_config = upsert_rocksdb_tuning_config;
self.finalize_shards = finalize_shards
}
}

Expand All @@ -60,6 +66,7 @@ impl RustType<ProtoStorageParameters> for StorageParameters {
self.keep_n_source_status_history_entries,
),
upsert_rocksdb_tuning_config: Some(self.upsert_rocksdb_tuning_config.into_proto()),
finalize_shards: self.finalize_shards,
}
}

Expand All @@ -77,6 +84,7 @@ impl RustType<ProtoStorageParameters> for StorageParameters {
upsert_rocksdb_tuning_config: proto
.upsert_rocksdb_tuning_config
.into_rust_if_some("ProtoStorageParameters::upsert_rocksdb_tuning_config")?,
finalize_shards: proto.finalize_shards,
})
}
}
Expand Down