From d7a3526172a7657e3f3db1d1aca852f67f7248e8 Mon Sep 17 00:00:00 2001 From: System Administrator Date: Mon, 24 Oct 2016 13:22:52 -0700 Subject: [PATCH] Pass workflow / instance / job / execution to the job from Pinball. --- pinball/workflow/job_executor.py | 11 ++++ tests/pinball/workflow/job_executor_test.py | 57 +++++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/pinball/workflow/job_executor.py b/pinball/workflow/job_executor.py index e81ac69..94fad4e 100644 --- a/pinball/workflow/job_executor.py +++ b/pinball/workflow/job_executor.py @@ -509,6 +509,17 @@ def execute(self): # Pinball sets Django module path which may interfere # with the command being executed. env.pop('DJANGO_SETTINGS_MODULE', None) + + # Pass workflow/instance/job/execution to the job. + # This will be consumed by Dr. Elephant. + env['PINBALL_WORKFLOW'] = self._workflow + env['PINBALL_INSTANCE'] = self._instance + env['PINBALL_JOB'] = self._job_name + env['PINBALL_EXECUTION'] = str(len(self.job.history) - 1) + + if PinballConfig.UI_HOST is not None: + env['PINBALL_BASE_URL'] = PinballConfig.UI_HOST + # The os.setsid() is passed in the argument preexec_fn # so it's run after the fork() and before exec() to # run the shell. It attaches a session id of the child diff --git a/tests/pinball/workflow/job_executor_test.py b/tests/pinball/workflow/job_executor_test.py index 4c6e11e..e6e7951 100644 --- a/tests/pinball/workflow/job_executor_test.py +++ b/tests/pinball/workflow/job_executor_test.py @@ -220,6 +220,63 @@ def test_execute_long_line(self, open_mock, exists_mock, get_s3_key_mock): self.assertEqual(2, get_s3_key_mock.call_count) + @mock.patch('pinball.workflow.log_saver.S3FileLogSaver._get_or_create_s3_key') + @mock.patch('os.path.exists') + @mock.patch('__builtin__.open') + def test_execute_env_var(self, rm_mock, open_mock, exists_mock, get_s3_key_mock): + file_mock = mock.MagicMock() + open_mock.return_value = file_mock + file_mock.__enter__.return_value = file_mock + + s3_key_mock = mock.MagicMock() + get_s3_key_mock.return_value = s3_key_mock + s3_key_mock.__enter__.return_value = s3_key_mock + + job_name = 'some_job' + workflow_name = 'some_workflow' + instance = '123' + + job = ShellJob(name=job_name, + command="echo $PINBALL_WORKFLOW && " + "echo $PINBALL_JOB && " + "echo $PINBALL_INSTANCE && " + "echo $PINBALL_EXECUTION && " + "echo $PINBALL_BASE_URL", + emails=['some_email@pinterest.com'], + warn_timeout_sec=10, + abort_timeout_sec=20) + executor = ShellJobExecutor(workflow_name, instance, job_name, + job, self._data_builder, + self._emailer) + + import time + execution_record = ExecutionRecord(instance=instance, + start_time=time.time()) + execution_record.end_time = time.time() + execution_record.exit_code = 1 + job.history.append(execution_record) + + self.assertTrue(executor.prepare()) + self.assertTrue(executor.execute()) + + file_mock.write.assert_has_calls( + [mock.call(workflow_name + '\n'), + mock.call(job_name + '\n'), + mock.call(instance + '\n'), + mock.call('1\n')]) + + exists_mock.assert_any_call( + '/tmp/pinball/logs/' + workflow_name + '/' + instance) + + # stdout and stderr + self.assertEqual(rm_mock.call_count, 3) + + self.assertEqual(2, len(executor.job.history)) + execution_record = executor.job.history[1] + self.assertEqual(0, execution_record.exit_code) + + self.assertEqual(3, get_s3_key_mock.call_count) + def test_process_log_line(self): job = ShellJob(name='some_job', command="echo ok",