Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
100 commits
Select commit Hold shift + click to select a range
ae891b2
add files that we'll need for this refactor
bgunnar5 Jul 15, 2025
3254380
add MerlinBaseFactory class
bgunnar5 Jul 15, 2025
1e48dea
refactor MerlinBackendFactory to use MerlinBaseFactory
bgunnar5 Jul 15, 2025
1edf5ba
add tests for MerlinBaseFactory and fix backend tests
bgunnar5 Jul 15, 2025
0134cd3
convert monitor factory to use new MerlinBaseFactory
bgunnar5 Jul 15, 2025
1c0088f
remove comment and update MonitorFactory docstring
bgunnar5 Jul 15, 2025
b8c9e19
update MerlinStatusRendererFactory to use MerlinBaseFactory and fix T…
bgunnar5 Jul 15, 2025
88cda38
add tests for the status renderer factory
bgunnar5 Jul 15, 2025
1a9cff2
update CHANGELOG
bgunnar5 Jul 15, 2025
f2efe91
run fix-style
bgunnar5 Jul 15, 2025
bfbfe03
fix issue with typehint that breaks in python 3.8
bgunnar5 Jul 15, 2025
9a5e257
mocked more items to try to fix broken tests on github
bgunnar5 Jul 16, 2025
4128bf8
create MerlinWorker and CeleryWorker classes to handle the launching …
bgunnar5 Jul 21, 2025
5d554ec
implement worker-handler related classes
bgunnar5 Jul 23, 2025
45afd64
add worker factory class and small cleanup to the rest of the worker …
bgunnar5 Jul 23, 2025
edead13
remove functions that are now in the new worker files
bgunnar5 Jul 23, 2025
7a50198
link the new worker classes to the actual launching of workers
bgunnar5 Jul 23, 2025
e51018d
resolve merge conflicts
bgunnar5 Jul 23, 2025
60ee2c2
fix regex in test
bgunnar5 Jul 23, 2025
c5ba679
remove watchdog files and run fix-style
bgunnar5 Jul 23, 2025
17ae613
fix tests that broke after refactor
bgunnar5 Jul 23, 2025
889541e
run fix-style
bgunnar5 Jul 23, 2025
4e1e1fb
update CHANGELOG
bgunnar5 Jul 23, 2025
b7caa85
refactored how the database command is initially processed
bgunnar5 Jul 30, 2025
b08f358
add filtering logic to the entity managers
bgunnar5 Jul 30, 2025
2e12115
add support for database-level filtering
bgunnar5 Jul 30, 2025
595df60
move get_plural_of_entity and get_singular_of_entity to utils.py
bgunnar5 Jul 30, 2025
b4bab5e
fix database command tests
bgunnar5 Aug 5, 2025
37feee9
fix broken tests for entity manager
bgunnar5 Aug 5, 2025
64f23a2
run fix-style and update CHANGELOG
bgunnar5 Aug 5, 2025
14b6ce0
add tests for new backend filtering
bgunnar5 Aug 5, 2025
7eb7529
add tests for new cli utils functions
bgunnar5 Aug 5, 2025
29151e6
fix typo in cli/utils.py
bgunnar5 Aug 5, 2025
0e747e9
run fix-style
bgunnar5 Aug 5, 2025
24042da
add filter options to the command line page
bgunnar5 Aug 6, 2025
c7949f3
resolve conflicts
bgunnar5 Jul 15, 2025
50743c5
resolve conflicts
bgunnar5 Jul 15, 2025
6daae0f
mocked more items to try to fix broken tests on github
bgunnar5 Jul 16, 2025
dd5d19d
create MerlinWorker and CeleryWorker classes to handle the launching …
bgunnar5 Jul 21, 2025
032405c
implement worker-handler related classes
bgunnar5 Jul 23, 2025
cc327aa
add worker factory class and small cleanup to the rest of the worker …
bgunnar5 Jul 23, 2025
15a3386
remove functions that are now in the new worker files
bgunnar5 Jul 23, 2025
c4cbe13
link the new worker classes to the actual launching of workers
bgunnar5 Jul 23, 2025
d2d324b
add files that we'll need for this refactor
bgunnar5 Jul 15, 2025
9a81f88
fix regex in test
bgunnar5 Jul 23, 2025
caddb71
remove watchdog files and run fix-style
bgunnar5 Jul 23, 2025
547e3c5
fix tests that broke after refactor
bgunnar5 Jul 23, 2025
8b212eb
run fix-style
bgunnar5 Jul 23, 2025
fcc6b8e
update CHANGELOG
bgunnar5 Jul 23, 2025
4d84bd8
run fix-style
bgunnar5 Aug 7, 2025
be71c82
fix worker-related factory classes
bgunnar5 Aug 7, 2025
534769e
add tests for new workers files
bgunnar5 Aug 7, 2025
272a0a7
change imports for Celery worker and handler in tests
bgunnar5 Aug 7, 2025
4d5efd4
run fix-style
bgunnar5 Aug 7, 2025
799d667
attempt to fix broken unit tests
bgunnar5 Aug 7, 2025
bfe02a2
fix style
bgunnar5 Aug 7, 2025
a214e56
pull latest changes from develop
bgunnar5 Aug 7, 2025
51db25c
add mocked merlin db to broken test
bgunnar5 Aug 7, 2025
45e7e64
run fix-style
bgunnar5 Aug 7, 2025
0a4de9c
fix susbcriptable error in test
bgunnar5 Aug 7, 2025
fa2d6f5
merge latest changes from develop
bgunnar5 Aug 7, 2025
dd3d390
Merge branch 'feature/filter-db-queries' into refactor/query-workers
bgunnar5 Aug 7, 2025
f6865c7
first pass at adding new worker formatter classes
bgunnar5 Aug 18, 2025
4d9d31e
link new formatters to new query-workers refactor
bgunnar5 Aug 18, 2025
413c39d
remove _discover_builtins method that's not used and annoying
bgunnar5 Aug 26, 2025
c420d5e
change database to only store base name of queue
bgunnar5 Aug 27, 2025
0d47eaa
get filters working for query-workers command
bgunnar5 Aug 27, 2025
64ac2c8
finalize worker formatters
bgunnar5 Aug 27, 2025
7a1b3a8
add console as class attribute for formmatters and fix pylint issues
bgunnar5 Aug 27, 2025
dfdf7d9
more pylint fixes
bgunnar5 Aug 27, 2025
add1927
add attributes section to json formatter
bgunnar5 Aug 27, 2025
74fcd1d
fix broken tests
bgunnar5 Aug 27, 2025
3defbaa
update CHANGELOG
bgunnar5 Aug 27, 2025
e1033d7
updated the query-workers docs page
bgunnar5 Aug 27, 2025
cac4d2f
update the query-workers CLI docs
bgunnar5 Aug 27, 2025
65e5c09
pull latest changes from develop-2.0
bgunnar5 Sep 22, 2025
2fb872d
fix one more merge problem
bgunnar5 Sep 22, 2025
0320158
add query-workers tests to celery worker handler test file
bgunnar5 Sep 23, 2025
82772b4
add tests for rich formatter
bgunnar5 Sep 23, 2025
d4c7e46
add tests for all formatter modules
bgunnar5 Sep 23, 2025
f070fc9
run fix-style
bgunnar5 Sep 23, 2025
a873e58
fix broken test
bgunnar5 Sep 23, 2025
4c90c69
remove query-workers integration tests
bgunnar5 Sep 25, 2025
f645267
fix issue with tests
bgunnar5 Sep 29, 2025
16d4e42
run fix style
bgunnar5 Sep 29, 2025
68b4486
pull latest changes from develop-2.0
bgunnar5 Oct 8, 2025
c22a9e7
fix issue with pid and use WorkerStatus enum properly
bgunnar5 Oct 9, 2025
dd00638
add functionality to compare db data against live celery data
bgunnar5 Oct 9, 2025
94b7539
pull in garbage collection update
bgunnar5 Oct 23, 2025
c8f3c7b
run fix style and fix tests
bgunnar5 Oct 23, 2025
be5be8c
move function to CeleryWorkerHandler and add/fix tests
bgunnar5 Oct 23, 2025
3a42d0a
run fix-style
bgunnar5 Oct 23, 2025
5a74c8d
merge latest changes from develop-2.0
bgunnar5 Dec 2, 2025
29a3ff9
Merge remote-tracking branch 'upstream/develop-2.0' into refactor/sto…
bgunnar5 Dec 3, 2025
c8f8616
move stop-workers functionality to CeleryWorkerHandler class
bgunnar5 Dec 11, 2025
231e958
remove unused functions from router/celeradapter
bgunnar5 Dec 11, 2025
1bfd537
add/update tests for the refactored stop-workers functionality
bgunnar5 Dec 11, 2025
501883e
remove unused import
bgunnar5 Dec 11, 2025
3e130fe
fix style
bgunnar5 Dec 11, 2025
5adc67f
update CHANGELOG
bgunnar5 Dec 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed
- Removed old monitor code from v1.0
- Moved `stop-workers` functionality to the `CeleryWorkerHandlers` class

