Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ __pycache__
htmlcov/
dive.log
.vscode
venv
47 changes: 47 additions & 0 deletions celery_exporter/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@

import click
from .core import CeleryExporter
from prometheus_client import Histogram

__VERSION__ = (2, 0, 0)

LOG_FORMAT = "[%(asctime)s] %(name)s:%(levelname)s: %(message)s"
DEFAULT_BUCKETS = ",".join(map(str, Histogram.DEFAULT_BUCKETS))


@click.command(context_settings={"auto_envvar_prefix": "CELERY_EXPORTER"})
Expand Down Expand Up @@ -62,6 +64,22 @@
allow_from_autoenv=False,
help="Periodically enable Celery events.",
)
@click.option(
"--runtime-histogram-bucket",
type=str,
show_default=True,
show_envvar=True,
default=DEFAULT_BUCKETS,
help="Default buckets of runtime historgram in seconds",
)
@click.option(
"--latency-histogram-bucket",
type=str,
show_default=True,
show_envvar=True,
default=DEFAULT_BUCKETS,
help="Default buckets of latency historgram in seconds.",
)
@click.option(
"--tz", type=str, allow_from_autoenv=False, help="Timezone used by the celery app."
)
Expand All @@ -76,6 +94,8 @@ def main(
namespace,
transport_options,
enable_events,
runtime_histogram_bucket,
latency_histogram_bucket,
tz,
verbose,
): # pragma: no cover
Expand All @@ -101,13 +121,40 @@ def main(
)
sys.exit(1)

def decode_buckets(buckets_list):
return tuple(map(float, buckets_list.split(",")))

try:
runtime_histogram_bucket = decode_buckets(runtime_histogram_bucket)
except ValueError:
print(
"Error parsing runtime_histogram_bucket options '{}' must be a comma sperated list of numbers".format(
runtime_histogram_bucket
),
file=sys.stderr,
)
sys.exit(1)

try:
latency_histogram_bucket = decode_buckets(latency_histogram_bucket)
except ValueError:
print(
"Error parsing latency_histogram_bucket options '{}' must be a comma sperated list of numbers".format(
latency_histogram_bucket
),
file=sys.stderr,
)
sys.exit(1)

celery_exporter = CeleryExporter(
broker_url,
listen_address,
max_tasks,
namespace,
transport_options,
enable_events,
runtime_histogram_bucket,
latency_histogram_bucket,
)
celery_exporter.start()

Expand Down
13 changes: 12 additions & 1 deletion celery_exporter/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,36 @@ def __init__(
namespace="celery",
transport_options=None,
enable_events=False,
runtime_histogram_bucket=prometheus_client.Histogram.DEFAULT_BUCKETS,
latency_histogram_bucket=prometheus_client.Histogram.DEFAULT_BUCKETS,
):
self._listen_address = listen_address
self._max_tasks = max_tasks
self._namespace = namespace
self._enable_events = enable_events
self._runtime_histogram_bucket = runtime_histogram_bucket
self._latency_histogram_bucket = latency_histogram_bucket

self._app = celery.Celery(broker=broker_url)
self._app.conf.broker_transport_options = transport_options or {}

def start(self):

setup_metrics(self._app, self._namespace)
setup_metrics(
self._app,
self._namespace,
self._runtime_histogram_bucket,
self._latency_histogram_bucket,
)

self._start_httpd()

t = TaskThread(
app=self._app,
namespace=self._namespace,
max_tasks_in_memory=self._max_tasks,
runtime_histogram_bucket=self._runtime_histogram_bucket,
latency_histogram_bucket=self._latency_histogram_bucket,
)
t.daemon = True
t.start()
Expand Down
18 changes: 0 additions & 18 deletions celery_exporter/metrics.py

This file was deleted.

66 changes: 61 additions & 5 deletions celery_exporter/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,39 @@

import celery
import celery.states
import prometheus_client

from .metrics import TASKS, TASKS_RUNTIME, LATENCY, WORKERS
from celery_state import CeleryState
from .utils import get_config

TASKS = None
WORKERS = None
TASKS_RUNTIME = None
LATENCY = None


class TaskThread(threading.Thread):
"""
MonitorThread is the thread that will collect the data that is later
exposed from Celery using its eventing system.
"""

