Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dub.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"copyright": "Copyright © 2025, Tristan B. Velloza Kildaire",
"dependencies": {
"gogga": ">=3.2.0",
"niknaks": ">=0.25.1"
"niknaks": ">=0.26.0"
},
"description": "Simple waitable-queue management",
"license": "LGPL-2.0-only",
Expand Down
55 changes: 42 additions & 13 deletions source/qix/manager.d
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ import gogga.mixins;
*/
private enum NEWQUEUE_MAX_ITER = 1000;

import qix.exceptions;

public final class ManagerException : QixException
{
private this(string m)
{
super(m);
}
}

/**
* A queue manager
*/
Expand Down Expand Up @@ -194,22 +204,25 @@ public template Manager(Item)
return id in this._q;
}

private Result!(QueueType*, string) getQueue(QueueKey id)
private Result!(QueueType*, QixException) getQueue(QueueKey id)
{
auto q = getQueue0(id);
if(q is null)
{
return error!(string, QueueType*)
return error!(QixException, QueueType*)
(
format
new ManagerException
(
"Could not find a queue with id %d",
id
format
(
"Could not find a queue with id %d",
id
)
)
);
}

return ok!(QueueType*, string)(q);
return ok!(QueueType*, QixException)(q);
}

// TODO: In future version let's add:
Expand All @@ -219,19 +232,35 @@ public template Manager(Item)
// 6. wait(QueueKey, Duration)
//

public Result!(bool, string) receive(QueueKey id, Item item)
public Result!(bool, QixException) receive(QueueKey id, Item item)
{
auto q_r = getQueue(id);
if(!q_r)
{
return error!(string, bool)(q_r.error());
return error!(QixException, bool)(q_r.error());
}

auto q = q_r.ok();
return ok!(bool, string)(q.receive(item));
return ok!(bool, QixException)(q.receive(item));
}


public Result!(Item, QixException) wait(QueueKey id)
{
return wait(id, Duration.zero);
}

import std.datetime : Duration;
public Result!(Item, QixException) wait(QueueKey id, Duration timeout)
{
auto q_r = getQueue(id);
if(!q_r)
{
return error!(QixException, Item)(q_r.error());
}

auto q = q_r.ok();
return q.wait(timeout);
}
}
}

Expand All @@ -253,7 +282,7 @@ unittest
}

// queue manager for queues that hold messages
auto m = new Manager!(Message);
auto m = new Manager!(Message)();

// no queues present
assert(m.removeQueue(0) == false);
Expand All @@ -273,8 +302,8 @@ unittest
// we won't block as the messages are already arrived
Message m1_in = Message("First message");
Message m2_in = Message("Second message");
assert(m.receive(q1.id(), m1_in)); // should not be rejected
assert(q2.receive(m2_in)); // should not be rejected
assert(m.receive(q1.id(), m1_in)); // (indirect usage via manager) should not be rejected
assert(q2.receive(m2_in)); // (direct usage via queue itself) should not be rejected
assert(q1.wait() == m1_in); // should be the same message we sent in
assert(q2.wait() == m2_in); // should be the same message we sent in

Expand Down
51 changes: 23 additions & 28 deletions source/qix/queue.d
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import core.sync.mutex : Mutex;
import core.sync.condition : Condition;
import std.datetime : Duration;
import qix.exceptions;
import niknaks.functional : Result, ok, error;

import gogga.mixins;

Expand Down Expand Up @@ -71,7 +72,7 @@ public template Queue(Item)
* id = the id
* ap = admittance policy
*/
package this(QueueKey id, AP ap)
package this(QueueKey id, AP ap) @safe
{
this._id = id;
this._l = new Mutex();
Expand All @@ -86,7 +87,7 @@ public template Queue(Item)
* Params:
* id = the id
*/
package this(QueueKey id)
package this(QueueKey id) @safe
{
this(id, null);
}
Expand All @@ -96,7 +97,7 @@ public template Queue(Item)
*
* Returns: the id
*/
public QueueKey id()
public QueueKey id() @safe
{
return this._id;
}
Expand Down Expand Up @@ -161,23 +162,29 @@ public template Queue(Item)
*/
public Item wait()
{
return wait(Duration.zero());
auto res = wait(Duration.zero());
//sanity: only way an error is if timed out
// but that should not be possible with
// a timeout of 0
assert(res.is_okay());
return res.ok();
}

/**
* Blocks up until the timeout for an
* item to become available for dequeuing.
*
* However, if the timeout is reached
* then an exception is thrown.
* then an exception is returned.
*
* Params:
* timeout = the timeout
* Throws: `TimeoutException` if the
* timeout is reached
* Returns: the dequeued item
*
* Returns: a `Result` containing the
* the dequeued item or a `QixException`
* if the timeout was exceeded
*/
public Item wait(Duration timeout)
public Result!(Item, QixException) wait(Duration timeout)
{
this._l.lock();

Expand All @@ -193,7 +200,7 @@ public template Queue(Item)
if(early_return)
{
DEBUG("early return");
return pop();
return ok!(Item, QixException)(pop());
}

// then no timeout
Expand All @@ -213,13 +220,12 @@ public template Queue(Item)

if(!in_time)
{
// todo: throw exception here
throw new TimeoutException(); // todo: log time taken
return error!(QixException, Item)(new TimeoutException()); // todo: log time taken
}
}

// pop single item off
return pop();
return ok!(Item, QixException)(pop());
}

// mt: assumes lock held
Expand All @@ -241,7 +247,7 @@ public template Queue(Item)
*
* Returns: the count
*/
public size_t size()
public size_t size() // TODO: Make safe justd ebug that is bad
{
this._l.lock();

Expand Down Expand Up @@ -345,20 +351,9 @@ unittest

// wait with timeout and knowing nothing will
// be enqueued
try
{
q.wait(dur!("seconds")(1));
assert(false);
}
catch(TimeoutException e)
{
assert(true);
}
catch(Exception e)
{
assert(false);
}

auto res = q.wait(dur!("seconds")(1));
assert(res.is_error());
assert(cast(TimeoutException)res.error());
}

// test admit policy
Expand Down