From 9a0d002d0c092859d1d25f51a1746a171307c6c2 Mon Sep 17 00:00:00 2001 From: Enzo Maruffa Date: Thu, 8 Jan 2026 01:50:19 -0300 Subject: [PATCH 01/13] feat(bigquery): add DATE type support to BigQuery I/O --- .../io/gcp/bigquery_schema_tools.py | 1 + .../io/gcp/bigquery_schema_tools_test.py | 99 +++++++++++++++++++ .../apache_beam/io/gcp/bigquery_tools.py | 1 + .../apache_beam/io/gcp/bigquery_tools_test.py | 5 +- 4 files changed, 105 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index 54c7ca90f011..7c8e02dac0e6 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -49,6 +49,7 @@ "BYTES": bytes, "TIMESTAMP": apache_beam.utils.timestamp.Timestamp, "GEOGRAPHY": str, + "DATE": str, #TODO(https://github.com/apache/beam/issues/20810): # Finish mappings for all BQ types } diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index 0eb3351ee84c..51f31f2dc1a1 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -336,6 +336,105 @@ def test_geography_with_complex_wkt(self): self.assertEqual(usertype.__annotations__, expected_annotations) + def test_date_type_support(self): + """Test that DATE type is properly supported in schema conversion.""" + fields = [ + bigquery.TableFieldSchema( + name='birth_date', type='DATE', mode="NULLABLE"), + bigquery.TableFieldSchema( + name='dates', type='DATE', mode="REPEATED"), + bigquery.TableFieldSchema( + name='required_date', type='DATE', mode="REQUIRED") + ] + schema = bigquery.TableSchema(fields=fields) + + usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) + + expected_annotations = { + 'birth_date': typing.Optional[str], + 'dates': typing.Sequence[str], + 'required_date': str + } + + self.assertEqual(usertype.__annotations__, expected_annotations) + + def test_date_in_bq_to_python_types_mapping(self): + """Test that DATE is included in BIG_QUERY_TO_PYTHON_TYPES mapping.""" + from apache_beam.io.gcp.bigquery_schema_tools import BIG_QUERY_TO_PYTHON_TYPES + + self.assertIn("DATE", BIG_QUERY_TO_PYTHON_TYPES) + self.assertEqual(BIG_QUERY_TO_PYTHON_TYPES["DATE"], str) + + def test_date_field_type_conversion(self): + """Test bq_field_to_type function with DATE fields.""" + from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type + + # Test required DATE field + result = bq_field_to_type("DATE", "REQUIRED") + self.assertEqual(result, str) + + # Test nullable DATE field + result = bq_field_to_type("DATE", "NULLABLE") + self.assertEqual(result, typing.Optional[str]) + + # Test repeated DATE field + result = bq_field_to_type("DATE", "REPEATED") + self.assertEqual(result, typing.Sequence[str]) + + # Test DATE field with None mode (should default to nullable) + result = bq_field_to_type("DATE", None) + self.assertEqual(result, typing.Optional[str]) + + # Test DATE field with empty mode (should default to nullable) + result = bq_field_to_type("DATE", "") + self.assertEqual(result, typing.Optional[str]) + + def test_convert_to_usertype_with_date(self): + """Test convert_to_usertype function with DATE fields.""" + schema = bigquery.TableSchema( + fields=[ + bigquery.TableFieldSchema( + name='id', type='INTEGER', mode="REQUIRED"), + bigquery.TableFieldSchema( + name='birth_date', type='DATE', mode="NULLABLE"), + bigquery.TableFieldSchema( + name='name', type='STRING', mode="REQUIRED") + ]) + + conversion_transform = bigquery_schema_tools.convert_to_usertype(schema) + + # Verify the transform is created successfully + self.assertIsNotNone(conversion_transform) + + # The transform should be a ParDo with BeamSchemaConversionDoFn + self.assertIsInstance(conversion_transform, beam.ParDo) + + def test_beam_schema_conversion_dofn_with_date(self): + """Test BeamSchemaConversionDoFn with DATE data.""" + from apache_beam.io.gcp.bigquery_schema_tools import BeamSchemaConversionDoFn + + # Create a user type with DATE field + fields = [ + bigquery.TableFieldSchema(name='id', type='INTEGER', mode="REQUIRED"), + bigquery.TableFieldSchema( + name='birth_date', type='DATE', mode="NULLABLE") + ] + schema = bigquery.TableSchema(fields=fields) + usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) + + # Create the DoFn + dofn = BeamSchemaConversionDoFn(usertype) + + # Test processing a dictionary with DATE data + input_dict = {'id': 1, 'birth_date': '2021-01-15'} + + results = list(dofn.process(input_dict)) + self.assertEqual(len(results), 1) + + result = results[0] + self.assertEqual(result.id, 1) + self.assertEqual(result.birth_date, '2021-01-15') + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index ddab941f9278..4e0ef2e83e18 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -124,6 +124,7 @@ "NUMERIC": decimal.Decimal, "TIMESTAMP": apache_beam.utils.timestamp.Timestamp, "GEOGRAPHY": str, + "DATE": str, } diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index 2594e6728e0e..93ee9ee70754 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -1016,7 +1016,8 @@ def test_dict_to_beam_row_repeated_nested_record(self): class TestBeamTypehintFromSchema(unittest.TestCase): EXPECTED_TYPEHINTS = [("str", str), ("bool", bool), ("bytes", bytes), ("int", np.int64), ("float", np.float64), - ("numeric", decimal.Decimal), ("timestamp", Timestamp)] + ("numeric", decimal.Decimal), ("timestamp", Timestamp), + ("date", str)] def get_schema_fields_with_mode(self, mode): return [{ @@ -1033,6 +1034,8 @@ def get_schema_fields_with_mode(self, mode): "name": "numeric", "type": "NUMERIC", "mode": mode }, { "name": "timestamp", "type": "TIMESTAMP", "mode": mode + }, { + "name": "date", "type": "DATE", "mode": mode }] def test_typehints_from_required_schema(self): From b9e5ddb42256b307b5b0eb2a004486390dfee4ba Mon Sep 17 00:00:00 2001 From: Enzo Maruffa Date: Thu, 8 Jan 2026 01:51:21 -0300 Subject: [PATCH 02/13] feat(bigquery): add DATETIME type support to BigQuery I/O --- .../io/gcp/bigquery_schema_tools.py | 1 + .../io/gcp/bigquery_schema_tools_test.py | 99 +++++++++++++++++++ .../apache_beam/io/gcp/bigquery_tools.py | 1 + .../apache_beam/io/gcp/bigquery_tools_test.py | 4 +- 4 files changed, 104 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index 7c8e02dac0e6..9f206e798697 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -50,6 +50,7 @@ "TIMESTAMP": apache_beam.utils.timestamp.Timestamp, "GEOGRAPHY": str, "DATE": str, + "DATETIME": str, #TODO(https://github.com/apache/beam/issues/20810): # Finish mappings for all BQ types } diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index 51f31f2dc1a1..675ac016435f 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -435,6 +435,105 @@ def test_beam_schema_conversion_dofn_with_date(self): self.assertEqual(result.id, 1) self.assertEqual(result.birth_date, '2021-01-15') + def test_datetime_type_support(self): + """Test that DATETIME type is properly supported in schema conversion.""" + fields = [ + bigquery.TableFieldSchema( + name='event_time', type='DATETIME', mode="NULLABLE"), + bigquery.TableFieldSchema( + name='timestamps', type='DATETIME', mode="REPEATED"), + bigquery.TableFieldSchema( + name='required_time', type='DATETIME', mode="REQUIRED") + ] + schema = bigquery.TableSchema(fields=fields) + + usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) + + expected_annotations = { + 'event_time': typing.Optional[str], + 'timestamps': typing.Sequence[str], + 'required_time': str + } + + self.assertEqual(usertype.__annotations__, expected_annotations) + + def test_datetime_in_bq_to_python_types_mapping(self): + """Test that DATETIME is included in BIG_QUERY_TO_PYTHON_TYPES mapping.""" + from apache_beam.io.gcp.bigquery_schema_tools import BIG_QUERY_TO_PYTHON_TYPES + + self.assertIn("DATETIME", BIG_QUERY_TO_PYTHON_TYPES) + self.assertEqual(BIG_QUERY_TO_PYTHON_TYPES["DATETIME"], str) + + def test_datetime_field_type_conversion(self): + """Test bq_field_to_type function with DATETIME fields.""" + from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type + + # Test required DATETIME field + result = bq_field_to_type("DATETIME", "REQUIRED") + self.assertEqual(result, str) + + # Test nullable DATETIME field + result = bq_field_to_type("DATETIME", "NULLABLE") + self.assertEqual(result, typing.Optional[str]) + + # Test repeated DATETIME field + result = bq_field_to_type("DATETIME", "REPEATED") + self.assertEqual(result, typing.Sequence[str]) + + # Test DATETIME field with None mode (should default to nullable) + result = bq_field_to_type("DATETIME", None) + self.assertEqual(result, typing.Optional[str]) + + # Test DATETIME field with empty mode (should default to nullable) + result = bq_field_to_type("DATETIME", "") + self.assertEqual(result, typing.Optional[str]) + + def test_convert_to_usertype_with_datetime(self): + """Test convert_to_usertype function with DATETIME fields.""" + schema = bigquery.TableSchema( + fields=[ + bigquery.TableFieldSchema( + name='id', type='INTEGER', mode="REQUIRED"), + bigquery.TableFieldSchema( + name='event_time', type='DATETIME', mode="NULLABLE"), + bigquery.TableFieldSchema( + name='name', type='STRING', mode="REQUIRED") + ]) + + conversion_transform = bigquery_schema_tools.convert_to_usertype(schema) + + # Verify the transform is created successfully + self.assertIsNotNone(conversion_transform) + + # The transform should be a ParDo with BeamSchemaConversionDoFn + self.assertIsInstance(conversion_transform, beam.ParDo) + + def test_beam_schema_conversion_dofn_with_datetime(self): + """Test BeamSchemaConversionDoFn with DATETIME data.""" + from apache_beam.io.gcp.bigquery_schema_tools import BeamSchemaConversionDoFn + + # Create a user type with DATETIME field + fields = [ + bigquery.TableFieldSchema(name='id', type='INTEGER', mode="REQUIRED"), + bigquery.TableFieldSchema( + name='event_time', type='DATETIME', mode="NULLABLE") + ] + schema = bigquery.TableSchema(fields=fields) + usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) + + # Create the DoFn + dofn = BeamSchemaConversionDoFn(usertype) + + # Test processing a dictionary with DATETIME data + input_dict = {'id': 1, 'event_time': '2021-01-15T10:30:00'} + + results = list(dofn.process(input_dict)) + self.assertEqual(len(results), 1) + + result = results[0] + self.assertEqual(result.id, 1) + self.assertEqual(result.event_time, '2021-01-15T10:30:00') + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 4e0ef2e83e18..e44f5bac7597 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -125,6 +125,7 @@ "TIMESTAMP": apache_beam.utils.timestamp.Timestamp, "GEOGRAPHY": str, "DATE": str, + "DATETIME": str, } diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index 93ee9ee70754..9ea7e188152f 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -1017,7 +1017,7 @@ class TestBeamTypehintFromSchema(unittest.TestCase): EXPECTED_TYPEHINTS = [("str", str), ("bool", bool), ("bytes", bytes), ("int", np.int64), ("float", np.float64), ("numeric", decimal.Decimal), ("timestamp", Timestamp), - ("date", str)] + ("date", str), ("datetime", str)] def get_schema_fields_with_mode(self, mode): return [{ @@ -1036,6 +1036,8 @@ def get_schema_fields_with_mode(self, mode): "name": "timestamp", "type": "TIMESTAMP", "mode": mode }, { "name": "date", "type": "DATE", "mode": mode + }, { + "name": "datetime", "type": "DATETIME", "mode": mode }] def test_typehints_from_required_schema(self): From d305d3465ddb7f0aaa35822888ad93ffe60db343 Mon Sep 17 00:00:00 2001 From: Enzo Maruffa Date: Thu, 8 Jan 2026 01:52:20 -0300 Subject: [PATCH 03/13] feat(bigquery): add JSON type support to BigQuery I/O --- .../io/gcp/bigquery_schema_tools.py | 1 + .../io/gcp/bigquery_schema_tools_test.py | 99 +++++++++++++++++++ .../apache_beam/io/gcp/bigquery_tools.py | 1 + .../apache_beam/io/gcp/bigquery_tools_test.py | 4 +- 4 files changed, 104 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index 9f206e798697..b464ad375bc0 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -51,6 +51,7 @@ "GEOGRAPHY": str, "DATE": str, "DATETIME": str, + "JSON": str, #TODO(https://github.com/apache/beam/issues/20810): # Finish mappings for all BQ types } diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index 675ac016435f..a61101d37b09 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -534,6 +534,105 @@ def test_beam_schema_conversion_dofn_with_datetime(self): self.assertEqual(result.id, 1) self.assertEqual(result.event_time, '2021-01-15T10:30:00') + def test_json_type_support(self): + """Test that JSON type is properly supported in schema conversion.""" + fields = [ + bigquery.TableFieldSchema( + name='data', type='JSON', mode="NULLABLE"), + bigquery.TableFieldSchema( + name='items', type='JSON', mode="REPEATED"), + bigquery.TableFieldSchema( + name='required_data', type='JSON', mode="REQUIRED") + ] + schema = bigquery.TableSchema(fields=fields) + + usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) + + expected_annotations = { + 'data': typing.Optional[str], + 'items': typing.Sequence[str], + 'required_data': str + } + + self.assertEqual(usertype.__annotations__, expected_annotations) + + def test_json_in_bq_to_python_types_mapping(self): + """Test that JSON is included in BIG_QUERY_TO_PYTHON_TYPES mapping.""" + from apache_beam.io.gcp.bigquery_schema_tools import BIG_QUERY_TO_PYTHON_TYPES + + self.assertIn("JSON", BIG_QUERY_TO_PYTHON_TYPES) + self.assertEqual(BIG_QUERY_TO_PYTHON_TYPES["JSON"], str) + + def test_json_field_type_conversion(self): + """Test bq_field_to_type function with JSON fields.""" + from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type + + # Test required JSON field + result = bq_field_to_type("JSON", "REQUIRED") + self.assertEqual(result, str) + + # Test nullable JSON field + result = bq_field_to_type("JSON", "NULLABLE") + self.assertEqual(result, typing.Optional[str]) + + # Test repeated JSON field + result = bq_field_to_type("JSON", "REPEATED") + self.assertEqual(result, typing.Sequence[str]) + + # Test JSON field with None mode (should default to nullable) + result = bq_field_to_type("JSON", None) + self.assertEqual(result, typing.Optional[str]) + + # Test JSON field with empty mode (should default to nullable) + result = bq_field_to_type("JSON", "") + self.assertEqual(result, typing.Optional[str]) + + def test_convert_to_usertype_with_json(self): + """Test convert_to_usertype function with JSON fields.""" + schema = bigquery.TableSchema( + fields=[ + bigquery.TableFieldSchema( + name='id', type='INTEGER', mode="REQUIRED"), + bigquery.TableFieldSchema( + name='data', type='JSON', mode="NULLABLE"), + bigquery.TableFieldSchema( + name='name', type='STRING', mode="REQUIRED") + ]) + + conversion_transform = bigquery_schema_tools.convert_to_usertype(schema) + + # Verify the transform is created successfully + self.assertIsNotNone(conversion_transform) + + # The transform should be a ParDo with BeamSchemaConversionDoFn + self.assertIsInstance(conversion_transform, beam.ParDo) + + def test_beam_schema_conversion_dofn_with_json(self): + """Test BeamSchemaConversionDoFn with JSON data.""" + from apache_beam.io.gcp.bigquery_schema_tools import BeamSchemaConversionDoFn + + # Create a user type with JSON field + fields = [ + bigquery.TableFieldSchema(name='id', type='INTEGER', mode="REQUIRED"), + bigquery.TableFieldSchema( + name='data', type='JSON', mode="NULLABLE") + ] + schema = bigquery.TableSchema(fields=fields) + usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) + + # Create the DoFn + dofn = BeamSchemaConversionDoFn(usertype) + + # Test processing a dictionary with JSON data + input_dict = {'id': 1, 'data': '{"key": "value", "count": 42}'} + + results = list(dofn.process(input_dict)) + self.assertEqual(len(results), 1) + + result = results[0] + self.assertEqual(result.id, 1) + self.assertEqual(result.data, '{"key": "value", "count": 42}') + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index e44f5bac7597..4dd0ab10cf81 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -126,6 +126,7 @@ "GEOGRAPHY": str, "DATE": str, "DATETIME": str, + "JSON": str, } diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index 9ea7e188152f..26785498ea5e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -1017,7 +1017,7 @@ class TestBeamTypehintFromSchema(unittest.TestCase): EXPECTED_TYPEHINTS = [("str", str), ("bool", bool), ("bytes", bytes), ("int", np.int64), ("float", np.float64), ("numeric", decimal.Decimal), ("timestamp", Timestamp), - ("date", str), ("datetime", str)] + ("date", str), ("datetime", str), ("json", str)] def get_schema_fields_with_mode(self, mode): return [{ @@ -1038,6 +1038,8 @@ def get_schema_fields_with_mode(self, mode): "name": "date", "type": "DATE", "mode": mode }, { "name": "datetime", "type": "DATETIME", "mode": mode + }, { + "name": "json", "type": "JSON", "mode": mode }] def test_typehints_from_required_schema(self): From e0cd5be72823cbf47283706462ecd244a593c11b Mon Sep 17 00:00:00 2001 From: Enzo Maruffa Date: Thu, 8 Jan 2026 03:09:03 -0300 Subject: [PATCH 04/13] style: fix yapf formatting and add issue link to CHANGES.md --- .../apache_beam/io/gcp/bigquery_schema_tools_test.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index a61101d37b09..48df50462c94 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -341,8 +341,7 @@ def test_date_type_support(self): fields = [ bigquery.TableFieldSchema( name='birth_date', type='DATE', mode="NULLABLE"), - bigquery.TableFieldSchema( - name='dates', type='DATE', mode="REPEATED"), + bigquery.TableFieldSchema(name='dates', type='DATE', mode="REPEATED"), bigquery.TableFieldSchema( name='required_date', type='DATE', mode="REQUIRED") ] @@ -537,10 +536,8 @@ def test_beam_schema_conversion_dofn_with_datetime(self): def test_json_type_support(self): """Test that JSON type is properly supported in schema conversion.""" fields = [ - bigquery.TableFieldSchema( - name='data', type='JSON', mode="NULLABLE"), - bigquery.TableFieldSchema( - name='items', type='JSON', mode="REPEATED"), + bigquery.TableFieldSchema(name='data', type='JSON', mode="NULLABLE"), + bigquery.TableFieldSchema(name='items', type='JSON', mode="REPEATED"), bigquery.TableFieldSchema( name='required_data', type='JSON', mode="REQUIRED") ] @@ -614,8 +611,7 @@ def test_beam_schema_conversion_dofn_with_json(self): # Create a user type with JSON field fields = [ bigquery.TableFieldSchema(name='id', type='INTEGER', mode="REQUIRED"), - bigquery.TableFieldSchema( - name='data', type='JSON', mode="NULLABLE") + bigquery.TableFieldSchema(name='data', type='JSON', mode="NULLABLE") ] schema = bigquery.TableSchema(fields=fields) usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) From 167762adcd0c0e2e9ef502420cf41e8ea95cd01b Mon Sep 17 00:00:00 2001 From: Enzo Maruffa Date: Tue, 27 Jan 2026 15:19:21 -0300 Subject: [PATCH 05/13] feat(bigquery): add type_overrides to get_beam_typehints_from_tableschema --- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 4dd0ab10cf81..cf3bedd828b3 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -1777,18 +1777,23 @@ def get_avro_schema_from_table_schema(schema): "root", dict_table_schema) -def get_beam_typehints_from_tableschema(schema): +def get_beam_typehints_from_tableschema(schema, type_overrides=None): """Extracts Beam Python type hints from the schema. Args: schema (~apache_beam.io.gcp.internal.clients.bigquery.\ bigquery_v2_messages.TableSchema): The TableSchema to extract type hints from. + type_overrides (dict): Optional mapping of BigQuery type names (uppercase) + to Python types. These override the default mappings in + BIGQUERY_TYPE_TO_PYTHON_TYPE. For example: + ``{'DATE': datetime.date, 'JSON': dict}`` Returns: List[Tuple[str, Any]]: A list of type hints that describe the input schema. Nested and repeated fields are supported. """ + effective_types = {**BIGQUERY_TYPE_TO_PYTHON_TYPE, **(type_overrides or {})} if not isinstance(schema, (bigquery.TableSchema, bigquery.TableFieldSchema)): schema = get_bq_tableschema(schema) typehints = [] @@ -1798,9 +1803,9 @@ def get_beam_typehints_from_tableschema(schema): if field_type in ["STRUCT", "RECORD"]: # Structs can be represented as Beam Rows. typehint = RowTypeConstraint.from_fields( - get_beam_typehints_from_tableschema(field)) - elif field_type in BIGQUERY_TYPE_TO_PYTHON_TYPE: - typehint = BIGQUERY_TYPE_TO_PYTHON_TYPE[field_type] + get_beam_typehints_from_tableschema(field, type_overrides)) + elif field_type in effective_types: + typehint = effective_types[field_type] else: raise ValueError( f"Converting BigQuery type [{field_type}] to " From 8cdfb77d258a5dedf817e5ad2725ecf383844189 Mon Sep 17 00:00:00 2001 From: Enzo Maruffa Date: Tue, 27 Jan 2026 15:19:30 -0300 Subject: [PATCH 06/13] feat(bigquery): add type_overrides to schema tools functions --- .../io/gcp/bigquery_schema_tools.py | 51 +++++++++++++++---- 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index b464ad375bc0..e28b12986e5e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -58,15 +58,21 @@ def generate_user_type_from_bq_schema( - the_table_schema, selected_fields: 'bigquery.TableSchema' = None) -> type: + the_table_schema, + selected_fields: 'bigquery.TableSchema' = None, + type_overrides=None) -> type: """Convert a schema of type TableSchema into a pcollection element. Args: the_table_schema: A BQ schema of type TableSchema selected_fields: if not None, the subset of fields to consider + type_overrides: Optional mapping of BigQuery type names (uppercase) + to Python types. These override the default mappings in + BIG_QUERY_TO_PYTHON_TYPES. For example: + ``{'DATE': datetime.date, 'JSON': dict}`` Returns: type: type that can be used to work with pCollections. """ - + effective_types = {**BIG_QUERY_TO_PYTHON_TYPES, **(type_overrides or {})} the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema( the_table_schema) if the_schema == {}: @@ -75,8 +81,8 @@ def generate_user_type_from_bq_schema( for field in the_schema['fields']: if selected_fields is not None and field['name'] not in selected_fields: continue - if field['type'] in BIG_QUERY_TO_PYTHON_TYPES: - typ = bq_field_to_type(field['type'], field['mode']) + if field['type'] in effective_types: + typ = bq_field_to_type(field['type'], field['mode'], type_overrides) else: raise ValueError( f"Encountered " @@ -88,19 +94,44 @@ def generate_user_type_from_bq_schema( return usertype -def bq_field_to_type(field, mode): +def bq_field_to_type(field, mode, type_overrides=None): + """Convert a BigQuery field type and mode to a Python type hint. + + Args: + field: The BigQuery type name (e.g., 'STRING', 'DATE'). + mode: The field mode ('NULLABLE', 'REPEATED', 'REQUIRED'). + type_overrides: Optional mapping of BigQuery type names (uppercase) + to Python types. These override the default mappings. + + Returns: + The corresponding Python type hint. + """ + effective_types = {**BIG_QUERY_TO_PYTHON_TYPES, **(type_overrides or {})} if mode == 'NULLABLE' or mode is None or mode == '': - return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]] + return Optional[effective_types[field]] elif mode == 'REPEATED': - return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]] + return Sequence[effective_types[field]] elif mode == 'REQUIRED': - return BIG_QUERY_TO_PYTHON_TYPES[field] + return effective_types[field] else: raise ValueError(f"Encountered an unsupported mode: {mode!r}") -def convert_to_usertype(table_schema, selected_fields=None): - usertype = generate_user_type_from_bq_schema(table_schema, selected_fields) +def convert_to_usertype( + table_schema, selected_fields=None, type_overrides=None): + """Convert a BigQuery table schema to a user type. + + Args: + table_schema: A BQ schema of type TableSchema + selected_fields: if not None, the subset of fields to consider + type_overrides: Optional mapping of BigQuery type names (uppercase) + to Python types. + + Returns: + A ParDo transform that converts dictionaries to the user type. + """ + usertype = generate_user_type_from_bq_schema( + table_schema, selected_fields, type_overrides) return beam.ParDo(BeamSchemaConversionDoFn(usertype)) From f6a3f630d906125645a2fc9447d39d2a79862832 Mon Sep 17 00:00:00 2001 From: Enzo Maruffa Date: Tue, 27 Jan 2026 15:19:35 -0300 Subject: [PATCH 07/13] feat(bigquery): add type_overrides parameter to WriteToBigQuery --- sdks/python/apache_beam/io/gcp/bigquery.py | 25 ++++++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 181c891c1b65..949e307baf01 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2009,7 +2009,8 @@ def __init__( use_cdc_writes: bool = False, primary_key: List[str] = None, expansion_service=None, - big_lake_configuration=None): + big_lake_configuration=None, + type_overrides=None): """Initialize a WriteToBigQuery transform. Args: @@ -2186,6 +2187,11 @@ def __init__( CREATE_IF_NEEDED mode for the underlying tables a list of column names is required to be configured as the primary key. Used for STORAGE_WRITE_API, working on 'at least once' mode. + type_overrides (dict): Optional mapping of BigQuery type names (uppercase) + to Python types. These override the default type mappings when + converting BigQuery schemas to Python types for STORAGE_WRITE_API. + For example: ``{'DATE': datetime.date, 'JSON': dict}``. + Default mappings include STRING->str, INT64->np.int64, etc. """ self._table = table self._dataset = dataset @@ -2231,6 +2237,7 @@ def __init__( self._use_cdc_writes = use_cdc_writes self._primary_key = primary_key self._big_lake_configuration = big_lake_configuration + self._type_overrides = type_overrides # Dict/schema methods were moved to bigquery_tools, but keep references # here for backward compatibility. @@ -2395,7 +2402,8 @@ def find_in_nested_dict(schema): use_cdc_writes=self._use_cdc_writes, primary_key=self._primary_key, big_lake_configuration=self._big_lake_configuration, - expansion_service=self.expansion_service) + expansion_service=self.expansion_service, + type_overrides=self._type_overrides) else: raise ValueError(f"Unsupported method {method_to_use}") @@ -2644,7 +2652,8 @@ def __init__( use_cdc_writes: bool = False, primary_key: List[str] = None, big_lake_configuration=None, - expansion_service=None): + expansion_service=None, + type_overrides=None): self._table = table self._table_side_inputs = table_side_inputs self._schema = schema @@ -2658,6 +2667,7 @@ def __init__( self._use_cdc_writes = use_cdc_writes self._primary_key = primary_key self._big_lake_configuration = big_lake_configuration + self._type_overrides = type_overrides self._expansion_service = expansion_service or BeamJarExpansionService( 'sdks:java:io:google-cloud-platform:expansion-service:build') @@ -2691,7 +2701,7 @@ def expand(self, input): input_beam_rows = ( input | "Convert dict to Beam Row" >> self.ConvertToBeamRows( - schema, False).with_output_types()) + schema, False, self._type_overrides).with_output_types()) # For dynamic destinations, we first figure out where each row is going. # Then we send (destination, record) rows over to Java SchemaTransform. @@ -2723,7 +2733,7 @@ def expand(self, input): input_beam_rows = ( input_rows | "Convert dict to Beam Row" >> self.ConvertToBeamRows( - schema, True).with_output_types()) + schema, True, self._type_overrides).with_output_types()) # communicate to Java that this write should use dynamic destinations table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS @@ -2791,9 +2801,10 @@ def __exit__(self, *args): pass class ConvertToBeamRows(PTransform): - def __init__(self, schema, dynamic_destinations): + def __init__(self, schema, dynamic_destinations, type_overrides=None): self.schema = schema self.dynamic_destinations = dynamic_destinations + self.type_overrides = type_overrides def expand(self, input_dicts): if self.dynamic_destinations: @@ -2819,7 +2830,7 @@ def expand(self, input_dicts): def with_output_types(self): row_type_hints = bigquery_tools.get_beam_typehints_from_tableschema( - self.schema) + self.schema, self.type_overrides) if self.dynamic_destinations: type_hint = RowTypeConstraint.from_fields([ (StorageWriteToBigQuery.DESTINATION, str), From 7d251cd4f6b05a25aae30b31cd141397b5d1d27c Mon Sep 17 00:00:00 2001 From: Enzo Maruffa Date: Tue, 27 Jan 2026 15:19:43 -0300 Subject: [PATCH 08/13] test(bigquery): add tests for type_overrides functionality --- .../io/gcp/bigquery_schema_tools_test.py | 71 ++++++++++ .../apache_beam/io/gcp/bigquery_tools_test.py | 133 ++++++++++++++++++ 2 files changed, 204 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index 48df50462c94..19906c066b7b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -630,6 +630,77 @@ def test_beam_schema_conversion_dofn_with_json(self): self.assertEqual(result.data, '{"key": "value", "count": 42}') +class TypeOverridesSchemaToolsTest(unittest.TestCase): + """Tests for type_overrides parameter in bigquery_schema_tools.""" + def test_bq_field_to_type_with_overrides(self): + """Test bq_field_to_type function with type_overrides.""" + import datetime + from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type + + # Without overrides + self.assertEqual(bq_field_to_type("DATE", "REQUIRED"), str) + + # With overrides + overrides = {"DATE": datetime.date} + self.assertEqual( + bq_field_to_type("DATE", "REQUIRED", overrides), datetime.date) + self.assertEqual( + bq_field_to_type("DATE", "NULLABLE", overrides), + typing.Optional[datetime.date]) + self.assertEqual( + bq_field_to_type("DATE", "REPEATED", overrides), + typing.Sequence[datetime.date]) + + def test_generate_user_type_with_overrides(self): + """Test generate_user_type_from_bq_schema with type_overrides.""" + import datetime + + schema = bigquery.TableSchema( + fields=[ + bigquery.TableFieldSchema( + name='id', type='INTEGER', mode="REQUIRED"), + bigquery.TableFieldSchema( + name='event_date', type='DATE', mode="NULLABLE") + ]) + + # Without overrides + usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) + self.assertEqual( + usertype.__annotations__, { + 'id': np.int64, 'event_date': typing.Optional[str] + }) + + # With overrides + overrides = {"DATE": datetime.date} + usertype_with_override = \ + bigquery_schema_tools.generate_user_type_from_bq_schema( + schema, type_overrides=overrides) + self.assertEqual( + usertype_with_override.__annotations__, { + 'id': np.int64, 'event_date': typing.Optional[datetime.date] + }) + + def test_convert_to_usertype_with_overrides(self): + """Test convert_to_usertype function with type_overrides.""" + import datetime + + schema = bigquery.TableSchema( + fields=[ + bigquery.TableFieldSchema( + name='id', type='INTEGER', mode="REQUIRED"), + bigquery.TableFieldSchema( + name='event_date', type='DATE', mode="NULLABLE") + ]) + + overrides = {"DATE": datetime.date} + transform = bigquery_schema_tools.convert_to_usertype( + schema, type_overrides=overrides) + + # The transform should be created successfully + self.assertIsNotNone(transform) + self.assertIsInstance(transform, beam.ParDo) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index 26785498ea5e..6d82e093ae07 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -1255,6 +1255,139 @@ def test_geography_with_special_characters(self): self.assertIsInstance(result, str) +class TestTypeOverrides(unittest.TestCase): + """Tests for type_overrides parameter in BigQuery type mappings.""" + def test_type_overrides_basic(self): + """Test that type_overrides overrides default type mappings.""" + import datetime + schema = { + "fields": [{ + "name": "date_field", "type": "DATE", "mode": "REQUIRED" + }, + { + "name": "datetime_field", + "type": "DATETIME", + "mode": "REQUIRED" + }] + } + + # Without overrides, DATE and DATETIME map to str + typehints = get_beam_typehints_from_tableschema(schema) + self.assertEqual(typehints, [("date_field", str), ("datetime_field", str)]) + + # With overrides, use custom types + type_overrides = {"DATE": datetime.date, "DATETIME": datetime.datetime} + typehints_with_override = get_beam_typehints_from_tableschema( + schema, type_overrides) + self.assertEqual( + typehints_with_override, [("date_field", datetime.date), + ("datetime_field", datetime.datetime)]) + + def test_type_overrides_with_modes(self): + """Test that type_overrides works with NULLABLE and REPEATED modes.""" + import datetime + schema = { + "fields": [{ + "name": "required_date", "type": "DATE", "mode": "REQUIRED" + }, { + "name": "optional_date", "type": "DATE", "mode": "NULLABLE" + }, { + "name": "repeated_dates", "type": "DATE", "mode": "REPEATED" + }] + } + + type_overrides = {"DATE": datetime.date} + typehints = get_beam_typehints_from_tableschema(schema, type_overrides) + + expected = [("required_date", datetime.date), + ("optional_date", Optional[datetime.date]), + ("repeated_dates", Sequence[datetime.date])] + self.assertEqual(typehints, expected) + + def test_type_overrides_partial(self): + """Test that type_overrides only affects specified types.""" + import datetime + schema = { + "fields": [{ + "name": "date_field", "type": "DATE", "mode": "REQUIRED" + }, { + "name": "string_field", "type": "STRING", "mode": "REQUIRED" + }, { + "name": "int_field", "type": "INTEGER", "mode": "REQUIRED" + }] + } + + # Only override DATE + type_overrides = {"DATE": datetime.date} + typehints = get_beam_typehints_from_tableschema(schema, type_overrides) + + expected = [("date_field", datetime.date), ("string_field", str), + ("int_field", np.int64)] + self.assertEqual(typehints, expected) + + def test_type_overrides_with_nested_struct(self): + """Test that type_overrides is propagated to nested STRUCT fields.""" + import datetime + schema = bigquery.TableSchema() + + # Root field + date_field = bigquery.TableFieldSchema() + date_field.name = "date_field" + date_field.type = "DATE" + date_field.mode = "REQUIRED" + + # Nested struct with DATE field + struct_field = bigquery.TableFieldSchema() + struct_field.name = "nested" + struct_field.type = "RECORD" + struct_field.mode = "REQUIRED" + + nested_date = bigquery.TableFieldSchema() + nested_date.name = "nested_date" + nested_date.type = "DATE" + nested_date.mode = "REQUIRED" + struct_field.fields.append(nested_date) + + schema.fields.append(date_field) + schema.fields.append(struct_field) + + type_overrides = {"DATE": datetime.date} + typehints = get_beam_typehints_from_tableschema(schema, type_overrides) + + self.assertEqual(len(typehints), 2) + self.assertEqual(typehints[0], ("date_field", datetime.date)) + # The nested field's DATE should also be overridden + nested_constraint = typehints[1][1] + nested_fields = nested_constraint._fields + self.assertEqual(nested_fields[0], ("nested_date", datetime.date)) + + def test_type_overrides_empty_dict(self): + """Test that empty type_overrides dict uses default mappings.""" + schema = { + "fields": [{ + "name": "date_field", "type": "DATE", "mode": "REQUIRED" + }] + } + + typehints_none = get_beam_typehints_from_tableschema(schema, None) + typehints_empty = get_beam_typehints_from_tableschema(schema, {}) + + self.assertEqual(typehints_none, typehints_empty) + self.assertEqual(typehints_none, [("date_field", str)]) + + def test_type_overrides_json_to_dict(self): + """Test overriding JSON type to dict.""" + schema = {"fields": [{"name": "data", "type": "JSON", "mode": "NULLABLE"}]} + + # Default: JSON -> str + typehints = get_beam_typehints_from_tableschema(schema) + self.assertEqual(typehints, [("data", Optional[str])]) + + # Override: JSON -> dict + typehints_dict = get_beam_typehints_from_tableschema(schema, {"JSON": dict}) + self.assertEqual(typehints_dict, [("data", Optional[dict])]) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() From 69ea79b904834762f0c999514905a64ea51ff975 Mon Sep 17 00:00:00 2001 From: Enzo Maruffa Date: Tue, 27 Jan 2026 15:20:06 -0300 Subject: [PATCH 09/13] docs: add type_overrides feature to CHANGES.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index f4a04320d66c..2a70f475f993 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -71,6 +71,7 @@ * (Python) Added exception chaining to preserve error context in CloudSQLEnrichmentHandler, processes utilities, and core transforms ([#37422](https://github.com/apache/beam/issues/37422)). * (Python) Added `take(n)` convenience for PCollection: `beam.take(n)` and `pcoll.take(n)` to get the first N elements deterministically without Top.Of + FlatMap ([#X](https://github.com/apache/beam/issues/37429)). +* Added `type_overrides` parameter to `WriteToBigQuery` allowing users to specify custom BigQuery to Python type mappings when using Storage Write API (Python) ([#25946](https://github.com/apache/beam/issues/25946)). * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Breaking Changes From 4633fdfcbe0a8567a9b259b809bbe4207ce9d64b Mon Sep 17 00:00:00 2001 From: Enzo Maruffa Date: Tue, 27 Jan 2026 15:29:54 -0300 Subject: [PATCH 10/13] refactor(bigquery): remove hardcoded DATE/DATETIME/JSON, require type_overrides --- CHANGES.md | 2 +- .../io/gcp/bigquery_schema_tools.py | 3 - .../io/gcp/bigquery_schema_tools_test.py | 355 +++--------------- .../apache_beam/io/gcp/bigquery_tools.py | 3 - .../apache_beam/io/gcp/bigquery_tools_test.py | 86 +++-- 5 files changed, 103 insertions(+), 346 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 2a70f475f993..9f1fe2a95931 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -71,7 +71,7 @@ * (Python) Added exception chaining to preserve error context in CloudSQLEnrichmentHandler, processes utilities, and core transforms ([#37422](https://github.com/apache/beam/issues/37422)). * (Python) Added `take(n)` convenience for PCollection: `beam.take(n)` and `pcoll.take(n)` to get the first N elements deterministically without Top.Of + FlatMap ([#X](https://github.com/apache/beam/issues/37429)). -* Added `type_overrides` parameter to `WriteToBigQuery` allowing users to specify custom BigQuery to Python type mappings when using Storage Write API (Python) ([#25946](https://github.com/apache/beam/issues/25946)). +* Added `type_overrides` parameter to `WriteToBigQuery` allowing users to specify custom BigQuery to Python type mappings when using Storage Write API. This enables support for types like DATE, DATETIME, and JSON (Python) ([#25946](https://github.com/apache/beam/issues/25946)). * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Breaking Changes diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index e28b12986e5e..792d6ffbd90c 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -49,9 +49,6 @@ "BYTES": bytes, "TIMESTAMP": apache_beam.utils.timestamp.Timestamp, "GEOGRAPHY": str, - "DATE": str, - "DATETIME": str, - "JSON": str, #TODO(https://github.com/apache/beam/issues/20810): # Finish mappings for all BQ types } diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index 19906c066b7b..0625fe21809c 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -336,299 +336,6 @@ def test_geography_with_complex_wkt(self): self.assertEqual(usertype.__annotations__, expected_annotations) - def test_date_type_support(self): - """Test that DATE type is properly supported in schema conversion.""" - fields = [ - bigquery.TableFieldSchema( - name='birth_date', type='DATE', mode="NULLABLE"), - bigquery.TableFieldSchema(name='dates', type='DATE', mode="REPEATED"), - bigquery.TableFieldSchema( - name='required_date', type='DATE', mode="REQUIRED") - ] - schema = bigquery.TableSchema(fields=fields) - - usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) - - expected_annotations = { - 'birth_date': typing.Optional[str], - 'dates': typing.Sequence[str], - 'required_date': str - } - - self.assertEqual(usertype.__annotations__, expected_annotations) - - def test_date_in_bq_to_python_types_mapping(self): - """Test that DATE is included in BIG_QUERY_TO_PYTHON_TYPES mapping.""" - from apache_beam.io.gcp.bigquery_schema_tools import BIG_QUERY_TO_PYTHON_TYPES - - self.assertIn("DATE", BIG_QUERY_TO_PYTHON_TYPES) - self.assertEqual(BIG_QUERY_TO_PYTHON_TYPES["DATE"], str) - - def test_date_field_type_conversion(self): - """Test bq_field_to_type function with DATE fields.""" - from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type - - # Test required DATE field - result = bq_field_to_type("DATE", "REQUIRED") - self.assertEqual(result, str) - - # Test nullable DATE field - result = bq_field_to_type("DATE", "NULLABLE") - self.assertEqual(result, typing.Optional[str]) - - # Test repeated DATE field - result = bq_field_to_type("DATE", "REPEATED") - self.assertEqual(result, typing.Sequence[str]) - - # Test DATE field with None mode (should default to nullable) - result = bq_field_to_type("DATE", None) - self.assertEqual(result, typing.Optional[str]) - - # Test DATE field with empty mode (should default to nullable) - result = bq_field_to_type("DATE", "") - self.assertEqual(result, typing.Optional[str]) - - def test_convert_to_usertype_with_date(self): - """Test convert_to_usertype function with DATE fields.""" - schema = bigquery.TableSchema( - fields=[ - bigquery.TableFieldSchema( - name='id', type='INTEGER', mode="REQUIRED"), - bigquery.TableFieldSchema( - name='birth_date', type='DATE', mode="NULLABLE"), - bigquery.TableFieldSchema( - name='name', type='STRING', mode="REQUIRED") - ]) - - conversion_transform = bigquery_schema_tools.convert_to_usertype(schema) - - # Verify the transform is created successfully - self.assertIsNotNone(conversion_transform) - - # The transform should be a ParDo with BeamSchemaConversionDoFn - self.assertIsInstance(conversion_transform, beam.ParDo) - - def test_beam_schema_conversion_dofn_with_date(self): - """Test BeamSchemaConversionDoFn with DATE data.""" - from apache_beam.io.gcp.bigquery_schema_tools import BeamSchemaConversionDoFn - - # Create a user type with DATE field - fields = [ - bigquery.TableFieldSchema(name='id', type='INTEGER', mode="REQUIRED"), - bigquery.TableFieldSchema( - name='birth_date', type='DATE', mode="NULLABLE") - ] - schema = bigquery.TableSchema(fields=fields) - usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) - - # Create the DoFn - dofn = BeamSchemaConversionDoFn(usertype) - - # Test processing a dictionary with DATE data - input_dict = {'id': 1, 'birth_date': '2021-01-15'} - - results = list(dofn.process(input_dict)) - self.assertEqual(len(results), 1) - - result = results[0] - self.assertEqual(result.id, 1) - self.assertEqual(result.birth_date, '2021-01-15') - - def test_datetime_type_support(self): - """Test that DATETIME type is properly supported in schema conversion.""" - fields = [ - bigquery.TableFieldSchema( - name='event_time', type='DATETIME', mode="NULLABLE"), - bigquery.TableFieldSchema( - name='timestamps', type='DATETIME', mode="REPEATED"), - bigquery.TableFieldSchema( - name='required_time', type='DATETIME', mode="REQUIRED") - ] - schema = bigquery.TableSchema(fields=fields) - - usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) - - expected_annotations = { - 'event_time': typing.Optional[str], - 'timestamps': typing.Sequence[str], - 'required_time': str - } - - self.assertEqual(usertype.__annotations__, expected_annotations) - - def test_datetime_in_bq_to_python_types_mapping(self): - """Test that DATETIME is included in BIG_QUERY_TO_PYTHON_TYPES mapping.""" - from apache_beam.io.gcp.bigquery_schema_tools import BIG_QUERY_TO_PYTHON_TYPES - - self.assertIn("DATETIME", BIG_QUERY_TO_PYTHON_TYPES) - self.assertEqual(BIG_QUERY_TO_PYTHON_TYPES["DATETIME"], str) - - def test_datetime_field_type_conversion(self): - """Test bq_field_to_type function with DATETIME fields.""" - from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type - - # Test required DATETIME field - result = bq_field_to_type("DATETIME", "REQUIRED") - self.assertEqual(result, str) - - # Test nullable DATETIME field - result = bq_field_to_type("DATETIME", "NULLABLE") - self.assertEqual(result, typing.Optional[str]) - - # Test repeated DATETIME field - result = bq_field_to_type("DATETIME", "REPEATED") - self.assertEqual(result, typing.Sequence[str]) - - # Test DATETIME field with None mode (should default to nullable) - result = bq_field_to_type("DATETIME", None) - self.assertEqual(result, typing.Optional[str]) - - # Test DATETIME field with empty mode (should default to nullable) - result = bq_field_to_type("DATETIME", "") - self.assertEqual(result, typing.Optional[str]) - - def test_convert_to_usertype_with_datetime(self): - """Test convert_to_usertype function with DATETIME fields.""" - schema = bigquery.TableSchema( - fields=[ - bigquery.TableFieldSchema( - name='id', type='INTEGER', mode="REQUIRED"), - bigquery.TableFieldSchema( - name='event_time', type='DATETIME', mode="NULLABLE"), - bigquery.TableFieldSchema( - name='name', type='STRING', mode="REQUIRED") - ]) - - conversion_transform = bigquery_schema_tools.convert_to_usertype(schema) - - # Verify the transform is created successfully - self.assertIsNotNone(conversion_transform) - - # The transform should be a ParDo with BeamSchemaConversionDoFn - self.assertIsInstance(conversion_transform, beam.ParDo) - - def test_beam_schema_conversion_dofn_with_datetime(self): - """Test BeamSchemaConversionDoFn with DATETIME data.""" - from apache_beam.io.gcp.bigquery_schema_tools import BeamSchemaConversionDoFn - - # Create a user type with DATETIME field - fields = [ - bigquery.TableFieldSchema(name='id', type='INTEGER', mode="REQUIRED"), - bigquery.TableFieldSchema( - name='event_time', type='DATETIME', mode="NULLABLE") - ] - schema = bigquery.TableSchema(fields=fields) - usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) - - # Create the DoFn - dofn = BeamSchemaConversionDoFn(usertype) - - # Test processing a dictionary with DATETIME data - input_dict = {'id': 1, 'event_time': '2021-01-15T10:30:00'} - - results = list(dofn.process(input_dict)) - self.assertEqual(len(results), 1) - - result = results[0] - self.assertEqual(result.id, 1) - self.assertEqual(result.event_time, '2021-01-15T10:30:00') - - def test_json_type_support(self): - """Test that JSON type is properly supported in schema conversion.""" - fields = [ - bigquery.TableFieldSchema(name='data', type='JSON', mode="NULLABLE"), - bigquery.TableFieldSchema(name='items', type='JSON', mode="REPEATED"), - bigquery.TableFieldSchema( - name='required_data', type='JSON', mode="REQUIRED") - ] - schema = bigquery.TableSchema(fields=fields) - - usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) - - expected_annotations = { - 'data': typing.Optional[str], - 'items': typing.Sequence[str], - 'required_data': str - } - - self.assertEqual(usertype.__annotations__, expected_annotations) - - def test_json_in_bq_to_python_types_mapping(self): - """Test that JSON is included in BIG_QUERY_TO_PYTHON_TYPES mapping.""" - from apache_beam.io.gcp.bigquery_schema_tools import BIG_QUERY_TO_PYTHON_TYPES - - self.assertIn("JSON", BIG_QUERY_TO_PYTHON_TYPES) - self.assertEqual(BIG_QUERY_TO_PYTHON_TYPES["JSON"], str) - - def test_json_field_type_conversion(self): - """Test bq_field_to_type function with JSON fields.""" - from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type - - # Test required JSON field - result = bq_field_to_type("JSON", "REQUIRED") - self.assertEqual(result, str) - - # Test nullable JSON field - result = bq_field_to_type("JSON", "NULLABLE") - self.assertEqual(result, typing.Optional[str]) - - # Test repeated JSON field - result = bq_field_to_type("JSON", "REPEATED") - self.assertEqual(result, typing.Sequence[str]) - - # Test JSON field with None mode (should default to nullable) - result = bq_field_to_type("JSON", None) - self.assertEqual(result, typing.Optional[str]) - - # Test JSON field with empty mode (should default to nullable) - result = bq_field_to_type("JSON", "") - self.assertEqual(result, typing.Optional[str]) - - def test_convert_to_usertype_with_json(self): - """Test convert_to_usertype function with JSON fields.""" - schema = bigquery.TableSchema( - fields=[ - bigquery.TableFieldSchema( - name='id', type='INTEGER', mode="REQUIRED"), - bigquery.TableFieldSchema( - name='data', type='JSON', mode="NULLABLE"), - bigquery.TableFieldSchema( - name='name', type='STRING', mode="REQUIRED") - ]) - - conversion_transform = bigquery_schema_tools.convert_to_usertype(schema) - - # Verify the transform is created successfully - self.assertIsNotNone(conversion_transform) - - # The transform should be a ParDo with BeamSchemaConversionDoFn - self.assertIsInstance(conversion_transform, beam.ParDo) - - def test_beam_schema_conversion_dofn_with_json(self): - """Test BeamSchemaConversionDoFn with JSON data.""" - from apache_beam.io.gcp.bigquery_schema_tools import BeamSchemaConversionDoFn - - # Create a user type with JSON field - fields = [ - bigquery.TableFieldSchema(name='id', type='INTEGER', mode="REQUIRED"), - bigquery.TableFieldSchema(name='data', type='JSON', mode="NULLABLE") - ] - schema = bigquery.TableSchema(fields=fields) - usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) - - # Create the DoFn - dofn = BeamSchemaConversionDoFn(usertype) - - # Test processing a dictionary with JSON data - input_dict = {'id': 1, 'data': '{"key": "value", "count": 42}'} - - results = list(dofn.process(input_dict)) - self.assertEqual(len(results), 1) - - result = results[0] - self.assertEqual(result.id, 1) - self.assertEqual(result.data, '{"key": "value", "count": 42}') - class TypeOverridesSchemaToolsTest(unittest.TestCase): """Tests for type_overrides parameter in bigquery_schema_tools.""" @@ -637,10 +344,11 @@ def test_bq_field_to_type_with_overrides(self): import datetime from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type - # Without overrides - self.assertEqual(bq_field_to_type("DATE", "REQUIRED"), str) + # Without overrides, DATE is not supported + with self.assertRaises(KeyError): + bq_field_to_type("DATE", "REQUIRED") - # With overrides + # With overrides, DATE works overrides = {"DATE": datetime.date} self.assertEqual( bq_field_to_type("DATE", "REQUIRED", overrides), datetime.date) @@ -651,6 +359,15 @@ def test_bq_field_to_type_with_overrides(self): bq_field_to_type("DATE", "REPEATED", overrides), typing.Sequence[datetime.date]) + def test_bq_field_to_type_overrides_can_use_str(self): + """Test that type_overrides can map DATE/DATETIME/JSON to str.""" + from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type + + overrides = {"DATE": str, "DATETIME": str, "JSON": str} + self.assertEqual(bq_field_to_type("DATE", "REQUIRED", overrides), str) + self.assertEqual(bq_field_to_type("DATETIME", "REQUIRED", overrides), str) + self.assertEqual(bq_field_to_type("JSON", "REQUIRED", overrides), str) + def test_generate_user_type_with_overrides(self): """Test generate_user_type_from_bq_schema with type_overrides.""" import datetime @@ -663,21 +380,35 @@ def test_generate_user_type_with_overrides(self): name='event_date', type='DATE', mode="NULLABLE") ]) - # Without overrides - usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) + # Without overrides, DATE is not supported + with self.assertRaises(ValueError): + bigquery_schema_tools.generate_user_type_from_bq_schema(schema) + + # With overrides, DATE works + overrides = {"DATE": datetime.date} + usertype = bigquery_schema_tools.generate_user_type_from_bq_schema( + schema, type_overrides=overrides) self.assertEqual( usertype.__annotations__, { - 'id': np.int64, 'event_date': typing.Optional[str] + 'id': np.int64, 'event_date': typing.Optional[datetime.date] }) - # With overrides - overrides = {"DATE": datetime.date} - usertype_with_override = \ - bigquery_schema_tools.generate_user_type_from_bq_schema( - schema, type_overrides=overrides) + def test_generate_user_type_overrides_with_str(self): + """Test that type_overrides can map DATE to str.""" + schema = bigquery.TableSchema( + fields=[ + bigquery.TableFieldSchema( + name='id', type='INTEGER', mode="REQUIRED"), + bigquery.TableFieldSchema( + name='event_date', type='DATE', mode="NULLABLE") + ]) + + overrides = {"DATE": str} + usertype = bigquery_schema_tools.generate_user_type_from_bq_schema( + schema, type_overrides=overrides) self.assertEqual( - usertype_with_override.__annotations__, { - 'id': np.int64, 'event_date': typing.Optional[datetime.date] + usertype.__annotations__, { + 'id': np.int64, 'event_date': typing.Optional[str] }) def test_convert_to_usertype_with_overrides(self): @@ -700,6 +431,18 @@ def test_convert_to_usertype_with_overrides(self): self.assertIsNotNone(transform) self.assertIsInstance(transform, beam.ParDo) + def test_type_overrides_can_override_default_types(self): + """Test that type_overrides can override default type mappings.""" + from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type + + # GEOGRAPHY is in the default mapping as str + self.assertEqual(bq_field_to_type("GEOGRAPHY", "REQUIRED"), str) + + # We can override it + overrides = {"GEOGRAPHY": bytes} + self.assertEqual( + bq_field_to_type("GEOGRAPHY", "REQUIRED", overrides), bytes) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index cf3bedd828b3..b254ee2fa1f2 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -124,9 +124,6 @@ "NUMERIC": decimal.Decimal, "TIMESTAMP": apache_beam.utils.timestamp.Timestamp, "GEOGRAPHY": str, - "DATE": str, - "DATETIME": str, - "JSON": str, } diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index 6d82e093ae07..c0bbfc0ce59b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -1016,8 +1016,7 @@ def test_dict_to_beam_row_repeated_nested_record(self): class TestBeamTypehintFromSchema(unittest.TestCase): EXPECTED_TYPEHINTS = [("str", str), ("bool", bool), ("bytes", bytes), ("int", np.int64), ("float", np.float64), - ("numeric", decimal.Decimal), ("timestamp", Timestamp), - ("date", str), ("datetime", str), ("json", str)] + ("numeric", decimal.Decimal), ("timestamp", Timestamp)] def get_schema_fields_with_mode(self, mode): return [{ @@ -1034,12 +1033,6 @@ def get_schema_fields_with_mode(self, mode): "name": "numeric", "type": "NUMERIC", "mode": mode }, { "name": "timestamp", "type": "TIMESTAMP", "mode": mode - }, { - "name": "date", "type": "DATE", "mode": mode - }, { - "name": "datetime", "type": "DATETIME", "mode": mode - }, { - "name": "json", "type": "JSON", "mode": mode }] def test_typehints_from_required_schema(self): @@ -1257,8 +1250,8 @@ def test_geography_with_special_characters(self): class TestTypeOverrides(unittest.TestCase): """Tests for type_overrides parameter in BigQuery type mappings.""" - def test_type_overrides_basic(self): - """Test that type_overrides overrides default type mappings.""" + def test_type_overrides_enables_unsupported_types(self): + """Test that type_overrides enables support for DATE/DATETIME/JSON.""" import datetime schema = { "fields": [{ @@ -1268,20 +1261,41 @@ def test_type_overrides_basic(self): "name": "datetime_field", "type": "DATETIME", "mode": "REQUIRED" + }, { + "name": "json_field", "type": "JSON", "mode": "REQUIRED" }] } - # Without overrides, DATE and DATETIME map to str - typehints = get_beam_typehints_from_tableschema(schema) - self.assertEqual(typehints, [("date_field", str), ("datetime_field", str)]) + # Without overrides, these types are not supported + with self.assertRaises(ValueError): + get_beam_typehints_from_tableschema(schema) + + # With overrides, they work + type_overrides = {"DATE": str, "DATETIME": str, "JSON": str} + typehints = get_beam_typehints_from_tableschema(schema, type_overrides) + self.assertEqual( + typehints, [("date_field", str), ("datetime_field", str), + ("json_field", str)]) + + def test_type_overrides_with_custom_types(self): + """Test type_overrides with custom Python types.""" + import datetime + schema = { + "fields": [{ + "name": "date_field", "type": "DATE", "mode": "REQUIRED" + }, + { + "name": "datetime_field", + "type": "DATETIME", + "mode": "REQUIRED" + }] + } - # With overrides, use custom types type_overrides = {"DATE": datetime.date, "DATETIME": datetime.datetime} - typehints_with_override = get_beam_typehints_from_tableschema( - schema, type_overrides) + typehints = get_beam_typehints_from_tableschema(schema, type_overrides) self.assertEqual( - typehints_with_override, [("date_field", datetime.date), - ("datetime_field", datetime.datetime)]) + typehints, [("date_field", datetime.date), + ("datetime_field", datetime.datetime)]) def test_type_overrides_with_modes(self): """Test that type_overrides works with NULLABLE and REPEATED modes.""" @@ -1304,8 +1318,8 @@ def test_type_overrides_with_modes(self): ("repeated_dates", Sequence[datetime.date])] self.assertEqual(typehints, expected) - def test_type_overrides_partial(self): - """Test that type_overrides only affects specified types.""" + def test_type_overrides_mixed_with_default_types(self): + """Test type_overrides alongside default type mappings.""" import datetime schema = { "fields": [{ @@ -1317,7 +1331,6 @@ def test_type_overrides_partial(self): }] } - # Only override DATE type_overrides = {"DATE": datetime.date} typehints = get_beam_typehints_from_tableschema(schema, type_overrides) @@ -1361,29 +1374,36 @@ def test_type_overrides_with_nested_struct(self): nested_fields = nested_constraint._fields self.assertEqual(nested_fields[0], ("nested_date", datetime.date)) - def test_type_overrides_empty_dict(self): - """Test that empty type_overrides dict uses default mappings.""" + def test_type_overrides_can_override_default_types(self): + """Test that type_overrides can override default type mappings.""" schema = { "fields": [{ - "name": "date_field", "type": "DATE", "mode": "REQUIRED" + "name": "geo_field", "type": "GEOGRAPHY", "mode": "REQUIRED" }] } - typehints_none = get_beam_typehints_from_tableschema(schema, None) - typehints_empty = get_beam_typehints_from_tableschema(schema, {}) + # Without overrides, GEOGRAPHY maps to str (default) + typehints = get_beam_typehints_from_tableschema(schema, None) + self.assertEqual(typehints, [("geo_field", str)]) - self.assertEqual(typehints_none, typehints_empty) - self.assertEqual(typehints_none, [("date_field", str)]) + # With overrides, we can change it + typehints_override = get_beam_typehints_from_tableschema( + schema, {"GEOGRAPHY": bytes}) + self.assertEqual(typehints_override, [("geo_field", bytes)]) def test_type_overrides_json_to_dict(self): - """Test overriding JSON type to dict.""" + """Test using type_overrides to map JSON to dict.""" schema = {"fields": [{"name": "data", "type": "JSON", "mode": "NULLABLE"}]} - # Default: JSON -> str - typehints = get_beam_typehints_from_tableschema(schema) - self.assertEqual(typehints, [("data", Optional[str])]) + # Without overrides, JSON is not supported + with self.assertRaises(ValueError): + get_beam_typehints_from_tableschema(schema) + + # With overrides, can map to str + typehints_str = get_beam_typehints_from_tableschema(schema, {"JSON": str}) + self.assertEqual(typehints_str, [("data", Optional[str])]) - # Override: JSON -> dict + # Or map to dict typehints_dict = get_beam_typehints_from_tableschema(schema, {"JSON": dict}) self.assertEqual(typehints_dict, [("data", Optional[dict])]) From 4c59be33b51a5e54509b8f3f4952c63dbf76bbf9 Mon Sep 17 00:00:00 2001 From: Enzo Maruffa Date: Wed, 4 Feb 2026 14:17:16 -0300 Subject: [PATCH 11/13] fixup! docs: add type_overrides feature to CHANGES.md --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 9f1fe2a95931..072b0efabbf3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -71,7 +71,7 @@ * (Python) Added exception chaining to preserve error context in CloudSQLEnrichmentHandler, processes utilities, and core transforms ([#37422](https://github.com/apache/beam/issues/37422)). * (Python) Added `take(n)` convenience for PCollection: `beam.take(n)` and `pcoll.take(n)` to get the first N elements deterministically without Top.Of + FlatMap ([#X](https://github.com/apache/beam/issues/37429)). -* Added `type_overrides` parameter to `WriteToBigQuery` allowing users to specify custom BigQuery to Python type mappings when using Storage Write API. This enables support for types like DATE, DATETIME, and JSON (Python) ([#25946](https://github.com/apache/beam/issues/25946)). +* (Python) Added `type_overrides` parameter to `WriteToBigQuery` allowing users to specify custom BigQuery to Python type mappings when using Storage Write API. This enables support for types like DATE, DATETIME, and JSON (Python) ([#25946](https://github.com/apache/beam/issues/25946)). * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Breaking Changes From 307d357d3e7e065a2183a2f20b21bb0b35eba1a3 Mon Sep 17 00:00:00 2001 From: Enzo Maruffa Date: Wed, 4 Feb 2026 14:17:16 -0300 Subject: [PATCH 12/13] fixup! docs: add type_overrides feature to CHANGES.md --- sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index 0625fe21809c..e3d9227dea00 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -342,6 +342,7 @@ class TypeOverridesSchemaToolsTest(unittest.TestCase): def test_bq_field_to_type_with_overrides(self): """Test bq_field_to_type function with type_overrides.""" import datetime + from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type # Without overrides, DATE is not supported From 33414bad9410450c0a65e6f2e3a3f0eac095711f Mon Sep 17 00:00:00 2001 From: Enzo Maruffa Date: Mon, 9 Feb 2026 18:14:59 -0300 Subject: [PATCH 13/13] fix(bigquery): add skipIf decorator to type_overrides test classes --- sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py | 3 ++- sdks/python/apache_beam/io/gcp/bigquery_tools_test.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index e3d9227dea00..3cf641a2fb04 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -337,7 +337,8 @@ def test_geography_with_complex_wkt(self): self.assertEqual(usertype.__annotations__, expected_annotations) -class TypeOverridesSchemaToolsTest(unittest.TestCase): +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') +class TestTypeOverridesSchemaTools(unittest.TestCase): """Tests for type_overrides parameter in bigquery_schema_tools.""" def test_bq_field_to_type_with_overrides(self): """Test bq_field_to_type function with type_overrides.""" diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index c0bbfc0ce59b..078c42160941 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -1248,6 +1248,7 @@ def test_geography_with_special_characters(self): self.assertIsInstance(result, str) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestTypeOverrides(unittest.TestCase): """Tests for type_overrides parameter in BigQuery type mappings.""" def test_type_overrides_enables_unsupported_types(self):