## [2.0.0b4]

Expand Down
26 changes: 23 additions & 3 deletions merlin/cli/commands/stop_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

from merlin.ascii_art import banner_small
from merlin.cli.commands.command_entry_point import CommandEntryPoint
from merlin.router import stop_workers
from merlin.spec.specification import MerlinSpec
from merlin.utils import verify_filepath
from merlin.workers.handlers.handler_factory import worker_handler_factory


LOG = logging.getLogger("merlin")
Expand Down Expand Up @@ -67,6 +67,12 @@ def add_parser(self, subparsers: ArgumentParser):
default=None,
help="regex match for specific workers to stop",
)
stop.add_argument(
"-d",
"--dry-run",
action="store_true",
help="Display which workers would be stopped without actually stopping them",
)

def process_command(self, args: Namespace):
"""
Expand All @@ -87,13 +93,27 @@ def process_command(self, args: Namespace):
worker_names = []

# Load in the spec if one was provided via the CLI
spec = None
if args.spec:
spec_path = verify_filepath(args.spec)
spec = MerlinSpec.load_specification(spec_path)
worker_names = spec.get_worker_names()
for worker_name in worker_names:
if "$" in worker_name:
LOG.warning(f"Worker '{worker_name}' is unexpanded. Target provenance spec instead?")
LOG.debug(f"Searching for the following workers to stop based on the spec {args.spec}: {worker_names}")

# If we have workers from --workers flag, add them to the list
if args.workers:
worker_names.extend(args.workers)

# Send stop command to router
stop_workers(args.task_server, worker_names, args.queues, args.workers)
# Get the task server from spec or CLI argument
task_server = spec.merlin["resources"]["task_server"] if spec else args.task_server

# Create the handler and send stop command
worker_handler = worker_handler_factory.create(task_server)
worker_handler.stop_workers(
queues=args.queues,
workers=worker_names if worker_names else None,
dry_run=args.dry_run,
)
16 changes: 7 additions & 9 deletions merlin/common/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@
RestartException,
RetryException,
)
from merlin.router import stop_workers
from merlin.spec.expansion import parameter_substitutions_for_cmd, parameter_substitutions_for_sample
from merlin.study.dag import DAG
from merlin.study.status import read_status, status_conflict_handler
from merlin.study.step import Step
from merlin.study.study import MerlinStudy
from merlin.utils import dict_deep_merge
from merlin.workers.handlers.celery_handler import CeleryWorkerHandler


retry_exceptions = (
Expand Down Expand Up @@ -894,12 +894,13 @@ def expand_tasks_with_samples( # pylint: disable=R0913,R0914
name="merlin:shutdown_workers",
priority=get_priority(Priority.HIGH),
)
def shutdown_workers(self: Task, shutdown_queues: List[str]): # pylint: disable=W0613
def shutdown_workers(self: Task, shutdown_queues: List[str] = None): # pylint: disable=W0613
"""
Initiates the shutdown of Celery workers.

