From 919c441611d20be2a88596a550cd2e017686ce31 Mon Sep 17 00:00:00 2001 From: Fabian Gruber Date: Fri, 6 Dec 2024 17:04:45 +0100 Subject: [PATCH] feat: add 'messaging.queue.length' metric --- src/Queue/Server.php | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/Queue/Server.php b/src/Queue/Server.php index ce6b573..bd0ff70 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -8,6 +8,7 @@ use Utopia\Hook; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Telemetry\Adapter\None as NoTelemetry; +use Utopia\Telemetry\Gauge; use Utopia\Telemetry\Histogram; use Utopia\Validator; @@ -69,6 +70,7 @@ class Server private Histogram $jobWaitTime; private Histogram $processDuration; + private Gauge $queueLength; /** * Creates an instance of a Queue server. @@ -161,6 +163,8 @@ public function setTelemetry(Telemetry $telemetry) null, ['ExplicitBucketBoundaries' => [0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10]] ); + + $this->queueLength = $telemetry->createGauge('messaging.queue.length'); } /** @@ -219,6 +223,11 @@ public function start(): self call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook)); } while (true) { + $this->queueLength->record( + $this->adapter->connection->listSize("{$this->adapter->namespace}.queue.{$this->adapter->queue}"), + ['queue' => $this->adapter->queue] + ); + /** * Waiting for next Job. */ @@ -239,7 +248,7 @@ public function start(): self Console::info("[Job] Received Job ({$message->getPid()})."); $waitDuration = microtime(true) - $message->getTimestamp(); - $this->jobWaitTime->record($waitDuration); + $this->jobWaitTime->record($waitDuration, ['queue' => $this->adapter->queue]); /** * Move Job to Jobs and it's PID to the processing list. @@ -327,7 +336,7 @@ public function start(): self } } finally { $processDuration = microtime(true) - $receivedAtTimestamp; - $this->processDuration->record($processDuration); + $this->processDuration->record($processDuration, ['queue' => $this->adapter->queue]); /** * Remove Job from Processing.