From bb90a7999cc3c4665bbe99b9c4cc4807349f6a85 Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Tue, 25 Feb 2025 16:49:10 -0500 Subject: [PATCH 1/2] fix(fxci): Ensure schema mismatches cause an exception The docstring to `ensure_table` states that if there is a schema mismatch an exception will be raised. This isn't actually true, I was under the impression that `create_table(..., exists_ok=True)` would raise if the schemas were different, but instead it proceeds without error. --- .../fxci_etl/loaders/bigquery.py | 25 ++++-- .../test/test_loader_bigquery.py | 77 +++++++++++++++++++ 2 files changed, 94 insertions(+), 8 deletions(-) create mode 100644 jobs/fxci-taskcluster-export/test/test_loader_bigquery.py diff --git a/jobs/fxci-taskcluster-export/fxci_etl/loaders/bigquery.py b/jobs/fxci-taskcluster-export/fxci_etl/loaders/bigquery.py index 9e04ebb6..3c9a3f87 100644 --- a/jobs/fxci-taskcluster-export/fxci_etl/loaders/bigquery.py +++ b/jobs/fxci-taskcluster-export/fxci_etl/loaders/bigquery.py @@ -63,14 +63,23 @@ def ensure_table(self, table_type: str) -> Table: schema_cls = get_record_cls(table_type) schema = generate_schema(schema_cls) - partition = TimePartitioning( - type_=TimePartitioningType.DAY, - field="submission_date", - require_partition_filter=True, - ) - table = Table(self.table_name, schema=schema) - table.time_partitioning = partition - self.client.create_table(table, exists_ok=True) + try: + table = self.client.get_table(self.table_name) + + # Table exists, validate the schema + if schema != table.schema: + raise Exception(f"Schema mismatch detected for {self.table_name}!") + + except NotFound: + # Table doesn't exist, create it + table = Table(self.table_name, schema=schema) + table.time_partitioning = TimePartitioning( + type_=TimePartitioningType.DAY, + field="submission_date", + require_partition_filter=True, + ) + self.client.create_table(table) + return table def replace(self, submission_date: str, records: list[Record] | Record): diff --git a/jobs/fxci-taskcluster-export/test/test_loader_bigquery.py b/jobs/fxci-taskcluster-export/test/test_loader_bigquery.py new file mode 100644 index 00000000..6b8eb77e --- /dev/null +++ b/jobs/fxci-taskcluster-export/test/test_loader_bigquery.py @@ -0,0 +1,77 @@ +import pytest +from google.cloud.bigquery import SchemaField +from google.cloud.exceptions import NotFound +from google.cloud import storage + +from fxci_etl.loaders import bigquery +from fxci_etl.schemas import generate_schema, get_record_cls + + +@pytest.fixture(autouse=True) +def storage_mock(mocker): + storage_mock = mocker.MagicMock() + mocker.patch.object(storage, "Client", return_value=storage_mock) + + +@pytest.fixture +def client_mock(mocker): + client_mock = mocker.MagicMock() + mocker.patch.object(bigquery, "Client", return_value=client_mock) + return client_mock + + +@pytest.fixture +def run_ensure_table(make_config, mocker, client_mock): + table = "tasks" + config = make_config( + **{ + "bigquery": { + "project": "project", + "dataset": "dataset", + "tables": {table: "table_v1"}, + } + } + ) + + def inner(schema=None, exists=True): + if not schema: + schema = generate_schema(get_record_cls(table)) + + if exists: + table_mock = mocker.MagicMock() + table_mock.schema = schema + client_mock.get_table.return_value = table_mock + else: + client_mock.get_table.side_effect = NotFound("message") + + return bigquery.BigQueryLoader(config, table) + + return inner + + +def test_ensure_table_schemas_match(run_ensure_table): + loader = run_ensure_table() + loader.client.get_table.assert_called_once_with(loader.table_name) + assert not loader.client.create_table.called + + +def test_ensure_table_schemas_differ_value(run_ensure_table): + schema = generate_schema(get_record_cls("tasks")) + schema[0]._properties["mode"] = "NULLABLE" + + with pytest.raises(Exception): + run_ensure_table(schema) + + +def test_ensure_table_schemas_differ_extra_column(run_ensure_table): + schema = generate_schema(get_record_cls("tasks")) + schema.append(SchemaField("foo", "STRING", "NULLABLE")) + + with pytest.raises(Exception): + run_ensure_table(schema) + + +def test_ensure_table_missing(run_ensure_table): + loader = run_ensure_table(exists=False) + loader.client.get_table.assert_called_once_with(loader.table_name) + loader.client.create_table.assert_called_once() From 2aa5f787b3def2334155bb3adc16aee557f667e1 Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Tue, 25 Feb 2025 16:54:27 -0500 Subject: [PATCH 2/2] fix(fxci): ensure bigquery tables before consuming pulse events By instantiating the BigQueryLoader in the on_processing_complete callback, we don't validate the tables until *after* we drain the pulse queue. This is bad because if there's an error (like a schema mismatch), we lose all those events forever. This way the pulse messages don't get consumed, so the data will still be there for consumption once the issue is fixed. --- jobs/fxci-taskcluster-export/fxci_etl/pulse/handler.py | 9 +++++---- jobs/fxci-taskcluster-export/test/test_pulse_handler.py | 9 ++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/jobs/fxci-taskcluster-export/fxci_etl/pulse/handler.py b/jobs/fxci-taskcluster-export/fxci_etl/pulse/handler.py index 87c73141..1ed91952 100644 --- a/jobs/fxci-taskcluster-export/fxci_etl/pulse/handler.py +++ b/jobs/fxci-taskcluster-export/fxci_etl/pulse/handler.py @@ -93,6 +93,9 @@ def __init__(self, config: Config, **kwargs: Any): self.task_records: list[Record] = [] self.run_records: list[Record] = [] + self.task_loader = BigQueryLoader(self.config, "tasks") + self.run_loader = BigQueryLoader(self.config, "runs") + self._convert_camel_case_re = re.compile(r"(?