Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/actor-context-api.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ This function allows to unsubsribe from event-stream or such.
Additional options:

- `:queue-size` limits the message-box's size. By default, it is unbounded.
- `:mbox-type` specify a custom message-box type similar as can be done is dispatcher config.
- `:mbox-type` specify a custom message-box type similar as can be done is dispatcher config. It also could be a function which accept a `:max-queue-size` argument for `:pinned` dispatcher or `:max-queue-size` and `:dispatcher` arguments for other types of dispatchers.
It must be a subtype of `mesgb:message-box/dp`.
"))

Expand Down
53 changes: 37 additions & 16 deletions src/actor-context.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,43 @@ The `actor-system` and the `actor` itself are composed of an `actor-context`."))
old-actors)))))

(defun %message-box-for-dispatcher-id (context dispatcher-id queue-size mbox-type)
(case dispatcher-id
(:pinned (make-instance 'mesgb:message-box/bt
:max-queue-size queue-size))
(otherwise (let* ((asys (system context))
(sys-config (asys:config asys))
(disp-config (%get-dispatcher-config sys-config dispatcher-id))
(dispatcher (%get-shared-dispatcher asys dispatcher-id)))
(unless dispatcher
(error (format nil "No such dispatcher identifier '~a' exists!" dispatcher-id)))
;; if dispatcher exists, the config does, too.
(let ((eff-mbox-type (if mbox-type
mbox-type
(getf disp-config :mbox-type 'mesgb:message-box/dp))))
(make-instance eff-mbox-type
:dispatcher dispatcher
:max-queue-size queue-size))))))
(let* ((asys (system context))
(sys-config (asys:config asys))
(disp-config (%get-dispatcher-config sys-config dispatcher-id))
(default-mbox-type (case dispatcher-id
(:pinned 'mesgb:message-box/bt)
(otherwise
'mesgb:message-box/dp)))
(eff-mbox-type (if mbox-type
mbox-type
(getf disp-config :mbox-type
default-mbox-type))))
(case dispatcher-id
(:pinned
(etypecase eff-mbox-type
(function
(let ((result (funcall eff-mbox-type :max-queue-size queue-size)))
(check-type result mesgb::message-box-base)
result))
(symbol
(make-instance eff-mbox-type
:max-queue-size queue-size))))
(otherwise
(let ((dispatcher (%get-shared-dispatcher asys dispatcher-id)))
(unless dispatcher
(error (format nil "No such dispatcher identifier '~a' exists!" dispatcher-id)))
;; if dispatcher exists, the config does, too.
(etypecase eff-mbox-type
(function
(let ((result (funcall eff-mbox-type
:dispatcher dispatcher
:max-queue-size queue-size)))
(check-type result mesgb::message-box-base)
result))
(symbol
(make-instance eff-mbox-type
:dispatcher dispatcher
:max-queue-size queue-size))))))))

(defun %find-actor-by-name (context name)
(find-if (lambda (a)
Expand Down
56 changes: 56 additions & 0 deletions tests/actor-context-test.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@
(is (not (eq cut (act:context actor))))
(is (not (null (ac:system (act:context actor))))))))


(defclass my-mbox-type (mesgb:message-box/dp) ())

(defclass my-bt-mbox-type (mesgb:message-box/bt) ())


(test actor-of--shared--custom-mbox-type
(with-fixture test-system ()
(let* ((cut (make-actor-context system))
Expand Down Expand Up @@ -119,6 +124,57 @@
))
(ac:shutdown system))))


(test actor-of--custom-dispatcher-with-mbox-made-by-func
(let ((system))
(unwind-protect
(progn
(setf system (asys:make-actor-system `(:dispatchers
(:foo
(:workers 0
:mbox-type ,(lambda (&key dispatcher max-queue-size &allow-other-keys)
(make-instance 'my-mbox-type
:dispatcher dispatcher
:max-queue-size max-queue-size)))))))
(let* ((cut (make-actor-context system))
(actor (actor-of cut :receive (lambda ())
:dispatcher :foo)))
(is (not (null actor)))
(is (typep (act-cell:msgbox actor) 'my-mbox-type))
(is (eq :foo (slot-value (mesgb::dispatcher (act-cell:msgbox actor)) 'disp::identifier)))))
(ac:shutdown system))))


(test actor-of--pinned-dispatcher-with-custom-mbox-type
(let ((system))
(unwind-protect
(progn
(setf system (asys:make-actor-system))
(let* ((cut (make-actor-context system))
(actor (actor-of cut :receive (lambda ())
:dispatcher :pinned
:mbox-type 'my-bt-mbox-type)))
(is (not (null actor)))
(is (typep (act-cell:msgbox actor) 'my-bt-mbox-type))))
(ac:shutdown system))))


(test actor-of--pinned-dispatcher-with-mbox-made-by-func
(let ((system))
(unwind-protect
(progn
(setf system (asys:make-actor-system))
(let* ((cut (make-actor-context system))
(actor (actor-of cut :receive (lambda ())
:dispatcher :pinned
:mbox-type (lambda (&key max-queue-size &allow-other-keys)
(make-instance 'my-bt-mbox-type
:max-queue-size max-queue-size)))))
(is (not (null actor)))
(is (typep (act-cell:msgbox actor) 'my-bt-mbox-type))))
(ac:shutdown system))))


(test actor-of--err-custom-dispatcher--unknown-mbox-type
"Tests creating an actor with a custom shared dispatcher."
(let ((system))
Expand Down