Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 134 additions & 1 deletion sdks/python/apache_beam/yaml/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import itertools
import logging
import os
import random
import sqlite3
import string
import unittest
import uuid

Expand All @@ -33,6 +35,11 @@
import pytds
import sqlalchemy
import yaml
from google.cloud import pubsub_v1
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.google import PubSubContainer
from testcontainers.kafka import KafkaContainer
from testcontainers.mssql import SqlServerContainer
from testcontainers.mysql import MySqlContainer
from testcontainers.postgres import PostgresContainer
Expand Down Expand Up @@ -351,6 +358,129 @@ def temp_sqlserver_database():
raise err


class OracleTestContainer(DockerContainer):
"""
OracleTestContainer is an updated version of OracleDBContainer that goes
ahead and sets the oracle password, waits for logs to establish that the
container is ready before calling get_exposed_port, and uses a more modern
oracle driver.
"""
def __init__(self):
super().__init__("gvenzl/oracle-xe:21-slim")
self.with_env("ORACLE_PASSWORD", "oracle")
self.with_exposed_ports(1521)

def start(self):
super().start()
wait_for_logs(self, "DATABASE IS READY TO USE!", timeout=300)
return self

def get_connection_url(self):
port = self.get_exposed_port(1521)
return \
f"oracle+oracledb://system:oracle@localhost:{port}/?service_name=XEPDB1"


@contextlib.contextmanager
def temp_oracle_database():
"""Context manager to provide a temporary Oracle database for testing.

This function utilizes the 'testcontainers' library to spin up an
Oracle Database instance within a Docker container. It then connects
to this temporary database using 'oracledb', creates a predefined

NOTE: A custom OracleTestContainer class was created due to the current
version (OracleDBContainer) that calls get_exposed_port too soon causing the
service to hang until timeout.

Yields:
str: A JDBC connection string for the temporary Oracle database.
Example format:
"jdbc:oracle:thin:system/oracle@localhost:{port}/XEPDB1"

Raises:
oracledb.Error: If there's an error connecting to or interacting with
the Oracle database during setup.
Exception: Any other exception encountered during the setup process.
"""
with OracleTestContainer() as oracle:
engine = sqlalchemy.create_engine(oracle.get_connection_url())
with engine.connect() as connection:
connection.execute(
sqlalchemy.text(
"""
CREATE TABLE tmp_table (
value NUMBER PRIMARY KEY,
rank NUMBER
)
"""))
connection.commit()
port = oracle.get_exposed_port(1521)
yield f"jdbc:oracle:thin:system/oracle@localhost:{port}/XEPDB1"


@contextlib.contextmanager
def temp_kafka_server():
"""Context manager to provide a temporary Kafka server for testing.

This function utilizes the 'testcontainers' library to spin up a Kafka
instance within a Docker container. It then yields the bootstrap server
string, which can be used by Kafka clients to connect to this temporary
server.

The Docker container and the Kafka instance are automatically managed
and torn down when the context manager exits.

Yields:
str: The bootstrap server string for the temporary Kafka instance.
Example format: "localhost:XXXXX" or "PLAINTEXT://localhost:XXXXX"

Raises:
Exception: If there's an error starting the Kafka container or
interacting with the temporary Kafka server.
"""
try:
with KafkaContainer() as kafka_container:
yield kafka_container.get_bootstrap_server()
except Exception as err:
logging.error("Error interacting with temporary Kakfa Server: %s", err)
raise err


@contextlib.contextmanager
def temp_pubsub_emulator(project_id="apache-beam-testing"):
"""
Context manager to provide a temporary Pub/Sub emulator for testing.

This function uses 'testcontainers' to spin up a Google Cloud SDK
container running the Pub/Sub emulator. It yields the emulator host
string (e.g., "localhost:xxxxx") that can be used to configure Pub/Sub
clients.

The Docker container is automatically managed and torn down when the
context manager exits.

Args:
project_id (str): The GCP project ID to be used by the emulator.
This doesn't need to be a real project for the emulator.

Yields:
str: The host and port for the Pub/Sub emulator (e.g., "localhost:xxxx").
This will be the address to point your Pub/Sub client to.

Raises:
Exception: If the container fails to start or the emulator isn't ready.
"""
with PubSubContainer(project=project_id) as pubsub_container:
publisher = pubsub_v1.PublisherClient()
random_front_charactor = random.choice(string.ascii_lowercase)
topic_id = f"{random_front_charactor}{uuid.uuid4().hex[:8]}"
topic_name_to_create = \
f"projects/{pubsub_container.project}/topics/{topic_id}"
created_topic_object = publisher.create_topic(name=topic_name_to_create)
yield created_topic_object.name


