Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Examon common utilities package
===============================
v0.2.6
v0.2.7
2 changes: 2 additions & 0 deletions examon/utils/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ def monitor_process_children():
if hasattr(d['d'], 'pid'):
kill_proc_tree(d['d'].pid)

d['d'].join()

if len(d['worker']) > 1:
d_ = Process(target=d['worker'][0], args=d['worker'][1:])
else:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from setuptools import setup

setup(name='examon-common',
version='v0.2.6',
version='v0.2.7',
description='Examon common utilities',
url='http://github.com/fbeneventi/examon-common',
author='Francesco Beneventi',
Expand Down
140 changes: 140 additions & 0 deletions tests/test_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import pytest
import time
import multiprocessing
from unittest.mock import patch, MagicMock
from examon.utils.executor import Executor


# Simple worker functions for testing
def simple_worker():
"""A worker that completes quickly"""
return "done"


def worker_with_args(x, y):
"""A worker that takes arguments"""
return x + y


def long_running_worker():
"""Simulates a daemon that runs for a while"""
time.sleep(0.5)


def dying_worker():
"""A worker that dies immediately"""
raise Exception("I'm dying!")


class TestExecutorInit:
"""Tests for Executor initialization"""

def test_default_initialization(self):
executor = Executor()
assert executor.executor == 'ProcessPool'
assert executor.keepalivesec == 60
assert executor.workers == []

def test_custom_initialization(self):
executor = Executor(executor='Daemon', keepalivesec=30)
assert executor.executor == 'Daemon'
assert executor.keepalivesec == 30


class TestExecutorAddWorker:
"""Tests for adding workers"""

def test_add_worker_no_args(self):
executor = Executor()
executor.add_worker(simple_worker)
assert len(executor.workers) == 1
assert executor.workers[0] == (simple_worker,)

def test_add_worker_with_args(self):
executor = Executor()
executor.add_worker(worker_with_args, 1, 2)
assert len(executor.workers) == 1
assert executor.workers[0] == (worker_with_args, 1, 2)

def test_add_multiple_workers(self):
executor = Executor()
executor.add_worker(simple_worker)
executor.add_worker(worker_with_args, 10, 20)
assert len(executor.workers) == 2


class TestExecutorProcessPool:
"""Tests for ProcessPool executor mode"""

def test_exec_par_single_worker(self):
executor = Executor(executor='ProcessPool')
executor.add_worker(worker_with_args, 2, 3)
results = executor.exec_par()
assert results == [5]

def test_exec_par_multiple_workers(self):
executor = Executor(executor='ProcessPool')
executor.add_worker(worker_with_args, 2, 3)
executor.add_worker(worker_with_args, 10, 20)
results = executor.exec_par()
assert sorted(results) == [5, 30]

def test_exec_par_no_workers(self):
executor = Executor(executor='ProcessPool')
results = executor.exec_par()
assert results == []


class TestExecutorDaemon:
"""Tests for Daemon executor mode"""

def test_daemon_with_keepalive_zero(self):
"""When keepalivesec=0, skips monitoring loop and joins workers"""
executor = Executor(executor='Daemon', keepalivesec=0)
executor.add_worker(simple_worker)

# sys.exit(0) is called, so we catch it
with pytest.raises(SystemExit) as exc_info:
executor.exec_par()
assert exc_info.value.code == 0

def test_daemon_keyboard_interrupt(self):
"""Test graceful shutdown on KeyboardInterrupt"""
executor = Executor(executor='Daemon', keepalivesec=1)
executor.add_worker(long_running_worker)

# Mock time.sleep to raise KeyboardInterrupt after first call
call_count = 0
original_sleep = time.sleep

def mock_sleep(seconds):
nonlocal call_count
call_count += 1
if call_count > 1:
raise KeyboardInterrupt()
original_sleep(0.01) # Very short actual sleep

with patch('time.sleep', side_effect=mock_sleep):
with pytest.raises(SystemExit) as exc_info:
executor.exec_par()
assert exc_info.value.code == 0

def test_daemon_workers_started_as_daemon(self):
"""Verify workers are started as daemon processes"""
executor = Executor(executor='Daemon', keepalivesec=0)
executor.add_worker(simple_worker)

started_processes = []
original_process = multiprocessing.Process

class MockProcess(original_process):
def start(self):
started_processes.append(self)
super().start()

with patch('examon.utils.executor.Process', MockProcess):
with pytest.raises(SystemExit):
executor.exec_par()

assert len(started_processes) == 1
assert started_processes[0].daemon is True