diff --git a/src/drake/core.clj b/src/drake/core.clj index 8c03207..d5833b8 100644 --- a/src/drake/core.clj +++ b/src/drake/core.clj @@ -485,21 +485,35 @@ (when (not (realized? promise)) (deliver promise 0)))))) +(defn- post + [^com.google.common.eventbus.EventBus event-bus event] + (when event-bus (.post event-bus event))) + +(defn- sanitize-step + [step] + (dissoc step :function :promise :exception-promise)) + (defn- function-for-step "Returns an anonymous function that can be triggered in its own thread to execute a step. Each step delivers its own promise. Dependent steps will block on that promise. " - [parse-tree steps promises-indexed step] + [parse-tree event-bus steps promises-indexed step] (fn [] ; wait for parent promises in the tree promises to be delivered ; accumulate successful parent tasks into a sum : successful-parent-steps - (let [{:keys [promise deps exception-promise]} step] + (let [{:keys [promise deps exception-promise]} step + sanitized-step (sanitize-step step)] (try (let [successful-parent-steps (reduce + (map (fn [i] @(promises-indexed i)) deps))] (if (= successful-parent-steps (count deps)) - (attempt-run-step parse-tree step) + (try + (post event-bus (EventStepBegin sanitized-step)) + (attempt-run-step parse-tree step) + (finally (if (realized? (:exception-promise step)) + (post event-bus (EventStepError sanitized-step @(:exception-promise step))) + (post event-bus (EventStepEnd sanitized-step))))) (deliver promise 0))) (catch Exception e (deliver exception-promise e)) @@ -509,48 +523,35 @@ (defn- assoc-function "Associates a future (anonymous function) for each step" - [parse-tree steps] + [parse-tree event-bus steps] ; for quickly accessing promises via a map :index => :promise (let [promises-indexed (zipmap (map :index steps) (map :promise steps))] ; associate a :function on each step (map (fn [step] (assoc step :function - (function-for-step parse-tree steps promises-indexed step))) + (function-for-step parse-tree event-bus steps promises-indexed step))) steps))) -(defn- post - [^com.google.common.eventbus.EventBus event-bus event] - (when event-bus (.post event-bus event))) - -(defn- sanitize-step - [step] - (dissoc step :function :promise :exception-promise)) - (defn- trigger-futures-helper - [jobs lazy-steps state-atom event-bus] + [jobs lazy-steps state-atom] (let [semaphore (new Semaphore jobs true)] (loop [steps lazy-steps] (.acquire semaphore) (when (seq steps) - (let [step (first steps) - sanitized-step (sanitize-step step)] + (let [step (first steps)] (future (try - (post event-bus (EventStepBegin sanitized-step)) ((:function step)) (finally (swap! state-atom update-state-atom-when-step-finishes step) - (when (realized? (:exception-promise step)) - (post event-bus (EventStepError sanitized-step @(:exception-promise step)))) - (post event-bus (EventStepEnd sanitized-step)) (.release semaphore)))) (recur (rest steps))))))) (defn- trigger-futures "Run all the steps in (jobs) number of threads" - [jobs steps event-bus] + [jobs steps] (let [state-atom (create-state-atom steps)] - (trigger-futures-helper jobs (lazy-step-list state-atom) state-atom event-bus))) + (trigger-futures-helper jobs (lazy-step-list state-atom) state-atom))) (defn- await-promises "waits for all the promises to be fullfilled otherwise @@ -586,11 +587,11 @@ steps-data assoc-exception-promise assoc-promise - (assoc-function parse-tree))] + (assoc-function parse-tree event-bus))] (post event-bus (EventWorkflowBegin steps-data)) - (trigger-futures jobs steps-future event-bus) + (trigger-futures jobs steps-future) (let [successful-steps (await-promises steps-future)] (info (format "Done (%d steps run)." successful-steps))