Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
cdd1f6f
.github+sdks+website: update docs and add exmples for CloudSQL handler
mohamedawnallah Jun 29, 2025
5541f75
Update website/www/site/content/en/documentation/transforms/python/el…
mohamedawnallah Aug 15, 2025
5727fd9
Merge remote-tracking branch 'upstream/master' into updateDocsAndAddE…
mohamedawnallah Aug 28, 2025
ca7e09f
sdks/python: fix issue regards generic binding parameteres in CloudSQL
mohamedawnallah Aug 28, 2025
9fd5fe1
sdks/python: use binding parameters instead of `{}`
mohamedawnallah Aug 28, 2025
99fe5d7
CHANGES.md: update release notes
mohamedawnallah Aug 28, 2025
c7c0146
Merge remote-tracking branch 'upstream/master' into updateDocsAndAddE…
mohamedawnallah Sep 5, 2025
af8e4fe
website: update beam version
mohamedawnallah Sep 5, 2025
91bcc72
sdks/python: add `ALLOYDB_PASSWORD` to `tox.ini`
mohamedawnallah Sep 8, 2025
bf42c00
Merge remote-tracking branch 'upstream/master' into updateDocsAndAddE…
mohamedawnallah Sep 8, 2025
34eb695
sdks/python: fix unbounded local variable
mohamedawnallah Sep 9, 2025
d699f04
CHANGES.md: fix white space issue
mohamedawnallah Sep 9, 2025
e9897bd
sdks/python: make table_id globally unique in `enrichment_test`
mohamedawnallah Sep 9, 2025
73a8c5e
sdks/python: fix data type issue
mohamedawnallah Sep 9, 2025
c795ea4
sdks/python: enforce CloudSQL tests to run only on py transforms flow
mohamedawnallah Sep 9, 2025
d78f573
sdks/python: remove `uses_testcontainer` pytest marker from CloudSQL
mohamedawnallah Sep 9, 2025
c59148f
Merge remote-tracking branch 'upstream/master' into updateDocsAndAddE…
mohamedawnallah Sep 9, 2025
a53de92
sdks/python: skip google cloudsql tests unless `ALLOYDB_PASSWORD` found
mohamedawnallah Sep 9, 2025
286ceba
workflows: remove `ALLOYDB_PASSWORD` from beam precommit python coverage
mohamedawnallah Sep 9, 2025
5ea33c9
sdks/python: fix duplicate data issue
mohamedawnallah Sep 9, 2025
8cd248c
sdks/python: fix linting
mohamedawnallah Sep 9, 2025
b12c72e
sdks/python: reorder table drop approach
mohamedawnallah Sep 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/beam_PreCommit_Python_Coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ env:
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
HF_INFERENCE_TOKEN: ${{ secrets.HF_INFERENCE_TOKEN }}
ALLOYDB_PASSWORD: ${{ secrets.ALLOYDB_PASSWORD }}


