From 46aae6b07ab4cb3226b553f32bfcf4b8e0bddc92 Mon Sep 17 00:00:00 2001 From: Alexander Artemenko Date: Wed, 15 Jan 2025 01:08:31 +0000 Subject: [PATCH 1/3] Now message-box/bt class will restart thread if it was aborted because of a non-local exit from processing loop. I've measured performance before these changes and after them and didn't notice significant changes. Results of the benchmark along with the baseline results are available in this gist: https://gist.github.com/svetlyak40wt/a1097ab7d501087ca366d5addb410ebe --- bench.lisp | 71 ++++++++++++++++++----- sento.asd | 5 +- src/mbox/message-box.lisp | 71 +++++++++++++++++++---- tests/message-box-test.lisp | 111 ++++++++++++++++++++++++++++++++++++ tests/test-utils.lisp | 48 ++++++++++++++++ 5 files changed, 279 insertions(+), 27 deletions(-) create mode 100644 tests/message-box-test.lisp create mode 100644 tests/test-utils.lisp diff --git a/bench.lisp b/bench.lisp index ef4c81e..ccc018f 100644 --- a/bench.lisp +++ b/bench.lisp @@ -92,7 +92,8 @@ (wait-if-queue-larger-than 10000 wait-if-queue-larger-than-given-p) (duration 10) (num-iterations 60) - (load-threads 8)) + (load-threads 8) + (time-out nil)) (log:config :warn) @@ -124,7 +125,8 @@ :async-ask-p async-ask-p :num-shared-workers num-shared-workers :queue-size queue-size - :wait-if-queue-larger-than wait-if-queue-larger-than)) + :wait-if-queue-larger-than wait-if-queue-larger-than + :time-out time-out)) (force-output) (with-timing (num-iterations @@ -255,33 +257,72 @@ (defun run-all (&key - (num-iterations 10) - (duration 10)) + (num-iterations 60) + (duration 10) + (queue-size 100) + (time-out 3) + &aux (started-at (get-internal-real-time))) (run-benchmark :num-iterations num-iterations - :duration duration) + :duration duration + :with-reply-p nil + :async-ask-p nil) - (format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p nil)) (run-benchmark :num-iterations num-iterations :duration duration - :with-reply-p t :async-ask-p nil) + :with-reply-p t + :async-ask-p nil) + + (run-benchmark :num-iterations num-iterations + :duration duration + :with-reply-p t + :async-ask-p t) + + + (format t "With queue size limited to ~A:~2%" + queue-size) - (format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p t)) (run-benchmark :num-iterations num-iterations :duration duration - :with-reply-p t :async-ask-p t) + :with-reply-p nil + :async-ask-p nil + :queue-size queue-size) - (format t "Running ~A:~%" '(run-benchmark :queue-size 100)) (run-benchmark :num-iterations num-iterations :duration duration - :queue-size 100) + :with-reply-p t + :async-ask-p nil + :queue-size queue-size) - (format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p nil :queue-size 100)) (run-benchmark :num-iterations num-iterations :duration duration - :with-reply-p t :async-ask-p nil :queue-size 100) + :with-reply-p t + :async-ask-p t + :queue-size queue-size) + + + (format t "With time-out ~A:~2%" + time-out) - (format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p t :queue-size 100)) (run-benchmark :num-iterations num-iterations :duration duration - :with-reply-p t :async-ask-p t :queue-size 100)) + :with-reply-p nil + :async-ask-p nil + ;; This should not make sense for with-reply-p = nil + :time-out time-out) + + (run-benchmark :num-iterations num-iterations + :duration duration + :with-reply-p t + :async-ask-p nil + :time-out time-out) + + (run-benchmark :num-iterations num-iterations + :duration duration + :with-reply-p t + :async-ask-p t + :time-out time-out) + + (format t "All tests are performed in ~,2f seconds.~%" + (/ (- (get-internal-real-time) started-at) + internal-time-units-per-second))) diff --git a/sento.asd b/sento.asd index 6ccfc25..a746a68 100644 --- a/sento.asd +++ b/sento.asd @@ -65,6 +65,7 @@ :author "Manfred Bergmann" :depends-on ("sento" "fiveam" + "serapeum" "lparallel" "cl-mock") :components ((:module "tests" @@ -95,7 +96,9 @@ (:file "actor-system-test") (:file "actor-tree-test") (:file "spawn-in-receive-test") - ))) + (:file "test-utils") + (:file "message-box-test" + :depends-on ("test-utils"))))) :description "Test system for sento" :perform (test-op (op c) (symbol-call :fiveam :run! (uiop:find-symbol* '#:test-suite diff --git a/src/mbox/message-box.lisp b/src/mbox/message-box.lisp index aaa2a16..81fa0ba 100644 --- a/src/mbox/message-box.lisp +++ b/src/mbox/message-box.lisp @@ -113,18 +113,41 @@ This is used to break the environment possibly captured as closure at 'submit' s (defclass message-box/bt (message-box-base) ((queue-thread :initform nil :documentation - "The thread that pops queue items.")) + "The thread that pops queue items.") + (thread-is-running-p :initform nil + :type boolean + :documentation + "Will be set to NIL if processing loop will be broken because of an error or a restart invocation.")) (:documentation "Bordeaux-Threads based message-box with a single thread operating on a message queue. This is used when the actor is created using a `:pinned` dispatcher type. There is a limit on the maximum number of actors/agents that can be created with this kind of queue because each message-box (and with that each actor) requires exactly one thread.")) + +(declaim (ftype (function (message-box/bt &key (:thread-name (or null string))) + (values &optional)) + start-thread)) + +(defun start-thread (msgbox &key thread-name) + (with-slots (name queue-thread thread-is-running-p) + msgbox + (flet ((run-processing-loop () + (setf thread-is-running-p t) + (unwind-protect + (message-processing-loop msgbox) + (setf thread-is-running-p + nil)))) + (setf queue-thread + (bt2:make-thread #'run-processing-loop + :name (or thread-name + (mkstr "message-thread-" name)))))) + (values)) + + (defmethod initialize-instance :after ((self message-box/bt) &key) - (with-slots (name queue-thread) self - (setf queue-thread (bt2:make-thread - (lambda () (message-processing-loop self)) - :name (mkstr "message-thread-" name)))) + (start-thread self) + (when (next-method-p) (call-next-method))) @@ -179,6 +202,24 @@ This function sets the result as `handler-result' in `item'. The return of this (bt2:condition-notify withreply-cvar))) (handler-fun))))) + +(declaim (ftype (function (message-box/bt) + (values &optional)) + ensure-thread-is-running)) + +(defun ensure-thread-is-running (msgbox) + (with-slots (queue-thread thread-is-running-p should-run) + msgbox + (when (and (not thread-is-running-p) + should-run) + ;; Just to be sure that thread is not alive: + (unless (bt2:thread-alive-p queue-thread) + (let ((thread-name (bt2:thread-name queue-thread))) + (log:warn "Restarting thread" thread-name) + (start-thread msgbox + :thread-name thread-name)))) + (values))) + (defmethod submit ((self message-box/bt) message withreply-p time-out handler-fun-args) "The `handler-fun-args` argument must contain a handler function as first list item. It will be apply'ed with the rest of the args when the message was 'popped' from queue." @@ -200,15 +241,21 @@ It will be apply'ed with the rest of the args when the message was 'popped' from :time-out time-out :handler-fun-args handler-fun-args :handler-result 'no-result))) - (log:trace "~a: withreply: waiting for arrival of result..." (name msgbox)) + (bt2:with-lock-held (withreply-lock) (log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item) (queue:pushq queue push-item) - - (if time-out - (wait-and-probe-for-msg-handler-result msgbox push-item) - (bt2:condition-wait withreply-cvar withreply-lock))) - + (ensure-thread-is-running msgbox) + + (log:trace "~a: withreply: waiting for arrival of result..." (name msgbox)) + + (unless (bt2:condition-wait withreply-cvar withreply-lock + :timeout time-out) + (log:warn "~a: time-out elapsed but result not available yet!" (name msgbox)) + (setf (slot-value push-item 'cancelled-p) t) + (error 'ask-timeout + :wait-time time-out))) + (with-slots (handler-result) push-item (log:trace "~a: withreply: result should be available: ~a" (name msgbox) handler-result) handler-result))) @@ -222,6 +269,7 @@ The submitting code has to await the side-effect and possibly handle a timeout." :handler-fun-args handler-fun-args))) (log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item) (queue:pushq queue push-item) + (ensure-thread-is-running msgbox) t)) (defmethod stop ((self message-box/bt) &optional (wait nil)) @@ -301,6 +349,7 @@ Returns the handler-result if `withreply-p' is eq to `T', otherwise the return i processed-messages dispatcher) self (incf processed-messages) + (let ((push-item (make-message-item/dp :message message :handler-fun-args handler-fun-args diff --git a/tests/message-box-test.lisp b/tests/message-box-test.lisp new file mode 100644 index 0000000..bbe1620 --- /dev/null +++ b/tests/message-box-test.lisp @@ -0,0 +1,111 @@ +(defpackage :sento.message-box-test + (:use :cl :fiveam :cl-mock :sento.actor :sento.future) + (:shadow #:! #:?) + (:import-from #:miscutils + #:assert-cond + #:await-cond + #:filter) + (:import-from #:timeutils + #:ask-timeout) + (:import-from #:sento.messageb + #:message-box/bt + #:submit + #:no-result + #:queue-thread + #:stop) + (:import-from #:sento.test-utils + #:parametrized-test) + (:import-from #:ac + #:actor-of)) + +(in-package :sento.message-box-test) + +(def-suite message-box-tests + :description "message-box tests" + :in sento.tests:test-suite) + +(in-suite message-box-tests) + + +(defun wait-while-thread-will-die (msgbox &key (timeout 10)) + (let ((wait-until (+ (get-internal-real-time) (* timeout + internal-time-units-per-second)))) + (with-slots (queue-thread) + msgbox + (loop :while (bt2:thread-alive-p queue-thread) + :do (sleep 0.1) + (when (< wait-until + (get-internal-real-time)) + (error "Thread didn't die in ~A seconds." + timeout)))))) + + +(parametrized-test bt-box-resurrects-thread-after-abort-if-handler-catches-all-signals + ((withreply-p timeout) + (nil nil) + (t 1) + (t nil)) + + "Simulates a situation when error has happened during message processing, and ABORT restart was invoked. + Usually this kill a thread, but here we ensure that by the thread is resurrected when we submit a + subsequent message." + + (flet ((kill-by-restart-invoke (msg) + (declare (ignore msg)) + (handler-case + ;; This way we are simulating that the user choose + ;; an ABORT restart in the IDE during debug session: + (handler-bind ((serious-condition #'abort)) + (error "Die, thread, die!")) + ;; This part the same as error handling code in the + ;; SENTO.ACTOR-CELL:HANDLE-MESSAGE function: + ;; + ;; TODO: t was used to check if it is able to + ;; catch stack unwinding because of INVOKE-RESTART, + ;; but it can't. + (t (c) + (log:error "error condition was raised: ~%~a~%" + c) + (cons :handler-error c))))) + + (let ((box (make-instance 'message-box/bt + :name "foo"))) + (unwind-protect + (progn + (let ((first-reply + (submit box "The Message" + t + ;; Don't wait for result here, because we are + ;; intentionally raise error here and will never + ;; return a result: + nil + (list #'kill-by-restart-invoke)))) + (is (equal first-reply + 'no-result))) + + (wait-while-thread-will-die box) + + (is (not + (bt2:thread-alive-p + (slot-value box 'queue-thread)))) + + (let ((result (handler-case + (submit box "The Message" + withreply-p + timeout + (list (lambda (msg) + (reverse msg)))) + (ask-timeout () + :timeout)))) + + (cond + (withreply-p + (is (string= "egasseM ehT" result))) + (t + (is (eql result t))))) + + (is (bt2:thread-alive-p + (slot-value box 'queue-thread)))) + + ;; Cleanup a thread: + (stop box t))))) diff --git a/tests/test-utils.lisp b/tests/test-utils.lisp new file mode 100644 index 0000000..026987e --- /dev/null +++ b/tests/test-utils.lisp @@ -0,0 +1,48 @@ +(defpackage #:sento.test-utils + (:use #:cl) + (:import-from #:serapeum + #:eval-always) + (:import-from #:alexandria + #:parse-body) + (:export #:parametrized-test)) +(in-package #:sento.test-utils) + + + +(eval-always + (defun generate-test-form (base-test-name parameter-names parameters docstring body-form) + (let* ((test-name-str (format nil + "~A-[~{~A=~S~^ ~}]" + base-test-name + (loop :for name :in parameter-names + :for value :in parameters + :appending (list name value)))) + (test-name (intern test-name-str)) + (bindings (loop :for name :in parameter-names + :for value :in parameters + :collect (list name value)))) + `(5am:test ,test-name + ,docstring + (let ,bindings + ,@body-form))))) + + +(defmacro parametrized-test (name ((&rest parameter-names) &rest parameter-tuples) &body body) + (multiple-value-bind (forms decls docstring) + (parse-body body :documentation t :whole name) + (let* ((docstring (or docstring "")) + (body-forms (append decls forms))) + + (let ((tests (loop :for parameters :in parameter-tuples + :collect (generate-test-form name parameter-names parameters docstring body-forms)))) + `(progn + ;; If somebody has changed parameters, we need to remove obsolte tests from the 5AM test registry. + (loop :with prefix-to-search := ,(format nil "~A-" name) + :for candidate-name in (5am:test-names) + :for candidate-name-str := (symbol-name candidate-name) + :when (and (serapeum:length<= prefix-to-search candidate-name-str) + (string= (subseq candidate-name-str 0 (length prefix-to-search)) + prefix-to-search)) + :do (5am:rem-test candidate-name)) + ,@tests))))) + From c337569c118324d97d67c8e8e02f2f09c27868b2 Mon Sep 17 00:00:00 2001 From: Alexander Artemenko Date: Thu, 30 Jan 2025 12:52:42 +0300 Subject: [PATCH 2/3] Added docstring. --- tests/test-utils.lisp | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/tests/test-utils.lisp b/tests/test-utils.lisp index 026987e..7d552ff 100644 --- a/tests/test-utils.lisp +++ b/tests/test-utils.lisp @@ -28,6 +28,35 @@ (defmacro parametrized-test (name ((&rest parameter-names) &rest parameter-tuples) &body body) + "Generates a separate tests for each parameter combination. + + - NAME is the prefix for all tests in the group. The rest of each test name consists of parameters and values. + - PARAMETER-NAMES should be a list of symbolic names of variables to be bound during BODY execution. + - PARAMETER-TUPLES should be a list of lists of values to be bound to variables given in PARAMETER-NAMES. + + Example: + + (parametrized-test bt-box-test + ((withreply-p timeout) + (nil nil) + (t 1) + (t nil)) + + (do-something with-reply-p timeout)) + + This form will be expanded to the code which will remove all 5AM tests starting with BT-BOX-TEST- + and then will create 3 tests like this one: + + + (test |BT-BOX-TEST-[WITHREPLY-P=T TIMEOUT=1]| + (let ((withreply-p t) (timeout 1)) + (do-something with-reply-p timeout))) + + As you can see, this test binds WITHREPLY-P and TIMEOUT variables to a values given in the second row of PARAMETER-TUPLES. + + Name of each test will include parameter variables for this test. This way it will be easy to tell which parameter combination + fails. +" (multiple-value-bind (forms decls docstring) (parse-body body :documentation t :whole name) (let* ((docstring (or docstring "")) From 0afe862b11228c81d7c15baac20f0941c105c2c0 Mon Sep 17 00:00:00 2001 From: Alexander Artemenko Date: Thu, 30 Jan 2025 12:57:28 +0300 Subject: [PATCH 3/3] Replaced depends-on with serial. --- sento.asd | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sento.asd b/sento.asd index a746a68..6a58d83 100644 --- a/sento.asd +++ b/sento.asd @@ -69,6 +69,7 @@ "lparallel" "cl-mock") :components ((:module "tests" + :serial t :components ((:file "all-test") (:file "miscutils-test") @@ -97,8 +98,7 @@ (:file "actor-tree-test") (:file "spawn-in-receive-test") (:file "test-utils") - (:file "message-box-test" - :depends-on ("test-utils"))))) + (:file "message-box-test")))) :description "Test system for sento" :perform (test-op (op c) (symbol-call :fiveam :run! (uiop:find-symbol* '#:test-suite