diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index d005bfe0f797..d8c04d9718f8 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -23,7 +23,9 @@ import itertools import logging import os +import random import sqlite3 +import string import unittest import uuid @@ -33,6 +35,11 @@ import pytds import sqlalchemy import yaml +from google.cloud import pubsub_v1 +from testcontainers.core.container import DockerContainer +from testcontainers.core.waiting_utils import wait_for_logs +from testcontainers.google import PubSubContainer +from testcontainers.kafka import KafkaContainer from testcontainers.mssql import SqlServerContainer from testcontainers.mysql import MySqlContainer from testcontainers.postgres import PostgresContainer @@ -351,6 +358,129 @@ def temp_sqlserver_database(): raise err +class OracleTestContainer(DockerContainer): + """ + OracleTestContainer is an updated version of OracleDBContainer that goes + ahead and sets the oracle password, waits for logs to establish that the + container is ready before calling get_exposed_port, and uses a more modern + oracle driver. + """ + def __init__(self): + super().__init__("gvenzl/oracle-xe:21-slim") + self.with_env("ORACLE_PASSWORD", "oracle") + self.with_exposed_ports(1521) + + def start(self): + super().start() + wait_for_logs(self, "DATABASE IS READY TO USE!", timeout=300) + return self + + def get_connection_url(self): + port = self.get_exposed_port(1521) + return \ + f"oracle+oracledb://system:oracle@localhost:{port}/?service_name=XEPDB1" + + +@contextlib.contextmanager +def temp_oracle_database(): + """Context manager to provide a temporary Oracle database for testing. + + This function utilizes the 'testcontainers' library to spin up an + Oracle Database instance within a Docker container. It then connects + to this temporary database using 'oracledb', creates a predefined + + NOTE: A custom OracleTestContainer class was created due to the current + version (OracleDBContainer) that calls get_exposed_port too soon causing the + service to hang until timeout. + + Yields: + str: A JDBC connection string for the temporary Oracle database. + Example format: + "jdbc:oracle:thin:system/oracle@localhost:{port}/XEPDB1" + + Raises: + oracledb.Error: If there's an error connecting to or interacting with + the Oracle database during setup. + Exception: Any other exception encountered during the setup process. + """ + with OracleTestContainer() as oracle: + engine = sqlalchemy.create_engine(oracle.get_connection_url()) + with engine.connect() as connection: + connection.execute( + sqlalchemy.text( + """ + CREATE TABLE tmp_table ( + value NUMBER PRIMARY KEY, + rank NUMBER + ) + """)) + connection.commit() + port = oracle.get_exposed_port(1521) + yield f"jdbc:oracle:thin:system/oracle@localhost:{port}/XEPDB1" + + +@contextlib.contextmanager +def temp_kafka_server(): + """Context manager to provide a temporary Kafka server for testing. + + This function utilizes the 'testcontainers' library to spin up a Kafka + instance within a Docker container. It then yields the bootstrap server + string, which can be used by Kafka clients to connect to this temporary + server. + + The Docker container and the Kafka instance are automatically managed + and torn down when the context manager exits. + + Yields: + str: The bootstrap server string for the temporary Kafka instance. + Example format: "localhost:XXXXX" or "PLAINTEXT://localhost:XXXXX" + + Raises: + Exception: If there's an error starting the Kafka container or + interacting with the temporary Kafka server. + """ + try: + with KafkaContainer() as kafka_container: + yield kafka_container.get_bootstrap_server() + except Exception as err: + logging.error("Error interacting with temporary Kakfa Server: %s", err) + raise err + + +@contextlib.contextmanager +def temp_pubsub_emulator(project_id="apache-beam-testing"): + """ + Context manager to provide a temporary Pub/Sub emulator for testing. + + This function uses 'testcontainers' to spin up a Google Cloud SDK + container running the Pub/Sub emulator. It yields the emulator host + string (e.g., "localhost:xxxxx") that can be used to configure Pub/Sub + clients. + + The Docker container is automatically managed and torn down when the + context manager exits. + + Args: + project_id (str): The GCP project ID to be used by the emulator. + This doesn't need to be a real project for the emulator. + + Yields: + str: The host and port for the Pub/Sub emulator (e.g., "localhost:xxxx"). + This will be the address to point your Pub/Sub client to. + + Raises: + Exception: If the container fails to start or the emulator isn't ready. + """ + with PubSubContainer(project=project_id) as pubsub_container: + publisher = pubsub_v1.PublisherClient() + random_front_charactor = random.choice(string.ascii_lowercase) + topic_id = f"{random_front_charactor}{uuid.uuid4().hex[:8]}" + topic_name_to_create = \ + f"projects/{pubsub_container.project}/topics/{topic_id}" + created_topic_object = publisher.create_topic(name=topic_name_to_create) + yield created_topic_object.name + + def replace_recursive(spec, vars): if isinstance(spec, dict): return { @@ -359,7 +489,10 @@ def replace_recursive(spec, vars): } elif isinstance(spec, list): return [replace_recursive(value, vars) for value in spec] - elif isinstance(spec, str) and '{' in spec and '{\n' not in spec: + # TODO(https://github.com/apache/beam/issues/35067): Consider checking for + # callable in the if branch above instead of checking lambda here. + elif isinstance( + spec, str) and '{' in spec and '{\n' not in spec and 'lambda' not in spec: try: return spec.format(**vars) except Exception as exn: diff --git a/sdks/python/apache_beam/yaml/tests/kafka.yaml b/sdks/python/apache_beam/yaml/tests/kafka.yaml new file mode 100644 index 000000000000..c2e3c8657ee2 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/kafka.yaml @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_BOOTSTAP_SERVER + type: "apache_beam.yaml.integration_tests.temp_kafka_server" + +pipelines: + # Kafka write pipeline + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {value: 123} + - {value: 456} + - {value: 789} + - type: MapToFields + config: + language: python + fields: + value: + callable: | + lambda row: str(row.value).encode('utf-8') + output_type: bytes + - type: WriteToKafka + config: + format: "RAW" + topic: "silly_topic" + bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}" + producer_config_updates: + linger.ms: "0" + + # Kafka read pipeline + # Need a separate read pipeline to make sure the write pipeline is flushed + - pipeline: + type: chain + transforms: + - type: ReadFromKafka + config: + format: "RAW" + topic: "silly_topic" + bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}" + consumer_config: + auto.offset.reset: "earliest" + group.id: "yaml-kafka-test-group" + max_read_time_seconds: 10 # will read forever if not set + - type: MapToFields + config: + language: python + fields: + value: + callable: | + # Kafka RAW format reads messages as bytes in the 'payload' field of a Row + lambda row: row.payload.decode('utf-8') + output_type: string + - type: AssertEqual + config: + elements: + - {value: "123"} + - {value: "456"} + - {value: "789"} + +# TODO: Error handling hard to trigger upon initial investigations. Need to +# investigate more. diff --git a/sdks/python/apache_beam/yaml/tests/oracle.yaml b/sdks/python/apache_beam/yaml/tests/oracle.yaml new file mode 100644 index 000000000000..01d23989d166 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/oracle.yaml @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_DB + type: "apache_beam.yaml.integration_tests.temp_oracle_database" + +pipelines: + # Oracle write pipeline + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + - type: WriteToOracle + config: + url: "{TEMP_DB}" + query: "INSERT INTO tmp_table (value, rank) VALUES(?,?)" + + # Oracle read pipeline + # Need a separate read pipeline to make sure the write pipeline is flushed + - pipeline: + type: chain + transforms: + - type: ReadFromOracle + config: + url: "{TEMP_DB}" + query: "SELECT * FROM tmp_table" + driver_class_name: "oracle.jdbc.OracleDriver" + - type: MapToFields + config: + language: python + fields: + value: + callable: "lambda x: int(x.VALUE)" + output_type: integer + rank: + callable: "lambda x: int(x.RANK)" + output_type: integer + - type: AssertEqual + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} diff --git a/sdks/python/apache_beam/yaml/tests/pubsub.yaml b/sdks/python/apache_beam/yaml/tests/pubsub.yaml new file mode 100644 index 000000000000..41f739ac77e0 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/pubsub.yaml @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: PS_TOPIC + type: "apache_beam.yaml.integration_tests.temp_pubsub_emulator" + config: + project_id: "apache-beam-testing" + +pipelines: + # Pubsub write pipeline + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {value: "11a"} + - {value: "37a"} + - {value: "389a"} + - type: WriteToPubSub + config: + topic: "{PS_TOPIC}" + format: "RAW" + + options: + streaming: true + + +# TODO: Current PubSubIO doesn't have a max_read_time_seconds parameter like +# Kafka does. Without it, the ReadFromPubSub will run forever. This is not a +# trival change. For now, we will live with the mocked tests located +# [here](https://github.com/apache/beam/blob/bea04446b41c86856c24d0a9761622092ed9936f/sdks/python/apache_beam/yaml/yaml_io_test.py#L83). + + # - pipeline: + # type: chain + # transforms: + # - type: ReadFromPubSub + # config: + # topic: "{PS_TOPIC}" + # format: "RAW" + # # ... + + + # options: + # streaming: true + diff --git a/sdks/python/apache_beam/yaml/tests/runinference.yaml b/sdks/python/apache_beam/yaml/tests/runinference.yaml new file mode 100644 index 000000000000..f87ae4b44acf --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/runinference.yaml @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pipelines: + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - question: "What is a car?" + - question: "Where is the Eiffel Tower located?" + - type: RunInference + config: + model_handler: + type: "VertexAIModelHandlerJSON" + config: + endpoint_id: 9157860935048626176 + project: "apache-beam-testing" + location: "us-central1" + preprocess: + callable: 'lambda x: {"prompt": x.question}' + private: false + - type: PyTransform + name: TransformInferenceBeamRowsToSimpleSchemaRows + config: + constructor: __callable__ + kwargs: + source: | + import apache_beam as beam + def simplify_elements(pcoll): + return pcoll | beam.Map(lambda row: beam.Row(dummy_field=1)) + - type: Sql + name: CountRows + config: + query: "SELECT COUNT(*) AS actual_inference_count FROM PCOLLECTION" + - type: AssertEqual + config: + elements: + - {actual_inference_count: 2} + + options: + yaml_experimental_features: ['ML'] \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/yaml_ml.py b/sdks/python/apache_beam/yaml/yaml_ml.py index dcb4c507776d..a2c7a19563bc 100644 --- a/sdks/python/apache_beam/yaml/yaml_ml.py +++ b/sdks/python/apache_beam/yaml/yaml_ml.py @@ -141,8 +141,7 @@ def __init__( private: bool = False, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, - max_batch_duration_secs: Optional[int] = None, - env_vars: Optional[dict[str, Any]] = None): + max_batch_duration_secs: Optional[int] = None): """ ModelHandler for Vertex AI. @@ -214,7 +213,6 @@ def __init__( inputs. max_batch_duration_secs: The maximum amount of time to buffer a batch before emitting; used in streaming contexts. - env_vars: Environment variables. """ try: @@ -233,8 +231,7 @@ def __init__( private=private, min_batch_size=min_batch_size, max_batch_size=max_batch_size, - max_batch_duration_secs=max_batch_duration_secs, - env_vars=env_vars or {}) + max_batch_duration_secs=max_batch_duration_secs) super().__init__(_handler, preprocess, postprocess) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index f19861fa0dd2..d6c4088c1be4 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -424,7 +424,7 @@ def get_portability_package_data(): 'sqlalchemy>=1.3,<3.0', 'psycopg2-binary>=2.8.5,<2.9.10; python_version <= "3.9"', 'psycopg2-binary>=2.8.5,<3.0; python_version >= "3.10"', - 'testcontainers[mysql]>=3.0.3,<4.0.0', + 'testcontainers[mysql,kafka]>=3.0.3,<4.0.0', 'cryptography>=41.0.2', 'hypothesis>5.0.0,<7.0.0', 'virtualenv-clone>=0.5,<1.0',