This task wraps the [`stop_celery_workers`][study.celeryadapter.stop_celery_workers]
function, allowing for the graceful shutdown of specified Celery worker queues. It is
This task wraps the [`stop_workers`][workers.handlers.celery_handler.CeleryWorkerHandler.stop_workers]
method of the [`CeleryWorkerHandler`][workers.handlers.celery_handler.CeleryWorkerHandler]
class, allowing for the graceful shutdown of specified Celery worker queues. It is
acknowledged immediately upon execution, ensuring that it will not be requeued, even
if executed by a worker.

Expand All @@ -908,11 +909,8 @@ def shutdown_workers(self: Task, shutdown_queues: List[str]): # pylint: disable
shutdown_queues: A list of specific queues to shut down. If None, all queues will
be shut down.
"""
if shutdown_queues is not None:
LOG.warning(f"Shutting down workers in queues {shutdown_queues}!")
else:
LOG.warning("Shutting down workers in all queues!")
return stop_workers("celery", None, shutdown_queues, None)
worker_handler = CeleryWorkerHandler()
worker_handler.stop_workers(queues=shutdown_queues)


# Pylint complains that these args are unused but celery passes args
Expand Down
22 changes: 0 additions & 22 deletions merlin/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
purge_celery_tasks,
query_celery_queues,
run_celery,
stop_celery_workers,
)
from merlin.study.study import MerlinStudy

Expand Down Expand Up @@ -150,24 +149,3 @@ def query_queues(
else:
LOG.error("Celery is not specified as the task server!")
return {}


def stop_workers(task_server: str, spec_worker_names: List[str], queues: List[str], workers_regex: str):
"""
This function sends a command to stop workers that match the specified
criteria from the designated task server.

