-
Notifications
You must be signed in to change notification settings - Fork 11
Task distribution
Tasks (or update_requests) are distributed to workers via the main task queue (or global task queue). This queue (in fact, a system of multiple queues, as explained below) is currently implemented using RabbitMQ message broker.
The number of worker processes is configured at two places - NERD backend configuration (/etc/nerd/nerdd.yml)
and supervisord configuration (/etc/nerd/supervisord.conf.d/workers.ini).
Also, the RabbitMQ exchanges and queues must be configured properly before NERD starts
(using rmq_reconfigure.sh script, see below for details).
IMPORTANT: The number of workers must not be changed while the system is running.
To change the number of worker processes, the following process must be followed:
- Stop all processes generating tasks (i.e. primary input modules and updater).
- Wait until all tasks are processed (i.e. all
nerd-worker-#queues are empty). - Stop all workers.
- Change the
worker_processeskey in/etc/nerd/nerd.yml. - Change the
numprocskey in/etc/nerd/supervisord.conf.d/workers.ini. - Reload Apache (
systemctl reload httpd) - Delete and recreate queues in RabbitMQ using
/nerd/scripts/rmq_reconfigure.sh N, whereNis the new number of worker processes. - Reload supervisord config (
nerdctl update) and start all components again.
In case some error occurs during development/debugging and some tasks remain in the queues which need to be removed,
the rmq_purge_worker_queues.sh script can be used to do it.
Upon default installation of NERD, there is a RabbitMQ management interface running at
http://127.0.0.1:15672/ (default login: guest/guest, accessible only from localhost).
This interface can be used to monitor status of the RabbitMQ queues.
On the Python code level, this task distribution system is represented by the pair of classes called
TaskQueueWriter and TaskQueueReader.
Multiple system components can submit tasks (i.e. requests to perform one or more changes in a specific entity record)
into the main queue using TaskQueueWriter.
Worker processes register to receive the tasks using an instance of TaskQueueReader.
These two classes encapsulate all the communication with the broker server, so the rest of the application is
independent on the actual implementation used (i.e. RabbitMQ can be easily replaced by another system).
There are two RabbitMQ queues for each worker process in the system - a normal one and a priority one.
Most of the components submit tasks into the normal queue of given worker process
(process index is determined automatically by hash of entity ID inside the TaskQueueWriter),
only the UpdateManager.update() method within workers puts tasks into the priority queue.
The normal queue has a limited length (100 messages by default) and an attempt to submit a task into it may block
indefinitely.
The priority one is unlimited so the task submission never blocks (unless there is a connection problem).
These non-blocking priority queues are needed to prevent a deadlock.
The task submission via UpdateManager.update() is usually called as part of processing of another task.
If it blocks because there is no space in the queue, that task never finishes, which in turn means
no new tasks can be fetched from the queue and the whole program stops.
By default, the exchanges are named nerd-main-task-exchange and nerd-priority-task-exchange,
the queues are named nerd-worker-{} and nerd-worker-{}-pri (where {} is replaced by the index of the worker).
The whole queue architecture is depicted in the picture below.
Tasks are distributed to worker processes (and threads) in a deterministic way based on the hash of the identifier of the entity which is to be updated.
The index of the worker process is determined as md5(key) % N, where key = entity_type + ':' + entity_id and
N is the number of worker processes in the system.
The index is computed automatically by TaskQueueWriter and passed to RabbitMQ as the routing_key of the message.
With this approach, each entity record is always processed by a specific worker, so there is no need to lock records or otherwise synchronize the workers. Also, all the tasks related to a single entity are guaranteed to be processed in the same order as they were written to the main queue.
Since the routing key must be given when a task is written, each system component writing tasks must know how many
worker processes are there - this is defined in the configuration (worker_processes key).
There always must be this number of worker processes running.
Each worker must know its index, which is passed as a program parameter.
The main/global queue system (RabbitMQ) only distributes the tasks among worker processes. But each process have multiple worker threads. The distribution to threads within a process is done in a similar way -- using local queues (one for each thread) and distribution based on the hash of entity ID. See the right part of the picture above.
The block labeled as QueueReader is an instance of TaskQueueReader class with UpdateManager._distribute_task()
method set as the callback to call upon receiving a task from the global queue.
This method determines the index of the worker thread (as hash(etype+id) mod num_threads) and puts the task into its
local queue.
Each thread reads tasks from its queue and processes them.
If, during task processing, an update of another entity record is required, a new task is issued by calling
UpdateManager.update().
This method passes the task to an instance of TaskQueueWriter,
which determines the target worker process index and put the task to the global queue.
Each task received from RabbitMQ must be acknowledged when it is successfully processed (unacknowledged tasks are kept in the queue and re-delivered in case the worker disconnects (e.g. crashes) and connects again).
Tasks are being acknowledged automatically by UpdateManager when they are read from local queue, just before they
are passed for processing.
Therefore, in case of unexpected worker shutdown, the tasks "pre-fetched" in local queues are still unacknowledged and will be re-delivered after recovery. However, all the tasks currently being processed are lost (this is because we do not know in which phase the processing was interrupted and therefore if some changes were already made; also, one of these tasks may be the cause of the crash, so we don't want it to be re-delivered).
TODO: move to separate page
Each task is a 3-tuple specifying the entity record to work with and the list of operations to perform:
-
etype: Type of the entity, e.g.'ip'or'asn'. -
eid: Identifier of the entity, e.g.'1.2.3.4'or2852 -
requested_changes: A list of n-tuples specifying the operations to perform. Each tuple has the following format:(operation, key, [value/params, ...])- Generally they usually mean to perform given operation (e.g.
set,add,add_to_set) with the value of given attribute (key) in the record. - A special operation type is
event, which does not change any attribute, but can be used to trigger handler functions in the modules. The second parameter is then the event name (which should start with!by convention). - If the operation is prefixed by
*, it is weak, which means to not create the entity record if it does not exist (default is to create a new one in such case). - Full documentation is currently only available as a comment in the beginning of the update_manager.py file.
In the queueing system, the tasks are encoded as JSON strings (witch a special way of encoding and decoding datetime and timedelta objects).