From cdd1f6fd2cc9a54356d65fea8a27b361584a37af Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Sun, 29 Jun 2025 02:42:53 +0000 Subject: [PATCH 01/18] .github+sdks+website: update docs and add exmples for CloudSQL handler --- .../beam_PreCommit_Python_Examples.yml | 1 + .../transforms/elementwise/enrichment.py | 211 ++++++++++++++++ .../transforms/elementwise/enrichment_test.py | 231 +++++++++++++++++- .../python/elementwise/enrichment-cloudsql.md | 146 +++++++++++ .../python/elementwise/enrichment.md | 1 + .../section-menu/en/documentation.html | 1 + 6 files changed, 586 insertions(+), 5 deletions(-) create mode 100644 website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md diff --git a/.github/workflows/beam_PreCommit_Python_Examples.yml b/.github/workflows/beam_PreCommit_Python_Examples.yml index c76d140eadeb..3b7681faa892 100644 --- a/.github/workflows/beam_PreCommit_Python_Examples.yml +++ b/.github/workflows/beam_PreCommit_Python_Examples.yml @@ -53,6 +53,7 @@ env: DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + ALLOYDB_PASSWORD: ${{ secrets.ALLOYDB_PASSWORD }} jobs: beam_PreCommit_Python_Examples: diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py index acee633b6f67..9d4e41bb07fd 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py @@ -116,3 +116,214 @@ def enrichment_with_vertex_ai_legacy(): | "Enrich W/ Vertex AI" >> Enrichment(vertex_ai_handler) | "Print" >> beam.Map(print)) # [END enrichment_with_vertex_ai_legacy] + + +def enrichment_with_google_cloudsql_pg(): + # [START enrichment_with_google_cloudsql_pg] + import apache_beam as beam + from apache_beam.transforms.enrichment import Enrichment + from apache_beam.transforms.enrichment_handlers.cloudsql import ( + CloudSQLEnrichmentHandler, + DatabaseTypeAdapter, + TableFieldsQueryConfig, + CloudSQLConnectionConfig) + import os + + database_adapter = DatabaseTypeAdapter.POSTGRESQL + database_uri = os.environ.get("GOOGLE_CLOUD_SQL_DB_URI") + database_user = int(os.environ.get("GOOGLE_CLOUD_SQL_DB_USER")) + database_password = os.environ.get("GOOGLE_CLOUD_SQL_DB_PASSWORD") + database_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_ID") + table_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_TABLE_ID") + where_clause_template = "product_id = {}" + where_clause_fields = ["product_id"] + + data = [ + beam.Row(product_id=1, name='A'), + beam.Row(product_id=2, name='B'), + beam.Row(product_id=3, name='C'), + ] + + connection_config = CloudSQLConnectionConfig( + db_adapter=database_adapter, + instance_connection_uri=database_uri, + user=database_user, + password=database_password, + db_id=database_id) + + query_config = TableFieldsQueryConfig( + table_id=table_id, + where_clause_template=where_clause_template, + where_clause_fields=where_clause_fields) + + cloudsql_handler = CloudSQLEnrichmentHandler( + connection_config=connection_config, + table_id=table_id, + query_config=query_config) + with beam.Pipeline() as p: + _ = ( + p + | "Create" >> beam.Create(data) + | + "Enrich W/ Google CloudSQL PostgreSQL" >> Enrichment(cloudsql_handler) + | "Print" >> beam.Map(print)) + # [END enrichment_with_google_cloudsql_pg] + + +def enrichment_with_external_pg(): + # [START enrichment_with_external_pg] + import apache_beam as beam + from apache_beam.transforms.enrichment import Enrichment + from apache_beam.transforms.enrichment_handlers.cloudsql import ( + CloudSQLEnrichmentHandler, + DatabaseTypeAdapter, + TableFieldsQueryConfig, + ExternalSQLDBConnectionConfig) + import os + + database_adapter = DatabaseTypeAdapter.POSTGRESQL + database_host = os.environ.get("EXTERNAL_SQL_DB_HOST") + database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT")) + database_user = os.environ.get("EXTERNAL_SQL_DB_USER") + database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD") + database_id = os.environ.get("EXTERNAL_SQL_DB_ID") + table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID") + where_clause_template = "product_id = {}" + where_clause_fields = ["product_id"] + + data = [ + beam.Row(product_id=1, name='A'), + beam.Row(product_id=2, name='B'), + beam.Row(product_id=3, name='C'), + ] + + connection_config = ExternalSQLDBConnectionConfig( + db_adapter=database_adapter, + host=database_host, + port=database_port, + user=database_user, + password=database_password, + db_id=database_id) + + query_config = TableFieldsQueryConfig( + table_id=table_id, + where_clause_template=where_clause_template, + where_clause_fields=where_clause_fields) + + cloudsql_handler = CloudSQLEnrichmentHandler( + connection_config=connection_config, + table_id=table_id, + query_config=query_config) + with beam.Pipeline() as p: + _ = ( + p + | "Create" >> beam.Create(data) + | "Enrich W/ Unmanaged PostgreSQL" >> Enrichment(cloudsql_handler) + | "Print" >> beam.Map(print)) + # [END enrichment_with_external_pg] + + +def enrichment_with_external_mysql(): + # [START enrichment_with_external_mysql] + import apache_beam as beam + from apache_beam.transforms.enrichment import Enrichment + from apache_beam.transforms.enrichment_handlers.cloudsql import ( + CloudSQLEnrichmentHandler, + DatabaseTypeAdapter, + TableFieldsQueryConfig, + ExternalSQLDBConnectionConfig) + import os + + database_adapter = DatabaseTypeAdapter.MYSQL + database_host = os.environ.get("EXTERNAL_SQL_DB_HOST") + database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT")) + database_user = os.environ.get("EXTERNAL_SQL_DB_USER") + database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD") + database_id = os.environ.get("EXTERNAL_SQL_DB_ID") + table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID") + where_clause_template = "product_id = {}" + where_clause_fields = ["product_id"] + + data = [ + beam.Row(product_id=1, name='A'), + beam.Row(product_id=2, name='B'), + beam.Row(product_id=3, name='C'), + ] + + connection_config = ExternalSQLDBConnectionConfig( + db_adapter=database_adapter, + host=database_host, + port=database_port, + user=database_user, + password=database_password, + db_id=database_id) + + query_config = TableFieldsQueryConfig( + table_id=table_id, + where_clause_template=where_clause_template, + where_clause_fields=where_clause_fields) + + cloudsql_handler = CloudSQLEnrichmentHandler( + connection_config=connection_config, + table_id=table_id, + query_config=query_config) + with beam.Pipeline() as p: + _ = ( + p + | "Create" >> beam.Create(data) + | "Enrich W/ Unmanaged MySQL" >> Enrichment(cloudsql_handler) + | "Print" >> beam.Map(print)) + # [END enrichment_with_external_mysql] + + +def enrichment_with_external_sqlserver(): + # [START enrichment_with_external_sqlserver] + import apache_beam as beam + from apache_beam.transforms.enrichment import Enrichment + from apache_beam.transforms.enrichment_handlers.cloudsql import ( + CloudSQLEnrichmentHandler, + DatabaseTypeAdapter, + TableFieldsQueryConfig, + ExternalSQLDBConnectionConfig) + import os + + database_adapter = DatabaseTypeAdapter.SQLSERVER + database_host = os.environ.get("EXTERNAL_SQL_DB_HOST") + database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT")) + database_user = os.environ.get("EXTERNAL_SQL_DB_USER") + database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD") + database_id = os.environ.get("EXTERNAL_SQL_DB_ID") + table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID") + where_clause_template = "product_id = {}" + where_clause_fields = ["product_id"] + + data = [ + beam.Row(product_id=1, name='A'), + beam.Row(product_id=2, name='B'), + beam.Row(product_id=3, name='C'), + ] + + connection_config = ExternalSQLDBConnectionConfig( + db_adapter=database_adapter, + host=database_host, + port=database_port, + user=database_user, + password=database_password, + db_id=database_id) + + query_config = TableFieldsQueryConfig( + table_id=table_id, + where_clause_template=where_clause_template, + where_clause_fields=where_clause_fields) + + cloudsql_handler = CloudSQLEnrichmentHandler( + connection_config=connection_config, + table_id=table_id, + query_config=query_config) + with beam.Pipeline() as p: + _ = ( + p + | "Create" >> beam.Create(data) + | "Enrich W/ Unmanaged SQL Server" >> Enrichment(cloudsql_handler) + | "Print" >> beam.Map(print)) + # [END enrichment_with_external_sqlserver] diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py index 8a7cdfbe9263..1b3fdd36423c 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py @@ -18,19 +18,41 @@ # pytype: skip-file # pylint: disable=line-too-long +import os import unittest +from collections.abc import Callable +from contextlib import contextmanager +from dataclasses import dataclass from io import StringIO +from typing import Optional import mock +import pytest +from sqlalchemy.engine import Connection as DBAPIConnection # pylint: disable=unused-import try: - from apache_beam.examples.snippets.transforms.elementwise.enrichment import enrichment_with_bigtable, \ - enrichment_with_vertex_ai_legacy - from apache_beam.examples.snippets.transforms.elementwise.enrichment import enrichment_with_vertex_ai + from sqlalchemy import ( + Column, Integer, VARCHAR, Engine, MetaData, create_engine) + from apache_beam.examples.snippets.transforms.elementwise.enrichment import ( + enrichment_with_bigtable, enrichment_with_vertex_ai_legacy) + from apache_beam.examples.snippets.transforms.elementwise.enrichment import ( + enrichment_with_vertex_ai, + enrichment_with_google_cloudsql_pg, + enrichment_with_external_pg, + enrichment_with_external_mysql, + enrichment_with_external_sqlserver) + from apache_beam.transforms.enrichment_handlers.cloudsql import ( + DatabaseTypeAdapter) + from apache_beam.transforms.enrichment_handlers.cloudsql_it_test import ( + SQLEnrichmentTestHelper, + SQLDBContainerInfo, + ConnectionConfig, + CloudSQLConnectionConfig, + ExternalSQLDBConnectionConfig) from apache_beam.io.requestresponse import RequestResponseIO -except ImportError: - raise unittest.SkipTest('RequestResponseIO dependencies are not installed') +except ImportError as e: + raise unittest.SkipTest(f'RequestResponseIO dependencies not installed: {e}') def validate_enrichment_with_bigtable(): @@ -60,7 +82,44 @@ def validate_enrichment_with_vertex_ai_legacy(): return expected +def validate_enrichment_with_google_cloudsql_pg(): + expected = '''[START enrichment_with_google_cloudsql_pg] +Row(product_id=1, name='A', quantity=2, region_id=3) +Row(product_id=2, name='B', quantity=3, region_id=1) +Row(product_id=3, name='C', quantity=10, region_id=4) + [END enrichment_with_google_cloudsql_pg]'''.splitlines()[1:-1] + return expected + + +def validate_enrichment_with_external_pg(): + expected = '''[START enrichment_with_external_pg] +Row(product_id=1, name='A', quantity=2, region_id=3) +Row(product_id=2, name='B', quantity=3, region_id=1) +Row(product_id=3, name='C', quantity=10, region_id=4) + [END enrichment_with_external_pg]'''.splitlines()[1:-1] + return expected + + +def validate_enrichment_with_external_mysql(): + expected = '''[START enrichment_with_external_mysql] +Row(product_id=1, name='A', quantity=2, region_id=3) +Row(product_id=2, name='B', quantity=3, region_id=1) +Row(product_id=3, name='C', quantity=10, region_id=4) + [END enrichment_with_external_mysql]'''.splitlines()[1:-1] + return expected + + +def validate_enrichment_with_external_sqlserver(): + expected = '''[START enrichment_with_external_sqlserver] +Row(product_id=1, name='A', quantity=2, region_id=3) +Row(product_id=2, name='B', quantity=3, region_id=1) +Row(product_id=3, name='C', quantity=10, region_id=4) + [END enrichment_with_external_sqlserver]'''.splitlines()[1:-1] + return expected + + @mock.patch('sys.stdout', new_callable=StringIO) +@pytest.mark.uses_testcontainer class EnrichmentTest(unittest.TestCase): def test_enrichment_with_bigtable(self, mock_stdout): enrichment_with_bigtable() @@ -83,6 +142,168 @@ def test_enrichment_with_vertex_ai_legacy(self, mock_stdout): self.maxDiff = None self.assertEqual(output, expected) + def test_enrichment_with_google_cloudsql_pg(self, mock_stdout): + db_adapter = DatabaseTypeAdapter.POSTGRESQL + with EnrichmentTestHelpers.sql_test_context(True, db_adapter): + try: + enrichment_with_google_cloudsql_pg() + output = mock_stdout.getvalue().splitlines() + expected = validate_enrichment_with_google_cloudsql_pg() + self.assertEqual(output, expected) + except Exception as e: + self.fail(f"Test failed with unexpected error: {e}") + + def test_enrichment_with_external_pg(self, mock_stdout): + db_adapter = DatabaseTypeAdapter.POSTGRESQL + with EnrichmentTestHelpers.sql_test_context(False, db_adapter): + try: + enrichment_with_external_pg() + output = mock_stdout.getvalue().splitlines() + expected = validate_enrichment_with_external_pg() + self.assertEqual(output, expected) + except Exception as e: + self.fail(f"Test failed with unexpected error: {e}") + + def test_enrichment_with_external_mysql(self, mock_stdout): + db_adapter = DatabaseTypeAdapter.MYSQL + with EnrichmentTestHelpers.sql_test_context(False, db_adapter): + try: + enrichment_with_external_mysql() + output = mock_stdout.getvalue().splitlines() + expected = validate_enrichment_with_external_mysql() + self.assertEqual(output, expected) + except Exception as e: + self.fail(f"Test failed with unexpected error: {e}") + + def test_enrichment_with_external_sqlserver(self, mock_stdout): + db_adapter = DatabaseTypeAdapter.SQLSERVER + with EnrichmentTestHelpers.sql_test_context(False, db_adapter): + try: + enrichment_with_external_sqlserver() + output = mock_stdout.getvalue().splitlines() + expected = validate_enrichment_with_external_sqlserver() + self.assertEqual(output, expected) + except Exception as e: + self.fail(f"Test failed with unexpected error: {e}") + + +@dataclass +class CloudSQLEnrichmentTestDataConstruct: + client_handler: Callable[[], DBAPIConnection] + engine: Engine + metadata: MetaData + db: SQLDBContainerInfo = None + + +class EnrichmentTestHelpers: + @contextmanager + def sql_test_context(is_cloudsql: bool, db_adapter: DatabaseTypeAdapter): + result: Optional[CloudSQLEnrichmentTestDataConstruct] = None + try: + result = EnrichmentTestHelpers.pre_sql_enrichment_test( + is_cloudsql, db_adapter) + yield + finally: + if result: + EnrichmentTestHelpers.post_sql_enrichment_test(result) + + @staticmethod + def pre_sql_enrichment_test( + is_cloudsql: bool, + db_adapter: DatabaseTypeAdapter) -> CloudSQLEnrichmentTestDataConstruct: + table_id = "products" + columns = [ + Column("product_id", Integer, primary_key=True), + Column("name", VARCHAR(255), nullable=False), + Column("quantity", Integer, nullable=False), + Column("region_id", Integer, nullable=False), + ] + table_data = [ + { + "product_id": 1, "name": "A", 'quantity': 2, 'region_id': 3 + }, + { + "product_id": 2, "name": "B", 'quantity': 3, 'region_id': 1 + }, + { + "product_id": 3, "name": "C", 'quantity': 10, 'region_id': 4 + }, + ] + metadata = MetaData() + + connection_config: ConnectionConfig + if is_cloudsql: + gcp_project_id = "apache-beam-testing" + region = "us-central1" + instance_name = "beam-integration-tests" + instance_connection_uri = f"{gcp_project_id}:{region}:{instance_name}" + db_id = "postgres" + user = "postgres" + password = os.getenv("ALLOYDB_PASSWORD") + os.environ['GOOGLE_CLOUD_SQL_DB_URI'] = instance_connection_uri + os.environ['GOOGLE_CLOUD_SQL_DB_ID'] = db_id + os.environ['GOOGLE_CLOUD_SQL_DB_USER'] = user + os.environ['GOOGLE_CLOUD_SQL_DB_PASSWORD'] = password + os.environ['GOOGLE_CLOUD_SQL_DB_TABLE_ID'] = table_id + connection_config = CloudSQLConnectionConfig( + db_adapter=db_adapter, + instance_connection_uri=instance_connection_uri, + user=user, + password=password, + db_id=db_id) + else: + db = SQLEnrichmentTestHelper.start_sql_db_container(db_adapter) + os.environ['EXTERNAL_SQL_DB_HOST'] = db.host + os.environ['EXTERNAL_SQL_DB_PORT'] = str(db.port) + os.environ['EXTERNAL_SQL_DB_ID'] = db.id + os.environ['EXTERNAL_SQL_DB_USER'] = db.user + os.environ['EXTERNAL_SQL_DB_PASSWORD'] = db.password + os.environ['EXTERNAL_SQL_DB_TABLE_ID'] = table_id + connection_config = ExternalSQLDBConnectionConfig( + db_adapter=db_adapter, + host=db.host, + port=db.port, + user=db.user, + password=db.password, + db_id=db.id) + + conenctor = connection_config.get_connector_handler() + engine = create_engine( + url=connection_config.get_db_url(), creator=conenctor) + + SQLEnrichmentTestHelper.create_table( + table_id=table_id, + engine=engine, + columns=columns, + table_data=table_data, + metadata=metadata) + + result = CloudSQLEnrichmentTestDataConstruct( + db=db, client_handler=conenctor, engine=engine, metadata=metadata) + return result + + @staticmethod + def post_sql_enrichment_test(res: CloudSQLEnrichmentTestDataConstruct): + # Clean up the data inserted previously. + res.metadata.drop_all(res.engine) + res.engine.dispose(close=True) + + # Check if the test used a container-based external SQL database. + if res.db: + SQLEnrichmentTestHelper.stop_sql_db_container(res.db) + os.environ.pop('EXTERNAL_SQL_DB_HOST', None) + os.environ.pop('EXTERNAL_SQL_DB_PORT', None) + os.environ.pop('EXTERNAL_SQL_DB_ID', None) + os.environ.pop('EXTERNAL_SQL_DB_USER', None) + os.environ.pop('EXTERNAL_SQL_DB_PASSWORD', None) + os.environ.pop('EXTERNAL_SQL_DB_TABLE_ID', None) + else: + os.environ.pop('GOOGLE_CLOUD_SQL_DB_URI', None) + os.environ.pop('GOOGLE_CLOUD_SQL_DB_ID', None) + os.environ.pop('GOOGLE_CLOUD_SQL_DB_USER', None) + os.environ.pop('GOOGLE_CLOUD_SQL_DB_PASSWORD', None) + os.environ.pop('GOOGLE_CLOUD_SQL_DB_TABLE_ID', None) + if __name__ == '__main__': unittest.main() diff --git a/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md new file mode 100644 index 000000000000..8dfc5a174186 --- /dev/null +++ b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md @@ -0,0 +1,146 @@ +--- +title: "Enrichment with CloudSQL" +--- + + +# Use CloudSQL to enrich data + +{{< localstorage language language-py >}} + + + + + +
+ + {{< button-pydoc path="apache_beam.transforms.enrichment_handlers.cloudsql" class="CloudSQLEnrichmentHandler" >}} + +
+ +Starting with Apache Beam 2.67.0, the enrichment transform includes +built-in enrichment handler support for the +[Google CloudSQL](https://cloud.google.com/sql/docs). This handler allows your +Beam pipeline to enrich data using SQL databases, with built-in support for: + +- Managed PostgreSQL, MySQL, and Microsoft SQL Server instances on CloudSQL +- Unmanaged SQL database instances not hosted on CloudSQL (e.g., self-hosted or + on-premises databases) + +The following example demonstrates how to create a pipeline that use the +enrichment transform with the +[`CloudSQLEnrichmentHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.cloudsql.html#apache_beam.transforms.enrichment_handlers.cloudsql.CloudSQLEnrichmentHandler) handler. + +## Example 1: Enrichment with Google CloudSQL (Managed PostgreSQL) + +The data in the CloudSQL PostgreSQL table `products` follows this format: + +{{< table >}} +| product_id | name | quantity | region_id | +|:----------:|:----:|:--------:|:---------:| +| 1 | A | 2 | 3 | +| 2 | B | 3 | 1 | +| 3 | C | 10 | 4 | +{{< /table >}} + + +{{< highlight language="py" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py" enrichment_with_google_cloudsql_pg >}} +{{}} + +{{< paragraph class="notebook-skip" >}} +Output: +{{< /paragraph >}} +{{< highlight class="notebook-skip" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py" enrichment_with_google_cloudsql_pg >}} +{{< /highlight >}} + +## Example 2: Enrichment with Unmanaged PostgreSQL + +The data in the Unmanaged PostgreSQL table `products` follows this format: + +{{< table >}} +| product_id | name | quantity | region_id | +|:----------:|:----:|:--------:|:---------:| +| 1 | A | 2 | 3 | +| 2 | B | 3 | 1 | +| 3 | C | 10 | 4 | +{{< /table >}} + + +{{< highlight language="py" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py" enrichment_with_external_pg >}} +{{}} + +{{< paragraph class="notebook-skip" >}} +Output: +{{< /paragraph >}} +{{< highlight class="notebook-skip" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py" enrichment_with_external_pg >}} +{{< /highlight >}} + +## Example 3: Enrichment with Unmanaged MySQL + +The data in the Unmanaged MySQL table `products` follows this format: + +{{< table >}} +| product_id | name | quantity | region_id | +|:----------:|:----:|:--------:|:---------:| +| 1 | A | 2 | 3 | +| 2 | B | 3 | 1 | +| 3 | C | 10 | 4 | +{{< /table >}} + + +{{< highlight language="py" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py" enrichment_with_external_mysql >}} +{{}} + +{{< paragraph class="notebook-skip" >}} +Output: +{{< /paragraph >}} +{{< highlight class="notebook-skip" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py" enrichment_with_external_mysql >}} +{{< /highlight >}} + +## Example 4: Enrichment with Unmanaged Microsoft SQL Server + +The data in the Unmanaged Microsoft SQL Server table `products` follows this +format: + +{{< table >}} +| product_id | name | quantity | region_id | +|:----------:|:----:|:--------:|:---------:| +| 1 | A | 2 | 3 | +| 2 | B | 3 | 1 | +| 3 | C | 10 | 4 | +{{< /table >}} + + +{{< highlight language="py" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py" enrichment_with_external_sqlserver >}} +{{}} + +{{< paragraph class="notebook-skip" >}} +Output: +{{< /paragraph >}} +{{< highlight class="notebook-skip" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py" enrichment_with_external_sqlserver >}} +{{< /highlight >}} + +## Related transforms + +Not applicable. + +{{< button-pydoc path="apache_beam.transforms.enrichment_handlers.cloudsql" class="CloudSQLEnrichmentHandler" >}} \ No newline at end of file diff --git a/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment.md b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment.md index 6c05b6b515a4..4b352d0447ad 100644 --- a/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment.md +++ b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment.md @@ -42,6 +42,7 @@ The following examples demonstrate how to create a pipeline that use the enrichm | Service | Example | |:-----------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | Cloud Bigtable | [Enrichment with Bigtable](/documentation/transforms/python/elementwise/enrichment-bigtable/#example) | +| Cloud SQL (PostgreSQL, MySQL, SQLServer) | [Enrichment with CloudSQL](/documentation/transforms/python/elementwise/enrichment-cloudsql) | | Vertex AI Feature Store | [Enrichment with Vertex AI Feature Store](/documentation/transforms/python/elementwise/enrichment-vertexai/#example-1-enrichment-with-vertex-ai-feature-store) | | Vertex AI Feature Store (Legacy) | [Enrichment with Legacy Vertex AI Feature Store](/documentation/transforms/python/elementwise/enrichment-vertexai/#example-2-enrichment-with-vertex-ai-feature-store-legacy) | {{< /table >}} diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html index 6b37450786f9..9fb842228e30 100755 --- a/website/www/site/layouts/partials/section-menu/en/documentation.html +++ b/website/www/site/layouts/partials/section-menu/en/documentation.html @@ -297,6 +297,7 @@ From 5541f753e838749907131d8699856912bec6d68c Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Sat, 16 Aug 2025 01:02:23 +0300 Subject: [PATCH 02/18] Update website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md Co-authored-by: Danny McCormick --- .../transforms/python/elementwise/enrichment-cloudsql.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md index 8dfc5a174186..31c72be6be03 100644 --- a/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md +++ b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md @@ -29,7 +29,7 @@ limitations under the License. -Starting with Apache Beam 2.67.0, the enrichment transform includes +Starting with Apache Beam 2.68.0, the enrichment transform includes built-in enrichment handler support for the [Google CloudSQL](https://cloud.google.com/sql/docs). This handler allows your Beam pipeline to enrich data using SQL databases, with built-in support for: From ca7e09ffd1dfd2e719df792ccfa568378706a0af Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Thu, 28 Aug 2025 17:26:17 +0000 Subject: [PATCH 03/18] sdks/python: fix issue regards generic binding parameteres in CloudSQL --- .../enrichment_handlers/cloudsql.py | 30 ++++++++----------- .../enrichment_handlers/cloudsql_it_test.py | 2 +- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py index f070158d1c54..3fe3a62f9546 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py @@ -478,16 +478,14 @@ def _build_parameters_dict( # For batched queries, use unique parameter names per batch item. if batch_size > 1: - # Extract parameter names from the template using regex. - # Batching is only used with table-based query configs + # Batching is only used with table-based query configs. table_query_configs = (TableFieldsQueryConfig, TableFunctionQueryConfig) assert isinstance(self._query_config, table_query_configs) - param_names = self._extract_parameter_names( - self._query_config.where_clause_template) - for param_name, val in zip(param_names, current_values): + batch_param_dict = self._build_single_param_dict(current_values) + # Prefix batch parameters to make them globally unique. + for param_name, val in batch_param_dict.items(): param_dict[f'batch_{i}_{param_name}'] = val else: - # For single request, use the helper function. single_param_dict = self._build_single_param_dict(current_values) param_dict.update(single_param_dict) @@ -502,17 +500,15 @@ def _build_single_param_dict(self, values: list[Any]) -> dict[str, Any]: Returns: Dictionary mapping parameter names to values """ - if isinstance(self._query_config, TableFieldsQueryConfig): - return { - field_name: val - for field_name, val in zip( - self._query_config.where_clause_fields, values) - } - else: # TableFunctionQueryConfig. - assert isinstance(self._query_config, TableFunctionQueryConfig) - _, param_dict = self._get_unique_template_and_params( - self._query_config.where_clause_template, values) - return param_dict + table_query_configs = (TableFieldsQueryConfig, TableFunctionQueryConfig) + if not isinstance(self._query_config, table_query_configs): + raise ValueError( + f"Parameter binding not supported for " + f"{type(self._query_config).__name__}") + + _, param_dict = self._get_unique_template_and_params( + self._query_config.where_clause_template, values) + return param_dict def _get_unique_template_and_params( self, template: str, values: list[Any]) -> tuple[str, dict[str, Any]]: diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py index 3d9cd18151b6..f0f2d37ff5a0 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py @@ -320,7 +320,7 @@ def test_sql_enrichment(self): query_config = TableFieldsQueryConfig( table_id=self._table_id, - where_clause_template="id = :id", + where_clause_template="id = :id_param", where_clause_fields=fields) handler = CloudSQLEnrichmentHandler( From 9fd5fe127548bd1f2373ebdc245c3be1589b77c4 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Thu, 28 Aug 2025 17:03:59 +0000 Subject: [PATCH 04/18] sdks/python: use binding parameters instead of `{}` --- .../snippets/transforms/elementwise/enrichment.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py index 9d4e41bb07fd..8b25a3c23094 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py @@ -135,7 +135,7 @@ def enrichment_with_google_cloudsql_pg(): database_password = os.environ.get("GOOGLE_CLOUD_SQL_DB_PASSWORD") database_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_ID") table_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_TABLE_ID") - where_clause_template = "product_id = {}" + where_clause_template = "product_id = :pid" where_clause_fields = ["product_id"] data = [ @@ -188,7 +188,7 @@ def enrichment_with_external_pg(): database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD") database_id = os.environ.get("EXTERNAL_SQL_DB_ID") table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID") - where_clause_template = "product_id = {}" + where_clause_template = "product_id = :pid" where_clause_fields = ["product_id"] data = [ @@ -241,7 +241,7 @@ def enrichment_with_external_mysql(): database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD") database_id = os.environ.get("EXTERNAL_SQL_DB_ID") table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID") - where_clause_template = "product_id = {}" + where_clause_template = "product_id = :pid" where_clause_fields = ["product_id"] data = [ @@ -294,7 +294,7 @@ def enrichment_with_external_sqlserver(): database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD") database_id = os.environ.get("EXTERNAL_SQL_DB_ID") table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID") - where_clause_template = "product_id = {}" + where_clause_template = "product_id = :pid" where_clause_fields = ["product_id"] data = [ From 99fe5d77b41636cd43aeff7728dbd2f948952322 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Thu, 28 Aug 2025 17:38:06 +0000 Subject: [PATCH 05/18] CHANGES.md: update release notes --- CHANGES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index dd7fb7b734a3..e7a160d7122a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -80,6 +80,8 @@ Beam now supports data enrichment capabilities using SQL databases, with built-in support for: - Managed PostgreSQL, MySQL, and Microsoft SQL Server instances on CloudSQL - Unmanaged SQL database instances not hosted on CloudSQL (e.g., self-hosted or on-premises databases) +* Python examples added for CloudSQL enrichment handler on [Beam website](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment-cloudsql/) + (Python) ([#35473](https://github.com/apache/beam/pull/35473)). ## Breaking Changes From af8e4fe198b89a0abfd9a88fb2c1ab0303338772 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Fri, 5 Sep 2025 13:19:48 +0000 Subject: [PATCH 06/18] website: update beam version --- .../transforms/python/elementwise/enrichment-cloudsql.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md index 31c72be6be03..a29b2672e678 100644 --- a/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md +++ b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md @@ -29,7 +29,7 @@ limitations under the License. -Starting with Apache Beam 2.68.0, the enrichment transform includes +Starting with Apache Beam 2.69.0, the enrichment transform includes built-in enrichment handler support for the [Google CloudSQL](https://cloud.google.com/sql/docs). This handler allows your Beam pipeline to enrich data using SQL databases, with built-in support for: @@ -143,4 +143,4 @@ Output: Not applicable. -{{< button-pydoc path="apache_beam.transforms.enrichment_handlers.cloudsql" class="CloudSQLEnrichmentHandler" >}} \ No newline at end of file +{{< button-pydoc path="apache_beam.transforms.enrichment_handlers.cloudsql" class="CloudSQLEnrichmentHandler" >}} From 91bcc72577017d3563a97ceac21f641a1735c8c0 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Mon, 8 Sep 2025 15:11:36 +0000 Subject: [PATCH 07/18] sdks/python: add `ALLOYDB_PASSWORD` to `tox.ini` --- sdks/python/tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index f344cfc61ccf..b478c6fc59be 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -31,7 +31,7 @@ select = E3 # https://github.com/apache/beam/issues/25668 pip_pre = True # allow apps that support color to use it. -passenv=TERM,CLOUDSDK_CONFIG,DOCKER_*,TESTCONTAINERS_*,TC_* +passenv=TERM,CLOUDSDK_CONFIG,DOCKER_*,TESTCONTAINERS_*,TC_*,ALLOYDB_PASSWORD # Set [] options for pip installation of apache-beam tarball. extras = test,dataframe # Don't warn that these commands aren't installed. From 34eb69544ae018ba300ac37cd9ab2ee4e92330c7 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Tue, 9 Sep 2025 06:47:14 +0000 Subject: [PATCH 08/18] sdks/python: fix unbounded local variable --- .../examples/snippets/transforms/elementwise/enrichment_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py index 1b3fdd36423c..079adaf7fed1 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py @@ -232,6 +232,7 @@ def pre_sql_enrichment_test( metadata = MetaData() connection_config: ConnectionConfig + db = None if is_cloudsql: gcp_project_id = "apache-beam-testing" region = "us-central1" From d699f04add70c3253135c8d48493f3c3df5c4a87 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Tue, 9 Sep 2025 06:47:41 +0000 Subject: [PATCH 09/18] CHANGES.md: fix white space issue --- CHANGES.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ebdfa9ebc2bf..a618cde1e92c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,8 +74,7 @@ ## New Features / Improvements * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). -* Python examples added for CloudSQL enrichment handler on [Beam website](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment-cloudsql/) - (Python) ([#35473](https://github.com/apache/beam/pull/35473)). +* Python examples added for CloudSQL enrichment handler on [Beam website](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment-cloudsql/) (Python) ([#35473](https://github.com/apache/beam/issues/36095)). ## Breaking Changes From e9897bd45bb2adab105c537dddf89dd0ff667fc6 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Tue, 9 Sep 2025 08:48:18 +0000 Subject: [PATCH 10/18] sdks/python: make table_id globally unique in `enrichment_test` --- .../snippets/transforms/elementwise/enrichment_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py index 079adaf7fed1..7f5108ab6b20 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py @@ -20,6 +20,7 @@ import os import unittest +import uuid from collections.abc import Callable from contextlib import contextmanager from dataclasses import dataclass @@ -211,7 +212,8 @@ def sql_test_context(is_cloudsql: bool, db_adapter: DatabaseTypeAdapter): def pre_sql_enrichment_test( is_cloudsql: bool, db_adapter: DatabaseTypeAdapter) -> CloudSQLEnrichmentTestDataConstruct: - table_id = "products" + unique_suffix = str(uuid.uuid4())[:8] + table_id = f"products_{unique_suffix}" columns = [ Column("product_id", Integer, primary_key=True), Column("name", VARCHAR(255), nullable=False), From 73a8c5e543a7253f87f53595ec730102a37b17a2 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Tue, 9 Sep 2025 09:38:21 +0000 Subject: [PATCH 11/18] sdks/python: fix data type issue --- .../examples/snippets/transforms/elementwise/enrichment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py index 8b25a3c23094..d71faa6d8477 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py @@ -131,7 +131,7 @@ def enrichment_with_google_cloudsql_pg(): database_adapter = DatabaseTypeAdapter.POSTGRESQL database_uri = os.environ.get("GOOGLE_CLOUD_SQL_DB_URI") - database_user = int(os.environ.get("GOOGLE_CLOUD_SQL_DB_USER")) + database_user = os.environ.get("GOOGLE_CLOUD_SQL_DB_USER") database_password = os.environ.get("GOOGLE_CLOUD_SQL_DB_PASSWORD") database_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_ID") table_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_TABLE_ID") From c795ea42097c18fb6e764b8c4e05b5222784bdbe Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Tue, 9 Sep 2025 10:39:25 +0000 Subject: [PATCH 12/18] sdks/python: enforce CloudSQL tests to run only on py transforms flow --- .../enrichment_handlers/cloudsql_it_test.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py index f0f2d37ff5a0..ecb2d585d58d 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py @@ -330,7 +330,7 @@ def test_sql_enrichment(self): max_batch_size=100, ) - with TestPipeline(is_integration_test=True) as test_pipeline: + with TestPipeline() as test_pipeline: pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler)) assert_that(pcoll, equal_to(expected_rows)) @@ -357,7 +357,7 @@ def test_sql_enrichment_batched(self): min_batch_size=2, max_batch_size=100, ) - with TestPipeline(is_integration_test=True) as test_pipeline: + with TestPipeline() as test_pipeline: pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler)) assert_that(pcoll, equal_to(expected_rows)) @@ -384,7 +384,7 @@ def test_sql_enrichment_batched_multiple_fields(self): min_batch_size=8, max_batch_size=100, ) - with TestPipeline(is_integration_test=True) as test_pipeline: + with TestPipeline() as test_pipeline: pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler)) assert_that(pcoll, equal_to(expected_rows)) @@ -404,7 +404,7 @@ def test_sql_enrichment_with_query_fn(self): handler = CloudSQLEnrichmentHandler( connection_config=self._connection_config, query_config=query_config) - with TestPipeline(is_integration_test=True) as test_pipeline: + with TestPipeline() as test_pipeline: pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler)) assert_that(pcoll, equal_to(expected_rows)) @@ -429,7 +429,7 @@ def test_sql_enrichment_with_condition_value_fn(self): query_config=query_config, min_batch_size=2, max_batch_size=100) - with TestPipeline(is_integration_test=True) as test_pipeline: + with TestPipeline() as test_pipeline: pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler)) assert_that(pcoll, equal_to(expected_rows)) @@ -481,7 +481,7 @@ def test_sql_enrichment_with_redis(self): query_config=query_config, min_batch_size=2, max_batch_size=100) - with TestPipeline(is_integration_test=True) as test_pipeline: + with TestPipeline() as test_pipeline: pcoll_populate_cache = ( test_pipeline | beam.Create(requests) @@ -506,7 +506,7 @@ def test_sql_enrichment_with_redis(self): side_effect=Exception("Database should not be called on a cache hit.")) # Run a second pipeline to verify cache is being used. - with TestPipeline(is_integration_test=True) as test_pipeline: + with TestPipeline() as test_pipeline: pcoll_cached = ( test_pipeline | beam.Create(requests) From d78f573d807d8802e69e19b344b15f4f970ef1d3 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Tue, 9 Sep 2025 11:07:48 +0000 Subject: [PATCH 13/18] sdks/python: remove `uses_testcontainer` pytest marker from CloudSQL --- .../transforms/enrichment_handlers/cloudsql_it_test.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py index ecb2d585d58d..f04b1999f660 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py @@ -208,7 +208,6 @@ def create_table( raise Exception(f"Failed to insert table data: {e}") -@pytest.mark.uses_testcontainer class BaseTestSQLEnrichment(unittest.TestCase): _table_data = [ { @@ -567,7 +566,6 @@ class TestCloudSQLPostgresEnrichment(BaseCloudSQLDBEnrichment): _metadata = MetaData() -@pytest.mark.uses_testcontainer class BaseExternalSQLDBEnrichment(BaseTestSQLEnrichment): @classmethod def setUpClass(cls): @@ -595,7 +593,6 @@ def tearDownClass(cls): cls._db = None -@pytest.mark.uses_testcontainer class TestExternalPostgresEnrichment(BaseExternalSQLDBEnrichment): _db_adapter = DatabaseTypeAdapter.POSTGRESQL _unique_suffix = str(uuid.uuid4())[:8] @@ -603,7 +600,6 @@ class TestExternalPostgresEnrichment(BaseExternalSQLDBEnrichment): _metadata = MetaData() -@pytest.mark.uses_testcontainer class TestExternalMySQLEnrichment(BaseExternalSQLDBEnrichment): _db_adapter = DatabaseTypeAdapter.MYSQL _unique_suffix = str(uuid.uuid4())[:8] @@ -611,7 +607,6 @@ class TestExternalMySQLEnrichment(BaseExternalSQLDBEnrichment): _metadata = MetaData() -@pytest.mark.uses_testcontainer class TestExternalSQLServerEnrichment(BaseExternalSQLDBEnrichment): _db_adapter = DatabaseTypeAdapter.SQLSERVER _unique_suffix = str(uuid.uuid4())[:8] From a53de92f3df385d40faaa75d26c3f50471d7afe0 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Tue, 9 Sep 2025 15:37:56 +0000 Subject: [PATCH 14/18] sdks/python: skip google cloudsql tests unless `ALLOYDB_PASSWORD` found --- .../snippets/transforms/elementwise/enrichment_test.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py index 7f5108ab6b20..770b75351bd4 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py @@ -143,6 +143,9 @@ def test_enrichment_with_vertex_ai_legacy(self, mock_stdout): self.maxDiff = None self.assertEqual(output, expected) + @unittest.skipUnless( + os.environ.get('ALLOYDB_PASSWORD'), + "ALLOYDB_PASSWORD environment var is not provided") def test_enrichment_with_google_cloudsql_pg(self, mock_stdout): db_adapter = DatabaseTypeAdapter.POSTGRESQL with EnrichmentTestHelpers.sql_test_context(True, db_adapter): From 286ceba8fd099bdc2fcb966c0fdc83643da713ad Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Tue, 9 Sep 2025 15:40:32 +0000 Subject: [PATCH 15/18] workflows: remove `ALLOYDB_PASSWORD` from beam precommit python coverage --- .github/workflows/beam_PreCommit_Python_Coverage.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/beam_PreCommit_Python_Coverage.yml b/.github/workflows/beam_PreCommit_Python_Coverage.yml index 6e288ceb5f51..9a32336e96a0 100644 --- a/.github/workflows/beam_PreCommit_Python_Coverage.yml +++ b/.github/workflows/beam_PreCommit_Python_Coverage.yml @@ -54,7 +54,6 @@ env: GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} HF_INFERENCE_TOKEN: ${{ secrets.HF_INFERENCE_TOKEN }} - ALLOYDB_PASSWORD: ${{ secrets.ALLOYDB_PASSWORD }} jobs: @@ -113,7 +112,7 @@ jobs: TESTCONTAINERS_HOST_OVERRIDE: ${{ contains(matrix.os, 'self-hosted') && env.DIND_IP || '' }} TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE: "/var/run/docker.sock" TESTCONTAINERS_RYUK_DISABLED: "false" - TESTCONTAINERS_RYUK_CONTAINER_PRIVILEGED: "true" + TESTCONTAINERS_RYUK_CONTAINER_PRIVILEGED: "true" PYTEST_ADDOPTS: "-v --tb=short --maxfail=3 --durations=20 --reruns=2 --reruns-delay=5" TC_TIMEOUT: "120" TC_MAX_TRIES: "120" From 5ea33c9d2d4733f20c628ecef5167391d48ac132 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Tue, 9 Sep 2025 20:35:50 +0000 Subject: [PATCH 16/18] sdks/python: fix duplicate data issue --- .../enrichment_handlers/cloudsql_it_test.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py index f04b1999f660..d0de3fa72b74 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py @@ -302,6 +302,16 @@ def _start_cache_container(self): @classmethod def tearDownClass(cls): + # Attempt to drop table first using raw SQL. + try: + with cls._engine.connect() as conn: + conn.execute(f"DROP TABLE IF EXISTS {cls._table_id}") + conn.commit() + _LOGGER.info(f"Dropped table {cls._table_id}") + except Exception as e: + _LOGGER.warning(f"Failed to drop table {cls._table_id}: {e}") + + # Fallback to metadata drop as a backup. cls._metadata.drop_all(cls._engine) cls._engine.dispose(close=True) cls._engine = None @@ -552,7 +562,8 @@ class TestCloudSQLPostgresEnrichment(BaseCloudSQLDBEnrichment): _db_adapter = DatabaseTypeAdapter.POSTGRESQL # Configuration required for locating the CloudSQL instance. - _table_id = "product_details_cloudsql_pg_enrichment" + _unique_suffix = str(uuid.uuid4())[:8] + _table_id = f"product_details_cloudsql_pg_enrichment_{_unique_suffix}" _gcp_project_id = "apache-beam-testing" _region = "us-central1" _instance_name = "beam-integration-tests" From 8cd248cb282d5f5b2c35bcc9d0de90d1684d8dd7 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Tue, 9 Sep 2025 21:14:49 +0000 Subject: [PATCH 17/18] sdks/python: fix linting --- .../transforms/enrichment_handlers/cloudsql_it_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py index d0de3fa72b74..d42e19513627 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py @@ -307,9 +307,9 @@ def tearDownClass(cls): with cls._engine.connect() as conn: conn.execute(f"DROP TABLE IF EXISTS {cls._table_id}") conn.commit() - _LOGGER.info(f"Dropped table {cls._table_id}") + _LOGGER.info("Dropped table %s", cls._table_id) except Exception as e: - _LOGGER.warning(f"Failed to drop table {cls._table_id}: {e}") + _LOGGER.warning("Failed to drop table %s: %s", cls._table_id, e) # Fallback to metadata drop as a backup. cls._metadata.drop_all(cls._engine) From b12c72ea2719372914dde4476e8c8f9e17d79cc4 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Tue, 9 Sep 2025 21:16:37 +0000 Subject: [PATCH 18/18] sdks/python: reorder table drop approach --- .../transforms/enrichment_handlers/cloudsql_it_test.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py index d42e19513627..15ab0ec0a3a1 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py @@ -302,7 +302,10 @@ def _start_cache_container(self): @classmethod def tearDownClass(cls): - # Attempt to drop table first using raw SQL. + # Drop all tables using metadata as the primary approach. + cls._metadata.drop_all(cls._engine) + + # Fallback to raw SQL drop if needed. try: with cls._engine.connect() as conn: conn.execute(f"DROP TABLE IF EXISTS {cls._table_id}") @@ -311,8 +314,6 @@ def tearDownClass(cls): except Exception as e: _LOGGER.warning("Failed to drop table %s: %s", cls._table_id, e) - # Fallback to metadata drop as a backup. - cls._metadata.drop_all(cls._engine) cls._engine.dispose(close=True) cls._engine = None