Skip to content

HighQueue API

Dale Wilson edited this page Feb 14, 2017 · 15 revisions

If you are looking for documentation on HighStages, click here

The HighQueue API consists of the public interfaces of four primary classes: Message, Connection, Consumer, and Producer.

Three additional classes, CreationParameters, WaitStrategy, and MemoryPool, are used to initialize the HighQueue.

###Start with a Connection A Connection provides access to a HighQueue. A Connection may be used to create and initialize a new HighQueue, or it may be used to attach to an existing HighQueue (presumably in shared memory.) Once the HighQueue is created or located, the Connection can be used to initialize Producer and Consumer clients.

####Step-by-step:

  • Construct a Connection object on the heap and assign it to a shared pointer (typedef: ConnectionPtr). No construction arguments are required.
  • If the Connection will be used to create a new HighQueue (either local or in shared memory):
    • Construct a WaitStrategy (or two.)
      • The Consumer WaitStrategy specifies how Consumers wait for messages,
      • The Producer WaitStrategy specifies how Producers will wait when the queue is full.
        • Note: the same strategy can be used for both the Consumer and the Producers.
    • Construct a CreationParameters object containing the WaitStrategys and other configuration information such as the number of messages that can be queued, the maximum size of a message, etc.
      • Note that for performance reasons HighQueue does not do any dynamic memory allocation. It must be configured properly at initialization time to achieve the best performance.
    • Create a MemoryPool to manage the memory used by Messages.
      • This is optional but strongly recommended for local HighQueues -- those that do not reside in Shared Memory.
    • Call Connection::createLocal() or Connection::createOrOpenShared() passing in the CreationParameters and the optional MemoryPool.
  • If the Connection will be used only to locate an existing HighQueue in shared memory no WaitStrategys or CreationParameters are needed.
    • Call Connection::openExistingShared();

####Example:

    WaitStrategy producerStrategy(spinCount, yieldCount); 
    WaitStrategy consumerStrategy(....);
    bool discardMessagesIfNoConsumer = false;
    CreationParameters parameters(producerStrategy, consumerStrategy, 
         discardMessagesIfNoConsumer, entryCount, messageBytes);
    auto memoryPool = std::make_shared<MemoryPool>(messageBytes, messageCount));
    auto connection = std::make_shared<Connection>
    connection->createLocal("HighQueue Name", parameters, memoryPool);   

The Connection may now be used to create a Consumer, one or more Producers, and the necessary Messages.

###To Add a Consumer

  • Construct a Consumer passing the ConnectionPtr as an argument.
  • Construct a Message passing the ConnectionPtr as an argument.
    • Most consumers will need only one Message which should be created at the time the consumer is being initialized. This Message will be reused for each message received.

####Example:

    Consumer consumer(connection);
    Message consumerMessage(connection);

###To Add One or More Producers

  • Construct a Producer passing the Connection as the construction argument.
  • Construct a Message passing the Connection as an argument.
    • Most producers will need only one Message object which should be created at the time the producer is being initialized. This Message object will be reused for each message being sent.

####Example:

    Producer producer(connection);
    Message producerMessage(connection);

##Populating and Publishing a Message

At any particular time the Producer's Message object owns a block of memory. The Producer client should populate the block of memory currently owned by its Message with the data to be sent, then call Producer::publish() to actually send the message.

When the publish() method returns, the Message will have a new, empty block of memory into which the Producer can write the next message to be published.

###Populating a message

There are several ways in which the Producer can populate the message depending on what type of data is being sent and where the Producer gets the data.

###Using emplaceBack() (placement construction) to send an object.

If the data to be delivered is actually an object that the Producer will construct, the most effective way to populate the Message is to use the Message::emplaceBack() method. This uses a placement new operation to construct the object in place.

[This is similar to the emplace_back() operaton on a std::vector, hence the name emplaceBack]

####A typical use might look like:

	while(! stopping)
   {
        auto myObject = producerMessage.emplaceBack<MyClass>(
                            construction arguments go here);
        myObject.addAdditionalInformationAsNecessary(additional info);
        producer.publish(producerMessage);
   }

For local (in-process) HighQueues, there are no restrictions on the type of object that can be passed using this technique.

Shared memory HighQueues impose some restrictions which are documented below.

Note that if the object to be sent is already present in memory not controlled by the Message, use the object's copy or move constructor to emplace the data into the Message.

###Reading complete messages directly into the Message's buffer.

If the data to be sent comes from a source that can read or compose a data into a caller-supplied buffer (for example a UDP socket), you can have the source operate directly into the Message's buffer by calling Message::getWritePosition() and Message::available() to find the location and length of the buffer.

