Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/Queue/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Utopia\Hook;
use Utopia\Telemetry\Adapter as Telemetry;
use Utopia\Telemetry\Adapter\None as NoTelemetry;
use Utopia\Telemetry\Gauge;
use Utopia\Telemetry\Histogram;
use Utopia\Validator;

Expand Down Expand Up @@ -69,6 +70,7 @@ class Server

private Histogram $jobWaitTime;
private Histogram $processDuration;
private Gauge $queueLength;

/**
* Creates an instance of a Queue server.
Expand Down Expand Up @@ -161,6 +163,8 @@ public function setTelemetry(Telemetry $telemetry)
null,
['ExplicitBucketBoundaries' => [0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10]]
);

$this->queueLength = $telemetry->createGauge('messaging.queue.length');

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the term 'messaging' here? will that cause confusion between the Messaging queue?

}

/**
Expand Down Expand Up @@ -219,6 +223,11 @@ public function start(): self
call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
}
while (true) {
$this->queueLength->record(
$this->adapter->connection->listSize("{$this->adapter->namespace}.queue.{$this->adapter->queue}"),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also track processing length?

It should be "{$this->adapter->namespace}.processing.{$this->adapter->queue}"

['queue' => $this->adapter->queue]
);

/**
* Waiting for next Job.
*/
Expand All @@ -239,7 +248,7 @@ public function start(): self
Console::info("[Job] Received Job ({$message->getPid()}).");

$waitDuration = microtime(true) - $message->getTimestamp();
$this->jobWaitTime->record($waitDuration);
$this->jobWaitTime->record($waitDuration, ['queue' => $this->adapter->queue]);

/**
* Move Job to Jobs and it's PID to the processing list.
Expand Down Expand Up @@ -327,7 +336,7 @@ public function start(): self
}
} finally {
$processDuration = microtime(true) - $receivedAtTimestamp;
$this->processDuration->record($processDuration);
$this->processDuration->record($processDuration, ['queue' => $this->adapter->queue]);

/**
* Remove Job from Processing.
Expand Down
Loading