From de2579521315c0bf254de9a99b7b4704bbe9b696 Mon Sep 17 00:00:00 2001 From: Chao Ren Date: Thu, 17 Nov 2016 12:26:34 -0800 Subject: [PATCH] Pass workflow / instance / job / execution to the job from Pinball --- pinball/workflow/job_executor.py | 11 ++++ tests/pinball/workflow/job_executor_test.py | 66 +++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/pinball/workflow/job_executor.py b/pinball/workflow/job_executor.py index e81ac69..94fad4e 100644 --- a/pinball/workflow/job_executor.py +++ b/pinball/workflow/job_executor.py @@ -509,6 +509,17 @@ def execute(self): # Pinball sets Django module path which may interfere # with the command being executed. env.pop('DJANGO_SETTINGS_MODULE', None) + + # Pass workflow/instance/job/execution to the job. + # This will be consumed by Dr. Elephant. + env['PINBALL_WORKFLOW'] = self._workflow + env['PINBALL_INSTANCE'] = self._instance + env['PINBALL_JOB'] = self._job_name + env['PINBALL_EXECUTION'] = str(len(self.job.history) - 1) + + if PinballConfig.UI_HOST is not None: + env['PINBALL_BASE_URL'] = PinballConfig.UI_HOST + # The os.setsid() is passed in the argument preexec_fn # so it's run after the fork() and before exec() to # run the shell. It attaches a session id of the child diff --git a/tests/pinball/workflow/job_executor_test.py b/tests/pinball/workflow/job_executor_test.py index 4c6e11e..87a0b85 100644 --- a/tests/pinball/workflow/job_executor_test.py +++ b/tests/pinball/workflow/job_executor_test.py @@ -16,6 +16,7 @@ import mock import os import subprocess +import time import unittest from pinball.config.pinball_config import PinballConfig @@ -220,6 +221,71 @@ def test_execute_long_line(self, open_mock, exists_mock, get_s3_key_mock): self.assertEqual(2, get_s3_key_mock.call_count) + @mock.patch('pinball.workflow.log_saver.S3FileLogSaver._get_or_create_s3_key') + @mock.patch('os.path.exists') + @mock.patch('__builtin__.open') + def test_execute_env_var(self, open_mock, exists_mock, get_s3_key_mock): + file_mock = mock.MagicMock() + open_mock.return_value = file_mock + file_mock.__enter__.return_value = file_mock + + s3_key_mock = mock.MagicMock() + get_s3_key_mock.return_value = s3_key_mock + s3_key_mock.__enter__.return_value = s3_key_mock + + job_name = 'some_job' + workflow_name = 'some_workflow' + instance = '123' + + job = ShellJob(name=job_name, + command="echo $PINBALL_WORKFLOW && " + "echo $PINBALL_JOB && " + "echo $PINBALL_INSTANCE && " + "echo $PINBALL_EXECUTION && " + "echo $PINBALL_BASE_URL", + emails=['some_email@pinterest.com'], + warn_timeout_sec=10, + abort_timeout_sec=20) + executor = ShellJobExecutor(workflow_name, instance, job_name, + job, self._data_builder, + self._emailer) + + execution_record = ExecutionRecord(instance=instance, + start_time=time.time()) + execution_record.end_time = time.time() + execution_record.exit_code = 1 + job.history.append(execution_record) + + self.assertTrue(executor.prepare()) + self.assertTrue(executor.execute()) + + file_mock.write.assert_has_calls( + [mock.call(workflow_name + '\n'), + mock.call(job_name + '\n'), + mock.call(instance + '\n'), + mock.call('1\n')]) + + self.assertEqual(len(executor.job.history), 2) + self.assertEqual(get_s3_key_mock.call_count, len(executor.job.history)) + latest_execution_record = executor.job.history[1] + self.assertEqual(latest_execution_record.exit_code, 0) + + exists_mock.assert_any_call( + '/tmp/pinball_job_logs/{wf}/{inst}'.format( + wf=workflow_name, inst=instance + ), + ) + exists_mock.assert_any_call( + '/tmp/pinball_job_logs/{wf}/{inst}/some_job.{ts}.stdout'.format( + wf=workflow_name, inst=instance, ts=int(latest_execution_record.start_time) + ), + ) + exists_mock.assert_any_call( + '/tmp/pinball_job_logs/{wf}/{inst}/some_job.{ts}.stderr'.format( + wf=workflow_name, inst=instance, ts=int(latest_execution_record.start_time) + ), + ) + def test_process_log_line(self): job = ShellJob(name='some_job', command="echo ok",