diff --git a/.github/workflows/beam_PreCommit_Python_Coverage.yml b/.github/workflows/beam_PreCommit_Python_Coverage.yml index 6e288ceb5f51..9a32336e96a0 100644 --- a/.github/workflows/beam_PreCommit_Python_Coverage.yml +++ b/.github/workflows/beam_PreCommit_Python_Coverage.yml @@ -54,7 +54,6 @@ env: GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} HF_INFERENCE_TOKEN: ${{ secrets.HF_INFERENCE_TOKEN }} - ALLOYDB_PASSWORD: ${{ secrets.ALLOYDB_PASSWORD }} jobs: @@ -113,7 +112,7 @@ jobs: TESTCONTAINERS_HOST_OVERRIDE: ${{ contains(matrix.os, 'self-hosted') && env.DIND_IP || '' }} TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE: "/var/run/docker.sock" TESTCONTAINERS_RYUK_DISABLED: "false" - TESTCONTAINERS_RYUK_CONTAINER_PRIVILEGED: "true" + TESTCONTAINERS_RYUK_CONTAINER_PRIVILEGED: "true" PYTEST_ADDOPTS: "-v --tb=short --maxfail=3 --durations=20 --reruns=2 --reruns-delay=5" TC_TIMEOUT: "120" TC_MAX_TRIES: "120" diff --git a/CHANGES.md b/CHANGES.md index b52d55fdf63e..e59e28b60838 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,6 +74,7 @@ ## New Features / Improvements * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Python examples added for CloudSQL enrichment handler on [Beam website](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment-cloudsql/) (Python) ([#35473](https://github.com/apache/beam/issues/36095)). ## Breaking Changes diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py index acee633b6f67..d71faa6d8477 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py @@ -116,3 +116,214 @@ def enrichment_with_vertex_ai_legacy(): | "Enrich W/ Vertex AI" >> Enrichment(vertex_ai_handler) | "Print" >> beam.Map(print)) # [END enrichment_with_vertex_ai_legacy] + + +def enrichment_with_google_cloudsql_pg(): + # [START enrichment_with_google_cloudsql_pg] + import apache_beam as beam + from apache_beam.transforms.enrichment import Enrichment + from apache_beam.transforms.enrichment_handlers.cloudsql import ( + CloudSQLEnrichmentHandler, + DatabaseTypeAdapter, + TableFieldsQueryConfig, + CloudSQLConnectionConfig) + import os + + database_adapter = DatabaseTypeAdapter.POSTGRESQL + database_uri = os.environ.get("GOOGLE_CLOUD_SQL_DB_URI") + database_user = os.environ.get("GOOGLE_CLOUD_SQL_DB_USER") + database_password = os.environ.get("GOOGLE_CLOUD_SQL_DB_PASSWORD") + database_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_ID") + table_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_TABLE_ID") + where_clause_template = "product_id = :pid" + where_clause_fields = ["product_id"] + + data = [ + beam.Row(product_id=1, name='A'), + beam.Row(product_id=2, name='B'), + beam.Row(product_id=3, name='C'), + ] + + connection_config = CloudSQLConnectionConfig( + db_adapter=database_adapter, + instance_connection_uri=database_uri, + user=database_user, + password=database_password, + db_id=database_id) + + query_config = TableFieldsQueryConfig( + table_id=table_id, + where_clause_template=where_clause_template, + where_clause_fields=where_clause_fields) + + cloudsql_handler = CloudSQLEnrichmentHandler( + connection_config=connection_config, + table_id=table_id, + query_config=query_config) + with beam.Pipeline() as p: + _ = ( + p + | "Create" >> beam.Create(data) + | + "Enrich W/ Google CloudSQL PostgreSQL" >> Enrichment(cloudsql_handler) + | "Print" >> beam.Map(print)) + # [END enrichment_with_google_cloudsql_pg] + + +def enrichment_with_external_pg(): + # [START enrichment_with_external_pg] + import apache_beam as beam + from apache_beam.transforms.enrichment import Enrichment + from apache_beam.transforms.enrichment_handlers.cloudsql import ( + CloudSQLEnrichmentHandler, + DatabaseTypeAdapter, + TableFieldsQueryConfig, + ExternalSQLDBConnectionConfig) + import os + + database_adapter = DatabaseTypeAdapter.POSTGRESQL + database_host = os.environ.get("EXTERNAL_SQL_DB_HOST") + database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT")) + database_user = os.environ.get("EXTERNAL_SQL_DB_USER") + database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD") + database_id = os.environ.get("EXTERNAL_SQL_DB_ID") + table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID") + where_clause_template = "product_id = :pid" + where_clause_fields = ["product_id"] + + data = [ + beam.Row(product_id=1, name='A'), + beam.Row(product_id=2, name='B'), + beam.Row(product_id=3, name='C'), + ] + + connection_config = ExternalSQLDBConnectionConfig( + db_adapter=database_adapter, + host=database_host, + port=database_port, + user=database_user, + password=database_password, + db_id=database_id) + + query_config = TableFieldsQueryConfig( + table_id=table_id, + where_clause_template=where_clause_template, + where_clause_fields=where_clause_fields) + + cloudsql_handler = CloudSQLEnrichmentHandler( + connection_config=connection_config, + table_id=table_id, + query_config=query_config) + with beam.Pipeline() as p: + _ = ( + p + | "Create" >> beam.Create(data) + | "Enrich W/ Unmanaged PostgreSQL" >> Enrichment(cloudsql_handler) + | "Print" >> beam.Map(print)) + # [END enrichment_with_external_pg] + + +def enrichment_with_external_mysql(): + # [START enrichment_with_external_mysql] + import apache_beam as beam + from apache_beam.transforms.enrichment import Enrichment + from apache_beam.transforms.enrichment_handlers.cloudsql import ( + CloudSQLEnrichmentHandler, + DatabaseTypeAdapter, + TableFieldsQueryConfig, + ExternalSQLDBConnectionConfig) + import os + + database_adapter = DatabaseTypeAdapter.MYSQL + database_host = os.environ.get("EXTERNAL_SQL_DB_HOST") + database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT")) + database_user = os.environ.get("EXTERNAL_SQL_DB_USER") + database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD") + database_id = os.environ.get("EXTERNAL_SQL_DB_ID") + table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID") + where_clause_template = "product_id = :pid" + where_clause_fields = ["product_id"] + + data = [ + beam.Row(product_id=1, name='A'), + beam.Row(product_id=2, name='B'), + beam.Row(product_id=3, name='C'), + ] + + connection_config = ExternalSQLDBConnectionConfig( + db_adapter=database_adapter, + host=database_host, + port=database_port, + user=database_user, + password=database_password, + db_id=database_id) + + query_config = TableFieldsQueryConfig( + table_id=table_id, + where_clause_template=where_clause_template, + where_clause_fields=where_clause_fields) + + cloudsql_handler = CloudSQLEnrichmentHandler( + connection_config=connection_config, + table_id=table_id, + query_config=query_config) + with beam.Pipeline() as p: + _ = ( + p + | "Create" >> beam.Create(data) + | "Enrich W/ Unmanaged MySQL" >> Enrichment(cloudsql_handler) + | "Print" >> beam.Map(print)) + # [END enrichment_with_external_mysql] + + +def enrichment_with_external_sqlserver(): + # [START enrichment_with_external_sqlserver] + import apache_beam as beam + from apache_beam.transforms.enrichment import Enrichment + from apache_beam.transforms.enrichment_handlers.cloudsql import ( + CloudSQLEnrichmentHandler, + DatabaseTypeAdapter, + TableFieldsQueryConfig, + ExternalSQLDBConnectionConfig) + import os + + database_adapter = DatabaseTypeAdapter.SQLSERVER + database_host = os.environ.get("EXTERNAL_SQL_DB_HOST") + database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT")) + database_user = os.environ.get("EXTERNAL_SQL_DB_USER") + database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD") + database_id = os.environ.get("EXTERNAL_SQL_DB_ID") + table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID") + where_clause_template = "product_id = :pid" + where_clause_fields = ["product_id"] + + data = [ + beam.Row(product_id=1, name='A'), + beam.Row(product_id=2, name='B'), + beam.Row(product_id=3, name='C'), + ] + + connection_config = ExternalSQLDBConnectionConfig( + db_adapter=database_adapter, + host=database_host, + port=database_port, + user=database_user, + password=database_password, + db_id=database_id) + + query_config = TableFieldsQueryConfig( + table_id=table_id, + where_clause_template=where_clause_template, + where_clause_fields=where_clause_fields) + + cloudsql_handler = CloudSQLEnrichmentHandler( + connection_config=connection_config, + table_id=table_id, + query_config=query_config) + with beam.Pipeline() as p: + _ = ( + p + | "Create" >> beam.Create(data) + | "Enrich W/ Unmanaged SQL Server" >> Enrichment(cloudsql_handler) + | "Print" >> beam.Map(print)) + # [END enrichment_with_external_sqlserver] diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py index afa2bca7ec68..770b75351bd4 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py @@ -18,19 +18,42 @@ # pytype: skip-file # pylint: disable=line-too-long +import os import unittest +import uuid +from collections.abc import Callable +from contextlib import contextmanager +from dataclasses import dataclass from io import StringIO +from typing import Optional import mock +import pytest +from sqlalchemy.engine import Connection as DBAPIConnection # pylint: disable=unused-import try: - from apache_beam.examples.snippets.transforms.elementwise.enrichment import enrichment_with_bigtable, \ - enrichment_with_vertex_ai_legacy - from apache_beam.examples.snippets.transforms.elementwise.enrichment import enrichment_with_vertex_ai + from sqlalchemy import ( + Column, Integer, VARCHAR, Engine, MetaData, create_engine) + from apache_beam.examples.snippets.transforms.elementwise.enrichment import ( + enrichment_with_bigtable, enrichment_with_vertex_ai_legacy) + from apache_beam.examples.snippets.transforms.elementwise.enrichment import ( + enrichment_with_vertex_ai, + enrichment_with_google_cloudsql_pg, + enrichment_with_external_pg, + enrichment_with_external_mysql, + enrichment_with_external_sqlserver) + from apache_beam.transforms.enrichment_handlers.cloudsql import ( + DatabaseTypeAdapter) + from apache_beam.transforms.enrichment_handlers.cloudsql_it_test import ( + SQLEnrichmentTestHelper, + SQLDBContainerInfo, + ConnectionConfig, + CloudSQLConnectionConfig, + ExternalSQLDBConnectionConfig) from apache_beam.io.requestresponse import RequestResponseIO -except ImportError: - raise unittest.SkipTest('RequestResponseIO dependencies are not installed') +except ImportError as e: + raise unittest.SkipTest(f'RequestResponseIO dependencies not installed: {e}') def validate_enrichment_with_bigtable(): @@ -60,52 +83,232 @@ def validate_enrichment_with_vertex_ai_legacy(): return expected -def std_out_to_dict(stdout_lines, row_key): - output_dict = {} - for stdout_line in stdout_lines: - # parse the stdout in a dictionary format so that it can be - # evaluated/compared as one. This allows us to compare without - # considering the order of the stdout or the order that the fields of the - # row are arranged in. - fmtd = '{\"' + stdout_line[4:-1].replace('=', '\": ').replace( - ', ', ', \"').replace('\"\'', '\'') + "}" - stdout_dict = eval(fmtd) # pylint: disable=eval-used - output_dict[stdout_dict[row_key]] = stdout_dict - return output_dict +def validate_enrichment_with_google_cloudsql_pg(): + expected = '''[START enrichment_with_google_cloudsql_pg] +Row(product_id=1, name='A', quantity=2, region_id=3) +Row(product_id=2, name='B', quantity=3, region_id=1) +Row(product_id=3, name='C', quantity=10, region_id=4) + [END enrichment_with_google_cloudsql_pg]'''.splitlines()[1:-1] + return expected + + +def validate_enrichment_with_external_pg(): + expected = '''[START enrichment_with_external_pg] +Row(product_id=1, name='A', quantity=2, region_id=3) +Row(product_id=2, name='B', quantity=3, region_id=1) +Row(product_id=3, name='C', quantity=10, region_id=4) + [END enrichment_with_external_pg]'''.splitlines()[1:-1] + return expected + + +def validate_enrichment_with_external_mysql(): + expected = '''[START enrichment_with_external_mysql] +Row(product_id=1, name='A', quantity=2, region_id=3) +Row(product_id=2, name='B', quantity=3, region_id=1) +Row(product_id=3, name='C', quantity=10, region_id=4) + [END enrichment_with_external_mysql]'''.splitlines()[1:-1] + return expected + + +def validate_enrichment_with_external_sqlserver(): + expected = '''[START enrichment_with_external_sqlserver] +Row(product_id=1, name='A', quantity=2, region_id=3) +Row(product_id=2, name='B', quantity=3, region_id=1) +Row(product_id=3, name='C', quantity=10, region_id=4) + [END enrichment_with_external_sqlserver]'''.splitlines()[1:-1] + return expected @mock.patch('sys.stdout', new_callable=StringIO) +@pytest.mark.uses_testcontainer class EnrichmentTest(unittest.TestCase): def test_enrichment_with_bigtable(self, mock_stdout): enrichment_with_bigtable() output = mock_stdout.getvalue().splitlines() expected = validate_enrichment_with_bigtable() - - self.assertEqual(len(output), len(expected)) - self.assertEqual( - std_out_to_dict(output, 'sale_id'), - std_out_to_dict(expected, 'sale_id')) + self.assertEqual(output, expected) def test_enrichment_with_vertex_ai(self, mock_stdout): enrichment_with_vertex_ai() output = mock_stdout.getvalue().splitlines() expected = validate_enrichment_with_vertex_ai() - self.assertEqual(len(output), len(expected)) - self.assertEqual( - std_out_to_dict(output, 'user_id'), - std_out_to_dict(expected, 'user_id')) + for i in range(len(expected)): + self.assertEqual(set(output[i].split(',')), set(expected[i].split(','))) def test_enrichment_with_vertex_ai_legacy(self, mock_stdout): enrichment_with_vertex_ai_legacy() output = mock_stdout.getvalue().splitlines() expected = validate_enrichment_with_vertex_ai_legacy() self.maxDiff = None + self.assertEqual(output, expected) + + @unittest.skipUnless( + os.environ.get('ALLOYDB_PASSWORD'), + "ALLOYDB_PASSWORD environment var is not provided") + def test_enrichment_with_google_cloudsql_pg(self, mock_stdout): + db_adapter = DatabaseTypeAdapter.POSTGRESQL + with EnrichmentTestHelpers.sql_test_context(True, db_adapter): + try: + enrichment_with_google_cloudsql_pg() + output = mock_stdout.getvalue().splitlines() + expected = validate_enrichment_with_google_cloudsql_pg() + self.assertEqual(output, expected) + except Exception as e: + self.fail(f"Test failed with unexpected error: {e}") + + def test_enrichment_with_external_pg(self, mock_stdout): + db_adapter = DatabaseTypeAdapter.POSTGRESQL + with EnrichmentTestHelpers.sql_test_context(False, db_adapter): + try: + enrichment_with_external_pg() + output = mock_stdout.getvalue().splitlines() + expected = validate_enrichment_with_external_pg() + self.assertEqual(output, expected) + except Exception as e: + self.fail(f"Test failed with unexpected error: {e}") + + def test_enrichment_with_external_mysql(self, mock_stdout): + db_adapter = DatabaseTypeAdapter.MYSQL + with EnrichmentTestHelpers.sql_test_context(False, db_adapter): + try: + enrichment_with_external_mysql() + output = mock_stdout.getvalue().splitlines() + expected = validate_enrichment_with_external_mysql() + self.assertEqual(output, expected) + except Exception as e: + self.fail(f"Test failed with unexpected error: {e}") + + def test_enrichment_with_external_sqlserver(self, mock_stdout): + db_adapter = DatabaseTypeAdapter.SQLSERVER + with EnrichmentTestHelpers.sql_test_context(False, db_adapter): + try: + enrichment_with_external_sqlserver() + output = mock_stdout.getvalue().splitlines() + expected = validate_enrichment_with_external_sqlserver() + self.assertEqual(output, expected) + except Exception as e: + self.fail(f"Test failed with unexpected error: {e}") + + +@dataclass +class CloudSQLEnrichmentTestDataConstruct: + client_handler: Callable[[], DBAPIConnection] + engine: Engine + metadata: MetaData + db: SQLDBContainerInfo = None + + +class EnrichmentTestHelpers: + @contextmanager + def sql_test_context(is_cloudsql: bool, db_adapter: DatabaseTypeAdapter): + result: Optional[CloudSQLEnrichmentTestDataConstruct] = None + try: + result = EnrichmentTestHelpers.pre_sql_enrichment_test( + is_cloudsql, db_adapter) + yield + finally: + if result: + EnrichmentTestHelpers.post_sql_enrichment_test(result) + + @staticmethod + def pre_sql_enrichment_test( + is_cloudsql: bool, + db_adapter: DatabaseTypeAdapter) -> CloudSQLEnrichmentTestDataConstruct: + unique_suffix = str(uuid.uuid4())[:8] + table_id = f"products_{unique_suffix}" + columns = [ + Column("product_id", Integer, primary_key=True), + Column("name", VARCHAR(255), nullable=False), + Column("quantity", Integer, nullable=False), + Column("region_id", Integer, nullable=False), + ] + table_data = [ + { + "product_id": 1, "name": "A", 'quantity': 2, 'region_id': 3 + }, + { + "product_id": 2, "name": "B", 'quantity': 3, 'region_id': 1 + }, + { + "product_id": 3, "name": "C", 'quantity': 10, 'region_id': 4 + }, + ] + metadata = MetaData() + + connection_config: ConnectionConfig + db = None + if is_cloudsql: + gcp_project_id = "apache-beam-testing" + region = "us-central1" + instance_name = "beam-integration-tests" + instance_connection_uri = f"{gcp_project_id}:{region}:{instance_name}" + db_id = "postgres" + user = "postgres" + password = os.getenv("ALLOYDB_PASSWORD") + os.environ['GOOGLE_CLOUD_SQL_DB_URI'] = instance_connection_uri + os.environ['GOOGLE_CLOUD_SQL_DB_ID'] = db_id + os.environ['GOOGLE_CLOUD_SQL_DB_USER'] = user + os.environ['GOOGLE_CLOUD_SQL_DB_PASSWORD'] = password + os.environ['GOOGLE_CLOUD_SQL_DB_TABLE_ID'] = table_id + connection_config = CloudSQLConnectionConfig( + db_adapter=db_adapter, + instance_connection_uri=instance_connection_uri, + user=user, + password=password, + db_id=db_id) + else: + db = SQLEnrichmentTestHelper.start_sql_db_container(db_adapter) + os.environ['EXTERNAL_SQL_DB_HOST'] = db.host + os.environ['EXTERNAL_SQL_DB_PORT'] = str(db.port) + os.environ['EXTERNAL_SQL_DB_ID'] = db.id + os.environ['EXTERNAL_SQL_DB_USER'] = db.user + os.environ['EXTERNAL_SQL_DB_PASSWORD'] = db.password + os.environ['EXTERNAL_SQL_DB_TABLE_ID'] = table_id + connection_config = ExternalSQLDBConnectionConfig( + db_adapter=db_adapter, + host=db.host, + port=db.port, + user=db.user, + password=db.password, + db_id=db.id) + + conenctor = connection_config.get_connector_handler() + engine = create_engine( + url=connection_config.get_db_url(), creator=conenctor) + + SQLEnrichmentTestHelper.create_table( + table_id=table_id, + engine=engine, + columns=columns, + table_data=table_data, + metadata=metadata) + + result = CloudSQLEnrichmentTestDataConstruct( + db=db, client_handler=conenctor, engine=engine, metadata=metadata) + return result + + @staticmethod + def post_sql_enrichment_test(res: CloudSQLEnrichmentTestDataConstruct): + # Clean up the data inserted previously. + res.metadata.drop_all(res.engine) + res.engine.dispose(close=True) - self.assertEqual(len(output), len(expected)) - self.assertEqual( - std_out_to_dict(output, 'entity_id'), - std_out_to_dict(expected, 'entity_id')) + # Check if the test used a container-based external SQL database. + if res.db: + SQLEnrichmentTestHelper.stop_sql_db_container(res.db) + os.environ.pop('EXTERNAL_SQL_DB_HOST', None) + os.environ.pop('EXTERNAL_SQL_DB_PORT', None) + os.environ.pop('EXTERNAL_SQL_DB_ID', None) + os.environ.pop('EXTERNAL_SQL_DB_USER', None) + os.environ.pop('EXTERNAL_SQL_DB_PASSWORD', None) + os.environ.pop('EXTERNAL_SQL_DB_TABLE_ID', None) + else: + os.environ.pop('GOOGLE_CLOUD_SQL_DB_URI', None) + os.environ.pop('GOOGLE_CLOUD_SQL_DB_ID', None) + os.environ.pop('GOOGLE_CLOUD_SQL_DB_USER', None) + os.environ.pop('GOOGLE_CLOUD_SQL_DB_PASSWORD', None) + os.environ.pop('GOOGLE_CLOUD_SQL_DB_TABLE_ID', None) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py index f070158d1c54..3fe3a62f9546 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py @@ -478,16 +478,14 @@ def _build_parameters_dict( # For batched queries, use unique parameter names per batch item. if batch_size > 1: - # Extract parameter names from the template using regex. - # Batching is only used with table-based query configs + # Batching is only used with table-based query configs. table_query_configs = (TableFieldsQueryConfig, TableFunctionQueryConfig) assert isinstance(self._query_config, table_query_configs) - param_names = self._extract_parameter_names( - self._query_config.where_clause_template) - for param_name, val in zip(param_names, current_values): + batch_param_dict = self._build_single_param_dict(current_values) + # Prefix batch parameters to make them globally unique. + for param_name, val in batch_param_dict.items(): param_dict[f'batch_{i}_{param_name}'] = val else: - # For single request, use the helper function. single_param_dict = self._build_single_param_dict(current_values) param_dict.update(single_param_dict) @@ -502,17 +500,15 @@ def _build_single_param_dict(self, values: list[Any]) -> dict[str, Any]: Returns: Dictionary mapping parameter names to values """ - if isinstance(self._query_config, TableFieldsQueryConfig): - return { - field_name: val - for field_name, val in zip( - self._query_config.where_clause_fields, values) - } - else: # TableFunctionQueryConfig. - assert isinstance(self._query_config, TableFunctionQueryConfig) - _, param_dict = self._get_unique_template_and_params( - self._query_config.where_clause_template, values) - return param_dict + table_query_configs = (TableFieldsQueryConfig, TableFunctionQueryConfig) + if not isinstance(self._query_config, table_query_configs): + raise ValueError( + f"Parameter binding not supported for " + f"{type(self._query_config).__name__}") + + _, param_dict = self._get_unique_template_and_params( + self._query_config.where_clause_template, values) + return param_dict def _get_unique_template_and_params( self, template: str, values: list[Any]) -> tuple[str, dict[str, Any]]: diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py index 3d9cd18151b6..15ab0ec0a3a1 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_it_test.py @@ -208,7 +208,6 @@ def create_table( raise Exception(f"Failed to insert table data: {e}") -@pytest.mark.uses_testcontainer class BaseTestSQLEnrichment(unittest.TestCase): _table_data = [ { @@ -303,7 +302,18 @@ def _start_cache_container(self): @classmethod def tearDownClass(cls): + # Drop all tables using metadata as the primary approach. cls._metadata.drop_all(cls._engine) + + # Fallback to raw SQL drop if needed. + try: + with cls._engine.connect() as conn: + conn.execute(f"DROP TABLE IF EXISTS {cls._table_id}") + conn.commit() + _LOGGER.info("Dropped table %s", cls._table_id) + except Exception as e: + _LOGGER.warning("Failed to drop table %s: %s", cls._table_id, e) + cls._engine.dispose(close=True) cls._engine = None @@ -320,7 +330,7 @@ def test_sql_enrichment(self): query_config = TableFieldsQueryConfig( table_id=self._table_id, - where_clause_template="id = :id", + where_clause_template="id = :id_param", where_clause_fields=fields) handler = CloudSQLEnrichmentHandler( @@ -330,7 +340,7 @@ def test_sql_enrichment(self): max_batch_size=100, ) - with TestPipeline(is_integration_test=True) as test_pipeline: + with TestPipeline() as test_pipeline: pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler)) assert_that(pcoll, equal_to(expected_rows)) @@ -357,7 +367,7 @@ def test_sql_enrichment_batched(self): min_batch_size=2, max_batch_size=100, ) - with TestPipeline(is_integration_test=True) as test_pipeline: + with TestPipeline() as test_pipeline: pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler)) assert_that(pcoll, equal_to(expected_rows)) @@ -384,7 +394,7 @@ def test_sql_enrichment_batched_multiple_fields(self): min_batch_size=8, max_batch_size=100, ) - with TestPipeline(is_integration_test=True) as test_pipeline: + with TestPipeline() as test_pipeline: pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler)) assert_that(pcoll, equal_to(expected_rows)) @@ -404,7 +414,7 @@ def test_sql_enrichment_with_query_fn(self): handler = CloudSQLEnrichmentHandler( connection_config=self._connection_config, query_config=query_config) - with TestPipeline(is_integration_test=True) as test_pipeline: + with TestPipeline() as test_pipeline: pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler)) assert_that(pcoll, equal_to(expected_rows)) @@ -429,7 +439,7 @@ def test_sql_enrichment_with_condition_value_fn(self): query_config=query_config, min_batch_size=2, max_batch_size=100) - with TestPipeline(is_integration_test=True) as test_pipeline: + with TestPipeline() as test_pipeline: pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler)) assert_that(pcoll, equal_to(expected_rows)) @@ -481,7 +491,7 @@ def test_sql_enrichment_with_redis(self): query_config=query_config, min_batch_size=2, max_batch_size=100) - with TestPipeline(is_integration_test=True) as test_pipeline: + with TestPipeline() as test_pipeline: pcoll_populate_cache = ( test_pipeline | beam.Create(requests) @@ -506,7 +516,7 @@ def test_sql_enrichment_with_redis(self): side_effect=Exception("Database should not be called on a cache hit.")) # Run a second pipeline to verify cache is being used. - with TestPipeline(is_integration_test=True) as test_pipeline: + with TestPipeline() as test_pipeline: pcoll_cached = ( test_pipeline | beam.Create(requests) @@ -553,7 +563,8 @@ class TestCloudSQLPostgresEnrichment(BaseCloudSQLDBEnrichment): _db_adapter = DatabaseTypeAdapter.POSTGRESQL # Configuration required for locating the CloudSQL instance. - _table_id = "product_details_cloudsql_pg_enrichment" + _unique_suffix = str(uuid.uuid4())[:8] + _table_id = f"product_details_cloudsql_pg_enrichment_{_unique_suffix}" _gcp_project_id = "apache-beam-testing" _region = "us-central1" _instance_name = "beam-integration-tests" @@ -567,7 +578,6 @@ class TestCloudSQLPostgresEnrichment(BaseCloudSQLDBEnrichment): _metadata = MetaData() -@pytest.mark.uses_testcontainer class BaseExternalSQLDBEnrichment(BaseTestSQLEnrichment): @classmethod def setUpClass(cls): @@ -595,7 +605,6 @@ def tearDownClass(cls): cls._db = None -@pytest.mark.uses_testcontainer class TestExternalPostgresEnrichment(BaseExternalSQLDBEnrichment): _db_adapter = DatabaseTypeAdapter.POSTGRESQL _unique_suffix = str(uuid.uuid4())[:8] @@ -603,7 +612,6 @@ class TestExternalPostgresEnrichment(BaseExternalSQLDBEnrichment): _metadata = MetaData() -@pytest.mark.uses_testcontainer class TestExternalMySQLEnrichment(BaseExternalSQLDBEnrichment): _db_adapter = DatabaseTypeAdapter.MYSQL _unique_suffix = str(uuid.uuid4())[:8] @@ -611,7 +619,6 @@ class TestExternalMySQLEnrichment(BaseExternalSQLDBEnrichment): _metadata = MetaData() -@pytest.mark.uses_testcontainer class TestExternalSQLServerEnrichment(BaseExternalSQLDBEnrichment): _db_adapter = DatabaseTypeAdapter.SQLSERVER _unique_suffix = str(uuid.uuid4())[:8] diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index f344cfc61ccf..b478c6fc59be 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -31,7 +31,7 @@ select = E3 # https://github.com/apache/beam/issues/25668 pip_pre = True # allow apps that support color to use it. -passenv=TERM,CLOUDSDK_CONFIG,DOCKER_*,TESTCONTAINERS_*,TC_* +passenv=TERM,CLOUDSDK_CONFIG,DOCKER_*,TESTCONTAINERS_*,TC_*,ALLOYDB_PASSWORD # Set [] options for pip installation of apache-beam tarball. extras = test,dataframe # Don't warn that these commands aren't installed. diff --git a/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md new file mode 100644 index 000000000000..a29b2672e678 --- /dev/null +++ b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md @@ -0,0 +1,146 @@ +--- +title: "Enrichment with CloudSQL" +--- + + +# Use CloudSQL to enrich data + +{{< localstorage language language-py >}} + + + + + +
+ + {{< button-pydoc path="apache_beam.transforms.enrichment_handlers.cloudsql" class="CloudSQLEnrichmentHandler" >}} + +
+ +Starting with Apache Beam 2.69.0, the enrichment transform includes +built-in enrichment handler support for the +[Google CloudSQL](https://cloud.google.com/sql/docs). This handler allows your +Beam pipeline to enrich data using SQL databases, with built-in support for: + +- Managed PostgreSQL, MySQL, and Microsoft SQL Server instances on CloudSQL +- Unmanaged SQL database instances not hosted on CloudSQL (e.g., self-hosted or + on-premises databases) + +The following example demonstrates how to create a pipeline that use the +enrichment transform with the +[`CloudSQLEnrichmentHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.cloudsql.html#apache_beam.transforms.enrichment_handlers.cloudsql.CloudSQLEnrichmentHandler) handler. + +## Example 1: Enrichment with Google CloudSQL (Managed PostgreSQL) + +The data in the CloudSQL PostgreSQL table `products` follows this format: + +{{< table >}} +| product_id | name | quantity | region_id | +|:----------:|:----:|:--------:|:---------:| +| 1 | A | 2 | 3 | +| 2 | B | 3 | 1 | +| 3 | C | 10 | 4 | +{{< /table >}} + + +{{< highlight language="py" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py" enrichment_with_google_cloudsql_pg >}} +{{}} + +{{< paragraph class="notebook-skip" >}} +Output: +{{< /paragraph >}} +{{< highlight class="notebook-skip" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py" enrichment_with_google_cloudsql_pg >}} +{{< /highlight >}} + +## Example 2: Enrichment with Unmanaged PostgreSQL + +The data in the Unmanaged PostgreSQL table `products` follows this format: + +{{< table >}} +| product_id | name | quantity | region_id | +|:----------:|:----:|:--------:|:---------:| +| 1 | A | 2 | 3 | +| 2 | B | 3 | 1 | +| 3 | C | 10 | 4 | +{{< /table >}} + + +{{< highlight language="py" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py" enrichment_with_external_pg >}} +{{}} + +{{< paragraph class="notebook-skip" >}} +Output: +{{< /paragraph >}} +{{< highlight class="notebook-skip" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py" enrichment_with_external_pg >}} +{{< /highlight >}} + +## Example 3: Enrichment with Unmanaged MySQL + +The data in the Unmanaged MySQL table `products` follows this format: + +{{< table >}} +| product_id | name | quantity | region_id | +|:----------:|:----:|:--------:|:---------:| +| 1 | A | 2 | 3 | +| 2 | B | 3 | 1 | +| 3 | C | 10 | 4 | +{{< /table >}} + + +{{< highlight language="py" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py" enrichment_with_external_mysql >}} +{{}} + +{{< paragraph class="notebook-skip" >}} +Output: +{{< /paragraph >}} +{{< highlight class="notebook-skip" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py" enrichment_with_external_mysql >}} +{{< /highlight >}} + +## Example 4: Enrichment with Unmanaged Microsoft SQL Server + +The data in the Unmanaged Microsoft SQL Server table `products` follows this +format: + +{{< table >}} +| product_id | name | quantity | region_id | +|:----------:|:----:|:--------:|:---------:| +| 1 | A | 2 | 3 | +| 2 | B | 3 | 1 | +| 3 | C | 10 | 4 | +{{< /table >}} + + +{{< highlight language="py" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py" enrichment_with_external_sqlserver >}} +{{}} + +{{< paragraph class="notebook-skip" >}} +Output: +{{< /paragraph >}} +{{< highlight class="notebook-skip" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py" enrichment_with_external_sqlserver >}} +{{< /highlight >}} + +## Related transforms + +Not applicable. + +{{< button-pydoc path="apache_beam.transforms.enrichment_handlers.cloudsql" class="CloudSQLEnrichmentHandler" >}} diff --git a/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment.md b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment.md index 6c05b6b515a4..4b352d0447ad 100644 --- a/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment.md +++ b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment.md @@ -42,6 +42,7 @@ The following examples demonstrate how to create a pipeline that use the enrichm | Service | Example | |:-----------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | Cloud Bigtable | [Enrichment with Bigtable](/documentation/transforms/python/elementwise/enrichment-bigtable/#example) | +| Cloud SQL (PostgreSQL, MySQL, SQLServer) | [Enrichment with CloudSQL](/documentation/transforms/python/elementwise/enrichment-cloudsql) | | Vertex AI Feature Store | [Enrichment with Vertex AI Feature Store](/documentation/transforms/python/elementwise/enrichment-vertexai/#example-1-enrichment-with-vertex-ai-feature-store) | | Vertex AI Feature Store (Legacy) | [Enrichment with Legacy Vertex AI Feature Store](/documentation/transforms/python/elementwise/enrichment-vertexai/#example-2-enrichment-with-vertex-ai-feature-store-legacy) | {{< /table >}} diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html index 2386ecb39d9d..1a60cfbdd9f1 100755 --- a/website/www/site/layouts/partials/section-menu/en/documentation.html +++ b/website/www/site/layouts/partials/section-menu/en/documentation.html @@ -297,6 +297,7 @@