Args:
task_server: The task server from which to stop workers.
spec_worker_names: A list of worker names to stop, as defined
in a specification.
queues: A list of queues from which to stop associated workers.
workers_regex: A regex pattern used to filter the workers to stop.
"""
LOG.info("Stopping workers...")

if task_server == "celery": # pylint: disable=R1705
# Stop workers
stop_celery_workers(queues, spec_worker_names, workers_regex)
else:
LOG.error("Celery is not specified as the task server!")
81 changes: 1 addition & 80 deletions merlin/study/celeryadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from merlin.config import Config
from merlin.spec.specification import MerlinSpec
from merlin.study.study import MerlinStudy
from merlin.utils import apply_list_of_regex, get_procs, is_running
from merlin.utils import get_procs, is_running


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -458,82 +458,3 @@ def purge_celery_tasks(queues: str, force: bool) -> int:
purge_command = " ".join(["celery -A merlin purge", force_com, "-Q", queues])
LOG.debug(purge_command)
return subprocess.run(purge_command, shell=True).returncode


def stop_celery_workers(
queues: List[str] = None, spec_worker_names: List[str] = None, worker_regex: List[str] = None
): # pylint: disable=R0912
"""
Send a stop command to Celery workers.

This function sends a shutdown command to Celery workers associated with
specified queues. By default, it stops all connected workers, but it can
be configured to target specific workers based on queue names or regular
expression patterns.

Args:
queues: A list of queue names to which the stop command will be sent.
If None, all connected workers across all queues will be stopped.
spec_worker_names: A list of specific worker names to stop, in addition
to those matching the `worker_regex`.
worker_regex: A regular expression string used to match worker names.
If None, no regex filtering will be applied.

Side Effects:
- Broadcasts a shutdown signal to Celery workers

