diff --git a/ci/builder/requirements.txt b/ci/builder/requirements.txt index 89d92bed1b787..cc38017cfee61 100644 --- a/ci/builder/requirements.txt +++ b/ci/builder/requirements.txt @@ -37,7 +37,7 @@ PyMySQL==1.0.2 pytest==7.2.1 pytest-split==0.8.0 pyyaml==6.0 -requests==2.28.1 +requests==2.31.0 scipy==1.10.0 semver==3.0.0 shtab==1.5.8 diff --git a/doc/user/content/sql/system-catalog/mz_catalog.md b/doc/user/content/sql/system-catalog/mz_catalog.md index 3935e63d19359..6061cce7247a7 100644 --- a/doc/user/content/sql/system-catalog/mz_catalog.md +++ b/doc/user/content/sql/system-catalog/mz_catalog.md @@ -360,7 +360,7 @@ Field | Type | Meaning `oid` | [`oid`] | A [PostgreSQL-compatible OID][oid] for the source. `schema_id` | [`uint8`] | The ID of the schema to which the source belongs. Corresponds to [`mz_schemas.id`](/sql/system-catalog/mz_catalog/#mz_schemas). `name` | [`text`] | The name of the source. -`type` | [`text`] | The type of the source: `kafka`, `postgres`, `load-generator`, or `subsource`. +`type` | [`text`] | The type of the source: `kafka`, `postgres`, `load-generator`, `progress`, or `subsource`. `connection_id` | [`text`] | The ID of the connection associated with the source, if any. Corresponds to [`mz_connections.id`](/sql/system-catalog/mz_catalog/#mz_connections). `size` | [`text`] | The [size](/sql/create-source/#sizing-a-source) of the source. `envelope_type` | [`text`] | The [envelope](/sql/create-source/#envelopes) of the source: `none`, `upsert`, or `debezium`. diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 7dc5d83cf7ff1..a10936db643a4 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -1930,7 +1930,8 @@ impl Source { pub fn source_type(&self) -> &str { match &self.data_source { DataSourceDesc::Ingestion(ingestion) => ingestion.desc.connection.name(), - DataSourceDesc::Progress | DataSourceDesc::Source => "subsource", + DataSourceDesc::Progress => "progress", + DataSourceDesc::Source => "subsource", DataSourceDesc::Introspection(_) => "source", } } diff --git a/src/storage-client/src/client.rs b/src/storage-client/src/client.rs index 6f7cb2b3c5873..3c55d007294f6 100644 --- a/src/storage-client/src/client.rs +++ b/src/storage-client/src/client.rs @@ -114,6 +114,39 @@ pub enum StorageCommand { CreateSinks(Vec>), } +// /// A list of identifiers of traces, with new upper frontiers. +// /// +// /// TODO(teskje): Consider also reporting the previous upper frontier and using that +// /// information to assert the correct implementation of our protocols at various places. +// FrontierUppers(Vec<(GlobalId, Antichain)>), +// /// Punctuation indicates that no more responses will be transmitted for the specified ids +// DroppedIds(BTreeSet), + +// /// A list of statistics updates, currently only for sources. +// StatisticsUpdates(Vec, Vec), + +impl StorageCommand { + fn expected_response(&self) -> Option> { + match self { + StorageCommand::AllowCompaction(compactions) => { + if compactions.iter().any(|(_, frontier)| frontier.is_empty()) { + Some(StorageResponse::DroppedIds( + compactions + .iter() + .filter_map( + |(id, frontier)| if frontier.is_empty() { Some(*id) } else { None }, + ) + .collect(), + )) + } else { + None + } + } + _ => None, + } + } +} + /// A command that starts ingesting the given ingestion description #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct CreateSourceCommand { diff --git a/test/pg-cdc/pg-cdc.td b/test/pg-cdc/pg-cdc.td index 0d49e71612c96..a722fd2f8e1b6 100644 --- a/test/pg-cdc/pg-cdc.td +++ b/test/pg-cdc/pg-cdc.td @@ -267,7 +267,7 @@ escaped_text_table subsource large_text subsource multipart_pk subsource mz_source postgres ${arg.default-storage-size} -mz_source_progress subsource +mz_source_progress progress no_replica_identity subsource nonpk_table subsource nulls_table subsource @@ -643,7 +643,7 @@ contains: invalid TEXT COLUMNS option value: table utf8_table not found in sourc > SELECT * FROM (SHOW SOURCES) WHERE name LIKE '%enum%'; another_enum_table subsource enum_source postgres ${arg.default-storage-size} -enum_source_progress subsource +enum_source_progress progress enum_table subsource > SELECT * FROM enum_table @@ -685,7 +685,7 @@ regex:Source error: .*: db error: ERROR: publication "mz_source" does not exist > SHOW SOURCES another_source postgres ${arg.default-storage-size} -another_source_progress subsource +another_source_progress progress another_table subsource > DROP SOURCE another_source diff --git a/test/testdrive/get-started.td b/test/testdrive/get-started.td index aa4ec9186a4a9..b68f2f15a8213 100644 --- a/test/testdrive/get-started.td +++ b/test/testdrive/get-started.td @@ -15,7 +15,7 @@ name type size ---------------------------------- demo load-generator ${arg.default-storage-size} -demo_progress subsource +demo_progress progress accounts subsource auctions subsource bids subsource diff --git a/test/testdrive/load-generator.td b/test/testdrive/load-generator.td index c9b1d972b60cf..a686c56743883 100644 --- a/test/testdrive/load-generator.td +++ b/test/testdrive/load-generator.td @@ -15,7 +15,7 @@ ALTER SYSTEM SET enable_format_json = true > SHOW SOURCES accounts subsource auction_house load-generator ${arg.default-storage-size} -auction_house_progress subsource +auction_house_progress progress auctions subsource bids subsource organizations subsource @@ -27,7 +27,7 @@ users subsource > SHOW SOURCES FROM a; auction_bids load-generator ${arg.default-storage-size} -auction_bids_progress subsource +auction_bids_progress progress bids subsource # For Tables with mentioned schema should work @@ -37,7 +37,7 @@ bids subsource > SHOW SOURCES FROM another; accounts subsource auction_house load-generator ${arg.default-storage-size} -auction_house_progress subsource +auction_house_progress progress auctions subsource bids subsource organizations subsource @@ -52,7 +52,7 @@ users subsource > SHOW SOURCES FROM foo; auction_subset load-generator ${arg.default-storage-size} -auction_subset_progress subsource +auction_subset_progress progress foo_bids subsource > SHOW SOURCES FROM bar; diff --git a/test/testdrive/materializations.td b/test/testdrive/materializations.td index 509c9b3014b63..c721228d92f62 100644 --- a/test/testdrive/materializations.td +++ b/test/testdrive/materializations.td @@ -338,7 +338,7 @@ a b name type size --------------------- data kafka ${arg.default-storage-size} -data_progress subsource +data_progress progress mat_data kafka ${arg.default-storage-size} mat_data_progress subsource diff --git a/test/testdrive/rename.td b/test/testdrive/rename.td index f117e3c08f935..3e443f81d1187 100644 --- a/test/testdrive/rename.td +++ b/test/testdrive/rename.td @@ -190,8 +190,8 @@ renamed_sink > SHOW SOURCES; name type size ------------------------------ -renamed_mz_data kafka ${arg.default-storage-size} -mz_data_progress subsource +renamed_mz_data kafka ${arg.default-storage-size} +mz_data_progress progress # Sink was successfully renamed > SHOW SINKS diff --git a/test/testdrive/tpch.td b/test/testdrive/tpch.td index 374aa08e1a619..2500d475daf93 100644 --- a/test/testdrive/tpch.td +++ b/test/testdrive/tpch.td @@ -30,7 +30,7 @@ name type size -------------------------------- customer subsource gen load-generator ${source-size} - gen_progress subsource + gen_progress progress lineitem subsource nation subsource orders subsource