diff --git a/.gitignore b/.gitignore index 2eae690..4db62b2 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,6 @@ pom.xml.asc /.nrepl-port /.idea *.iml +/bin/ +.classpath +.project diff --git a/project.clj b/project.clj index 263e43b..a013382 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject curator "0.0.6" +(defproject curator "1.0.0-SNAPSHOT" :description "Clojurified Apache Curator" :url "https://github.com/pingles/curator" :license {:name "Eclipse Public License" @@ -6,7 +6,8 @@ :dependencies [[org.clojure/clojure "1.6.0"] [org.apache.curator/curator-recipes "2.8.0"] [org.apache.curator/curator-framework "2.8.0"] - [org.apache.curator/curator-x-discovery "2.8.0"]] + [org.apache.curator/curator-x-discovery "2.8.0"] + [org.clojure/java.data "0.1.1"]] :profiles {:dev {:dependencies [[org.slf4j/log4j-over-slf4j "1.7.12"] [org.slf4j/slf4j-simple "1.7.12"]] :exclusions [org.slf4j/slf4j-log4j12]}} diff --git a/src/curator/discovery.clj b/src/curator/discovery.clj index 5d67004..582bf51 100644 --- a/src/curator/discovery.clj +++ b/src/curator/discovery.clj @@ -1,10 +1,12 @@ (ns ^{:doc "Namespace for service discovery"} curator.discovery - (:require [clojure.edn :as edn] - [curator.framework :refer (time-units)]) - (:import [org.apache.curator.x.discovery ServiceDiscovery ServiceDiscoveryBuilder ServiceInstance ServiceType UriSpec ProviderStrategy DownInstancePolicy ServiceProvider ServiceCache] - [org.apache.curator.x.discovery.details InstanceSerializer JsonInstanceSerializer InstanceProvider] - [org.apache.curator.x.discovery.strategies RandomStrategy RoundRobinStrategy StickyStrategy] - [java.io ByteArrayInputStream InputStreamReader PushbackReader])) + (:require [clojure.edn :as edn] + [curator.framework :refer (time-units)] + [clojure.data.json :as json] + [clojure.java.data :as data]) + (:import [org.apache.curator.x.discovery ServiceDiscovery ServiceDiscoveryBuilder ServiceInstance ServiceType UriSpec ProviderStrategy DownInstancePolicy ServiceProvider ServiceCache] + [org.apache.curator.x.discovery.details InstanceSerializer JsonInstanceSerializer InstanceProvider] + [org.apache.curator.x.discovery.strategies RandomStrategy RoundRobinStrategy StickyStrategy] + [java.io ByteArrayInputStream InputStreamReader PushbackReader])) (defmacro dotonn [x & forms] (let [gx (gensym)] @@ -14,7 +16,7 @@ `(when ~@(next f) (~(first f) ~gx ~@(next f))) `(~f ~gx))) - forms) + forms) ~gx))) (defn uri-spec* @@ -29,41 +31,60 @@ "name: my-service uri-spec: \"{scheme}://foo.com:{port}\" port: 1234 - payload is serialized using json, only supports strings for now" + payload is serialized using json (if it is a clojure map, we do + stringify-keys and transform it to a java hash map)." [name uri-spec port & {:keys [id address ssl-port service-type payload]}] - {:pre [(or (nil? payload) (string? payload))]} (let [service-types {:dynamic ServiceType/DYNAMIC :static ServiceType/STATIC :permanent ServiceType/PERMANENT}] (-> (dotonn (ServiceInstance/builder) - (.payload payload) - (.name name) - (.id id) - (.address address) - (.port port) - (.sslPort ssl-port) - (.uriSpec (uri-spec* uri-spec)) - (.serviceType (service-types service-type))) - (.build)))) + (.payload payload) + (.name name) + (.id id) + (.address address) + (.port port) + (.sslPort ssl-port) + (.uriSpec (uri-spec* uri-spec)) + (.serviceType (service-types service-type))) + (.build)))) (defn uri [^ServiceInstance service-instance] (.buildUriSpec service-instance)) -(defn json-serializer [] - (JsonInstanceSerializer. String)) +(def utf-8-cs (java.nio.charset.Charset/forName "UTF-8")) + +(def json-map-serializer + (reify org.apache.curator.x.discovery.details.InstanceSerializer + (serialize [this instance] + (let [map (data/from-java instance) + map-with-uri-spec (assoc map :uriSpec (.build (.getUriSpec instance)))] + (.getBytes (json/write-str map-with-uri-spec) utf-8-cs))) + (deserialize [this bytes] + (let [instance (json/read-str (String. bytes utf-8-cs) ) + {:strs [name, id, address, port, sslPort, registrationTimeUTC, serviceType, + uriSpec payload]} instance] + (org.apache.curator.x.discovery.ServiceInstance. name id address + (int port ) (when sslPort (int sslPort)) payload (long registrationTimeUTC), + (org.apache.curator.x.discovery.ServiceType/valueOf serviceType), + (when uriSpec (org.apache.curator.x.discovery.UriSpec. uriSpec))))))) + + (defn ^ServiceDiscovery service-discovery + "base-path [/foo] + payload-class [clojure.lang.PersistentArrayMap] + serializer [JsonInstanceSerializer (clojure.lang.PersistentArrayMap)]" [curator-framework service-instance & {:keys [base-path serializer payload-class] :or {base-path "/foo" - payload-class String - serializer (json-serializer)}}] + payload-class clojure.lang.PersistentArrayMap + serializer json-map-serializer}}] {:pre [(.startsWith ^String base-path "/")]} (-> (dotonn (ServiceDiscoveryBuilder/builder payload-class) - (.client curator-framework) - (.basePath base-path) - (.serializer (json-serializer)) - (.thisInstance service-instance)) - (.build))) + (.client curator-framework) + (.basePath base-path) + (.serializer serializer) + (.thisInstance service-instance)) + (.build))) (defn services "Returns the names of the services registered." @@ -86,8 +107,8 @@ (defn down-instance-policy ([] (down-instance-policy 30 :seconds 2)) ([timeout timeout-unit error-threshold] - {:pre [(some time-units [timeout-unit])]} - (DownInstancePolicy. timeout (time-units timeout-unit) error-threshold))) + {:pre [(some time-units [timeout-unit])]} + (DownInstancePolicy. timeout (time-units timeout-unit) error-threshold))) (defn ^ServiceProvider service-provider "Creates a service provider for a named service s." @@ -98,15 +119,15 @@ (.serviceName s) (.downInstancePolicy down-instance-policy) (.providerStrategy strategy)) - (.build))) + (.build))) (defn service-cache "Creates a service cache (rather than reading ZooKeeper each time) for the service named s" [^ServiceDiscovery service-discovery s] (-> (.serviceCacheBuilder service-discovery) - ( .name s) - (.build))) + ( .name s) + (.build))) (defn note-error "Clients should use this to indicate a problem when trying to