From e0f37aebbba899e8643706b84a9441bc82c100bc Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Fri, 6 Feb 2026 16:08:39 +0500 Subject: [PATCH 1/6] Fix IndexError in processes.py when pip subprocess fails with short command - Add _pip_package_from_args() to safely get package string (avoids args[0][6] on short lists) - Guard pip branch with len(args[0]) > 2 before indexing - Use helper in call, check_call, and check_output - Add tests for short pip command path (no IndexError) --- sdks/python/apache_beam/utils/processes.py | 28 ++++++++++--- .../apache_beam/utils/processes_test.py | 39 +++++++++++++++++++ 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/utils/processes.py b/sdks/python/apache_beam/utils/processes.py index f6daecea2125..d7af08eeeb0b 100644 --- a/sdks/python/apache_beam/utils/processes.py +++ b/sdks/python/apache_beam/utils/processes.py @@ -44,6 +44,19 @@ else: + def _pip_package_from_args(args): + """Return a safe string for the package field in pip error messages. + + Avoids IndexError when the command list is shorter than 7 elements + (e.g. ['python', '-m', 'pip', 'install', 'pkg']). + """ + if not isinstance(args, tuple) or not args: + return "see output below" + cmd = args[0] + if not isinstance(cmd, (list, tuple)) or len(cmd) <= 6: + return "see output below" + return cmd[6] + def call(*args, **kwargs): if force_shell: kwargs['shell'] = True @@ -52,11 +65,12 @@ def call(*args, **kwargs): except OSError as e: raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: - if isinstance(args, tuple) and (args[0][2] == "pip"): + if isinstance(args, tuple) and len(args[0]) > 2 and args[0][2] == "pip": raise RuntimeError( \ "Full traceback: {}\n Pip install failed for package: {} \ \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), args[0][6], error. output)) from error + .format(traceback.format_exc(), + _pip_package_from_args(args), error.output)) from error else: raise RuntimeError("Full trace: {}\ \n Output of the failed child process: {} " \ @@ -71,11 +85,12 @@ def check_call(*args, **kwargs): except OSError as e: raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: - if isinstance(args, tuple) and (args[0][2] == "pip"): + if isinstance(args, tuple) and len(args[0]) > 2 and args[0][2] == "pip": raise RuntimeError( \ "Full traceback: {} \n Pip install failed for package: {} \ \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), args[0][6], error.output)) from error + .format(traceback.format_exc(), + _pip_package_from_args(args), error.output)) from error else: raise RuntimeError("Full trace: {} \ \n Output of the failed child process: {}" \ @@ -90,11 +105,12 @@ def check_output(*args, **kwargs): except OSError as e: raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: - if isinstance(args, tuple) and (args[0][2] == "pip"): + if isinstance(args, tuple) and len(args[0]) > 2 and args[0][2] == "pip": raise RuntimeError( \ "Full traceback: {} \n Pip install failed for package: {} \ \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), args[0][6], error.output)) from error + .format(traceback.format_exc(), + _pip_package_from_args(args), error.output)) from error else: raise RuntimeError("Full trace: {}, \ output of the failed child process {} "\ diff --git a/sdks/python/apache_beam/utils/processes_test.py b/sdks/python/apache_beam/utils/processes_test.py index 13425550dbbe..777d2948f2c8 100644 --- a/sdks/python/apache_beam/utils/processes_test.py +++ b/sdks/python/apache_beam/utils/processes_test.py @@ -131,6 +131,19 @@ def test_check_call_pip_install_non_existing_package(self): self.assertIn("Pip install failed for package: {}".format(package),\ error.args[0]) + def test_check_call_pip_short_command_no_index_error(self): + """Short pip command (e.g. pip install pkg) must not raise IndexError.""" + returncode = 1 + cmd = ['python', '-m', 'pip', 'install', 'nonexistent-package-xyz'] + output = "ERROR: Could not find a version that satisfies" + self.mock_get.side_effect = subprocess.CalledProcessError( + returncode, cmd, output=output) + with self.assertRaises(RuntimeError) as ctx: + processes.check_call(cmd) + self.assertIn("Output from execution of subprocess:", ctx.exception.args[0]) + self.assertIn(output, ctx.exception.args[0]) + self.assertIn("see output below", ctx.exception.args[0]) + class TestErrorHandlingCheckOutput(unittest.TestCase): @classmethod @@ -172,6 +185,19 @@ def test_check_output_pip_install_non_existing_package(self): self.assertIn("Pip install failed for package: {}".format(package),\ error.args[0]) + def test_check_output_pip_short_command_no_index_error(self): + """Short pip command must not raise IndexError.""" + returncode = 1 + cmd = ['python', '-m', 'pip', 'install', 'nonexistent-package-xyz'] + output = "ERROR: Could not find a version" + self.mock_get.side_effect = subprocess.CalledProcessError( + returncode, cmd, output=output) + with self.assertRaises(RuntimeError) as ctx: + processes.check_output(cmd) + self.assertIn("Output from execution of subprocess:", ctx.exception.args[0]) + self.assertIn(output, ctx.exception.args[0]) + self.assertIn("see output below", ctx.exception.args[0]) + class TestErrorHandlingCall(unittest.TestCase): @classmethod @@ -213,6 +239,19 @@ def test_check_output_pip_install_non_existing_package(self): self.assertIn("Pip install failed for package: {}".format(package),\ error.args[0]) + def test_call_pip_short_command_no_index_error(self): + """Short pip command must not raise IndexError.""" + returncode = 1 + cmd = ['python', '-m', 'pip', 'install', 'nonexistent-package-xyz'] + output = "ERROR: Could not find a version" + self.mock_get.side_effect = subprocess.CalledProcessError( + returncode, cmd, output=output) + with self.assertRaises(RuntimeError) as ctx: + processes.call(cmd) + self.assertIn("Output from execution of subprocess:", ctx.exception.args[0]) + self.assertIn(output, ctx.exception.args[0]) + self.assertIn("see output below", ctx.exception.args[0]) + if __name__ == '__main__': unittest.main() From 3a655ed4523f6092ce0094af1ec1143411249663 Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Fri, 6 Feb 2026 16:17:38 +0500 Subject: [PATCH 2/6] Update CHANGES.md for processes pip IndexError fix --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index f4a04320d66c..7c971698913d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -83,6 +83,7 @@ ## Bugfixes +* Fixed IndexError in `apache_beam.utils.processes` when pip subprocess fails with short command (e.g. `pip install pkg`) (Python) ([#37515](https://github.com/apache/beam/issues/37515)). * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Security Fixes From 1645f97a12b94758a86990c935ad0cd34100a3f5 Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Fri, 6 Feb 2026 17:25:35 +0500 Subject: [PATCH 3/6] Mark MultiProcessSharedTest with no_xdist to fix CI setup failures --- sdks/python/apache_beam/utils/multi_process_shared_test.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/utils/multi_process_shared_test.py b/sdks/python/apache_beam/utils/multi_process_shared_test.py index 7b2b11857bfd..3ae0e7b2a92d 100644 --- a/sdks/python/apache_beam/utils/multi_process_shared_test.py +++ b/sdks/python/apache_beam/utils/multi_process_shared_test.py @@ -24,6 +24,8 @@ import unittest from typing import Any +import pytest + from apache_beam.utils import multi_process_shared @@ -85,6 +87,7 @@ def __getattribute__(self, __name: str) -> Any: return object.__getattribute__(self, __name) +@pytest.mark.no_xdist class MultiProcessSharedTest(unittest.TestCase): @classmethod def setUpClass(cls): From 337a137ab5502ae5a43cb75b80771e63bdc0ff64 Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Thu, 19 Feb 2026 11:16:27 +0500 Subject: [PATCH 4/6] Simplify subprocess error handling and remove no_xdist from MultiProcessSharedTest * processes.py: Remove pip-specific parsing (_pip_package_from_args and if blocks). Raise a single RuntimeError with full command, traceback, and child process output for all CalledProcessError cases. * multi_process_shared_test.py: Remove @pytest.mark.no_xdist and unused pytest import so the test suite can run in parallel as intended. --- .../utils/multi_process_shared_test.py | 3 - sdks/python/apache_beam/utils/processes.py | 61 ++++++------------- 2 files changed, 18 insertions(+), 46 deletions(-) diff --git a/sdks/python/apache_beam/utils/multi_process_shared_test.py b/sdks/python/apache_beam/utils/multi_process_shared_test.py index 3ae0e7b2a92d..7b2b11857bfd 100644 --- a/sdks/python/apache_beam/utils/multi_process_shared_test.py +++ b/sdks/python/apache_beam/utils/multi_process_shared_test.py @@ -24,8 +24,6 @@ import unittest from typing import Any -import pytest - from apache_beam.utils import multi_process_shared @@ -87,7 +85,6 @@ def __getattribute__(self, __name: str) -> Any: return object.__getattribute__(self, __name) -@pytest.mark.no_xdist class MultiProcessSharedTest(unittest.TestCase): @classmethod def setUpClass(cls): diff --git a/sdks/python/apache_beam/utils/processes.py b/sdks/python/apache_beam/utils/processes.py index d7af08eeeb0b..bae55f8d742a 100644 --- a/sdks/python/apache_beam/utils/processes.py +++ b/sdks/python/apache_beam/utils/processes.py @@ -44,19 +44,6 @@ else: - def _pip_package_from_args(args): - """Return a safe string for the package field in pip error messages. - - Avoids IndexError when the command list is shorter than 7 elements - (e.g. ['python', '-m', 'pip', 'install', 'pkg']). - """ - if not isinstance(args, tuple) or not args: - return "see output below" - cmd = args[0] - if not isinstance(cmd, (list, tuple)) or len(cmd) <= 6: - return "see output below" - return cmd[6] - def call(*args, **kwargs): if force_shell: kwargs['shell'] = True @@ -65,16 +52,12 @@ def call(*args, **kwargs): except OSError as e: raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: - if isinstance(args, tuple) and len(args[0]) > 2 and args[0][2] == "pip": - raise RuntimeError( \ - "Full traceback: {}\n Pip install failed for package: {} \ - \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), - _pip_package_from_args(args), error.output)) from error - else: - raise RuntimeError("Full trace: {}\ - \n Output of the failed child process: {} " \ - .format(traceback.format_exc(), error.output)) from error + raise RuntimeError( + "Command that failed: {}\nFull trace: {}\nOutput of the failed " + "child process: {}".format( + args, traceback.format_exc(), + error.output if error.output is not None else "(not captured)") + ) from error return out def check_call(*args, **kwargs): @@ -85,16 +68,12 @@ def check_call(*args, **kwargs): except OSError as e: raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: - if isinstance(args, tuple) and len(args[0]) > 2 and args[0][2] == "pip": - raise RuntimeError( \ - "Full traceback: {} \n Pip install failed for package: {} \ - \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), - _pip_package_from_args(args), error.output)) from error - else: - raise RuntimeError("Full trace: {} \ - \n Output of the failed child process: {}" \ - .format(traceback.format_exc(), error.output)) from error + raise RuntimeError( + "Command that failed: {}\nFull trace: {}\nOutput of the failed " + "child process: {}".format( + args, traceback.format_exc(), + error.output if error.output is not None else "(not captured)") + ) from error return out def check_output(*args, **kwargs): @@ -105,16 +84,12 @@ def check_output(*args, **kwargs): except OSError as e: raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: - if isinstance(args, tuple) and len(args[0]) > 2 and args[0][2] == "pip": - raise RuntimeError( \ - "Full traceback: {} \n Pip install failed for package: {} \ - \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), - _pip_package_from_args(args), error.output)) from error - else: - raise RuntimeError("Full trace: {}, \ - output of the failed child process {} "\ - .format(traceback.format_exc(), error.output)) from error + raise RuntimeError( + "Command that failed: {}\nFull trace: {}\nOutput of the failed " + "child process: {}".format( + args, traceback.format_exc(), + error.output if error.output is not None else "(not captured)") + ) from error return out def Popen(*args, **kwargs): From fce90a959b32d9a0e4f9ecb2b9eb2b57bc6f5f06 Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Sat, 21 Feb 2026 12:23:32 +0500 Subject: [PATCH 5/6] Align subprocess error message with tests and drop pip parsing - processes.py: Use 'Output from execution of subprocess:' in RuntimeError and include command and full trace. Keeps test convention without parsing package from args. - processes_test.py: Assert on new message format; remove assertions for 'Pip install failed for package' and 'see output below'. --- sdks/python/apache_beam/utils/processes.py | 24 +++++------ .../apache_beam/utils/processes_test.py | 41 ++++++++----------- 2 files changed, 28 insertions(+), 37 deletions(-) diff --git a/sdks/python/apache_beam/utils/processes.py b/sdks/python/apache_beam/utils/processes.py index bae55f8d742a..a5dd5d90aef7 100644 --- a/sdks/python/apache_beam/utils/processes.py +++ b/sdks/python/apache_beam/utils/processes.py @@ -53,10 +53,10 @@ def call(*args, **kwargs): raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: raise RuntimeError( - "Command that failed: {}\nFull trace: {}\nOutput of the failed " - "child process: {}".format( - args, traceback.format_exc(), - error.output if error.output is not None else "(not captured)") + "Output from execution of subprocess: {}\n" + "Command that failed: {}\nFull trace: {}".format( + error.output if error.output is not None else "(not captured)", + args, traceback.format_exc()) ) from error return out @@ -69,10 +69,10 @@ def check_call(*args, **kwargs): raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: raise RuntimeError( - "Command that failed: {}\nFull trace: {}\nOutput of the failed " - "child process: {}".format( - args, traceback.format_exc(), - error.output if error.output is not None else "(not captured)") + "Output from execution of subprocess: {}\n" + "Command that failed: {}\nFull trace: {}".format( + error.output if error.output is not None else "(not captured)", + args, traceback.format_exc()) ) from error return out @@ -85,10 +85,10 @@ def check_output(*args, **kwargs): raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: raise RuntimeError( - "Command that failed: {}\nFull trace: {}\nOutput of the failed " - "child process: {}".format( - args, traceback.format_exc(), - error.output if error.output is not None else "(not captured)") + "Output from execution of subprocess: {}\n" + "Command that failed: {}\nFull trace: {}".format( + error.output if error.output is not None else "(not captured)", + args, traceback.format_exc()) ) from error return out diff --git a/sdks/python/apache_beam/utils/processes_test.py b/sdks/python/apache_beam/utils/processes_test.py index 777d2948f2c8..6330631afdeb 100644 --- a/sdks/python/apache_beam/utils/processes_test.py +++ b/sdks/python/apache_beam/utils/processes_test.py @@ -121,15 +121,13 @@ def test_check_call_pip_install_non_existing_package(self): self.mock_get.side_effect = subprocess.CalledProcessError(returncode,\ cmd, output=output) try: - output = processes.check_call(cmd) + processes.check_call(cmd) self.fail( - "The test failed due to that\ - no error was raised when calling process.check_call") + "The test failed due to that " + "no error was raised when calling process.check_call") except RuntimeError as error: - self.assertIn("Output from execution of subprocess: {}".format(output),\ - error.args[0]) - self.assertIn("Pip install failed for package: {}".format(package),\ - error.args[0]) + self.assertIn("Output from execution of subprocess:", error.args[0]) + self.assertIn(output, error.args[0]) def test_check_call_pip_short_command_no_index_error(self): """Short pip command (e.g. pip install pkg) must not raise IndexError.""" @@ -142,7 +140,6 @@ def test_check_call_pip_short_command_no_index_error(self): processes.check_call(cmd) self.assertIn("Output from execution of subprocess:", ctx.exception.args[0]) self.assertIn(output, ctx.exception.args[0]) - self.assertIn("see output below", ctx.exception.args[0]) class TestErrorHandlingCheckOutput(unittest.TestCase): @@ -175,15 +172,13 @@ def test_check_output_pip_install_non_existing_package(self): self.mock_get.side_effect = subprocess.CalledProcessError(returncode,\ cmd, output=output) try: - output = processes.check_output(cmd) + processes.check_output(cmd) self.fail( - "The test failed due to that\ - no error was raised when calling process.check_call") + "The test failed due to that " + "no error was raised when calling process.check_output") except RuntimeError as error: - self.assertIn("Output from execution of subprocess: {}".format(output),\ - error.args[0]) - self.assertIn("Pip install failed for package: {}".format(package),\ - error.args[0]) + self.assertIn("Output from execution of subprocess:", error.args[0]) + self.assertIn(output, error.args[0]) def test_check_output_pip_short_command_no_index_error(self): """Short pip command must not raise IndexError.""" @@ -196,7 +191,6 @@ def test_check_output_pip_short_command_no_index_error(self): processes.check_output(cmd) self.assertIn("Output from execution of subprocess:", ctx.exception.args[0]) self.assertIn(output, ctx.exception.args[0]) - self.assertIn("see output below", ctx.exception.args[0]) class TestErrorHandlingCall(unittest.TestCase): @@ -219,7 +213,7 @@ def test_oserror_check_output_message(self): self.assertIn('Executable {} not found'.format(str(cmd)),\ error.args[0]) - def test_check_output_pip_install_non_existing_package(self): + def test_call_pip_install_non_existing_package(self): returncode = 1 package = "non-exsisting-package" cmd = ['python', '-m', 'pip', 'download', '--dest', '/var',\ @@ -229,15 +223,13 @@ def test_check_output_pip_install_non_existing_package(self): self.mock_get.side_effect = subprocess.CalledProcessError(returncode,\ cmd, output=output) try: - output = processes.call(cmd) + processes.call(cmd) self.fail( - "The test failed due to that\ - no error was raised when calling process.check_call") + "The test failed due to that " + "no error was raised when calling process.call") except RuntimeError as error: - self.assertIn("Output from execution of subprocess: {}".format(output),\ - error.args[0]) - self.assertIn("Pip install failed for package: {}".format(package),\ - error.args[0]) + self.assertIn("Output from execution of subprocess:", error.args[0]) + self.assertIn(output, error.args[0]) def test_call_pip_short_command_no_index_error(self): """Short pip command must not raise IndexError.""" @@ -250,7 +242,6 @@ def test_call_pip_short_command_no_index_error(self): processes.call(cmd) self.assertIn("Output from execution of subprocess:", ctx.exception.args[0]) self.assertIn(output, ctx.exception.args[0]) - self.assertIn("see output below", ctx.exception.args[0]) if __name__ == '__main__': From d0461e628167f85bb8d650ed31c8f1435d9d29da Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Tue, 24 Feb 2026 10:26:21 +0500 Subject: [PATCH 6/6] Fix yapf formatting in processes.py --- sdks/python/apache_beam/utils/processes.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/utils/processes.py b/sdks/python/apache_beam/utils/processes.py index a5dd5d90aef7..f34b4a48085f 100644 --- a/sdks/python/apache_beam/utils/processes.py +++ b/sdks/python/apache_beam/utils/processes.py @@ -56,8 +56,8 @@ def call(*args, **kwargs): "Output from execution of subprocess: {}\n" "Command that failed: {}\nFull trace: {}".format( error.output if error.output is not None else "(not captured)", - args, traceback.format_exc()) - ) from error + args, + traceback.format_exc())) from error return out def check_call(*args, **kwargs): @@ -72,8 +72,8 @@ def check_call(*args, **kwargs): "Output from execution of subprocess: {}\n" "Command that failed: {}\nFull trace: {}".format( error.output if error.output is not None else "(not captured)", - args, traceback.format_exc()) - ) from error + args, + traceback.format_exc())) from error return out def check_output(*args, **kwargs): @@ -88,8 +88,8 @@ def check_output(*args, **kwargs): "Output from execution of subprocess: {}\n" "Command that failed: {}\nFull trace: {}".format( error.output if error.output is not None else "(not captured)", - args, traceback.format_exc()) - ) from error + args, + traceback.format_exc())) from error return out def Popen(*args, **kwargs):