diff --git a/project.clj b/project.clj index a479843..51bb119 100644 --- a/project.clj +++ b/project.clj @@ -1,11 +1,11 @@ -(defproject org.purefn/sentenza "0.1.8" - :description "Data pipeline for building email records" - :dependencies [[org.clojure/clojure "1.9.0-alpha16"] +(defproject org.purefn/sentenza "0.1.9-SNAPSHOT" + :description "A library for easier parallel (single-node) processing" + :dependencies [[org.clojure/clojure "1.9.0" :scope "provided"] [clj-time "0.14.2"] [me.raynes/fs "1.4.6"] [org.clojure/core.async "0.3.443"] [com.taoensso/timbre "4.10.0"] - [cheshire "5.8.0"] ] + [cheshire "5.8.0"]] :profiles {:dev {:dependencies [[org.clojure/tools.namespace "0.2.11"] [com.stuartsierra/component "0.3.2"] [criterium "0.4.4"]] diff --git a/src/org/purefn/sentenza/api.clj b/src/org/purefn/sentenza/api.clj index 6bcdbd4..3b4b15e 100644 --- a/src/org/purefn/sentenza/api.clj +++ b/src/org/purefn/sentenza/api.clj @@ -9,7 +9,7 @@ [clojure.core.async.impl.protocols :as async-impl] [taoensso.timbre :as log] [org.purefn.sentenza.proto :as proto] - [org.purefn.sentenza.annotate :as sann])) + [org.purefn.sentenza.annotate :as annotate])) (defn threaded-pipe "Takes elements from the from channel and calls the transformer on it in @@ -20,7 +20,7 @@ [n from xf] (log/info "Creating " n " threads.") (let [chs (map - (fn [_] (chan 50 xf (sann/warner xf))) + (fn [_] (chan 50 xf (annotate/warner xf))) (range n))] (doseq [ch chs] (async/thread @@ -117,8 +117,8 @@ (defn paral? "Returns whether a given trasnformer should be treated as parallelizable." [xf] - (or (sann/threaded? xf) - (sann/cored? xf))) + (or (annotate/threaded? xf) + (annotate/cored? xf))) ;; ============================= ;; Main API @@ -141,13 +141,11 @@ core.async channels. Returns the last transducer's channel. For each transducer, it will create a channel and pipe in the - contents of the previous channel. If the transducer has metadata - indicating that it involves blocking, IO-bound operations it will - pipe using core.async's thread and blocking operations. Otherwise - it simply defers to async/pipe to connect the two channels. - - We chose not to use async/pipeline since it performs extra work for - features (ordering,multiple values) that we don't need." + contents of the previous channel. If the transducer has been + annotated as being parallel with `annotate/cored` or + `annotate/threaded`, the appropriate core.async pipeline will be + created for its computation. Otherwise it simply defers to + async/pipe to connect the two channels." [source & xfs] (if (channel? source) (loop [from source @@ -160,23 +158,23 @@ ;; the same thread. to (if (paral? xf) (chan 500) - (chan 500 xf (sann/warner xf)))] + (chan 500 xf (annotate/warner xf)))] (cond - (sann/threaded? xf) - (async/pipeline-blocking (sann/paral xf) + (annotate/threaded? xf) + (async/pipeline-blocking (annotate/paral xf) to xf from true - (sann/warner xf)) + (annotate/warner xf)) - (sann/cored? xf) - (async/pipeline (sann/paral xf) + (annotate/cored? xf) + (async/pipeline (annotate/paral xf) to xf from true - (sann/warner xf)) + (annotate/warner xf)) :default (async/pipe from to))