Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ pom.xml.asc
/.nrepl-port
/.idea
*.iml
/bin/
.classpath
.project
5 changes: 3 additions & 2 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
(defproject curator "0.0.6"
(defproject curator "1.0.0-SNAPSHOT"
:description "Clojurified Apache Curator"
:url "https://github.com/pingles/curator"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.6.0"]
[org.apache.curator/curator-recipes "2.8.0"]
[org.apache.curator/curator-framework "2.8.0"]
[org.apache.curator/curator-x-discovery "2.8.0"]]
[org.apache.curator/curator-x-discovery "2.8.0"]
[org.clojure/java.data "0.1.1"]]
:profiles {:dev {:dependencies [[org.slf4j/log4j-over-slf4j "1.7.12"]
[org.slf4j/slf4j-simple "1.7.12"]]
:exclusions [org.slf4j/slf4j-log4j12]}}
Expand Down
85 changes: 53 additions & 32 deletions src/curator/discovery.clj
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
(ns ^{:doc "Namespace for service discovery"} curator.discovery
(:require [clojure.edn :as edn]
[curator.framework :refer (time-units)])
(:import [org.apache.curator.x.discovery ServiceDiscovery ServiceDiscoveryBuilder ServiceInstance ServiceType UriSpec ProviderStrategy DownInstancePolicy ServiceProvider ServiceCache]
[org.apache.curator.x.discovery.details InstanceSerializer JsonInstanceSerializer InstanceProvider]
[org.apache.curator.x.discovery.strategies RandomStrategy RoundRobinStrategy StickyStrategy]
[java.io ByteArrayInputStream InputStreamReader PushbackReader]))
(:require [clojure.edn :as edn]
[curator.framework :refer (time-units)]
[clojure.data.json :as json]
[clojure.java.data :as data])
(:import [org.apache.curator.x.discovery ServiceDiscovery ServiceDiscoveryBuilder ServiceInstance ServiceType UriSpec ProviderStrategy DownInstancePolicy ServiceProvider ServiceCache]
[org.apache.curator.x.discovery.details InstanceSerializer JsonInstanceSerializer InstanceProvider]
[org.apache.curator.x.discovery.strategies RandomStrategy RoundRobinStrategy StickyStrategy]
[java.io ByteArrayInputStream InputStreamReader PushbackReader]))

(defmacro dotonn [x & forms]
(let [gx (gensym)]
Expand All @@ -14,7 +16,7 @@
`(when ~@(next f)
(~(first f) ~gx ~@(next f)))
`(~f ~gx)))
forms)
forms)
~gx)))

(defn uri-spec*
Expand All @@ -29,41 +31,60 @@
"name: my-service
uri-spec: \"{scheme}://foo.com:{port}\"
port: 1234
payload is serialized using json, only supports strings for now"
payload is serialized using json (if it is a clojure map, we do
stringify-keys and transform it to a java hash map)."
[name uri-spec port & {:keys [id address ssl-port service-type payload]}]
{:pre [(or (nil? payload) (string? payload))]}
(let [service-types {:dynamic ServiceType/DYNAMIC
:static ServiceType/STATIC
:permanent ServiceType/PERMANENT}]
(-> (dotonn (ServiceInstance/builder)
(.payload payload)
(.name name)
(.id id)
(.address address)
(.port port)
(.sslPort ssl-port)
(.uriSpec (uri-spec* uri-spec))
(.serviceType (service-types service-type)))
(.build))))
(.payload payload)
(.name name)
(.id id)
(.address address)
(.port port)
(.sslPort ssl-port)
(.uriSpec (uri-spec* uri-spec))
(.serviceType (service-types service-type)))
(.build))))

(defn uri [^ServiceInstance service-instance]
(.buildUriSpec service-instance))

(defn json-serializer []
(JsonInstanceSerializer. String))
(def utf-8-cs (java.nio.charset.Charset/forName "UTF-8"))

(def json-map-serializer
(reify org.apache.curator.x.discovery.details.InstanceSerializer
(serialize [this instance]
(let [map (data/from-java instance)
map-with-uri-spec (assoc map :uriSpec (.build (.getUriSpec instance)))]
(.getBytes (json/write-str map-with-uri-spec) utf-8-cs)))
(deserialize [this bytes]
(let [instance (json/read-str (String. bytes utf-8-cs) )
{:strs [name, id, address, port, sslPort, registrationTimeUTC, serviceType,
uriSpec payload]} instance]
(org.apache.curator.x.discovery.ServiceInstance. name id address
(int port ) (when sslPort (int sslPort)) payload (long registrationTimeUTC),
(org.apache.curator.x.discovery.ServiceType/valueOf serviceType),
(when uriSpec (org.apache.curator.x.discovery.UriSpec. uriSpec)))))))



(defn ^ServiceDiscovery service-discovery
"base-path [/foo]
payload-class [clojure.lang.PersistentArrayMap]
serializer [JsonInstanceSerializer (clojure.lang.PersistentArrayMap)]"
[curator-framework service-instance & {:keys [base-path serializer payload-class]
:or {base-path "/foo"
payload-class String
serializer (json-serializer)}}]
payload-class clojure.lang.PersistentArrayMap
serializer json-map-serializer}}]
{:pre [(.startsWith ^String base-path "/")]}
(-> (dotonn (ServiceDiscoveryBuilder/builder payload-class)
(.client curator-framework)
(.basePath base-path)
(.serializer (json-serializer))
(.thisInstance service-instance))
(.build)))
(.client curator-framework)
(.basePath base-path)
(.serializer serializer)
(.thisInstance service-instance))
(.build)))

(defn services
"Returns the names of the services registered."
Expand All @@ -86,8 +107,8 @@
(defn down-instance-policy
([] (down-instance-policy 30 :seconds 2))
([timeout timeout-unit error-threshold]
{:pre [(some time-units [timeout-unit])]}
(DownInstancePolicy. timeout (time-units timeout-unit) error-threshold)))
{:pre [(some time-units [timeout-unit])]}
(DownInstancePolicy. timeout (time-units timeout-unit) error-threshold)))

(defn ^ServiceProvider service-provider
"Creates a service provider for a named service s."
Expand All @@ -98,15 +119,15 @@
(.serviceName s)
(.downInstancePolicy down-instance-policy)
(.providerStrategy strategy))
(.build)))
(.build)))

(defn service-cache
"Creates a service cache (rather than reading ZooKeeper each time) for
the service named s"
[^ServiceDiscovery service-discovery s]
(-> (.serviceCacheBuilder service-discovery)
( .name s)
(.build)))
( .name s)
(.build)))

(defn note-error
"Clients should use this to indicate a problem when trying to
Expand Down