jobs:
Expand Down Expand Up @@ -113,7 +112,7 @@ jobs:
TESTCONTAINERS_HOST_OVERRIDE: ${{ contains(matrix.os, 'self-hosted') && env.DIND_IP || '' }}
TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE: "/var/run/docker.sock"
TESTCONTAINERS_RYUK_DISABLED: "false"
TESTCONTAINERS_RYUK_CONTAINER_PRIVILEGED: "true"
TESTCONTAINERS_RYUK_CONTAINER_PRIVILEGED: "true"
PYTEST_ADDOPTS: "-v --tb=short --maxfail=3 --durations=20 --reruns=2 --reruns-delay=5"
TC_TIMEOUT: "120"
TC_MAX_TRIES: "120"
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Python examples added for CloudSQL enrichment handler on [Beam website](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment-cloudsql/) (Python) ([#35473](https://github.com/apache/beam/issues/36095)).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,214 @@ def enrichment_with_vertex_ai_legacy():
| "Enrich W/ Vertex AI" >> Enrichment(vertex_ai_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_vertex_ai_legacy]


def enrichment_with_google_cloudsql_pg():
# [START enrichment_with_google_cloudsql_pg]
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.cloudsql import (
CloudSQLEnrichmentHandler,
DatabaseTypeAdapter,
TableFieldsQueryConfig,
CloudSQLConnectionConfig)
import os

database_adapter = DatabaseTypeAdapter.POSTGRESQL
database_uri = os.environ.get("GOOGLE_CLOUD_SQL_DB_URI")
database_user = os.environ.get("GOOGLE_CLOUD_SQL_DB_USER")
database_password = os.environ.get("GOOGLE_CLOUD_SQL_DB_PASSWORD")
database_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_ID")
table_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_TABLE_ID")
where_clause_template = "product_id = :pid"
where_clause_fields = ["product_id"]

data = [
beam.Row(product_id=1, name='A'),
beam.Row(product_id=2, name='B'),
beam.Row(product_id=3, name='C'),
]

connection_config = CloudSQLConnectionConfig(
db_adapter=database_adapter,
instance_connection_uri=database_uri,
user=database_user,
password=database_password,
db_id=database_id)

query_config = TableFieldsQueryConfig(
table_id=table_id,
where_clause_template=where_clause_template,
where_clause_fields=where_clause_fields)

cloudsql_handler = CloudSQLEnrichmentHandler(
connection_config=connection_config,
table_id=table_id,
query_config=query_config)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
|
"Enrich W/ Google CloudSQL PostgreSQL" >> Enrichment(cloudsql_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_google_cloudsql_pg]


def enrichment_with_external_pg():
# [START enrichment_with_external_pg]
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.cloudsql import (
CloudSQLEnrichmentHandler,
DatabaseTypeAdapter,
TableFieldsQueryConfig,
ExternalSQLDBConnectionConfig)
import os

database_adapter = DatabaseTypeAdapter.POSTGRESQL
database_host = os.environ.get("EXTERNAL_SQL_DB_HOST")
database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT"))
database_user = os.environ.get("EXTERNAL_SQL_DB_USER")
database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD")
database_id = os.environ.get("EXTERNAL_SQL_DB_ID")
table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID")
where_clause_template = "product_id = :pid"
where_clause_fields = ["product_id"]

data = [
beam.Row(product_id=1, name='A'),
beam.Row(product_id=2, name='B'),
beam.Row(product_id=3, name='C'),
]

connection_config = ExternalSQLDBConnectionConfig(
db_adapter=database_adapter,
host=database_host,
port=database_port,
user=database_user,
password=database_password,
db_id=database_id)

query_config = TableFieldsQueryConfig(
table_id=table_id,
where_clause_template=where_clause_template,
where_clause_fields=where_clause_fields)

cloudsql_handler = CloudSQLEnrichmentHandler(
connection_config=connection_config,
table_id=table_id,
query_config=query_config)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
| "Enrich W/ Unmanaged PostgreSQL" >> Enrichment(cloudsql_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_external_pg]


def enrichment_with_external_mysql():
# [START enrichment_with_external_mysql]
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.cloudsql import (
CloudSQLEnrichmentHandler,
DatabaseTypeAdapter,
TableFieldsQueryConfig,
ExternalSQLDBConnectionConfig)
import os

database_adapter = DatabaseTypeAdapter.MYSQL
database_host = os.environ.get("EXTERNAL_SQL_DB_HOST")
database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT"))
database_user = os.environ.get("EXTERNAL_SQL_DB_USER")
database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD")
database_id = os.environ.get("EXTERNAL_SQL_DB_ID")
table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID")
where_clause_template = "product_id = :pid"
where_clause_fields = ["product_id"]

data = [
beam.Row(product_id=1, name='A'),
beam.Row(product_id=2, name='B'),
beam.Row(product_id=3, name='C'),
]

connection_config = ExternalSQLDBConnectionConfig(
db_adapter=database_adapter,
host=database_host,
port=database_port,
user=database_user,
password=database_password,
db_id=database_id)

query_config = TableFieldsQueryConfig(
table_id=table_id,
where_clause_template=where_clause_template,
where_clause_fields=where_clause_fields)

cloudsql_handler = CloudSQLEnrichmentHandler(
connection_config=connection_config,
table_id=table_id,
query_config=query_config)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
| "Enrich W/ Unmanaged MySQL" >> Enrichment(cloudsql_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_external_mysql]


def enrichment_with_external_sqlserver():
# [START enrichment_with_external_sqlserver]
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.cloudsql import (
CloudSQLEnrichmentHandler,
DatabaseTypeAdapter,
TableFieldsQueryConfig,
ExternalSQLDBConnectionConfig)
import os

database_adapter = DatabaseTypeAdapter.SQLSERVER
database_host = os.environ.get("EXTERNAL_SQL_DB_HOST")
database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT"))
database_user = os.environ.get("EXTERNAL_SQL_DB_USER")
database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD")
database_id = os.environ.get("EXTERNAL_SQL_DB_ID")
table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID")
where_clause_template = "product_id = :pid"
where_clause_fields = ["product_id"]

data = [
beam.Row(product_id=1, name='A'),
beam.Row(product_id=2, name='B'),
beam.Row(product_id=3, name='C'),
]

connection_config = ExternalSQLDBConnectionConfig(
db_adapter=database_adapter,
host=database_host,
port=database_port,
user=database_user,
password=database_password,
db_id=database_id)

query_config = TableFieldsQueryConfig(
table_id=table_id,
where_clause_template=where_clause_template,
where_clause_fields=where_clause_fields)

cloudsql_handler = CloudSQLEnrichmentHandler(
connection_config=connection_config,
table_id=table_id,
query_config=query_config)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
| "Enrich W/ Unmanaged SQL Server" >> Enrichment(cloudsql_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_external_sqlserver]
Loading
Loading