Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `MerlinWorkerHandler`: base class for managing launching, stopping, and querying multiple workers
- `CeleryWorkerHandler`: implementation of `MerlinWorkerHandler` specifically for manager Celery workers
- `WorkerHandlerFactory`: to help determine which task server handler to use
- New configuration dataclasses:
- `BatchConfig`: To load in batch configuration settings
- `WorkerConfig`: To define worker settings
- A new celery task called `mark_run_as_complete` that is automatically added to the task queue associated with the final step in a workflow
- Ability to filter database queries for the `get all-*` and `delete all-*` commands
- New `MerlinBaseFactory` class to help enable future plugins for backends, monitors, status renderers, etc.
Expand All @@ -45,6 +48,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Maestro version requirement is now at minimum 1.1.10 for status renderer changes
- The `BackendFactory`, `MonitorFactory`, and `StatusRendererFactory` classes all now inherit from `MerlinBaseFactory`
- Launching workers is now handled through worker classes rather than functions in the `celeryadapter.py` file
- `batch.py` is now a class rather than standalone functions


## [1.13.0]
Expand Down
10 changes: 9 additions & 1 deletion merlin/abstracts/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,15 @@ def create(self, component_type: str, config: Dict = None) -> Any:

# Create and return an instance of the component_class
try:
instance = component_class() if config is None else component_class(**config)
if config is None:
# No configuration
instance = component_class()
elif isinstance(config, dict):
# Dict-based config
instance = component_class(**config)
else:
# Try to pass directly (config could be dataclass)
instance = component_class(config)
LOG.info(f"Created component '{canonical_name}'")
return instance
except Exception as e:
Expand Down
1 change: 0 additions & 1 deletion merlin/config/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import logging
import ssl
from typing import Dict, List, Optional, Union
from urllib.parse import quote

from merlin.config.configfile import CONFIG, get_ssl_entries
from merlin.config.utils import resolve_password
Expand Down
1 change: 1 addition & 0 deletions merlin/config/configfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def set_local_mode(enable: bool = True):
if enable:
LOG.info("Running Merlin in local mode (no configuration file required)")


def is_local_mode() -> bool:
"""
Checks if Merlin is running in local mode.
Expand Down
29 changes: 14 additions & 15 deletions merlin/spec/specification.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from maestrowf.specification import YAMLSpecification

from merlin.spec import all_keys, defaults
from merlin.study.configurations import BatchConfig, WorkerConfig
from merlin.utils import find_vlaunch_var, get_yaml_var, load_array_file, needs_merlin_expansion, repr_timedelta
from merlin.workers.worker import MerlinWorker
from merlin.workers.worker_factory import worker_factory
Expand Down Expand Up @@ -1262,23 +1263,21 @@ def build_worker_list(self, workers_to_start: Set[str]) -> List[MerlinWorker]:

for worker_name in workers_to_start:
settings = all_workers[worker_name]
config = {
"args": settings.get("args", ""),
"machines": settings.get("machines", []),
"queues": set(self.get_queue_list(settings["steps"])),
"batch": settings["batch"] if settings["batch"] is not None else self.batch.copy(),
}

if "nodes" in settings and settings["nodes"] is not None:
if config["batch"]:
config["batch"]["nodes"] = settings["nodes"]
else:
config["batch"] = {"nodes": settings["nodes"]}

LOG.debug(f"config for worker '{worker_name}': {config}")
batch_settings = settings["batch"] if settings["batch"] is not None else self.batch.copy()

worker_config = WorkerConfig(
name=worker_name,
args=settings.get("args", ""),
queues=set(self.get_queue_list(settings["steps"])),
machines=settings.get("machines", []),
env=full_env,
overlap=overlap,
nodes=settings.get("nodes", None),
batch=BatchConfig.from_dict(batch_settings),
)

worker_params = {"name": worker_name, "config": config, "env": full_env, "overlap": overlap}
worker_instance = worker_factory.create(self.merlin["resources"]["task_server"], worker_params)
worker_instance = worker_factory.create(self.merlin["resources"]["task_server"], worker_config)
workers.append(worker_instance)
LOG.debug(f"Created CeleryWorker object for worker '{worker_name}'.")

Expand Down
Loading