Example:
```python
stop_celery_workers(queues=['hello'], worker_regex='celery@*my_machine*')
stop_celery_workers()
```
"""
from merlin.celery import app # pylint: disable=C0415

LOG.debug(f"Sending stop to queues: {queues}, worker_regex: {worker_regex}, spec_worker_names: {spec_worker_names}")
active_queues, _ = get_active_celery_queues(app)

# If not specified, get all the queues
if queues is None:
queues = [*active_queues]
# Celery adds the queue tag in front of each queue so we add that here
else:
celerize_queues(queues)

# Find the set of all workers attached to all of those queues
all_workers = set()
for queue in queues:
try:
all_workers.update(active_queues[queue])
LOG.debug(f"Workers attached to queue {queue}: {active_queues[queue]}")
except KeyError:
LOG.warning(f"No workers are connected to queue {queue}")

all_workers = list(all_workers)

LOG.debug(f"Pre-filter worker stop list: {all_workers}")

# Stop workers with no flags
if (spec_worker_names is None or len(spec_worker_names) == 0) and worker_regex is None:
workers_to_stop = list(all_workers)
# Flag handling
else:
workers_to_stop = []
# --spec flag
if (spec_worker_names is not None) and len(spec_worker_names) > 0:
apply_list_of_regex(spec_worker_names, all_workers, workers_to_stop)
# --workers flag
if worker_regex is not None:
LOG.debug(f"Searching for workers to stop based on the following regex's: {worker_regex}")
apply_list_of_regex(worker_regex, all_workers, workers_to_stop)

# Remove duplicates
workers_to_stop = list(set(workers_to_stop))
LOG.debug(f"Post-filter worker stop list: {workers_to_stop}")

if workers_to_stop:
LOG.info(f"Sending stop to these workers: {workers_to_stop}")
# Send the shutdown signal
app.control.broadcast("shutdown", destination=workers_to_stop)
else:
LOG.warning("No workers found to stop")
8 changes: 5 additions & 3 deletions merlin/study/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from merlin.db_scripts.merlin_db import MerlinDatabase
from merlin.exceptions import RunNotFoundError, StudyNotFoundError
from merlin.spec.specification import MerlinSpec
from merlin.study.celeryadapter import purge_celery_tasks, stop_celery_workers
from merlin.study.celeryadapter import purge_celery_tasks
from merlin.workers.handlers.celery_handler import CeleryWorkerHandler


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -97,12 +98,13 @@ def cancel(

# Step 1: Stop the workers
if stop_workers:
# TODO when we refactor `stop-workers`, update this
worker_names = spec.get_worker_names()
for worker_name in worker_names:
if "$" in worker_name:
LOG.warning(f"Worker '{worker_name}' is unexpanded. Target provenance spec instead?")
stop_celery_workers(spec_worker_names=worker_names)

worker_handler = CeleryWorkerHandler()
worker_handler.stop_workers(workers=worker_names)

# TODO when we refactor `stop-workers`, may want to do some extra validation here to ensure
# all of these workers have actually been stopped
Expand Down
23 changes: 22 additions & 1 deletion merlin/workers/celery_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def __init__(
self.batch = self.config.get("batch", {})
self.machines = self.config.get("machines", [])
self.overlap = overlap
self.pid = None # Set when the worker is launched

# Add this worker to the database
merlin_db = MerlinDatabase()
Expand Down Expand Up @@ -189,12 +190,32 @@ def start(self, override_args: str = "", disable_logs: bool = False):
if self.should_launch():
launch_cmd = self.get_launch_command(override_args=override_args, disable_logs=disable_logs)
try:
subprocess.Popen(launch_cmd, env=self.env, shell=True, universal_newlines=True) # pylint: disable=R1732
worker_proc = subprocess.Popen(
launch_cmd, env=self.env, shell=True, universal_newlines=True
) # pylint: disable=R1732
self.pid = worker_proc.pid
LOG.debug(f"Launched worker '{self.name}' with command: {launch_cmd}.")
except Exception as e: # pylint: disable=C0103
LOG.error(f"Cannot start celery workers, {e}")
raise MerlinWorkerLaunchError from e

def stop(self):
"""
Stop the worker process.

If the worker has a known PID, sends a SIGTERM to terminate it.
Otherwise, logs a warning that the worker cannot be stopped.
"""
if self.pid:
try:
os.kill(self.pid, 15) # Send SIGTERM
LOG.debug(f"Stopped worker '{self.name}' with PID {self.pid}.")
self.pid = None # Reset PID after stopping
except Exception as e: # pylint: disable=C0103
LOG.error(f"Cannot stop celery worker '{self.name}', {e}")
else:
LOG.warning(f"Worker '{self.name}' is not running or PID is unknown; cannot stop.")

def get_metadata(self) -> Dict:
"""
Return metadata about this worker instance.
Expand Down
Loading