diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py index 4f0516a1ea93..15cf46218e8e 100644 --- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py +++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py @@ -563,8 +563,11 @@ def _wordcount_minimal_test_preprocessor( return _wordcount_random_shuffler(test_spec, all_words, env) -@YamlExamplesTestSuite.register_test_preprocessor( - ['test_wordCountInclude_yaml', 'test_wordCountImport_yaml']) +@YamlExamplesTestSuite.register_test_preprocessor([ + 'test_wordCountInclude_yaml', + 'test_wordCountImport_yaml', + 'test_wordCountInheritance_yaml' +]) def _wordcount_jinja_test_preprocessor( test_spec: dict, expected: List[str], env: TestEnvironment): """ @@ -679,6 +682,7 @@ def _kafka_test_preprocessor( 'test_anomaly_scoring_yaml', 'test_wordCountInclude_yaml', 'test_wordCountImport_yaml', + 'test_wordCountInheritance_yaml', 'test_iceberg_to_alloydb_yaml' ]) def _io_write_test_preprocessor( @@ -1256,8 +1260,11 @@ def _batch_log_analysis_test_preprocessor( return test_spec -@YamlExamplesTestSuite.register_test_preprocessor( - ['test_wordCountInclude_yaml', 'test_wordCountImport_yaml']) +@YamlExamplesTestSuite.register_test_preprocessor([ + 'test_wordCountInclude_yaml', + 'test_wordCountImport_yaml', + 'test_wordCountInheritance_yaml' +]) def _jinja_preprocessor(raw_spec_string: str, test_name: str): """ Preprocessor for Jinja-based YAML tests. diff --git a/sdks/python/apache_beam/yaml/examples/testing/input_data.py b/sdks/python/apache_beam/yaml/examples/testing/input_data.py index fb468567355d..7fe9b5291e01 100644 --- a/sdks/python/apache_beam/yaml/examples/testing/input_data.py +++ b/sdks/python/apache_beam/yaml/examples/testing/input_data.py @@ -86,6 +86,11 @@ def word_count_jinja_template_data(test_name: str) -> list[str]: 'apache_beam/yaml/examples/transforms/jinja/' 'import/macros/wordCountMacros.yaml' ] + elif test_name == 'test_wordCountInheritance_yaml': + return [ + 'apache_beam/yaml/examples/transforms/jinja/' + 'inheritance/base/base_pipeline.yaml' + ] return [] diff --git a/sdks/python/apache_beam/yaml/examples/transforms/jinja/inheritance/README.md b/sdks/python/apache_beam/yaml/examples/transforms/jinja/inheritance/README.md new file mode 100644 index 000000000000..e22e54a56696 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/jinja/inheritance/README.md @@ -0,0 +1,77 @@ + + +# Jinja Inheritance Example + +This folder contains an example of how to use Jinja2 inheritance in Beam YAML pipelines. + +## Files + +* **base/base_pipeline.yaml**: A complete WordCount pipeline (Read -> Split -> Explode -> Combine -> MapToFields -> Write). It defines a block `extra_steps` between `Explode` and `MapToFields` to allow child pipelines to inject additional transforms. +* **wordCountInheritance.yaml**: Extends `base/base_pipeline.yaml` and injects a `Combine` transform into the `extra_steps` block to combine words. + +## Running the Example + +To run the child pipeline (which includes the inherited base pipeline logic + the new filter): + +General setup: +```sh +export PIPELINE_FILE=apache_beam/yaml/examples/transforms/jinja/inheritance/wordCountInheritance.yaml +export KINGLEAR="gs://dataflow-samples/shakespeare/kinglear.txt" +export TEMP_LOCATION="gs://MY-BUCKET/wordCounts/" +export PROJECT="MY-PROJECT" +export REGION="MY-REGION" + +cd /beam/sdks/python +``` + +Multiline Run Example: +```sh +python -m apache_beam.yaml.main \ + --project=${PROJECT} \ + --region=${REGION} \ + --yaml_pipeline_file="${PIPELINE_FILE}" \ + --jinja_variables='{ + "readFromTextTransform": {"path": "'"${KINGLEAR}"'"}, + "mapToFieldsSplitConfig": { + "language": "python", + "fields": { + "value": "1" + } + }, + "explodeTransform": {"fields": "word"}, + "combineTransform": { + "group_by": "word", + "combine": {"value": "sum"} + }, + "mapToFieldsCountConfig": { + "language": "python", + "fields": {"output": "word + \" - \" + str(value)"} + }, + "writeToTextTransform": {"path": "'"${TEMP_LOCATION}"'"} + }' +``` + +Single Line Run Example: +```sh +python -m apache_beam.yaml.main --project=${PROJECT} --region=${REGION} \ +--yaml_pipeline_file="${PIPELINE_FILE}" --jinja_variables='{"readFromTextTransform": +{"path": "'"${KINGLEAR}"'"}, "mapToFieldsSplitConfig": {"language": "python", "fields":{"value":"1"}}, "explodeTransform":{"fields":"word"}, "combineTransform":{"group_by":"word", "combine":{"value":"sum"}}, "mapToFieldsCountConfig":{"language": "python", "fields":{"output":"word + \" - \" + str(value)"}}, "writeToTextTransform":{"path":"'"${TEMP_LOCATION}"'"}}' +``` + diff --git a/sdks/python/apache_beam/yaml/examples/transforms/jinja/inheritance/base/base_pipeline.yaml b/sdks/python/apache_beam/yaml/examples/transforms/jinja/inheritance/base/base_pipeline.yaml new file mode 100644 index 000000000000..209646b894a5 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/jinja/inheritance/base/base_pipeline.yaml @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pipeline: + type: chain + transforms: + - type: ReadFromText + config: + path: {{readFromTextTransform.path}} + + - type: MapToFields + name: Split words + config: + language: python + fields: + word: + callable: |- + import re + def my_mapping(row): + return re.findall(r'[A-Za-z\']+', row.line.lower()) + value: {{mapToFieldsSplitConfig.fields.value}} + - type: Explode + config: + fields: + - {{explodeTransform.fields}} + + # Inheritance injection point: content added here by child pipelines will be executed + # after Explode and before MapToFields. +{% block extra_steps %} +{% endblock %} + + - type: MapToFields + name: Format output + config: + language: {{mapToFieldsCountConfig.language}} + fields: + output: {{mapToFieldsCountConfig.fields.output}} + - name: Write to GCS + type: WriteToText + config: + path: {{writeToTextTransform.path}} diff --git a/sdks/python/apache_beam/yaml/examples/transforms/jinja/inheritance/wordCountInheritance.yaml b/sdks/python/apache_beam/yaml/examples/transforms/jinja/inheritance/wordCountInheritance.yaml new file mode 100644 index 000000000000..ad9f44df7851 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/jinja/inheritance/wordCountInheritance.yaml @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +{% extends "apache_beam/yaml/examples/transforms/jinja/inheritance/base/base_pipeline.yaml" %} + +{% block extra_steps %} + - name: Count words + type: Combine + config: + group_by: + - {{combineTransform.group_by}} + combine: + value: {{combineTransform.combine.value}} +{% endblock %} + +# Expected: +# Row(output='king - 311') +# Row(output='lear - 253') +# Row(output='dramatis - 1') +# Row(output='personae - 1') +# Row(output='of - 483') +# Row(output='britain - 2') +# Row(output='france - 32') +# Row(output='duke - 26') +# Row(output='burgundy - 20') +# Row(output='cornwall - 75') \ No newline at end of file