diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 85e5b07a121e..7c0f22675daf 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -29,7 +29,6 @@ import ( "os/exec" "os/signal" "path/filepath" - "regexp" "slices" "strings" "sync" @@ -47,8 +46,6 @@ import ( ) var ( - acceptableWhlSpecs []string - // SetupOnly option is used to invoke the boot sequence to only process the provided artifacts and builds new dependency pre-cached images. setupOnly = flag.Bool("setup_only", false, "Execute boot program in setup only mode (optional).") artifacts = flag.String("artifacts", "", "Path to artifacts metadata file used in setup only mode (optional).") @@ -406,37 +403,15 @@ func setupVenv(ctx context.Context, logger *tools.Logger, baseDir, workerId stri return dir, nil } -// setupAcceptableWheelSpecs setup wheel specs according to installed python version -func setupAcceptableWheelSpecs() error { - cmd := exec.Command("python", "-V") - stdoutStderr, err := cmd.CombinedOutput() - if err != nil { - return err - } - re := regexp.MustCompile(`Python (\d)\.(\d+).*`) - pyVersions := re.FindStringSubmatch(string(stdoutStderr[:])) - if len(pyVersions) != 3 { - return fmt.Errorf("cannot get parse Python version from %s", stdoutStderr) - } - pyVersion := fmt.Sprintf("%s%s", pyVersions[1], pyVersions[2]) - wheelName := fmt.Sprintf("cp%s-cp%s-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", pyVersion, pyVersion) - acceptableWhlSpecs = append(acceptableWhlSpecs, wheelName) - return nil -} - // installSetupPackages installs Beam SDK and user dependencies. func installSetupPackages(ctx context.Context, logger *tools.Logger, files []string, workDir string, requirementsFiles []string) error { bufLogger := tools.NewBufferedLogger(logger) bufLogger.Printf(ctx, "Installing setup packages ...") - if err := setupAcceptableWheelSpecs(); err != nil { - bufLogger.Printf(ctx, "Failed to setup acceptable wheel specs, leave it as empty: %v", err) - } - // Install the Dataflow Python SDK if one was staged. In released // container images, SDK is already installed, but can be overriden // using the --sdk_location pipeline option. - if err := installSdk(ctx, logger, files, workDir, sdkSrcFile, acceptableWhlSpecs, false); err != nil { + if err := installSdk(ctx, logger, files, workDir, sdkSrcFile, false); err != nil { return fmt.Errorf("failed to install SDK: %v", err) } pkgName := "apache-beam" diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go index 1faf8421a02d..2024c16dde50 100644 --- a/sdks/python/container/piputil.go +++ b/sdks/python/container/piputil.go @@ -24,6 +24,7 @@ import ( "os" "os/exec" "path/filepath" + "regexp" "strings" "time" @@ -191,16 +192,34 @@ func installExtraPackages(ctx context.Context, logger *tools.Logger, files []str return nil } -func findBeamSdkWhl(ctx context.Context, logger *tools.Logger, files []string, acceptableWhlSpecs []string) string { +// getPythonVersionSpec returns the Python version specifier (e.g., "310" for Python 3.10) +func getPythonVersionSpec() (string, error) { + cmd := exec.Command("python", "-V") + stdoutStderr, err := cmd.CombinedOutput() + if err != nil { + return "", err + } + re := regexp.MustCompile(`Python (\d)\.(\d+).*`) + pyVersions := re.FindStringSubmatch(string(stdoutStderr[:])) + if len(pyVersions) != 3 { + return "", fmt.Errorf("cannot parse Python version from %s", stdoutStderr) + } + return fmt.Sprintf("%s%s", pyVersions[1], pyVersions[2]), nil +} + +func findBeamSdkWhl(ctx context.Context, logger *tools.Logger, files []string) string { bufLogger := tools.NewBufferedLoggerWithFlushInterval(ctx, logger, pipLogFlushInterval) + + pyVersionSpec, err := getPythonVersionSpec() + if err != nil { + bufLogger.Printf(ctx, "Failed to get Python version specifier: %v", err) + return "" + } + for _, file := range files { - if strings.HasPrefix(file, "apache_beam") { - for _, s := range acceptableWhlSpecs { - if strings.HasSuffix(file, s) { - bufLogger.Printf(ctx, "Found Apache Beam SDK wheel: %v", file) - return file - } - } + if strings.HasPrefix(file, "apache_beam") && strings.HasSuffix(file, ".whl") && strings.Contains(file, "cp"+pyVersionSpec) { + bufLogger.Printf(ctx, "Found Apache Beam SDK wheel: %v", file) + return file } } return "" @@ -211,8 +230,8 @@ func findBeamSdkWhl(ctx context.Context, logger *tools.Logger, files []string, a // assume that the pipleine was started with the Beam SDK found in the wheel // file, and we try to install it. If not successful, we fall back to installing // SDK from source tarball provided in sdkSrcFile. -func installSdk(ctx context.Context, logger *tools.Logger, files []string, workDir string, sdkSrcFile string, acceptableWhlSpecs []string, required bool) error { - sdkWhlFile := findBeamSdkWhl(ctx, logger, files, acceptableWhlSpecs) +func installSdk(ctx context.Context, logger *tools.Logger, files []string, workDir string, sdkSrcFile string, required bool) error { + sdkWhlFile := findBeamSdkWhl(ctx, logger, files) bufLogger := tools.NewBufferedLoggerWithFlushInterval(ctx, logger, pipLogFlushInterval) if sdkWhlFile != "" { // by default, pip rejects to install wheel if same version already installed