diff --git a/CHANGES.md b/CHANGES.md index 1f92a23282bf..5499cb066476 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,11 +68,11 @@ ## New Features / Improvements -* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Added support for large pipeline options via a file (Python) ([#37370](https://github.com/apache/beam/issues/37370)). ## Breaking Changes -* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). +* The Python SDK container's `boot.go` now passes pipeline options through a file instead of the `PIPELINE_OPTIONS` environment variable. If a user pairs a new Python SDK container with an older SDK version (which does not support the file-based approach), the pipeline options will not be recognized and the pipeline will fail. Users must ensure their SDK and container versions are synchronized ([#37370](https://github.com/apache/beam/issues/37370)). ## Deprecations diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index e4dd6cc2121f..754a631eaf33 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -73,6 +73,10 @@ def _import_beam_plugins(plugins): def create_harness(environment, dry_run=False): """Creates SDK Fn Harness.""" + # Bootstrap log level to capture startup events until pipeline options are + # parsed and the actual log level is set. + logging.getLogger().setLevel(logging.INFO) + deferred_exception = None if 'LOGGING_API_SERVICE_DESCRIPTOR' in environment: try: @@ -93,8 +97,24 @@ def create_harness(environment, dry_run=False): else: fn_log_handler = None - pipeline_options_dict = _load_pipeline_options( - environment.get('PIPELINE_OPTIONS')) + options_json = environment.get('PIPELINE_OPTIONS') + + # We check if options are stored in the file. + if 'PIPELINE_OPTIONS_FILE' in environment: + options_file = environment['PIPELINE_OPTIONS_FILE'] + try: + with open(options_file, 'r') as f: + options_json = f.read() + _LOGGER.info('Load pipeline options from file: %s', options_file) + except Exception: + _LOGGER.error( + 'Failed to load pipeline options from file: %s', + options_file, + exc_info=True) + raise + + pipeline_options_dict = _load_pipeline_options(options_json) + default_log_level = _get_log_level_from_options_dict(pipeline_options_dict) logging.getLogger().setLevel(default_log_level) _set_log_level_overrides(pipeline_options_dict) @@ -239,6 +259,7 @@ def terminate_sdk_harness(): def _load_pipeline_options(options_json): + """Deserialize the pipeline options from a JSON string into a dictionary.""" if options_json is None: return {} options = json.loads(options_json) @@ -256,6 +277,8 @@ def _load_pipeline_options(options_json): def _parse_pipeline_options(options_json): + """Parses the pipeline options from a JSON string into a PipelineOptions + object.""" return PipelineOptions.from_dictionary(_load_pipeline_options(options_json)) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 572dbf011134..85e5b07a121e 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -259,7 +259,11 @@ func launchSDKProcess() error { // (3) Invoke python - os.Setenv("PIPELINE_OPTIONS", options) + // Write the JSON string of pipeline options into a file to prevent "argument list too long" error. + if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil { + logger.Fatalf(ctx, "Failed to load pipeline options to worker: %v", err) + } + os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir) os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String()) os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String())