def replace_recursive(spec, vars):
if isinstance(spec, dict):
return {
Expand All @@ -359,7 +489,10 @@ def replace_recursive(spec, vars):
}
elif isinstance(spec, list):
return [replace_recursive(value, vars) for value in spec]
elif isinstance(spec, str) and '{' in spec and '{\n' not in spec:
# TODO(https://github.com/apache/beam/issues/35067): Consider checking for
# callable in the if branch above instead of checking lambda here.
elif isinstance(
spec, str) and '{' in spec and '{\n' not in spec and 'lambda' not in spec:
try:
return spec.format(**vars)
except Exception as exn:
Expand Down
80 changes: 80 additions & 0 deletions sdks/python/apache_beam/yaml/tests/kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

fixtures:
- name: TEMP_BOOTSTAP_SERVER
type: "apache_beam.yaml.integration_tests.temp_kafka_server"

pipelines:
# Kafka write pipeline
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {value: 123}
- {value: 456}
- {value: 789}
- type: MapToFields
config:
language: python
fields:
value:
callable: |
lambda row: str(row.value).encode('utf-8')
output_type: bytes
- type: WriteToKafka
config:
format: "RAW"
topic: "silly_topic"
bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}"
producer_config_updates:
linger.ms: "0"

# Kafka read pipeline
# Need a separate read pipeline to make sure the write pipeline is flushed
- pipeline:
type: chain
transforms:
- type: ReadFromKafka
config:
format: "RAW"
topic: "silly_topic"
bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}"
consumer_config:
auto.offset.reset: "earliest"
group.id: "yaml-kafka-test-group"
max_read_time_seconds: 10 # will read forever if not set
- type: MapToFields
config:
language: python
fields:
value:
callable: |
# Kafka RAW format reads messages as bytes in the 'payload' field of a Row
lambda row: row.payload.decode('utf-8')
output_type: string
- type: AssertEqual
config:
elements:
- {value: "123"}
- {value: "456"}
- {value: "789"}

# TODO: Error handling hard to trigger upon initial investigations. Need to
# investigate more.
63 changes: 63 additions & 0 deletions sdks/python/apache_beam/yaml/tests/oracle.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

fixtures:
- name: TEMP_DB
type: "apache_beam.yaml.integration_tests.temp_oracle_database"

pipelines:
# Oracle write pipeline
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {value: 123, rank: 0}
- {value: 456, rank: 1}
- {value: 789, rank: 2}
- type: WriteToOracle
config:
url: "{TEMP_DB}"
query: "INSERT INTO tmp_table (value, rank) VALUES(?,?)"

# Oracle read pipeline
# Need a separate read pipeline to make sure the write pipeline is flushed
- pipeline:
type: chain
transforms:
- type: ReadFromOracle
config:
url: "{TEMP_DB}"
query: "SELECT * FROM tmp_table"
driver_class_name: "oracle.jdbc.OracleDriver"
- type: MapToFields
config:
language: python
fields:
value:
callable: "lambda x: int(x.VALUE)"
output_type: integer
rank:
callable: "lambda x: int(x.RANK)"
output_type: integer
- type: AssertEqual
config:
elements:
- {value: 123, rank: 0}
- {value: 456, rank: 1}
- {value: 789, rank: 2}
61 changes: 61 additions & 0 deletions sdks/python/apache_beam/yaml/tests/pubsub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

fixtures:
- name: PS_TOPIC
type: "apache_beam.yaml.integration_tests.temp_pubsub_emulator"
config:
project_id: "apache-beam-testing"

pipelines:
# Pubsub write pipeline
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {value: "11a"}
- {value: "37a"}
- {value: "389a"}
- type: WriteToPubSub
config:
topic: "{PS_TOPIC}"
format: "RAW"

options:
streaming: true


# TODO: Current PubSubIO doesn't have a max_read_time_seconds parameter like
# Kafka does. Without it, the ReadFromPubSub will run forever. This is not a
# trival change. For now, we will live with the mocked tests located
# [here](https://github.com/apache/beam/blob/bea04446b41c86856c24d0a9761622092ed9936f/sdks/python/apache_beam/yaml/yaml_io_test.py#L83).

# - pipeline:
# type: chain
# transforms:
# - type: ReadFromPubSub
# config:
# topic: "{PS_TOPIC}"
# format: "RAW"
# # ...


# options:
# streaming: true

Loading
Loading