####For example:

    while(! stopping)
    {
        auto buffer = producerMessage.getWritePosition();
        auto size = producerMessage.available();
        auto bytesRead = dataSource.read(buffer, size);
        producerMessage.setUsed(bytesRead);
        producer.publish(producerMessage);
    }

###Sending binary copyable data If the data to be sent is not in a suitable object for emplacing, but it can be safely copied via a binary copy the Message:appendBinaryCopy(pointer, size) operation can be used. Note any type of pointer is acceptable, as long as it points to a contiguous block of memory containing the data to be sent.

For example, suppose you have an std::string and you don't wish to copy construct it into the Message because even a placement new of a string can cause an expensive memory allocation.

####You could populate the message like this:

   while(! stopping)
   {
        const std::string & msg = getSomeDataToSend();
        producerMessage.appendBinaryCopy(msg.data(), msg.size());
        producer.publish(producerMessage);
   }

###Sending data from buffer(s) where the message boundaries do not match the buffer boundaries. i.e. data read from a TCP/IP socket or random-sized messages read from a file.

When the data is split, call appendBinaryCopy twice.

###Et cetera

Other techniques can be used to populate the Message.

For example fields can be appended to the buffer one-at-a-time in a safe manner. See the append* methods for details.

Access Data From a Received Message.

The Consumer may accept a message from the HighQueue by calling either:

  • Consumer::tryGetNext(Message &), or
  • Consumer::getNext(Message &)
    • If a message is available, these methods are identical.
    • If no message is available:
      • tryGetNext() returns immediately with a false result, whereas
      • getNext() will wait for the message to return using the ConsumerWaitStrategy that was specified when the HighQueue was created.
        • getNext() may also return false if the HighQueue is shutting down.

Use methods on the Message to access the information from the message.

When the consumer no longer needs the contents of a message, it can simply reuse the message for the next tryGetNext() or getNext() call.

The Message used by the Consumer in the tryGetNext() or getNext() method will have access to the same memory that was used by the Producer to publish the message (note: the Publisher no longer has access to this memory.)

Several techniques are available to allow the Consumer to safely access the data in a message

###Accessing message data as an application defined object.

The Message::get() method will return a reference to an object of the specified type residing in the buffer. This is the most effective way to access incoming data.

####Example:

   while(consumer.getNext(consumerMessage))
   {
        auto myObject = consumerMessage.get<MyClass>();
        myObject.doSomethingInteresting(); // or
        doSomethingInteresting(myObject);
        consumerMessage.delete<myObject>(); // optional: calls the objects destructor if necessary.
   }

###Accessing binary data

    while(consumer.getNext(consumerMessage))
    {
        auto bytePointer = consumerMessage.get();
        auto size = consumerMessage.getUsed();
        doSomethingInteresting(bytePointer, size);
        // no need to delete binary data, just be sure it is no longer needed
        // before the next call to consumer.getNext();
    }

###Et cetera Again other techniques are available. See Message::get() for ideas.

##Sharing a pool of memory between multiple HighQueues. Although the Connection has the ability to construct a MemoryPool in the same memory block as the HighQueue, this is intended for use with HighQueues that reside in shared memory. In that case having the memory in the same block as the HighQueue itself is important to ensure that all processes sharing the HighQueue have access to the message buffers.

For local (non-shared memory) the memory pool should be allocated separately and all HighQueues in the process should share the same pool. This avoids an issue at shutdown. Because the memory blocks get passed around as the HighQueus operate, it is quite likely that one HighQueue will actually "own" a Message buffer that was originally associated with another HighQueue. If the MemoryPool is embedded in the HighQueue block itself, it might be released while there are other references to it -- resulting in invalid memory accesses at shut down time. Using a common MemoryPool avoids these issues and allows the system to shut down cleanly.

##Restrictions on objects passed via shared memory TODO: Will be documented here RSN. As of the time of this writing shared memory HighQueues are disabled. The worked in the original proof-of-concept work, but support for them has not evolved along with other features of HighQueue. It will be easy to re-enable shared memory HighQueues when the dust settles from other development work. If you need them NOW contact me about sponsoring the work!

###API Notes

  • The producer may take as long as it needs to construct a message directly in the Message. If there are other producers using the same HighQueue they will be unaffected by the time it takes a particular producer to prepare its message for publication.
  • Clients lose access to information in a message once they use the Message object in a publish() or getNext() call. In particular pointers and references to the data contained in a message are invalided when the Message object is reused.
  • When a client is ready to exit, it simply lets the Message and Producer or Consumer objects go out of scope (or otherwise be deleted.) This cleans up the resources used by the client.
    • If the application still has a shared pointer to the Connection, the Connection can still be used to service additional clients.

Clone this wiki locally