Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/builder/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ PyMySQL==1.0.2
pytest==7.2.1
pytest-split==0.8.0
pyyaml==6.0
requests==2.28.1
requests==2.31.0
scipy==1.10.0
semver==3.0.0
shtab==1.5.8
Expand Down
2 changes: 1 addition & 1 deletion doc/user/content/sql/system-catalog/mz_catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ Field | Type | Meaning
`oid` | [`oid`] | A [PostgreSQL-compatible OID][oid] for the source.
`schema_id` | [`uint8`] | The ID of the schema to which the source belongs. Corresponds to [`mz_schemas.id`](/sql/system-catalog/mz_catalog/#mz_schemas).
`name` | [`text`] | The name of the source.
`type` | [`text`] | The type of the source: `kafka`, `postgres`, `load-generator`, or `subsource`.
`type` | [`text`] | The type of the source: `kafka`, `postgres`, `load-generator`, `progress`, or `subsource`.
`connection_id` | [`text`] | The ID of the connection associated with the source, if any. Corresponds to [`mz_connections.id`](/sql/system-catalog/mz_catalog/#mz_connections).
`size` | [`text`] | The [size](/sql/create-source/#sizing-a-source) of the source.
`envelope_type` | [`text`] | The [envelope](/sql/create-source/#envelopes) of the source: `none`, `upsert`, or `debezium`.
Expand Down
3 changes: 2 additions & 1 deletion src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1930,7 +1930,8 @@ impl Source {
pub fn source_type(&self) -> &str {
match &self.data_source {
DataSourceDesc::Ingestion(ingestion) => ingestion.desc.connection.name(),
DataSourceDesc::Progress | DataSourceDesc::Source => "subsource",
DataSourceDesc::Progress => "progress",
DataSourceDesc::Source => "subsource",
DataSourceDesc::Introspection(_) => "source",
}
}
Expand Down
33 changes: 33 additions & 0 deletions src/storage-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,39 @@ pub enum StorageCommand<T = mz_repr::Timestamp> {
CreateSinks(Vec<CreateSinkCommand<T>>),
}

// /// A list of identifiers of traces, with new upper frontiers.
// ///
// /// TODO(teskje): Consider also reporting the previous upper frontier and using that
// /// information to assert the correct implementation of our protocols at various places.
// FrontierUppers(Vec<(GlobalId, Antichain<T>)>),
// /// Punctuation indicates that no more responses will be transmitted for the specified ids
// DroppedIds(BTreeSet<GlobalId>),

// /// A list of statistics updates, currently only for sources.
// StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),

impl<T> StorageCommand<T> {
fn expected_response(&self) -> Option<StorageResponse<T>> {
match self {
StorageCommand::AllowCompaction(compactions) => {
if compactions.iter().any(|(_, frontier)| frontier.is_empty()) {
Some(StorageResponse::DroppedIds(
compactions
.iter()
.filter_map(
|(id, frontier)| if frontier.is_empty() { Some(*id) } else { None },
)
.collect(),
))
} else {
None
}
}
_ => None,
}
}
}

/// A command that starts ingesting the given ingestion description
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct CreateSourceCommand<T> {
Expand Down
6 changes: 3 additions & 3 deletions test/pg-cdc/pg-cdc.td
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ escaped_text_table subsource <null>
large_text subsource <null>
multipart_pk subsource <null>
mz_source postgres ${arg.default-storage-size}
mz_source_progress subsource <null>
mz_source_progress progress <null>
no_replica_identity subsource <null>
nonpk_table subsource <null>
nulls_table subsource <null>
Expand Down Expand Up @@ -643,7 +643,7 @@ contains: invalid TEXT COLUMNS option value: table utf8_table not found in sourc
> SELECT * FROM (SHOW SOURCES) WHERE name LIKE '%enum%';
another_enum_table subsource <null>
enum_source postgres ${arg.default-storage-size}
enum_source_progress subsource <null>
enum_source_progress progress <null>
enum_table subsource <null>

> SELECT * FROM enum_table
Expand Down Expand Up @@ -685,7 +685,7 @@ regex:Source error: .*: db error: ERROR: publication "mz_source" does not exist

> SHOW SOURCES
another_source postgres ${arg.default-storage-size}
another_source_progress subsource <null>
another_source_progress progress <null>
another_table subsource <null>

> DROP SOURCE another_source
Expand Down
2 changes: 1 addition & 1 deletion test/testdrive/get-started.td
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
name type size
----------------------------------
demo load-generator ${arg.default-storage-size}
demo_progress subsource <null>
demo_progress progress <null>
accounts subsource <null>
auctions subsource <null>
bids subsource <null>
Expand Down
8 changes: 4 additions & 4 deletions test/testdrive/load-generator.td
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ ALTER SYSTEM SET enable_format_json = true
> SHOW SOURCES
accounts subsource <null>
auction_house load-generator ${arg.default-storage-size}
auction_house_progress subsource <null>
auction_house_progress progress <null>
auctions subsource <null>
bids subsource <null>
organizations subsource <null>
Expand All @@ -27,7 +27,7 @@ users subsource <null>

> SHOW SOURCES FROM a;
auction_bids load-generator ${arg.default-storage-size}
auction_bids_progress subsource <null>
auction_bids_progress progress <null>
bids subsource <null>

# For Tables with mentioned schema should work
Expand All @@ -37,7 +37,7 @@ bids subsource <null>
> SHOW SOURCES FROM another;
accounts subsource <null>
auction_house load-generator ${arg.default-storage-size}
auction_house_progress subsource <null>
auction_house_progress progress <null>
auctions subsource <null>
bids subsource <null>
organizations subsource <null>
Expand All @@ -52,7 +52,7 @@ users subsource <null>

> SHOW SOURCES FROM foo;
auction_subset load-generator ${arg.default-storage-size}
auction_subset_progress subsource <null>
auction_subset_progress progress <null>
foo_bids subsource <null>

> SHOW SOURCES FROM bar;
Expand Down
2 changes: 1 addition & 1 deletion test/testdrive/materializations.td
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ a b
name type size
---------------------
data kafka ${arg.default-storage-size}
data_progress subsource <null>
data_progress progress <null>
mat_data kafka ${arg.default-storage-size}
mat_data_progress subsource <null>

Expand Down
4 changes: 2 additions & 2 deletions test/testdrive/rename.td
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ renamed_sink
> SHOW SOURCES;
name type size
------------------------------
renamed_mz_data kafka ${arg.default-storage-size}
mz_data_progress subsource <null>
renamed_mz_data kafka ${arg.default-storage-size}
mz_data_progress progress <null>

# Sink was successfully renamed
> SHOW SINKS
Expand Down
2 changes: 1 addition & 1 deletion test/testdrive/tpch.td
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ name type size
--------------------------------
customer subsource <null>
gen load-generator ${source-size}
gen_progress subsource <null>
gen_progress progress <null>
lineitem subsource <null>
nation subsource <null>
orders subsource <null>
Expand Down