From 173c85cab1de640068143ff8b1df989affd26150 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 21 May 2025 18:00:51 +0000 Subject: [PATCH 01/10] add kafka yaml test --- sdks/python/apache_beam/yaml/tests/kafka.yaml | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/tests/kafka.yaml diff --git a/sdks/python/apache_beam/yaml/tests/kafka.yaml b/sdks/python/apache_beam/yaml/tests/kafka.yaml new file mode 100644 index 000000000000..c2e3c8657ee2 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/kafka.yaml @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_BOOTSTAP_SERVER + type: "apache_beam.yaml.integration_tests.temp_kafka_server" + +pipelines: + # Kafka write pipeline + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {value: 123} + - {value: 456} + - {value: 789} + - type: MapToFields + config: + language: python + fields: + value: + callable: | + lambda row: str(row.value).encode('utf-8') + output_type: bytes + - type: WriteToKafka + config: + format: "RAW" + topic: "silly_topic" + bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}" + producer_config_updates: + linger.ms: "0" + + # Kafka read pipeline + # Need a separate read pipeline to make sure the write pipeline is flushed + - pipeline: + type: chain + transforms: + - type: ReadFromKafka + config: + format: "RAW" + topic: "silly_topic" + bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}" + consumer_config: + auto.offset.reset: "earliest" + group.id: "yaml-kafka-test-group" + max_read_time_seconds: 10 # will read forever if not set + - type: MapToFields + config: + language: python + fields: + value: + callable: | + # Kafka RAW format reads messages as bytes in the 'payload' field of a Row + lambda row: row.payload.decode('utf-8') + output_type: string + - type: AssertEqual + config: + elements: + - {value: "123"} + - {value: "456"} + - {value: "789"} + +# TODO: Error handling hard to trigger upon initial investigations. Need to +# investigate more. From 5fc2671000886be3e9344e3b0cce8b1f8e8bc138 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 21 May 2025 18:01:47 +0000 Subject: [PATCH 02/10] add oracle yaml test --- .../python/apache_beam/yaml/tests/oracle.yaml | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/tests/oracle.yaml diff --git a/sdks/python/apache_beam/yaml/tests/oracle.yaml b/sdks/python/apache_beam/yaml/tests/oracle.yaml new file mode 100644 index 000000000000..01d23989d166 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/oracle.yaml @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_DB + type: "apache_beam.yaml.integration_tests.temp_oracle_database" + +pipelines: + # Oracle write pipeline + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + - type: WriteToOracle + config: + url: "{TEMP_DB}" + query: "INSERT INTO tmp_table (value, rank) VALUES(?,?)" + + # Oracle read pipeline + # Need a separate read pipeline to make sure the write pipeline is flushed + - pipeline: + type: chain + transforms: + - type: ReadFromOracle + config: + url: "{TEMP_DB}" + query: "SELECT * FROM tmp_table" + driver_class_name: "oracle.jdbc.OracleDriver" + - type: MapToFields + config: + language: python + fields: + value: + callable: "lambda x: int(x.VALUE)" + output_type: integer + rank: + callable: "lambda x: int(x.RANK)" + output_type: integer + - type: AssertEqual + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} From 92de2777a126975214b3a6be809afc5b78ce1600 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 21 May 2025 18:02:09 +0000 Subject: [PATCH 03/10] add pubsub yaml test --- .../python/apache_beam/yaml/tests/pubsub.yaml | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/tests/pubsub.yaml diff --git a/sdks/python/apache_beam/yaml/tests/pubsub.yaml b/sdks/python/apache_beam/yaml/tests/pubsub.yaml new file mode 100644 index 000000000000..41f739ac77e0 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/pubsub.yaml @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: PS_TOPIC + type: "apache_beam.yaml.integration_tests.temp_pubsub_emulator" + config: + project_id: "apache-beam-testing" + +pipelines: + # Pubsub write pipeline + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {value: "11a"} + - {value: "37a"} + - {value: "389a"} + - type: WriteToPubSub + config: + topic: "{PS_TOPIC}" + format: "RAW" + + options: + streaming: true + + +# TODO: Current PubSubIO doesn't have a max_read_time_seconds parameter like +# Kafka does. Without it, the ReadFromPubSub will run forever. This is not a +# trival change. For now, we will live with the mocked tests located +# [here](https://github.com/apache/beam/blob/bea04446b41c86856c24d0a9761622092ed9936f/sdks/python/apache_beam/yaml/yaml_io_test.py#L83). + + # - pipeline: + # type: chain + # transforms: + # - type: ReadFromPubSub + # config: + # topic: "{PS_TOPIC}" + # format: "RAW" + # # ... + + + # options: + # streaming: true + From e0a61d44f392800dc73345737e07352aee645173 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 21 May 2025 18:02:26 +0000 Subject: [PATCH 04/10] add runinference yaml test --- .../apache_beam/yaml/tests/runinference.yaml | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/tests/runinference.yaml diff --git a/sdks/python/apache_beam/yaml/tests/runinference.yaml b/sdks/python/apache_beam/yaml/tests/runinference.yaml new file mode 100644 index 000000000000..f87ae4b44acf --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/runinference.yaml @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pipelines: + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - question: "What is a car?" + - question: "Where is the Eiffel Tower located?" + - type: RunInference + config: + model_handler: + type: "VertexAIModelHandlerJSON" + config: + endpoint_id: 9157860935048626176 + project: "apache-beam-testing" + location: "us-central1" + preprocess: + callable: 'lambda x: {"prompt": x.question}' + private: false + - type: PyTransform + name: TransformInferenceBeamRowsToSimpleSchemaRows + config: + constructor: __callable__ + kwargs: + source: | + import apache_beam as beam + def simplify_elements(pcoll): + return pcoll | beam.Map(lambda row: beam.Row(dummy_field=1)) + - type: Sql + name: CountRows + config: + query: "SELECT COUNT(*) AS actual_inference_count FROM PCOLLECTION" + - type: AssertEqual + config: + elements: + - {actual_inference_count: 2} + + options: + yaml_experimental_features: ['ML'] \ No newline at end of file From 5410a7e539797d283f5140839c4ff9c271459f71 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 27 May 2025 19:59:26 +0000 Subject: [PATCH 05/10] fix rebase confilct --- .../apache_beam/yaml/integration_tests.py | 136 +++++++++++++++++- 1 file changed, 135 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index d005bfe0f797..6f0902346b44 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -31,12 +31,20 @@ import mysql.connector import psycopg2 import pytds +import random import sqlalchemy +import string import yaml +from testcontainers.core.container import DockerContainer +from testcontainers.core.waiting_utils import wait_for_logs +from testcontainers.google import PubSubContainer +from testcontainers.kafka import KafkaContainer from testcontainers.mssql import SqlServerContainer from testcontainers.mysql import MySqlContainer from testcontainers.postgres import PostgresContainer +from google.cloud import pubsub_v1 + import apache_beam as beam from apache_beam.io import filesystems from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper @@ -351,6 +359,129 @@ def temp_sqlserver_database(): raise err +class OracleTestContainer(DockerContainer): + """ + OracleTestContainer is an updated version of OracleDBContainer that goes + ahead and sets the oracle password, waits for logs to establish that the + container is ready before calling get_exposed_port, and uses a more modern + oracle driver. + """ + def __init__(self): + super().__init__("gvenzl/oracle-xe:21-slim") + self.with_env("ORACLE_PASSWORD", "oracle") + self.with_exposed_ports(1521) + + def start(self): + super().start() + wait_for_logs(self, "DATABASE IS READY TO USE!", timeout=300) + return self + + def get_connection_url(self): + port = self.get_exposed_port(1521) + return \ + f"oracle+oracledb://system:oracle@localhost:{port}/?service_name=XEPDB1" + + +@contextlib.contextmanager +def temp_oracle_database(): + """Context manager to provide a temporary Oracle database for testing. + + This function utilizes the 'testcontainers' library to spin up an + Oracle Database instance within a Docker container. It then connects + to this temporary database using 'oracledb', creates a predefined + + NOTE: A custom OracleTestContainer class was created due to the current + version (OracleDBContainer) that calls get_exposed_port too soon causing the + service to hang until timeout. + + Yields: + str: A JDBC connection string for the temporary Oracle database. + Example format: + "jdbc:oracle:thin:system/oracle@localhost:{port}/XEPDB1" + + Raises: + oracledb.Error: If there's an error connecting to or interacting with + the Oracle database during setup. + Exception: Any other exception encountered during the setup process. + """ + with OracleTestContainer() as oracle: + engine = sqlalchemy.create_engine(oracle.get_connection_url()) + with engine.connect() as connection: + connection.execute( + sqlalchemy.text( + """ + CREATE TABLE tmp_table ( + value NUMBER PRIMARY KEY, + rank NUMBER + ) + """)) + connection.commit() + port = oracle.get_exposed_port(1521) + yield f"jdbc:oracle:thin:system/oracle@localhost:{port}/XEPDB1" + + +@contextlib.contextmanager +def temp_kafka_server(): + """Context manager to provide a temporary Kafka server for testing. + + This function utilizes the 'testcontainers' library to spin up a Kafka + instance within a Docker container. It then yields the bootstrap server + string, which can be used by Kafka clients to connect to this temporary + server. + + The Docker container and the Kafka instance are automatically managed + and torn down when the context manager exits. + + Yields: + str: The bootstrap server string for the temporary Kafka instance. + Example format: "localhost:XXXXX" or "PLAINTEXT://localhost:XXXXX" + + Raises: + Exception: If there's an error starting the Kafka container or + interacting with the temporary Kafka server. + """ + try: + with KafkaContainer() as kafka_container: + yield kafka_container.get_bootstrap_server() + except Exception as err: + logging.error("Error interacting with temporary Kakfa Server: %s", err) + raise err + + +@contextlib.contextmanager +def temp_pubsub_emulator(project_id="apache-beam-testing"): + """ + Context manager to provide a temporary Pub/Sub emulator for testing. + + This function uses 'testcontainers' to spin up a Google Cloud SDK + container running the Pub/Sub emulator. It yields the emulator host + string (e.g., "localhost:xxxxx") that can be used to configure Pub/Sub + clients. + + The Docker container is automatically managed and torn down when the + context manager exits. + + Args: + project_id (str): The GCP project ID to be used by the emulator. + This doesn't need to be a real project for the emulator. + + Yields: + str: The host and port for the Pub/Sub emulator (e.g., "localhost:xxxx"). + This will be the address to point your Pub/Sub client to. + + Raises: + Exception: If the container fails to start or the emulator isn't ready. + """ + with PubSubContainer(project=project_id) as pubsub_container: + publisher = pubsub_v1.PublisherClient() + random_front_charactor = random.choice(string.ascii_lowercase) + topic_id = f"{random_front_charactor}{uuid.uuid4().hex[:8]}" + topic_name_to_create = \ + f"projects/{pubsub_container.project}/topics/{topic_id}" + created_topic_object = publisher.create_topic(name=topic_name_to_create) + yield created_topic_object.name + + def replace_recursive(spec, vars): if isinstance(spec, dict): return { @@ -359,7 +490,10 @@ def replace_recursive(spec, vars): } elif isinstance(spec, list): return [replace_recursive(value, vars) for value in spec] - elif isinstance(spec, str) and '{' in spec and '{\n' not in spec: + # TODO(derrickaw): Consider checking for callable in the if branch above + # instead of checking lambda here. + elif isinstance( + spec, str) and '{' in spec and '{\n' not in spec and 'lambda' not in spec: try: return spec.format(**vars) except Exception as exn: From 4c14fd8b49a8a49e3d7b2be48d8eb0c5af8c4e4e Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 21 May 2025 18:22:42 +0000 Subject: [PATCH 06/10] remove env vars parameter that shouldn't be exposed --- sdks/python/apache_beam/yaml/yaml_ml.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_ml.py b/sdks/python/apache_beam/yaml/yaml_ml.py index dcb4c507776d..a2c7a19563bc 100644 --- a/sdks/python/apache_beam/yaml/yaml_ml.py +++ b/sdks/python/apache_beam/yaml/yaml_ml.py @@ -141,8 +141,7 @@ def __init__( private: bool = False, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, - max_batch_duration_secs: Optional[int] = None, - env_vars: Optional[dict[str, Any]] = None): + max_batch_duration_secs: Optional[int] = None): """ ModelHandler for Vertex AI. @@ -214,7 +213,6 @@ def __init__( inputs. max_batch_duration_secs: The maximum amount of time to buffer a batch before emitting; used in streaming contexts. - env_vars: Environment variables. """ try: @@ -233,8 +231,7 @@ def __init__( private=private, min_batch_size=min_batch_size, max_batch_size=max_batch_size, - max_batch_duration_secs=max_batch_duration_secs, - env_vars=env_vars or {}) + max_batch_duration_secs=max_batch_duration_secs) super().__init__(_handler, preprocess, postprocess) From bed04b66935ac64692c4fd1f05011e2f6bbeb396 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 21 May 2025 19:40:25 +0000 Subject: [PATCH 07/10] add kafka dependency --- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index f19861fa0dd2..d6c4088c1be4 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -424,7 +424,7 @@ def get_portability_package_data(): 'sqlalchemy>=1.3,<3.0', 'psycopg2-binary>=2.8.5,<2.9.10; python_version <= "3.9"', 'psycopg2-binary>=2.8.5,<3.0; python_version >= "3.10"', - 'testcontainers[mysql]>=3.0.3,<4.0.0', + 'testcontainers[mysql,kafka]>=3.0.3,<4.0.0', 'cryptography>=41.0.2', 'hypothesis>5.0.0,<7.0.0', 'virtualenv-clone>=0.5,<1.0', From f209a8ca1896328178deb798955f4ef29c8ba961 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 27 May 2025 20:01:12 +0000 Subject: [PATCH 08/10] fix rebase confilct --- sdks/python/apache_beam/yaml/integration_tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index 6f0902346b44..77fea4d4e387 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -23,7 +23,9 @@ import itertools import logging import os +import random import sqlite3 +import string import unittest import uuid @@ -31,9 +33,7 @@ import mysql.connector import psycopg2 import pytds -import random import sqlalchemy -import string import yaml from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs From cb0200f650e669c1a50687827639dc84f26cfe45 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 28 May 2025 14:01:01 +0000 Subject: [PATCH 09/10] fix lint issue --- sdks/python/apache_beam/yaml/integration_tests.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index 77fea4d4e387..4071a9fb5f24 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -35,6 +35,7 @@ import pytds import sqlalchemy import yaml +from google.cloud import pubsub_v1 from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs from testcontainers.google import PubSubContainer @@ -43,8 +44,6 @@ from testcontainers.mysql import MySqlContainer from testcontainers.postgres import PostgresContainer -from google.cloud import pubsub_v1 - import apache_beam as beam from apache_beam.io import filesystems from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper From 9f2c949aaf7d67ca3f999bdb5c6da1ef99c4df12 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 28 May 2025 16:42:57 +0000 Subject: [PATCH 10/10] update todo from derrickaw to issue # --- sdks/python/apache_beam/yaml/integration_tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index 4071a9fb5f24..d8c04d9718f8 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -489,8 +489,8 @@ def replace_recursive(spec, vars): } elif isinstance(spec, list): return [replace_recursive(value, vars) for value in spec] - # TODO(derrickaw): Consider checking for callable in the if branch above - # instead of checking lambda here. + # TODO(https://github.com/apache/beam/issues/35067): Consider checking for + # callable in the if branch above instead of checking lambda here. elif isinstance( spec, str) and '{' in spec and '{\n' not in spec and 'lambda' not in spec: try: