From 632342bf8d2296759c6e3811bc58c43c2ceed0f1 Mon Sep 17 00:00:00 2001 From: Michael Gaare Date: Tue, 27 Nov 2018 17:22:12 -0500 Subject: [PATCH 1/4] Next dev iteration --- project.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project.clj b/project.clj index a479843..5bae858 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject org.purefn/sentenza "0.1.8" +(defproject org.purefn/sentenza "0.1.9-SNAPSHOT" :description "Data pipeline for building email records" :dependencies [[org.clojure/clojure "1.9.0-alpha16"] [clj-time "0.14.2"] From 5e19ed503cc9a27282840257f11918a53defcc07 Mon Sep 17 00:00:00 2001 From: Michael Gaare Date: Tue, 27 Nov 2018 17:24:51 -0500 Subject: [PATCH 2/4] bump clojure dependency to 1.9 release with provided scope --- project.clj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/project.clj b/project.clj index 5bae858..f297a02 100644 --- a/project.clj +++ b/project.clj @@ -1,11 +1,11 @@ (defproject org.purefn/sentenza "0.1.9-SNAPSHOT" :description "Data pipeline for building email records" - :dependencies [[org.clojure/clojure "1.9.0-alpha16"] + :dependencies [[org.clojure/clojure "1.9.0" :scope "provided"] [clj-time "0.14.2"] [me.raynes/fs "1.4.6"] [org.clojure/core.async "0.3.443"] [com.taoensso/timbre "4.10.0"] - [cheshire "5.8.0"] ] + [cheshire "5.8.0"]] :profiles {:dev {:dependencies [[org.clojure/tools.namespace "0.2.11"] [com.stuartsierra/component "0.3.2"] [criterium "0.4.4"]] From a3d4929fa6b00d30629a6cafe54ddec8b81bae77 Mon Sep 17 00:00:00 2001 From: Michael Gaare Date: Tue, 27 Nov 2018 17:26:41 -0500 Subject: [PATCH 3/4] update project description --- project.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project.clj b/project.clj index f297a02..51bb119 100644 --- a/project.clj +++ b/project.clj @@ -1,5 +1,5 @@ (defproject org.purefn/sentenza "0.1.9-SNAPSHOT" - :description "Data pipeline for building email records" + :description "A library for easier parallel (single-node) processing" :dependencies [[org.clojure/clojure "1.9.0" :scope "provided"] [clj-time "0.14.2"] [me.raynes/fs "1.4.6"] From 88e8261812a62af46b111f620ab426069d24dac4 Mon Sep 17 00:00:00 2001 From: Michael Gaare Date: Tue, 27 Nov 2018 17:43:35 -0500 Subject: [PATCH 4/4] update `flow` docstring to match its behavior - Also renamed the sentenza.annotate alias in the api namespace from sann to annotate so that I could refer its functions in flow's docstring in a way that looked nicer and I felt good about since it matched the alias --- src/org/purefn/sentenza/api.clj | 34 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/src/org/purefn/sentenza/api.clj b/src/org/purefn/sentenza/api.clj index 6bcdbd4..3b4b15e 100644 --- a/src/org/purefn/sentenza/api.clj +++ b/src/org/purefn/sentenza/api.clj @@ -9,7 +9,7 @@ [clojure.core.async.impl.protocols :as async-impl] [taoensso.timbre :as log] [org.purefn.sentenza.proto :as proto] - [org.purefn.sentenza.annotate :as sann])) + [org.purefn.sentenza.annotate :as annotate])) (defn threaded-pipe "Takes elements from the from channel and calls the transformer on it in @@ -20,7 +20,7 @@ [n from xf] (log/info "Creating " n " threads.") (let [chs (map - (fn [_] (chan 50 xf (sann/warner xf))) + (fn [_] (chan 50 xf (annotate/warner xf))) (range n))] (doseq [ch chs] (async/thread @@ -117,8 +117,8 @@ (defn paral? "Returns whether a given trasnformer should be treated as parallelizable." [xf] - (or (sann/threaded? xf) - (sann/cored? xf))) + (or (annotate/threaded? xf) + (annotate/cored? xf))) ;; ============================= ;; Main API @@ -141,13 +141,11 @@ core.async channels. Returns the last transducer's channel. For each transducer, it will create a channel and pipe in the - contents of the previous channel. If the transducer has metadata - indicating that it involves blocking, IO-bound operations it will - pipe using core.async's thread and blocking operations. Otherwise - it simply defers to async/pipe to connect the two channels. - - We chose not to use async/pipeline since it performs extra work for - features (ordering,multiple values) that we don't need." + contents of the previous channel. If the transducer has been + annotated as being parallel with `annotate/cored` or + `annotate/threaded`, the appropriate core.async pipeline will be + created for its computation. Otherwise it simply defers to + async/pipe to connect the two channels." [source & xfs] (if (channel? source) (loop [from source @@ -160,23 +158,23 @@ ;; the same thread. to (if (paral? xf) (chan 500) - (chan 500 xf (sann/warner xf)))] + (chan 500 xf (annotate/warner xf)))] (cond - (sann/threaded? xf) - (async/pipeline-blocking (sann/paral xf) + (annotate/threaded? xf) + (async/pipeline-blocking (annotate/paral xf) to xf from true - (sann/warner xf)) + (annotate/warner xf)) - (sann/cored? xf) - (async/pipeline (sann/paral xf) + (annotate/cored? xf) + (async/pipeline (annotate/paral xf) to xf from true - (sann/warner xf)) + (annotate/warner xf)) :default (async/pipe from to))