-
Notifications
You must be signed in to change notification settings - Fork 14
DSL, Serialization, and AOT #9
Description
I'm using Storm 0.9.2-incubating and Kafka 0.8.1.1.
I prefer not to AOT my topology if possible, but I have found that if:
- The tp uses an IPartitionedTridentSpout (i.e. TransactionalTridentKafkaSpout); and,
- The remainder of the tp has a parallelism hint applied; and,
- The ns enclosing the topology is not AOT'd
I encounter unbound fn errors when deploying the topology if I use any of the Marceline DSL macros (filters, tridentfns, etc).
The smallest reproducible error scenario being similar to:
(t/deffilter filter-type
[tuple]
true)
(defn ->topology
[]
(let [topology (TridentTopology.)
spout (TransactionalTridentKafkaSpout.
(TridentKafkaConfig.
(ZkHosts. "zk-connect-url") "a-topic"))]
(-> (t/new-stream topology "zk-tx-id" spout)
(t/each ["bytes"] a-filter)
(t/parallelism-hint 2))
(.build topology)))will lead to exceptions like:
java.lang.RuntimeException: java.lang.IllegalStateException: Attempting to call unbound fn: #'...filter/filter__
Changing the spout to non-partitioned (i.e. FixedBatchSpout), or removing the parallelism hint allows the topology to deploy without a problem, though I suspect the spout-type and parallelism hint specifics are a bit of a red-herring, they ensure in my simple cluster with a single worker that some serialization is going on and I assume that's the key thing.
I've previously encountered issues with de/serializing defrecords with carbonite/kryo/storm which are similar and related to this clojure issue: http://dev.clojure.org/jira/browse/CLJ-1208
Marceline is a pleasure to use compared to my previous approach of elaborate :gen-class constructs. AOT itself is not a killer for me, though I want to limit it where possible. Do we have a good idea of when/where it is necessary and why?