def __init__(self, app, namespace, max_tasks_in_memory, *args, **kwargs):
def __init__(
self,
app,
namespace,
max_tasks_in_memory,
runtime_histogram_bucket,
latency_histogram_bucket,
*args,
**kwargs
):
self._app = app
self._namespace = namespace
self.log = logging.getLogger("task-thread")
self._state = CeleryState(max_tasks_in_memory=max_tasks_in_memory)
self._runtime_histogram_bucket = runtime_histogram_bucket
self._latency_histogram_bucket = latency_histogram_bucket
self._known_states = set()
self._known_states_names = set()
self._tasks_started = dict()
Expand Down Expand Up @@ -56,12 +72,22 @@ def _monitor(self): # pragma: no cover
recv = self._app.events.Receiver(
conn, handlers={"*": self._process_event}
)
setup_metrics(self._app, self._namespace)
setup_metrics(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does setup_metrics need to be called again? Same below. It was already called in core.py::start

self._app,
self._namespace,
self._runtime_histogram_bucket,
self._latency_histogram_bucket,
)
self.log.info("Start capturing events...")
recv.capture(limit=None, timeout=None, wakeup=True)
except Exception:
self.log.exception("Connection failed")
setup_metrics(self._app, self._namespace)
setup_metrics(
self._app,
self._namespace,
self._runtime_histogram_bucket,
self._latency_histogram_bucket,
)
time.sleep(5)


Expand Down Expand Up @@ -109,11 +135,41 @@ def enable_events(self):
self._app.control.enable_events()


def setup_metrics(app, namespace):
def setup_metrics(app, namespace, task_buckets, latency_buckets):
"""
This initializes the available metrics with default values so that
even before the first event is received, data can be exposed.
"""
global TASKS
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would refactor this into a class or object to extend initialization ,instead of doing it here, but I wanted to limit changes, as it's my first PR for this project

global TASKS_RUNTIME
global LATENCY
global WORKERS

if TASKS == None:
TASKS = prometheus_client.Counter(
"celery_tasks_total",
"Number of task events.",
["namespace", "name", "state", "queue"],
)
if TASKS_RUNTIME == None:
TASKS_RUNTIME = prometheus_client.Histogram(
"celery_tasks_runtime_seconds",
"Task runtime.",
["namespace", "name", "queue"],
buckets=task_buckets,
)
if LATENCY == None:
LATENCY = prometheus_client.Histogram(
"celery_tasks_latency_seconds",
"Time between a task is received and started.",
["namespace", "name", "queue"],
buckets=latency_buckets,
)
if WORKERS == None:
WORKERS = prometheus_client.Gauge(
"celery_workers", "Number of alive workers", ["namespace"]
)

WORKERS.labels(namespace=namespace)
config = get_config(app)

Expand Down
8 changes: 7 additions & 1 deletion test/test_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import celery
import celery_exporter.monitor
from celery_exporter.core import CeleryExporter
from prometheus_client import Histogram

prom_http_server_mock = MagicMock(return_value=None)
setup_metrics_mock = MagicMock(return_value=None)
Expand Down Expand Up @@ -33,7 +34,10 @@ def setUp(self):
def test_setup_metrics(self):
self.cel_exp.start()
setup_metrics_mock.assert_called_with(
self.cel_exp._app, TestCeleryExporter.namespace
self.cel_exp._app,
TestCeleryExporter.namespace,
Histogram.DEFAULT_BUCKETS,
Histogram.DEFAULT_BUCKETS,
)

def test_http_server(self):
Expand All @@ -46,6 +50,8 @@ def test_task_thread(self):
self.cel_exp._app,
TestCeleryExporter.namespace,
TestCeleryExporter.max_tasks,
runtime_histogram_bucket=Histogram.DEFAULT_BUCKETS,
latency_histogram_bucket=Histogram.DEFAULT_BUCKETS,
)

def test_worker_thread(self):
Expand Down
15 changes: 12 additions & 3 deletions test/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from celery.events import Event
from celery.utils import uuid
from prometheus_client import REGISTRY
from prometheus_client import REGISTRY, Histogram
from unittest import TestCase
from unittest.mock import patch

Expand Down Expand Up @@ -36,7 +36,12 @@ def setUp(self):
"celery@12311847jsa2": {},
}
registered.return_value = {"celery@d6f95e9e24fc": [self.task, "trial"]}
setup_metrics(self.app, self.namespace) # reset metrics
setup_metrics(
self.app,
self.namespace,
task_buckets=Histogram.DEFAULT_BUCKETS,
latency_buckets=Histogram.DEFAULT_BUCKETS,
) # reset metrics

def test_initial_metric_values(self):
self._assert_task_states(celery.states.ALL_STATES, 0)
Expand Down Expand Up @@ -116,7 +121,11 @@ def test_tasks_events(self):
runtime = 234.5

m = TaskThread(
app=self.app, namespace=self.namespace, max_tasks_in_memory=self.max_tasks
app=self.app,
namespace=self.namespace,
max_tasks_in_memory=self.max_tasks,
runtime_histogram_bucket=Histogram.DEFAULT_BUCKETS,
latency_histogram_bucket=Histogram.DEFAULT_BUCKETS,
)

self._assert_task_states(celery.states.ALL_STATES, 0)
Expand Down