From d1a5d490333d7a88306ead680a1a6c4864e3c781 Mon Sep 17 00:00:00 2001 From: Bernhard Streit Date: Fri, 21 Aug 2015 15:06:08 +0200 Subject: [PATCH 1/2] Now supporting arbitrary payload classes * The default payload class is now a clojure map * You can pass any other type as well, but in that case, you will have to provide your own (de)serializer and the payload-class when invoking service-discovery. * Clojure maps are converted to Java Hashmaps to get nicer JSON output * This breaks compatibility with the previous version where String was accepted as payload --- .gitignore | 3 + project.clj | 2 +- src/curator/discovery.clj | 165 ++++++++++++++++++++++---------------- 3 files changed, 98 insertions(+), 72 deletions(-) diff --git a/.gitignore b/.gitignore index 2eae690..4db62b2 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,6 @@ pom.xml.asc /.nrepl-port /.idea *.iml +/bin/ +.classpath +.project diff --git a/project.clj b/project.clj index 263e43b..ed11844 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject curator "0.0.6" +(defproject curator "1.0.0" :description "Clojurified Apache Curator" :url "https://github.com/pingles/curator" :license {:name "Eclipse Public License" diff --git a/src/curator/discovery.clj b/src/curator/discovery.clj index 5d67004..35bc358 100644 --- a/src/curator/discovery.clj +++ b/src/curator/discovery.clj @@ -1,6 +1,7 @@ (ns ^{:doc "Namespace for service discovery"} curator.discovery (:require [clojure.edn :as edn] - [curator.framework :refer (time-units)]) + [curator.framework :refer (time-units)] + [clojure.walk :refer (stringify-keys)]) (:import [org.apache.curator.x.discovery ServiceDiscovery ServiceDiscoveryBuilder ServiceInstance ServiceType UriSpec ProviderStrategy DownInstancePolicy ServiceProvider ServiceCache] [org.apache.curator.x.discovery.details InstanceSerializer JsonInstanceSerializer InstanceProvider] [org.apache.curator.x.discovery.strategies RandomStrategy RoundRobinStrategy StickyStrategy] @@ -29,14 +30,17 @@ "name: my-service uri-spec: \"{scheme}://foo.com:{port}\" port: 1234 - payload is serialized using json, only supports strings for now" + payload is serialized using json (if it is a clojure map, we do + stringify-keys and transform it to a java hash map)." [name uri-spec port & {:keys [id address ssl-port service-type payload]}] - {:pre [(or (nil? payload) (string? payload))]} (let [service-types {:dynamic ServiceType/DYNAMIC :static ServiceType/STATIC :permanent ServiceType/PERMANENT}] (-> (dotonn (ServiceInstance/builder) - (.payload payload) + (.payload (if (map? payload) + ;; if we got a clojure map, stringify keys in order to get nicer json output + (java.util.HashMap. (stringify-keys payload)) + payload)) (.name name) (.id id) (.address address) @@ -49,76 +53,95 @@ (defn uri [^ServiceInstance service-instance] (.buildUriSpec service-instance)) -(defn json-serializer [] - (JsonInstanceSerializer. String)) - -(defn ^ServiceDiscovery service-discovery - [curator-framework service-instance & {:keys [base-path serializer payload-class] - :or {base-path "/foo" - payload-class String - serializer (json-serializer)}}] - {:pre [(.startsWith ^String base-path "/")]} - (-> (dotonn (ServiceDiscoveryBuilder/builder payload-class) - (.client curator-framework) - (.basePath base-path) - (.serializer (json-serializer)) - (.thisInstance service-instance)) - (.build))) - -(defn services - "Returns the names of the services registered." - [^ServiceDiscovery service-discovery] - (.queryForNames service-discovery)) - -(defn random-strategy - [] - (RandomStrategy. )) - -(defn round-robin-strategy - [] - (RoundRobinStrategy. )) - -(defn sticky-strategy - [^ProviderStrategy strategy] - (StickyStrategy. strategy)) - - -(defn down-instance-policy - ([] (down-instance-policy 30 :seconds 2)) - ([timeout timeout-unit error-threshold] - {:pre [(some time-units [timeout-unit])]} - (DownInstancePolicy. timeout (time-units timeout-unit) error-threshold))) - -(defn ^ServiceProvider service-provider - "Creates a service provider for a named service s." - [^ServiceDiscovery service-discovery s & {:keys [strategy down-instance-policy] - :or {strategy (random-strategy) - down-instance-policy (down-instance-policy)}}] - (-> (doto (.serviceProviderBuilder service-discovery) - (.serviceName s) - (.downInstancePolicy down-instance-policy) - (.providerStrategy strategy)) - (.build))) - -(defn service-cache - "Creates a service cache (rather than reading ZooKeeper each time) for +(defn json-serializer [payload-class] + (let [serializer (JsonInstanceSerializer. payload-class)] + (reify org.apache.curator.x.discovery.details.InstanceSerializer + (serialize [this instance] + (.serialize serializer instance)) + (deserialize [this bytes] + (if (= payload-class java.util.HashMap) + ;; if we have a hash map, convert to clojure map + (let [instance (.deserialize serializer bytes) + {:keys [name, id, address, port, sslPort, registrationTimeUTC, serviceType, + uriSpec]} (bean instance)] + (org.apache.curator.x.discovery.ServiceInstance. name id address + port sslPort + (clojure.walk/keywordize-keys (into {} (.getPayload instance))) + registrationTimeUTC, serviceType, uriSpec)) + (.deserialize serializer bytes)))))) + + + + (defn ^ServiceDiscovery service-discovery + "base-path [/foo] + payload-class [HashMap] + serializer [JsonInstanceSerializer (HashMap)]" + [curator-framework service-instance & {:keys [base-path serializer payload-class] + :or {base-path "/foo" + payload-class java.util.HashMap + serializer (json-serializer java.util.HashMap)}}] + {:pre [(.startsWith ^String base-path "/")]} + (-> (dotonn (ServiceDiscoveryBuilder/builder payload-class) + (.client curator-framework) + (.basePath base-path) + (.serializer serializer) + (.thisInstance service-instance)) + (.build))) + + (defn services + "Returns the names of the services registered." + [^ServiceDiscovery service-discovery] + (.queryForNames service-discovery)) + + (defn random-strategy + [] + (RandomStrategy. )) + + (defn round-robin-strategy + [] + (RoundRobinStrategy. )) + + (defn sticky-strategy + [^ProviderStrategy strategy] + (StickyStrategy. strategy)) + + + (defn down-instance-policy + ([] (down-instance-policy 30 :seconds 2)) + ([timeout timeout-unit error-threshold] + {:pre [(some time-units [timeout-unit])]} + (DownInstancePolicy. timeout (time-units timeout-unit) error-threshold))) + + (defn ^ServiceProvider service-provider + "Creates a service provider for a named service s." + [^ServiceDiscovery service-discovery s & {:keys [strategy down-instance-policy] + :or {strategy (random-strategy) + down-instance-policy (down-instance-policy)}}] + (-> (doto (.serviceProviderBuilder service-discovery) + (.serviceName s) + (.downInstancePolicy down-instance-policy) + (.providerStrategy strategy)) + (.build))) + + (defn service-cache + "Creates a service cache (rather than reading ZooKeeper each time) for the service named s" - [^ServiceDiscovery service-discovery s] - (-> (.serviceCacheBuilder service-discovery) - ( .name s) - (.build))) + [^ServiceDiscovery service-discovery s] + (-> (.serviceCacheBuilder service-discovery) + ( .name s) + (.build))) -(defn note-error - "Clients should use this to indicate a problem when trying to + (defn note-error + "Clients should use this to indicate a problem when trying to connect to a service instance. The instance may be marked as down depending on the service provider's down instance policy." - [^ServiceProvider service-provider ^ServiceInstance instance] - (.noteError service-provider instance)) + [^ServiceProvider service-provider ^ServiceInstance instance] + (.noteError service-provider instance)) -(defmulti instances (fn [^Object x & args] (.getClass x))) -(defmethod instances ServiceDiscovery [^ServiceDiscovery sd s] (.queryForInstances sd s)) -(defmethod instances ServiceCache [^ServiceCache sc] (.getInstances sc)) + (defmulti instances (fn [^Object x & args] (.getClass x))) + (defmethod instances ServiceDiscovery [^ServiceDiscovery sd s] (.queryForInstances sd s)) + (defmethod instances ServiceCache [^ServiceCache sc] (.getInstances sc)) -(defmulti instance (fn [^Object x & args] (.getClass x))) -(defmethod instance ServiceProvider [^ServiceProvider provider] (.getInstance provider)) -(defmethod instance ServiceCache [cache ^ProviderStrategy strategy] (.getInstance strategy cache)) + (defmulti instance (fn [^Object x & args] (.getClass x))) + (defmethod instance ServiceProvider [^ServiceProvider provider] (.getInstance provider)) + (defmethod instance ServiceCache [cache ^ProviderStrategy strategy] (.getInstance strategy cache)) From e42f07c3bdb948c3d319f4900bf433b3f5c81426 Mon Sep 17 00:00:00 2001 From: Bernhard Streit Date: Thu, 3 Sep 2015 13:31:16 +0200 Subject: [PATCH 2/2] Self-written (de)serialiser --- project.clj | 5 +- src/curator/discovery.clj | 210 +++++++++++++++++++------------------- 2 files changed, 107 insertions(+), 108 deletions(-) diff --git a/project.clj b/project.clj index ed11844..a013382 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject curator "1.0.0" +(defproject curator "1.0.0-SNAPSHOT" :description "Clojurified Apache Curator" :url "https://github.com/pingles/curator" :license {:name "Eclipse Public License" @@ -6,7 +6,8 @@ :dependencies [[org.clojure/clojure "1.6.0"] [org.apache.curator/curator-recipes "2.8.0"] [org.apache.curator/curator-framework "2.8.0"] - [org.apache.curator/curator-x-discovery "2.8.0"]] + [org.apache.curator/curator-x-discovery "2.8.0"] + [org.clojure/java.data "0.1.1"]] :profiles {:dev {:dependencies [[org.slf4j/log4j-over-slf4j "1.7.12"] [org.slf4j/slf4j-simple "1.7.12"]] :exclusions [org.slf4j/slf4j-log4j12]}} diff --git a/src/curator/discovery.clj b/src/curator/discovery.clj index 35bc358..582bf51 100644 --- a/src/curator/discovery.clj +++ b/src/curator/discovery.clj @@ -1,11 +1,12 @@ (ns ^{:doc "Namespace for service discovery"} curator.discovery - (:require [clojure.edn :as edn] - [curator.framework :refer (time-units)] - [clojure.walk :refer (stringify-keys)]) - (:import [org.apache.curator.x.discovery ServiceDiscovery ServiceDiscoveryBuilder ServiceInstance ServiceType UriSpec ProviderStrategy DownInstancePolicy ServiceProvider ServiceCache] - [org.apache.curator.x.discovery.details InstanceSerializer JsonInstanceSerializer InstanceProvider] - [org.apache.curator.x.discovery.strategies RandomStrategy RoundRobinStrategy StickyStrategy] - [java.io ByteArrayInputStream InputStreamReader PushbackReader])) + (:require [clojure.edn :as edn] + [curator.framework :refer (time-units)] + [clojure.data.json :as json] + [clojure.java.data :as data]) + (:import [org.apache.curator.x.discovery ServiceDiscovery ServiceDiscoveryBuilder ServiceInstance ServiceType UriSpec ProviderStrategy DownInstancePolicy ServiceProvider ServiceCache] + [org.apache.curator.x.discovery.details InstanceSerializer JsonInstanceSerializer InstanceProvider] + [org.apache.curator.x.discovery.strategies RandomStrategy RoundRobinStrategy StickyStrategy] + [java.io ByteArrayInputStream InputStreamReader PushbackReader])) (defmacro dotonn [x & forms] (let [gx (gensym)] @@ -15,7 +16,7 @@ `(when ~@(next f) (~(first f) ~gx ~@(next f))) `(~f ~gx))) - forms) + forms) ~gx))) (defn uri-spec* @@ -37,111 +38,108 @@ :static ServiceType/STATIC :permanent ServiceType/PERMANENT}] (-> (dotonn (ServiceInstance/builder) - (.payload (if (map? payload) - ;; if we got a clojure map, stringify keys in order to get nicer json output - (java.util.HashMap. (stringify-keys payload)) - payload)) - (.name name) - (.id id) - (.address address) - (.port port) - (.sslPort ssl-port) - (.uriSpec (uri-spec* uri-spec)) - (.serviceType (service-types service-type))) - (.build)))) + (.payload payload) + (.name name) + (.id id) + (.address address) + (.port port) + (.sslPort ssl-port) + (.uriSpec (uri-spec* uri-spec)) + (.serviceType (service-types service-type))) + (.build)))) (defn uri [^ServiceInstance service-instance] (.buildUriSpec service-instance)) -(defn json-serializer [payload-class] - (let [serializer (JsonInstanceSerializer. payload-class)] - (reify org.apache.curator.x.discovery.details.InstanceSerializer - (serialize [this instance] - (.serialize serializer instance)) - (deserialize [this bytes] - (if (= payload-class java.util.HashMap) - ;; if we have a hash map, convert to clojure map - (let [instance (.deserialize serializer bytes) - {:keys [name, id, address, port, sslPort, registrationTimeUTC, serviceType, - uriSpec]} (bean instance)] - (org.apache.curator.x.discovery.ServiceInstance. name id address - port sslPort - (clojure.walk/keywordize-keys (into {} (.getPayload instance))) - registrationTimeUTC, serviceType, uriSpec)) - (.deserialize serializer bytes)))))) - - - - (defn ^ServiceDiscovery service-discovery - "base-path [/foo] - payload-class [HashMap] - serializer [JsonInstanceSerializer (HashMap)]" - [curator-framework service-instance & {:keys [base-path serializer payload-class] - :or {base-path "/foo" - payload-class java.util.HashMap - serializer (json-serializer java.util.HashMap)}}] - {:pre [(.startsWith ^String base-path "/")]} - (-> (dotonn (ServiceDiscoveryBuilder/builder payload-class) - (.client curator-framework) - (.basePath base-path) - (.serializer serializer) - (.thisInstance service-instance)) - (.build))) - - (defn services - "Returns the names of the services registered." - [^ServiceDiscovery service-discovery] - (.queryForNames service-discovery)) - - (defn random-strategy - [] - (RandomStrategy. )) - - (defn round-robin-strategy - [] - (RoundRobinStrategy. )) - - (defn sticky-strategy - [^ProviderStrategy strategy] - (StickyStrategy. strategy)) - - - (defn down-instance-policy - ([] (down-instance-policy 30 :seconds 2)) - ([timeout timeout-unit error-threshold] - {:pre [(some time-units [timeout-unit])]} - (DownInstancePolicy. timeout (time-units timeout-unit) error-threshold))) - - (defn ^ServiceProvider service-provider - "Creates a service provider for a named service s." - [^ServiceDiscovery service-discovery s & {:keys [strategy down-instance-policy] - :or {strategy (random-strategy) - down-instance-policy (down-instance-policy)}}] - (-> (doto (.serviceProviderBuilder service-discovery) - (.serviceName s) - (.downInstancePolicy down-instance-policy) - (.providerStrategy strategy)) - (.build))) - - (defn service-cache - "Creates a service cache (rather than reading ZooKeeper each time) for +(def utf-8-cs (java.nio.charset.Charset/forName "UTF-8")) + +(def json-map-serializer + (reify org.apache.curator.x.discovery.details.InstanceSerializer + (serialize [this instance] + (let [map (data/from-java instance) + map-with-uri-spec (assoc map :uriSpec (.build (.getUriSpec instance)))] + (.getBytes (json/write-str map-with-uri-spec) utf-8-cs))) + (deserialize [this bytes] + (let [instance (json/read-str (String. bytes utf-8-cs) ) + {:strs [name, id, address, port, sslPort, registrationTimeUTC, serviceType, + uriSpec payload]} instance] + (org.apache.curator.x.discovery.ServiceInstance. name id address + (int port ) (when sslPort (int sslPort)) payload (long registrationTimeUTC), + (org.apache.curator.x.discovery.ServiceType/valueOf serviceType), + (when uriSpec (org.apache.curator.x.discovery.UriSpec. uriSpec))))))) + + + +(defn ^ServiceDiscovery service-discovery + "base-path [/foo] + payload-class [clojure.lang.PersistentArrayMap] + serializer [JsonInstanceSerializer (clojure.lang.PersistentArrayMap)]" + [curator-framework service-instance & {:keys [base-path serializer payload-class] + :or {base-path "/foo" + payload-class clojure.lang.PersistentArrayMap + serializer json-map-serializer}}] + {:pre [(.startsWith ^String base-path "/")]} + (-> (dotonn (ServiceDiscoveryBuilder/builder payload-class) + (.client curator-framework) + (.basePath base-path) + (.serializer serializer) + (.thisInstance service-instance)) + (.build))) + +(defn services + "Returns the names of the services registered." + [^ServiceDiscovery service-discovery] + (.queryForNames service-discovery)) + +(defn random-strategy + [] + (RandomStrategy. )) + +(defn round-robin-strategy + [] + (RoundRobinStrategy. )) + +(defn sticky-strategy + [^ProviderStrategy strategy] + (StickyStrategy. strategy)) + + +(defn down-instance-policy + ([] (down-instance-policy 30 :seconds 2)) + ([timeout timeout-unit error-threshold] + {:pre [(some time-units [timeout-unit])]} + (DownInstancePolicy. timeout (time-units timeout-unit) error-threshold))) + +(defn ^ServiceProvider service-provider + "Creates a service provider for a named service s." + [^ServiceDiscovery service-discovery s & {:keys [strategy down-instance-policy] + :or {strategy (random-strategy) + down-instance-policy (down-instance-policy)}}] + (-> (doto (.serviceProviderBuilder service-discovery) + (.serviceName s) + (.downInstancePolicy down-instance-policy) + (.providerStrategy strategy)) + (.build))) + +(defn service-cache + "Creates a service cache (rather than reading ZooKeeper each time) for the service named s" - [^ServiceDiscovery service-discovery s] - (-> (.serviceCacheBuilder service-discovery) - ( .name s) - (.build))) + [^ServiceDiscovery service-discovery s] + (-> (.serviceCacheBuilder service-discovery) + ( .name s) + (.build))) - (defn note-error - "Clients should use this to indicate a problem when trying to +(defn note-error + "Clients should use this to indicate a problem when trying to connect to a service instance. The instance may be marked as down depending on the service provider's down instance policy." - [^ServiceProvider service-provider ^ServiceInstance instance] - (.noteError service-provider instance)) + [^ServiceProvider service-provider ^ServiceInstance instance] + (.noteError service-provider instance)) - (defmulti instances (fn [^Object x & args] (.getClass x))) - (defmethod instances ServiceDiscovery [^ServiceDiscovery sd s] (.queryForInstances sd s)) - (defmethod instances ServiceCache [^ServiceCache sc] (.getInstances sc)) +(defmulti instances (fn [^Object x & args] (.getClass x))) +(defmethod instances ServiceDiscovery [^ServiceDiscovery sd s] (.queryForInstances sd s)) +(defmethod instances ServiceCache [^ServiceCache sc] (.getInstances sc)) - (defmulti instance (fn [^Object x & args] (.getClass x))) - (defmethod instance ServiceProvider [^ServiceProvider provider] (.getInstance provider)) - (defmethod instance ServiceCache [cache ^ProviderStrategy strategy] (.getInstance strategy cache)) +(defmulti instance (fn [^Object x & args] (.getClass x))) +(defmethod instance ServiceProvider [^ServiceProvider provider] (.getInstance provider)) +(defmethod instance ServiceCache [cache ^ProviderStrategy strategy] (.getInstance strategy cache))