From bc4115b1996f3a9f05b694cdb2ad7d74ea5dafd1 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 3 Feb 2026 14:04:08 -0500 Subject: [PATCH 1/4] Move GetExperiments to a common place for other boot.go --- sdks/go/container/tools/pipeline_options.go | 33 +++++++++++++ .../container/tools/pipeline_options_test.go | 48 +++++++++++++++++++ sdks/python/container/boot.go | 33 +------------ 3 files changed, 82 insertions(+), 32 deletions(-) diff --git a/sdks/go/container/tools/pipeline_options.go b/sdks/go/container/tools/pipeline_options.go index 026fb31b0991..d16e91c9cd74 100644 --- a/sdks/go/container/tools/pipeline_options.go +++ b/sdks/go/container/tools/pipeline_options.go @@ -42,3 +42,36 @@ func MakePipelineOptionsFileAndEnvVar(options string) error { os.Setenv("PIPELINE_OPTIONS_FILE", f.Name()) return nil } + +type PipelineOptionsData struct { + Options OptionsData `json:"options"` +} + +type OptionsData struct { + Experiments []string `json:"experiments"` +} + +// GetExperiments extracts a string array from the options string (in JSON format) +// +// The json string of pipeline options is in the following format. +// We only focus on experiments here. +// +// { +// "display_data": [ +// {...}, +// ], +// "options": { +// ... +// "experiments": [ +// ... +// ], +// } +// } +func GetExperiments(options string) []string { + var opts PipelineOptionsData + err := json.Unmarshal([]byte(options), &opts) + if err != nil { + return nil + } + return opts.Options.Experiments +} diff --git a/sdks/go/container/tools/pipeline_options_test.go b/sdks/go/container/tools/pipeline_options_test.go index 7a0d7ebd5f09..f939ba293200 100644 --- a/sdks/go/container/tools/pipeline_options_test.go +++ b/sdks/go/container/tools/pipeline_options_test.go @@ -56,3 +56,51 @@ func TestMakePipelineOptionsFileAndEnvVar(t *testing.T) { } os.Remove("pipeline_options.json") } + +func TestGetExperiments(t *testing.T) { + tests := []struct { + name string + inputOptions string + expectedExps []string + }{ + { + "no experiments", + `{"options": {"a": "b"}}`, + nil, + }, + { + "valid experiments", + `{"options": {"experiments": ["a", "b"]}}`, + []string{"a", "b"}, + }, + { + "empty experiments", + `{"options": {"experiments": []}}`, + []string{}, + }, + { + "invalid json", + `{options: {"experiments": []}}`, + nil, + }, + { + "empty string", + "", + nil, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + exps := GetExperiments(test.inputOptions) + if len(exps) != len(test.expectedExps) { + t.Errorf("got: %v, want: %v", exps, test.expectedExps) + } + for i, v := range exps { + if v != test.expectedExps[i] { + t.Errorf("got: %v, want: %v", exps, test.expectedExps) + } + } + }) + } +} diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 572dbf011134..d6a098dc01b2 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -117,37 +117,6 @@ func main() { } } -// The json string of pipeline options is in the following format. -// We only focus on experiments here. -// -// { -// "display_data": [ -// {...}, -// ], -// "options": { -// ... -// "experiments": [ -// ... -// ], -// } -// } -type PipelineOptionsData struct { - Options OptionsData `json:"options"` -} - -type OptionsData struct { - Experiments []string `json:"experiments"` -} - -func getExperiments(options string) []string { - var opts PipelineOptionsData - err := json.Unmarshal([]byte(options), &opts) - if err != nil { - return nil - } - return opts.Options.Experiments -} - func launchSDKProcess() error { ctx := grpcx.WriteWorkerID(context.Background(), *id) @@ -187,7 +156,7 @@ func launchSDKProcess() error { logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } - experiments := getExperiments(options) + experiments := tools.GetExperiments(options) pipNoBuildIsolation = false if slices.Contains(experiments, "pip_no_build_isolation") { pipNoBuildIsolation = true From e23342402c76c1ad382b0bfd4fb1c4a861540371 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 3 Feb 2026 14:55:39 -0500 Subject: [PATCH 2/4] Support urn format in pipeline options --- sdks/go/container/tools/pipeline_options.go | 46 +++++++++++++------ .../container/tools/pipeline_options_test.go | 12 ++++- 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/sdks/go/container/tools/pipeline_options.go b/sdks/go/container/tools/pipeline_options.go index d16e91c9cd74..1ce3e9378e4c 100644 --- a/sdks/go/container/tools/pipeline_options.go +++ b/sdks/go/container/tools/pipeline_options.go @@ -44,28 +44,38 @@ func MakePipelineOptionsFileAndEnvVar(options string) error { } type PipelineOptionsData struct { - Options OptionsData `json:"options"` + Options LegacyOptionsData `json:"options"` + Experiments []string `json:"beam:option:experiments:v1"` } -type OptionsData struct { +type LegacyOptionsData struct { Experiments []string `json:"experiments"` } // GetExperiments extracts a string array from the options string (in JSON format) // -// The json string of pipeline options is in the following format. -// We only focus on experiments here. +// The json string of pipeline options can be in two formats. +// +// Legacy format: // // { -// "display_data": [ -// {...}, -// ], -// "options": { -// ... -// "experiments": [ +// "display_data": [ +// {...}, +// ], +// "options": { +// ... +// "experiments": [ // ... -// ], -// } +// ], +// } +// } +// +// URN format: +// +// { +// "beam:option:experiments:v1": [ +// ... +// ] // } func GetExperiments(options string) []string { var opts PipelineOptionsData @@ -73,5 +83,15 @@ func GetExperiments(options string) []string { if err != nil { return nil } - return opts.Options.Experiments + + // Check the legacy experiments first + if len(opts.Options.Experiments) > 0 { + return opts.Options.Experiments + } + + if len(opts.Experiments) > 0 { + return opts.Experiments + } + + return nil } diff --git a/sdks/go/container/tools/pipeline_options_test.go b/sdks/go/container/tools/pipeline_options_test.go index f939ba293200..a1c4693183a3 100644 --- a/sdks/go/container/tools/pipeline_options_test.go +++ b/sdks/go/container/tools/pipeline_options_test.go @@ -69,10 +69,20 @@ func TestGetExperiments(t *testing.T) { nil, }, { - "valid experiments", + "valid legacy experiments", `{"options": {"experiments": ["a", "b"]}}`, []string{"a", "b"}, }, + { + "valid urn experiments", + `{"beam:option:experiments:v1": ["a", "b"]}`, + []string{"a", "b"}, + }, + { + "valid legacy and urn experiments; legacy first", + `{"options": {"experiments": ["c", "d"]}, "beam:option:experiments:v1": ["a", "b"]}`, + []string{"c", "d"}, + }, { "empty experiments", `{"options": {"experiments": []}}`, From 2ce0a166bad74b29f49b2756646ca7fb7f92e056 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 3 Feb 2026 15:11:22 -0500 Subject: [PATCH 3/4] Simplify and cover some edge cases. --- sdks/go/container/tools/pipeline_options.go | 8 ++------ sdks/go/container/tools/pipeline_options_test.go | 12 +++++++++++- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/sdks/go/container/tools/pipeline_options.go b/sdks/go/container/tools/pipeline_options.go index 1ce3e9378e4c..ff74622de2b3 100644 --- a/sdks/go/container/tools/pipeline_options.go +++ b/sdks/go/container/tools/pipeline_options.go @@ -85,13 +85,9 @@ func GetExperiments(options string) []string { } // Check the legacy experiments first - if len(opts.Options.Experiments) > 0 { + if opts.Options.Experiments != nil { return opts.Options.Experiments } - if len(opts.Experiments) > 0 { - return opts.Experiments - } - - return nil + return opts.Experiments } diff --git a/sdks/go/container/tools/pipeline_options_test.go b/sdks/go/container/tools/pipeline_options_test.go index a1c4693183a3..7920eb0f35a2 100644 --- a/sdks/go/container/tools/pipeline_options_test.go +++ b/sdks/go/container/tools/pipeline_options_test.go @@ -84,10 +84,20 @@ func TestGetExperiments(t *testing.T) { []string{"c", "d"}, }, { - "empty experiments", + "valid legacy and urn experiments; legacy first, even if empty", + `{"options": {"experiments": []}, "beam:option:experiments:v1": ["a", "b"]}`, + []string{}, + }, + { + "empty legacy experiments", `{"options": {"experiments": []}}`, []string{}, }, + { + "empty urn experiments", + `{"beam:option:experiments:v1": []}`, + []string{}, + }, { "invalid json", `{options: {"experiments": []}}`, From bf20220366f6dad72afa6488296536074254f7ad Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 3 Feb 2026 15:31:25 -0500 Subject: [PATCH 4/4] Minor change on comments. --- sdks/go/container/tools/pipeline_options.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdks/go/container/tools/pipeline_options.go b/sdks/go/container/tools/pipeline_options.go index ff74622de2b3..9d5bd5894eec 100644 --- a/sdks/go/container/tools/pipeline_options.go +++ b/sdks/go/container/tools/pipeline_options.go @@ -52,11 +52,12 @@ type LegacyOptionsData struct { Experiments []string `json:"experiments"` } -// GetExperiments extracts a string array from the options string (in JSON format) +// GetExperiments extracts a string array of experiments from the pipeline +// options string (in JSON format) // -// The json string of pipeline options can be in two formats. +// The JSON string can be in two styles: // -// Legacy format: +// Legacy style: // // { // "display_data": [ @@ -70,7 +71,7 @@ type LegacyOptionsData struct { // } // } // -// URN format: +// URN style: // // { // "beam:option:experiments:v1": [ @@ -84,7 +85,7 @@ func GetExperiments(options string) []string { return nil } - // Check the legacy experiments first + // Check the legacy style experiments first if opts.Options.Experiments != nil { return opts.Options.Experiments }