This is from pyr/kinsky
Kinsky is a somewhat opinionated client library for Apache Kafka in Clojure. But it doesn't support MapR Stream which is compatible with kafka 0.9.0.
This project is forking from pyr/kinsky for Mapr Stream Streams in Clojure.
This project provides the following:
- MapR Streams/Kakfa 0.9.0.x compatibility
- Adequate data representation of Kafka types.
- Default serializer and deserializer implementations such as JSON, EDN and a keyword serializer for keys.
- A
core.asyncfacade for producers and consumers. - Documentation
[org.clojars.incjung/kinsky "0.1.15"]MapR Streams brings integrated publish/subscribe messaging to the MapR Converged Data Platform.
Producer applications can publish messages to topics, which are logical collections of messages, that are managed by MapR Streams. Consumer applications can then read those messages at their own pace. All messages published to MapR Streams are persisted, allowing future consumers to “catch-up” on processing, and analytics applications to process historical data.
maprcli stream create -path /sample-stream
maprcli stream edit -path /sample-stream -produceperm p -consumeperm p -topicperm p
maprcli stream topic create -path /sample-stream -topic eventsThe two additional parameters grant security permissions. By default, these permissions are granted to the user ID that ran the maprcli stream create command. -consumeperm Grants permission to read messages from topics that are in the stream. -produceperm Grants permission to publish messages to topics that are in the stream.
maprcli stream topic create -path /sample-stream -topic eventsThe examples assume the following require forms:
(:require [kinsky.client :as client]
[kinsky.async :as async]
[clojure.core.async :refer [go <! >!]])MapR Streams ;; topic = "sample-stream:fast-messages"
(let [p (client/producer {} :string :string)]
(client/send! p "/sample-stream:events" "IJUNG" "HELLO WORLD"))
(let [p (client/producer {} :keyword :edn)]
(client/send! p "/sample-stream:events" :hello {:hello :world}))Async facade:
(let [[in out] (async/producer {:bootstrap.servers "localhost:9092"} :keyword :edn)]
(go
(>! in {:topic "account" :key :account-a :value {:action :login}})
(>! in {:topic "account" :key :account-a :value {:action :logout}})))(defn receiving []
(let [c (client/consumer {:group.id "mygroup"} :keyword :edn)]
(client/subscribe! c ["/sample-stream:events"])
(while true
(println (client/poll! c 1000)))))
(receiving)Async facade:
(let [[out ctl] (consumer {:bootstrap.servers "localhost:9092"
:group.id (str (java.util.UUID/randomUUID))}
(client/string-deserializer)
(client/string-deserializer))
topic "tests"]
(a/go-loop []
(when-let [record (a/<! out)]
(println (pr-str record))
(recur)))
(a/put! ctl {:op :partitions-for :topic topic})
(a/put! ctl {:op :subscribe :topic topic})
(a/put! ctl {:op :commit})
(a/put! ctl {:op :pause :topic-partitions [{:topic topic :partition 0}
{:topic topic :partition 1}
{:topic topic :partition 2}
{:topic topic :partition 3}]})
(a/put! ctl {:op :resume :topic-partitions [{:topic topic :partition 0}
{:topic topic :partition 1}
{:topic topic :partition 2}
{:topic topic :partition 3}]})
(a/put! ctl {:op :stop}))