From 585f263a6e6ce04c20e0d6ea30871b35dd8aa6cd Mon Sep 17 00:00:00 2001 From: neph Date: Sat, 5 Aug 2017 08:15:19 +0200 Subject: [PATCH] DeepStream client async rework --- .../java/io/deepstream/AnonymousRecord.java | 73 ++- src/main/java/io/deepstream/Connection.java | 28 +- .../java/io/deepstream/DeepstreamClient.java | 129 +++- .../deepstream/DeepstreamClientAbstract.java | 5 - .../java/io/deepstream/DeepstreamFactory.java | 21 + src/main/java/io/deepstream/EventHandler.java | 4 +- .../java/io/deepstream/GetListListener.java | 9 + .../java/io/deepstream/GetRecorrListener.java | 9 + src/main/java/io/deepstream/HasListener.java | 9 + src/main/java/io/deepstream/HasResult.java | 2 - .../io/deepstream/JavaEndpointWebsocket.java | 3 +- src/main/java/io/deepstream/List.java | 24 +- .../io/deepstream/LoginResultListener.java | 9 + .../java/io/deepstream/MessageBuilder.java | 3 +- .../java/io/deepstream/MessageParser.java | 7 +- .../PresenceGetAllResultListener.java | 9 + .../java/io/deepstream/PresenceHandler.java | 95 ++- src/main/java/io/deepstream/Record.java | 193 ++++-- .../io/deepstream/RecordChangedCallback.java | 3 +- .../java/io/deepstream/RecordHandler.java | 572 +++++++++++------- .../io/deepstream/RecordMergeStrategies.java | 3 +- .../io/deepstream/RecordMergeStrategy.java | 3 +- .../RecordMergeStrategyException.java | 3 +- .../deepstream/RecordPathChangedCallback.java | 3 +- src/main/java/io/deepstream/RpcHandler.java | 87 ++- .../java/io/deepstream/RpcMakeListener.java | 9 + .../java/io/deepstream/SetNameListener.java | 9 + .../deepstream/SetWithAckResultListener.java | 5 + .../java/io/deepstream/SnapshotListener.java | 5 + .../io/deepstream/UtilAckTimeoutRegistry.java | 7 +- src/main/java/io/deepstream/UtilJSONPath.java | 5 +- 31 files changed, 930 insertions(+), 416 deletions(-) create mode 100644 src/main/java/io/deepstream/GetListListener.java create mode 100644 src/main/java/io/deepstream/GetRecorrListener.java create mode 100644 src/main/java/io/deepstream/HasListener.java create mode 100644 src/main/java/io/deepstream/LoginResultListener.java create mode 100644 src/main/java/io/deepstream/PresenceGetAllResultListener.java create mode 100644 src/main/java/io/deepstream/RpcMakeListener.java create mode 100644 src/main/java/io/deepstream/SetNameListener.java create mode 100644 src/main/java/io/deepstream/SetWithAckResultListener.java create mode 100644 src/main/java/io/deepstream/SnapshotListener.java diff --git a/src/main/java/io/deepstream/AnonymousRecord.java b/src/main/java/io/deepstream/AnonymousRecord.java index 3047111..d24fffd 100644 --- a/src/main/java/io/deepstream/AnonymousRecord.java +++ b/src/main/java/io/deepstream/AnonymousRecord.java @@ -1,11 +1,12 @@ package io.deepstream; -import com.google.j2objc.annotations.ObjectiveCName; - import com.google.gson.JsonElement; +import com.google.j2objc.annotations.ObjectiveCName; import java.util.ArrayList; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; /** * An AnonymousRecord is a record without a predefined name. It @@ -269,37 +270,63 @@ public AnonymousRecord removeRecordEventsListener(RecordEventsListener recordEve } /** - * Sets the underlying record the anonymous record is bound + * Synchronously sets the underlying record the anonymous record is bound * to. Can be called multiple times. * * @param recordName The name of the underlying record to use * @return The AnonymousRecord */ @ObjectiveCName("setName:") - public AnonymousRecord setName( String recordName ) { - this.unsubscribeRecord(); - this.record = this.recordHandler.getRecord( recordName ); - this.subscribeRecord(); - - final CountDownLatch readyLatch = new CountDownLatch(1); - record.whenReady(new Record.RecordReadyListener() { - @Override - public void onRecordReady(String recordName, Record record) { - readyLatch.countDown(); - } - }); - + public AnonymousRecord setName( String recordName ){ try { - readyLatch.await(); - } catch (InterruptedException e) { + return setNameAsync(recordName, null).get(); + }catch(Exception e){ e.printStackTrace(); + return null; } + } - for( AnonymousRecordNameChangedListener anonymousRecordNameChangedCallback : this.anonymousRecordNameChangedCallbacks ) { - anonymousRecordNameChangedCallback.recordNameChanged( recordName, this ); - } - - return this; + /** + * Asynchronously sets the underlying record the anonymous record is bound + * to. Can be called multiple times. + * + * @param recordName The name of the underlying record to use + * @param listener Callback to be called after query is successfull, may be null + * @return The AnonymousRecord + */ + @ObjectiveCName("setNameAsync:") + public Future setNameAsync(final String recordName, final SetNameListener listener) { + return recordHandler.executor.submit(new Callable() { + @Override + public AnonymousRecord call() throws Exception { + AnonymousRecord.this.unsubscribeRecord(); + AnonymousRecord.this.record = AnonymousRecord.this.recordHandler.getRecord(recordName); + AnonymousRecord.this.subscribeRecord(); + + final CountDownLatch readyLatch = new CountDownLatch(1); + record.whenReady(new Record.RecordReadyListener() { + @Override + public void onRecordReady(String recordName, Record record) { + readyLatch.countDown(); + } + }); + + try { + readyLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + for (AnonymousRecordNameChangedListener anonymousRecordNameChangedCallback : AnonymousRecord.this.anonymousRecordNameChangedCallbacks) { + anonymousRecordNameChangedCallback.recordNameChanged(recordName, AnonymousRecord.this); + } + + if(listener != null){ + return AnonymousRecord.this; + } + return AnonymousRecord.this; + } + }); } /** diff --git a/src/main/java/io/deepstream/Connection.java b/src/main/java/io/deepstream/Connection.java index c948b47..f1a75d4 100644 --- a/src/main/java/io/deepstream/Connection.java +++ b/src/main/java/io/deepstream/Connection.java @@ -1,14 +1,15 @@ package io.deepstream; -import com.google.j2objc.annotations.ObjectiveCName; - import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import com.google.j2objc.annotations.ObjectiveCName; import java.net.URI; import java.net.URISyntaxException; -import java.util.*; +import java.util.ArrayList; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -58,6 +59,23 @@ class Connection implements IConnection { this.endpoint = createEndpoint(); } + /** + * Creates an endpoint and passed it to {@link Connection#Connection(String, DeepstreamConfig, DeepstreamClient, EndpointFactory, Endpoint)} + * + * @see Connection#Connection(String, DeepstreamConfig, DeepstreamClient, EndpointFactory, Endpoint) + * + * @param url The endpoint url + * @param options The options used to initialise the deepstream client + * @param endpointFactory The factory to create endpoints + * @param client The deepstream client + * @throws URISyntaxException An exception if an invalid url is passed in + */ + Connection(final String url, final DeepstreamConfig options, DeepstreamClient client, EndpointFactory endpointFactory, boolean networkAvailable) throws URISyntaxException { + this( url, options, client, endpointFactory, null ); + this.globalConnectivityState = networkAvailable ? GlobalConnectivityState.CONNECTED : GlobalConnectivityState.DISCONNECTED; + this.endpoint = createEndpoint(); + } + /** * Creates a connection, that is responsible for handling all the connection related logic related to state * and messages @@ -260,6 +278,7 @@ else if( this.deliberateClose ) { return; } this.tryReconnect(); + } } @@ -326,13 +345,14 @@ else if( message.action == Actions.ACK ) { @ObjectiveCName("setState:") private void setState( ConnectionState connectionState ) { + System.out.println(connectionState); this.connectionState = connectionState; for (ConnectionStateListener connectStateListener : this.connectStateListeners) { connectStateListener.connectionStateChanged(connectionState); } - if( connectionState == ConnectionState.AWAITING_AUTHENTICATION && this.authParameters != null ) { + if( connectionState == ConnectionState.AWAITING_AUTHENTICATION && this.authParameters != null) { this.sendAuthMessage(); } } diff --git a/src/main/java/io/deepstream/DeepstreamClient.java b/src/main/java/io/deepstream/DeepstreamClient.java index 7f3ae4c..e701a2c 100644 --- a/src/main/java/io/deepstream/DeepstreamClient.java +++ b/src/main/java/io/deepstream/DeepstreamClient.java @@ -7,7 +7,11 @@ import java.util.Date; import java.util.Map; import java.util.Properties; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** * The main entry point for a DeepstreamClient. You can create a client directly using the constructors or use the @@ -15,6 +19,7 @@ * {@link DeepstreamFactory#getClient(String, Properties)} to create one for you and hold them for future reference. */ public class DeepstreamClient extends DeepstreamClientAbstract { + private ExecutorService executor; /** * The getters for data-sync, such as {@link RecordHandler#getRecord(String)}, @@ -124,11 +129,24 @@ private DeepstreamClient(final String url, DeepstreamConfig deepstreamConfig) th */ @ObjectiveCName("init:deepstreamConfig:endpointFactory:") public DeepstreamClient(final String url, DeepstreamConfig deepstreamConfig, EndpointFactory endpointFactory) throws URISyntaxException { - this.connection = new Connection(url, deepstreamConfig, this, endpointFactory); + this(url, deepstreamConfig, endpointFactory, false); + } + + /** + * deepstream.io java client + * @param url URL to connect to. The protocol can be omited, e.g. : + * @param deepstreamConfig A map of options that extend the ones specified in DefaultConfig.properties + * @param endpointFactory An EndpointFactory that returns an Endpoint + * @param networkAvailable indicates whether network is available or not + * @throws URISyntaxException Thrown if the url in incorrect + */ + public DeepstreamClient(final String url, DeepstreamConfig deepstreamConfig, EndpointFactory endpointFactory, boolean networkAvailable) throws URISyntaxException { + this.executor = Executors.newSingleThreadExecutor(); + this.connection = new Connection(url, deepstreamConfig, this, endpointFactory, networkAvailable); this.event = new EventHandler(deepstreamConfig, this.connection, this); - this.rpc = new RpcHandler(deepstreamConfig, this.connection, this); - this.record = new RecordHandler(deepstreamConfig, this.connection, this); - this.presence = new PresenceHandler(deepstreamConfig, this.connection, this); + this.rpc = new RpcHandler(deepstreamConfig, this.connection, this, this.executor); + this.record = new RecordHandler(deepstreamConfig, this.connection, this, this.executor); + this.presence = new PresenceHandler(deepstreamConfig, this.connection, this, this.executor); } /** @@ -189,12 +207,12 @@ public void setRuntimeErrorHandler( DeepstreamRuntimeErrorHandler deepstreamRunt * @return The login result */ @ObjectiveCName("login") - public LoginResult login() { + public LoginResult login(){ return this.login(null); } /** - * Send authentication parameters to the client to fully open + * Synchronously sends authentication parameters to the client to fully open * the connection. * * Please note: Authentication parameters are send over an already established @@ -214,37 +232,83 @@ public LoginResult login() { * login can be called multiple times until either the connection is authenticated or * forcefully closed by the server since its maxAuthAttempts threshold has been exceeded * + * This will block your calling thread + * * @param authParams JSON.serializable authentication data * @return The login result */ @ObjectiveCName("login:") - public LoginResult login(JsonElement authParams) { - final CountDownLatch loggedInLatch = new CountDownLatch(1); - final LoginResult[] loginResult = new LoginResult[1]; - - this.connection.authenticate(authParams, new LoginCallback() { - @Override - @ObjectiveCName("loginSuccess:") - public void loginSuccess(Object userData) { - loginResult[0] = new LoginResult(true, userData); - loggedInLatch.countDown(); - } + public LoginResult login(final JsonElement authParams){ + try { + return this.loginAsync(authParams, null).get(); + }catch(Exception e){ + e.printStackTrace(); + return null; + } + } + /** + * Asynchronously sends authentication parameters to the client to fully open + * the connection and sends callback afterwards. + * + * Please note: Authentication parameters are send over an already established + * connection, rather than appended to the server URL. This means the parameters + * will be encrypted when used with a WSS / HTTPS connection. If the deepstream server + * on the other side has message logging enabled it will however be written to the logs in + * plain text. If additional security is a requirement it might therefor make sense to hash + * the password on the client. + * + * If the connection is not yet established the authentication parameter will be + * stored and send once it becomes available + * + * authParams can be any JSON serializable data structure and its up for the + * permission handler on the server to make sense of them, although something + * like { username: 'someName', password: 'somePass' } will probably make the most sense. + * + * login can be called multiple times until either the connection is authenticated or + * forcefully closed by the server since its maxAuthAttempts threshold has been exceeded + * + * This will not block your main thread. However you can block it by calling get(). + * + * @param authParams JSON.serializable authentication data + * @param listener Callback to be called after login is successfull, may be null + * @return The login result + */ + @ObjectiveCName("loginAsync:") + public Future loginAsync(final JsonElement authParams, final LoginResultListener listener) { + return executor.submit(new Callable() { @Override - @ObjectiveCName("loginFailed:data:") - public void loginFailed(Event errorEvent, Object data) { - loginResult[0] = new LoginResult(false, errorEvent, data); - loggedInLatch.countDown(); + public LoginResult call() throws Exception { + final CountDownLatch loggedInLatch = new CountDownLatch(1); + final LoginResult[] loginResult = new LoginResult[1]; + + connection.authenticate(authParams, new LoginCallback() { + @Override + @ObjectiveCName("loginSuccess:") + public void loginSuccess(Object userData) { + loginResult[0] = new LoginResult(true, userData); + loggedInLatch.countDown(); + } + + @Override + @ObjectiveCName("loginFailed:data:") + public void loginFailed(Event errorEvent, Object data) { + loginResult[0] = new LoginResult(false, errorEvent, data); + loggedInLatch.countDown(); + } + }); + + try { + loggedInLatch.await(); + } catch (InterruptedException e) { + loginResult[0] = new LoginResult(false, null, "An issue occured during login"); + } + if(listener != null) { + listener.loginCompleted(loginResult[0]); + } + return loginResult[0]; } }); - - try { - loggedInLatch.await(); - } catch (InterruptedException e) { - loginResult[0] = new LoginResult(false, null, "An issue occured during login"); - } - - return loginResult[0]; } /** @@ -252,7 +316,8 @@ public void loginFailed(Event errorEvent, Object data) { * @return The deepstream client */ public DeepstreamClient close() { - this.connection.close(false); + this.executor.shutdown(); + this.connection.close(true); this.getAckTimeoutRegistry().close(); return this; } @@ -289,9 +354,7 @@ public ConnectionState getConnectionState() { } /** - * Sets global connectivity state and notifies current connections about it. When connectivity is {@link GlobalConnectivityState#DISCONNECTED)} connection will be closed and - * no reconnects will be attempted. If connectivity is set to {@link GlobalConnectivityState#CONNECTED)} and current {@link ConnectionState)} is {@link ConnectionState#CLOSED)} - * or {@link ConnectionState#ERROR)} then client will try reconnecting. + * Set global connectivity state. * @param {GlobalConnectivityState} globalConnectivityState Current global connectivity state */ public void setGlobalConnectivityState(GlobalConnectivityState globalConnectivityState){ diff --git a/src/main/java/io/deepstream/DeepstreamClientAbstract.java b/src/main/java/io/deepstream/DeepstreamClientAbstract.java index e1fd622..d31404e 100644 --- a/src/main/java/io/deepstream/DeepstreamClientAbstract.java +++ b/src/main/java/io/deepstream/DeepstreamClientAbstract.java @@ -2,8 +2,6 @@ import com.google.j2objc.annotations.ObjectiveCName; -import com.google.gson.JsonElement; - abstract class DeepstreamClientAbstract { private UtilAckTimeoutRegistry utilAckTimeoutRegistry; private DeepstreamRuntimeErrorHandler deepstreamRuntimeErrorHandler; @@ -14,10 +12,7 @@ abstract class DeepstreamClientAbstract { abstract DeepstreamClientAbstract removeConnectionChangeListener(ConnectionStateListener connectionStateListener); abstract ConnectionState getConnectionState(); - abstract LoginResult login(); - @ObjectiveCName("login:") - abstract LoginResult login(JsonElement data); abstract DeepstreamClientAbstract close(); abstract String getUid(); diff --git a/src/main/java/io/deepstream/DeepstreamFactory.java b/src/main/java/io/deepstream/DeepstreamFactory.java index 5290250..d09ca7c 100644 --- a/src/main/java/io/deepstream/DeepstreamFactory.java +++ b/src/main/java/io/deepstream/DeepstreamFactory.java @@ -63,6 +63,27 @@ public DeepstreamClient getClient(String url) throws URISyntaxException { return client; } + /** + * Returns a client that was previous created via the same url using this method or {@link DeepstreamFactory#getClient(String, Properties)}. + * If one wasn't created, it creates it first and stores it for future reference. + * + * @param url The url to connect to, also the key used to retrieve in future calls + * @param networkAvailable boolean to indicate whether network is available or not + * @return A deepstream client + * @throws URISyntaxException An error if the url syntax is invalid + */ + public DeepstreamClient getClient(String url, boolean networkAvailable) throws URISyntaxException { + DeepstreamClient client = this.clients.get(url); + this.lastUrl = url; + if (clientDoesNotExist(client)) { + client = new DeepstreamClient(url, new DeepstreamConfig(), new JavaEndpointFactory(), networkAvailable); + this.clients.put(url, client); + }else{ + client.setGlobalConnectivityState(networkAvailable ? GlobalConnectivityState.CONNECTED : GlobalConnectivityState.DISCONNECTED); + } + return client; + } + /** * Returns a client that was previous created via the same url using this method or {@link DeepstreamFactory#getClient(String)}. * If one wasn't created, it creates it first and stores it for future reference. diff --git a/src/main/java/io/deepstream/EventHandler.java b/src/main/java/io/deepstream/EventHandler.java index 8a76109..7c74cf9 100644 --- a/src/main/java/io/deepstream/EventHandler.java +++ b/src/main/java/io/deepstream/EventHandler.java @@ -2,8 +2,10 @@ import com.google.j2objc.annotations.ObjectiveCName; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * The entry point for events, such as {@link EventHandler#subscribe(String, EventListener)}, diff --git a/src/main/java/io/deepstream/GetListListener.java b/src/main/java/io/deepstream/GetListListener.java new file mode 100644 index 0000000..d7f7fb3 --- /dev/null +++ b/src/main/java/io/deepstream/GetListListener.java @@ -0,0 +1,9 @@ +package io.deepstream; + +/** + * Created by horin on 04.08.2017. + */ + +public interface GetListListener { + void getListResult(List result); +} diff --git a/src/main/java/io/deepstream/GetRecorrListener.java b/src/main/java/io/deepstream/GetRecorrListener.java new file mode 100644 index 0000000..378eb9f --- /dev/null +++ b/src/main/java/io/deepstream/GetRecorrListener.java @@ -0,0 +1,9 @@ +package io.deepstream; + +/** + * Created by horin on 04.08.2017. + */ + +public interface GetRecorrListener { + void getRecordCompleted(Record result); +} diff --git a/src/main/java/io/deepstream/HasListener.java b/src/main/java/io/deepstream/HasListener.java new file mode 100644 index 0000000..d483565 --- /dev/null +++ b/src/main/java/io/deepstream/HasListener.java @@ -0,0 +1,9 @@ +package io.deepstream; + +/** + * Created by horin on 04.08.2017. + */ + +public interface HasListener { + void hasCompleted(HasResult result); +} diff --git a/src/main/java/io/deepstream/HasResult.java b/src/main/java/io/deepstream/HasResult.java index eb7411a..b9c79e0 100644 --- a/src/main/java/io/deepstream/HasResult.java +++ b/src/main/java/io/deepstream/HasResult.java @@ -1,7 +1,5 @@ package io.deepstream; -import com.google.gson.JsonElement; - public class HasResult { boolean exists; diff --git a/src/main/java/io/deepstream/JavaEndpointWebsocket.java b/src/main/java/io/deepstream/JavaEndpointWebsocket.java index e994969..5393ee1 100644 --- a/src/main/java/io/deepstream/JavaEndpointWebsocket.java +++ b/src/main/java/io/deepstream/JavaEndpointWebsocket.java @@ -6,12 +6,13 @@ import org.java_websocket.drafts.Draft_10; import org.java_websocket.handshake.ServerHandshake; -import javax.net.ssl.SSLContext; import java.net.URI; import java.net.URISyntaxException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; +import javax.net.ssl.SSLContext; + class JavaEndpointWebsocket implements Endpoint { private final URI uri; diff --git a/src/main/java/io/deepstream/List.java b/src/main/java/io/deepstream/List.java index 077cf2f..1afd2a1 100644 --- a/src/main/java/io/deepstream/List.java +++ b/src/main/java/io/deepstream/List.java @@ -1,11 +1,13 @@ package io.deepstream; import com.google.gson.Gson; -import com.google.j2objc.annotations.ObjectiveCName; - import com.google.gson.JsonElement; +import com.google.j2objc.annotations.ObjectiveCName; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; /** * A List is a specialised Record that contains @@ -28,7 +30,21 @@ public class List { @ObjectiveCName("init:name:") List(RecordHandler recordHandler, String name) { - this.record = recordHandler.getRecord( name ); + this.record = recordHandler.getRecord(name); + this.recordListeners = new List.RecordListeners( this, this.record ); + this.listChangedListeners = new ArrayList<>(); + this.listEntryChangedListeners = new ArrayList<>(); + this.gson = new Gson(); + } + + /** + * Constructor is not public since it is created via {@link RecordHandler#getList(String)} + * @param record The record of list + * @param name The list name + */ + @ObjectiveCName("init:name:") + List(Record record, String name) { + this.record = record; this.recordListeners = new List.RecordListeners( this, this.record ); this.listChangedListeners = new ArrayList<>(); this.listEntryChangedListeners = new ArrayList<>(); diff --git a/src/main/java/io/deepstream/LoginResultListener.java b/src/main/java/io/deepstream/LoginResultListener.java new file mode 100644 index 0000000..113d3bd --- /dev/null +++ b/src/main/java/io/deepstream/LoginResultListener.java @@ -0,0 +1,9 @@ +package io.deepstream; + +/** + * Created by horin on 04.08.2017. + */ + +public interface LoginResultListener { + void loginCompleted(LoginResult result); +} diff --git a/src/main/java/io/deepstream/MessageBuilder.java b/src/main/java/io/deepstream/MessageBuilder.java index 475fda7..f23b29c 100644 --- a/src/main/java/io/deepstream/MessageBuilder.java +++ b/src/main/java/io/deepstream/MessageBuilder.java @@ -1,9 +1,8 @@ package io.deepstream; -import com.google.j2objc.annotations.ObjectiveCName; - import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.j2objc.annotations.ObjectiveCName; /** * Creates a deepstream message string, based on the diff --git a/src/main/java/io/deepstream/MessageParser.java b/src/main/java/io/deepstream/MessageParser.java index 087ea63..aa0daa2 100644 --- a/src/main/java/io/deepstream/MessageParser.java +++ b/src/main/java/io/deepstream/MessageParser.java @@ -4,7 +4,12 @@ import com.google.gson.JsonElement; import com.google.gson.stream.JsonReader; -import java.io.*; +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; diff --git a/src/main/java/io/deepstream/PresenceGetAllResultListener.java b/src/main/java/io/deepstream/PresenceGetAllResultListener.java new file mode 100644 index 0000000..6e7f237 --- /dev/null +++ b/src/main/java/io/deepstream/PresenceGetAllResultListener.java @@ -0,0 +1,9 @@ +package io.deepstream; + +/** + * Created by horin on 04.08.2017. + */ + +public interface PresenceGetAllResultListener { + void getAllCompleted(String[] result); +} diff --git a/src/main/java/io/deepstream/PresenceHandler.java b/src/main/java/io/deepstream/PresenceHandler.java index b3d5ec0..bebcbe8 100644 --- a/src/main/java/io/deepstream/PresenceHandler.java +++ b/src/main/java/io/deepstream/PresenceHandler.java @@ -3,7 +3,11 @@ import com.google.j2objc.annotations.ObjectiveCName; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; public class PresenceHandler { @@ -14,8 +18,9 @@ public class PresenceHandler { private final DeepstreamClientAbstract client; private final UtilAckTimeoutRegistry ackTimeoutRegistry; private final UtilSingleNotifier notifier; + private final ExecutorService executor; - PresenceHandler(DeepstreamConfig deepstreamConfig, final IConnection connection, DeepstreamClientAbstract client) { + PresenceHandler(DeepstreamConfig deepstreamConfig, final IConnection connection, DeepstreamClientAbstract client, ExecutorService executor) { this.subscriptionTimeout = deepstreamConfig.getSubscriptionTimeout(); this.connection = connection; this.client = client; @@ -23,6 +28,7 @@ public class PresenceHandler { this.deepstreamConfig = deepstreamConfig; this.ackTimeoutRegistry = client.getAckTimeoutRegistry(); this.notifier = new UtilSingleNotifier(client, connection, Topic.PRESENCE, Actions.QUERY, subscriptionTimeout); + this.executor = executor; new UtilResubscribeNotifier(this.client, new UtilResubscribeNotifier.UtilResubscribeListener() { @Override @@ -35,42 +41,75 @@ public void resubscribe() { } /** - * Queries for clients logged into deepstream + * Queries synchronously for clients logged into deepstream + * + * This will block your calling thread * * @return List a list of currently connected clients * @throws DeepstreamError */ - public String[] getAll() throws DeepstreamError { - - final Object[] data = new Object[1]; - final DeepstreamError[] deepstreamException = new DeepstreamError[1]; - - final CountDownLatch snapshotLatch = new CountDownLatch(1); - - notifier.request(Actions.QUERY.toString(), new UtilSingleNotifier.UtilSingleNotifierCallback() { - @Override - public void onSingleNotifierError(String name, DeepstreamError error) { - deepstreamException[0] = error; - snapshotLatch.countDown(); + public String[] getAll() throws DeepstreamError{ + try { + return getAllAsync(null).get(); + }catch(ExecutionException e){ + Throwable t = e.getCause(); + if(t instanceof DeepstreamError){ + throw (DeepstreamError)t; + }else{ + e.printStackTrace(); + return null; } + }catch(Exception e){ + e.printStackTrace(); + return null; + } + } + /** + * Queries asynchronously for clients logged into deepstream. You can block calling thread by executing .get() on result. + * + * @return Future> a list of currently connected clients + * @param listener Callback to be called after query is successfull, may be null + * @throws DeepstreamError + */ + public Future getAllAsync(final PresenceGetAllResultListener listener) { + return executor.submit(new Callable() { @Override - public void onSingleNotifierResponse(String name, Object users) { - data[0] = users; - snapshotLatch.countDown(); + public String[] call() throws Exception { + final Object[] data = new Object[1]; + final DeepstreamError[] deepstreamException = new DeepstreamError[1]; + + final CountDownLatch snapshotLatch = new CountDownLatch(1); + + notifier.request(Actions.QUERY.toString(), new UtilSingleNotifier.UtilSingleNotifierCallback() { + @Override + public void onSingleNotifierError(String name, DeepstreamError error) { + deepstreamException[0] = error; + snapshotLatch.countDown(); + } + + @Override + public void onSingleNotifierResponse(String name, Object users) { + data[0] = users; + snapshotLatch.countDown(); + } + }); + + try { + snapshotLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + if (deepstreamException[0] != null) { + throw deepstreamException[0]; + } + if(listener != null){ + listener.getAllCompleted((String[]) data[0]); + } + return (String[]) data[0]; } }); - - try { - snapshotLatch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - if (deepstreamException[0] != null) { - throw deepstreamException[0]; - } - return (String[]) data[0]; } /** diff --git a/src/main/java/io/deepstream/Record.java b/src/main/java/io/deepstream/Record.java index 2e33524..be126c9 100644 --- a/src/main/java/io/deepstream/Record.java +++ b/src/main/java/io/deepstream/Record.java @@ -1,12 +1,23 @@ package io.deepstream; -import com.google.gson.*; +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; import com.google.j2objc.annotations.ObjectiveCName; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; /** @@ -29,6 +40,7 @@ public class Record { private final ArrayList onceRecordReadyListeners; private final String name; private final DeepstreamConfig deepstreamConfig; + private final ExecutorService executor; private boolean isReady; private boolean isDestroyed; private int version; @@ -47,7 +59,7 @@ public class Record { * @param client deepstream.io client */ @ObjectiveCName("init:recordOptions:connection:deepstreamConfig:client:") - Record(String name, Map recordOptions, IConnection connection, DeepstreamConfig deepstreamConfig, DeepstreamClientAbstract client) { + Record(String name, Map recordOptions, IConnection connection, DeepstreamConfig deepstreamConfig, DeepstreamClientAbstract client, ExecutorService executor) { this.ackTimeoutRegistry = client.getAckTimeoutRegistry(); this.name = name; this.deepstreamConfig = deepstreamConfig; @@ -75,6 +87,7 @@ public void resubscribe() { } }); this.recordSetNotifier = new UtilSingleNotifier(client, connection, Topic.RECORD, Actions.PATCH, deepstreamConfig.getSubscriptionTimeout()); + this.executor = executor; } /** @@ -220,7 +233,7 @@ public JsonElement get() { * such as {@link Map}. Since this is a root the object should also not be a primitive. * * @see Record#set(String, Object) - */ + */ @ObjectiveCName("set:") public Record set( JsonElement value ) throws DeepstreamRecordDestroyedException { return this.set( null, value, false ); @@ -255,7 +268,7 @@ public Record set(String path, Object value ) throws DeepstreamRecordDestroyedEx * @see Record#set(String, Object) */ @ObjectiveCName("setWithAck:") - public RecordSetResult setWithAck(Object value) { + public RecordSetResult setWithAck(Object value) throws DeepstreamRecordDestroyedException{ return this.setWithAck(null, value); } @@ -275,74 +288,114 @@ public RecordSetResult setWithAck(Object value) { * @throws DeepstreamRecordDestroyedException Thrown if the record has been destroyed and can't perform more actions */ @ObjectiveCName("setWithAck:value:") - public RecordSetResult setWithAck(String path, Object value) { - throwExceptionIfDestroyed( "set" ); - - JsonElement element; - if( value instanceof String ) { - element = new JsonPrimitive((String) value); - } - else if( value instanceof Number ) { - element = new JsonPrimitive((Number) value); - } - else if( value instanceof Boolean ) { - element = new JsonPrimitive((Boolean) value); - } else { - element = gson.toJsonTree( value ); + public RecordSetResult setWithAck(String path, Object value) throws DeepstreamRecordDestroyedException{ + try { + return this.setWithAckAsync(path, value, null).get(); + }catch(ExecutionException e){ + Throwable t = e.getCause(); + if(t instanceof DeepstreamRecordDestroyedException){ + throw (DeepstreamRecordDestroyedException)t; + }else{ + e.printStackTrace(); + return null; + } + }catch(Exception e){ + e.printStackTrace(); + return null; } + } - JsonElement object = this.path.get( path ); + /** + * Set the value for a specific path in your Record data asynchronously and gives acknowledgement + * whether there were any errors storing the record data in cache or storage. You can block calling thread by executing .get() on result.
+ * Make sure that the Object passed in can be serialised to a JsonElement, otherwise it will + * throw a {@link IllegalStateException}.
+ * The best way to guarantee this is by setting Json friendly objects, + * such as {@link Map}.
+ * If your path is not null, you can pass in primitives, which is the + * equivalent of calling {@link Record#set(String, Object)}. + * + * @param path The path with the JsonElement at which to set the value + * @param value The value to set + * @param listener Callback to be called after query is successfull, may be null + * @return future record + * @throws DeepstreamRecordDestroyedException Thrown if the record has been destroyed and can't perform more actions + */ + @ObjectiveCName("setWithAckAsync:value:") + public Future setWithAckAsync(final String path, final Object value, final SetWithAckResultListener listener) { + return executor.submit(new Callable() { + @Override + public RecordSetResult call() throws Exception { + throwExceptionIfDestroyed("set"); + + JsonElement element; + if (value instanceof String) { + element = new JsonPrimitive((String) value); + } else if (value instanceof Number) { + element = new JsonPrimitive((Number) value); + } else if (value instanceof Boolean) { + element = new JsonPrimitive((Boolean) value); + } else { + element = gson.toJsonTree(value); + } - if( object != null && object.equals( value ) ) { - return new RecordSetResult(null); - } else if( path == null && this.data.equals( value ) ) { - return new RecordSetResult(null); - } + JsonElement object = Record.this.path.get(path); - final RecordSetResult[] result = new RecordSetResult[1]; - final Map oldValues = beginChange(); - this.path.set( path, element ); - this.data = this.path.getCoreElement(); + if (object != null && object.equals(value)) { + return new RecordSetResult(null); + } else if (path == null && Record.this.data.equals(value)) { + return new RecordSetResult(null); + } - JsonObject config = new JsonObject(); - config.addProperty("writeSuccess", true); - String newVersion = String.valueOf(this.version + 1); + final RecordSetResult[] result = new RecordSetResult[1]; + final Map oldValues = beginChange(); + Record.this.path.set(path, element); + Record.this.data = Record.this.path.getCoreElement(); - String[] data; - if( path == null ) { - data = new String[]{ this.name(), newVersion, element.toString(), config.toString() }; - } else { - data = new String[]{ this.name(), newVersion, path, MessageBuilder.typed(value), config.toString() }; - } + JsonObject config = new JsonObject(); + config.addProperty("writeSuccess", true); + String newVersion = String.valueOf(Record.this.version + 1); - Actions action; - if (path == null) { - action = Actions.UPDATE; - } else { - action = Actions.PATCH; - } + String[] data; + if (path == null) { + data = new String[]{Record.this.name(), newVersion, element.toString(), config.toString()}; + } else { + data = new String[]{Record.this.name(), newVersion, path, MessageBuilder.typed(value), config.toString()}; + } - final CountDownLatch snapshotLatch = new CountDownLatch(1); - this.recordSetNotifier.request(newVersion, action, data, new UtilSingleNotifier.UtilSingleNotifierCallback() { - @Override - public void onSingleNotifierError(String name, DeepstreamError error) { - result[0] = new RecordSetResult( error.getMessage() ); - snapshotLatch.countDown(); - } + Actions action; + if (path == null) { + action = Actions.UPDATE; + } else { + action = Actions.PATCH; + } - @Override - public void onSingleNotifierResponse(String name, Object data) { - completeChange(oldValues); - result[0] = new RecordSetResult( null ); - snapshotLatch.countDown(); + final CountDownLatch snapshotLatch = new CountDownLatch(1); + Record.this.recordSetNotifier.request(newVersion, action, data, new UtilSingleNotifier.UtilSingleNotifierCallback() { + @Override + public void onSingleNotifierError(String name, DeepstreamError error) { + result[0] = new RecordSetResult(error.getMessage()); + snapshotLatch.countDown(); + } + + @Override + public void onSingleNotifierResponse(String name, Object data) { + completeChange(oldValues); + result[0] = new RecordSetResult(null); + snapshotLatch.countDown(); + } + }); + try { + snapshotLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if(listener != null){ + listener.setWithAckResultCompleted(result[0]); + } + return result[0]; } }); - try { - snapshotLatch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - return result[0]; } /** @@ -532,7 +585,7 @@ void onMessage(Message message) { } else if( message.data[ 0 ].equals( Event.VERSION_EXISTS.toString() ) ) { recoverRecord( Integer.parseInt( message.data[ 2 ] ), gson.fromJson( message.data[ 3 ], JsonElement.class )); } else if( message.data[ 0 ].equals( Event.MESSAGE_DENIED.toString() ) ) { - clearTimeouts(); + clearTimeouts(); } } @@ -944,4 +997,18 @@ interface RecordReadyListener { @ObjectiveCName("onRecordReady:record:") void onRecordReady(String recordName, Record record); } + + @Override + public int hashCode() { + return this.name.hashCode(); + } + + @Override + public boolean equals(Object otherEntry) { + if (otherEntry instanceof Record && otherEntry != null) { + return this.name.equals(((Record) otherEntry).name); + } else { + return false; + } + } } diff --git a/src/main/java/io/deepstream/RecordChangedCallback.java b/src/main/java/io/deepstream/RecordChangedCallback.java index f7a433f..1658126 100644 --- a/src/main/java/io/deepstream/RecordChangedCallback.java +++ b/src/main/java/io/deepstream/RecordChangedCallback.java @@ -1,8 +1,7 @@ package io.deepstream; -import com.google.j2objc.annotations.ObjectiveCName; - import com.google.gson.JsonElement; +import com.google.j2objc.annotations.ObjectiveCName; /** * Record data changed listener, used to be notified whenever the record data has been modified either locally or remotely. diff --git a/src/main/java/io/deepstream/RecordHandler.java b/src/main/java/io/deepstream/RecordHandler.java index 22b63e1..54cb053 100644 --- a/src/main/java/io/deepstream/RecordHandler.java +++ b/src/main/java/io/deepstream/RecordHandler.java @@ -1,11 +1,18 @@ package io.deepstream; -import com.google.gson.*; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; import com.google.j2objc.annotations.ObjectiveCName; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; /** * The getters for data-sync, such as {@link RecordHandler#getRecord(String)}, @@ -24,20 +31,22 @@ public class RecordHandler { private final Map listeners; private final RecordHandlerListeners recordHandlerListeners; private final UtilSingleNotifier recordSetNotifier; + protected final ExecutorService executor; /** * A collection of factories for records. This class * is exposed as client.record * * @param deepstreamConfig The deepstreamConfig the client was created with - * @param connection The connection - * @param client The deepstream client + * @param connection The connection + * @param client The deepstream client */ @ObjectiveCName("init:connection:client:") - RecordHandler(DeepstreamConfig deepstreamConfig, IConnection connection, DeepstreamClientAbstract client) { + RecordHandler(DeepstreamConfig deepstreamConfig, IConnection connection, DeepstreamClientAbstract client, ExecutorService executor) { this.deepstreamConfig = deepstreamConfig; this.connection = connection; this.client = client; + this.executor = executor; recordHandlerListeners = new RecordHandlerListeners(); records = new HashMap<>(); @@ -50,45 +59,74 @@ public class RecordHandler { } /** - * Returns an existing record or creates a new one. If creating a new one the record + * Synchronously returns an existing record or creates a new one. If creating a new one the record * will not be in a ready state till it is loaded from the server. + * This will block your calling thread + * * @param name The name of the record to get * @return Record The record */ @ObjectiveCName("getRecord:") - public Record getRecord( String name ) { - Record record = records.get( name ); - if( record == null ) { - synchronized (this) { - record = records.get( name ); + public Record getRecord(String name){ + try { + return this.getRecordAsync(name, null).get(); + }catch(Exception e){ + e.printStackTrace(); + return null; + } + } + + /** + * Asynchronously returns an existing record or creates a new one. If creating a new one the record + * will not be in a ready state till it is loaded from the server. + * You can block calling thread by calling .get() + * + * @param name The name of the record to get + * @param listener Callback to be called after record is fetch, may be null + * @return Record The record + */ + @ObjectiveCName("getRecordAsync:") + public Future getRecordAsync(final String name, final GetRecorrListener listener) { + return executor.submit(new Callable() { + @Override + public Record call() throws Exception { + Record record = records.get(name); if (record == null) { - record = new Record(name, new HashMap(), connection, deepstreamConfig, client); - records.put(name, record); - record.addRecordEventsListener(recordHandlerListeners); - record.addRecordDestroyPendingListener(recordHandlerListeners); - record.start(); + synchronized (this) { + record = records.get(name); + if (record == null) { + record = new Record(name, new HashMap(), connection, deepstreamConfig, client, executor); + records.put(name, record); + record.addRecordEventsListener(recordHandlerListeners); + record.addRecordDestroyPendingListener(recordHandlerListeners); + record.start(); + } + } } - } - } - record.incrementUsage(); + record.incrementUsage(); + + if (!record.isReady()) { + final CountDownLatch readyLatch = new CountDownLatch(1); + record.whenReady(new Record.RecordReadyListener() { + @Override + public void onRecordReady(String recordName, Record record) { + readyLatch.countDown(); + } + }); + try { + readyLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } - if (!record.isReady()) { - final CountDownLatch readyLatch = new CountDownLatch(1); - record.whenReady(new Record.RecordReadyListener() { - @Override - public void onRecordReady(String recordName, Record record) { - readyLatch.countDown(); + if (listener != null) { + listener.getRecordCompleted(record); } - }); - try { - readyLatch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); + return record; } - } - - return record; + }); } /** @@ -99,46 +137,68 @@ public void onRecordReady(String recordName, Record record) { * @return List The List */ @ObjectiveCName("getList:") - public List getList( String name ) { - List list = lists.get( name ); - if( list == null ) { + public List getList(String name) { + List list = lists.get(name); + if (list == null) { list = new List(this, name); lists.put(name, list); } return list; } + /** + * Returns an existing List or creates a new one. A list is a specialised + * type of record that holds an array of recordNames. + * + * @param name The name of the list to retrieve + * @return List The List + */ + @ObjectiveCName("getList:") + public Future getListAsync(final String name, GetListListener listener) { + return executor.submit(new Callable() { + @Override + public List call() throws Exception { + List list = lists.get(name); + if (list == null) { + list = new List(RecordHandler.this.getRecord(name), name); + lists.put(name, list); + } + return list; + } + }); + } + /** * Returns an anonymous record. A anonymous record is effectively * a wrapper that mimicks the API of a record, but allows for the * underlying record to be swapped without losing subscriptions etc.
- * + *

* This is particularly useful when selecting from a number of similarly * structured records. E.g. a list of users that can be choosen from a list
- * + *

* The only API differences to a normal record is an additional {@link AnonymousRecord#setName(String)} method * * @return AnonymousRecord */ public AnonymousRecord getAnonymousRecord() { - return new AnonymousRecord( this ); + return new AnonymousRecord(this); } /** * Allows to listen for record subscriptions made by this or other clients. This * is useful to create "active" data providers, e.g. providers that only provide * data for a particular record if a user is actually interested in it.
- * + *

* You can only listen to a pattern once, and if multiple listeners match the same pattern only * a single one will be notified! * - * @param pattern The pattern to match all records your interested in + * @param pattern The pattern to match all records your interested in * @param listenCallback The listen callback when a match has been found or removed. */ @ObjectiveCName("listen:listenCallback:") - public void listen( String pattern, ListenListener listenCallback ) { - if( listeners.containsKey( pattern ) ) { - client.onError( Topic.RECORD, Event.LISTENER_EXISTS, pattern ); + public void listen(String pattern, ListenListener listenCallback) { + if (listeners.containsKey(pattern)) { + client.onError(Topic.RECORD, Event.LISTENER_EXISTS, pattern); } else { synchronized (this) { UtilListener utilListener = new UtilListener(Topic.RECORD, pattern, listenCallback, deepstreamConfig, client, connection); @@ -154,59 +214,85 @@ public void listen( String pattern, ListenListener listenCallback ) { * @param pattern The pattern to stop listening to */ @ObjectiveCName("unlisten:") - public void unlisten( String pattern ) { - UtilListener listener = listeners.get( pattern ); - if( listener != null ) { + public void unlisten(String pattern) { + UtilListener listener = listeners.get(pattern); + if (listener != null) { listener.destroy(); - listeners.remove( pattern ); + listeners.remove(pattern); } else { - client.onError( Topic.RECORD, Event.NOT_LISTENING, pattern ); + client.onError(Topic.RECORD, Event.NOT_LISTENING, pattern); } } /** * Retrieve the current record data without subscribing to changes
- * + *

* If the record does not exist an error will be thrown * * @param name The name of the record which state to retrieve */ @ObjectiveCName("snapshot:") public SnapshotResult snapshot(String name) { - final JsonElement[] data = new JsonElement[1]; - final DeepstreamError[] deepstreamException = new DeepstreamError[1]; - - final Record record = records.get(name); - - if( record != null && record.isReady() ) { - data[0] = record.get(); - } else { - final CountDownLatch snapshotLatch = new CountDownLatch(1); + try { + return snapshotAsync(name, null).get(); + }catch(Exception e){ + e.printStackTrace(); + return null; + } + } - snapshotRegistry.request(name, new UtilSingleNotifier.UtilSingleNotifierCallback() { - @Override - @ObjectiveCName("onSingleNotifierError:error:") - public void onSingleNotifierError(String name, DeepstreamError error) { - deepstreamException[0] = error; - snapshotLatch.countDown(); + /** + * Asynchronously retrieve the current record data without subscribing to changes
+ *

+ * If the record does not exist an error will be thrown + * + * @param name The name of the record which state to retrieve + * @param listener Callback to be called after query is successfull, may be null + */ + @ObjectiveCName("snapshotAsync:") + public Future snapshotAsync(final String name, final SnapshotListener listener) { + return executor.submit(new Callable() { + @Override + public SnapshotResult call() throws Exception { + final JsonElement[] data = new JsonElement[1]; + final DeepstreamError[] deepstreamException = new DeepstreamError[1]; + + final Record record = records.get(name); + + if (record != null && record.isReady()) { + data[0] = record.get(); + } else { + final CountDownLatch snapshotLatch = new CountDownLatch(1); + + snapshotRegistry.request(name, new UtilSingleNotifier.UtilSingleNotifierCallback() { + @Override + @ObjectiveCName("onSingleNotifierError:error:") + public void onSingleNotifierError(String name, DeepstreamError error) { + deepstreamException[0] = error; + snapshotLatch.countDown(); + } + + @Override + @ObjectiveCName("onSingleNotifierResponse:recordData:") + public void onSingleNotifierResponse(String name, Object recordData) { + data[0] = (JsonElement) recordData; + snapshotLatch.countDown(); + } + }); + + try { + snapshotLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } } - @Override - @ObjectiveCName("onSingleNotifierResponse:recordData:") - public void onSingleNotifierResponse(String name, Object recordData) { - data[0] = (JsonElement) recordData; - snapshotLatch.countDown(); + if(listener != null){ + listener.snapshotCompleted(new SnapshotResult(data[0], deepstreamException[0])); } - }); - - try { - snapshotLatch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); + return new SnapshotResult(data[0], deepstreamException[0]); } - } - - return new SnapshotResult(data[0], deepstreamException[0]); + }); } /** @@ -214,9 +300,9 @@ public void onSingleNotifierResponse(String name, Object recordData) { * is a forceful set and will override any remote data * * @param recordName name of record to set - * @param data the data the record will be set to. Make sure that the Object passed - * in can be serialised to a JsonElement, such as {@link Map}. Since this - * is a root the object should also not be a primitive. + * @param data the data the record will be set to. Make sure that the Object passed + * in can be serialised to a JsonElement, such as {@link Map}. Since this + * is a root the object should also not be a primitive. * @return The RecordHandler */ @ObjectiveCName("setData:data:") @@ -229,8 +315,8 @@ public RecordHandler setData(String recordName, Object data) { * is a forceful set and will override any remote data * * @param recordName name of record to set - * @param path the path the data will be written to - * @param data the data the record will be set to + * @param path the path the data will be written to + * @param data the data the record will be set to * @return The RecordHandler */ @ObjectiveCName("setData:path:data:") @@ -243,24 +329,22 @@ public RecordHandler setData(String recordName, String path, Object data) { * for a record. * * @param recordName name of record to set - * @param version version to set the record to. If -1 then record data is overwritten - * @param path the path the data will be written to - * @param value the data the record will be set to + * @param version version to set the record to. If -1 then record data is overwritten + * @param path the path the data will be written to + * @param value the data the record will be set to * @return The RecordHandler */ @ObjectiveCName("setData:version:path:data:") public RecordHandler setData(String recordName, int version, String path, Object value) { JsonElement element; - if( value instanceof String ) { + if (value instanceof String) { element = new JsonPrimitive((String) value); - } - else if( value instanceof Number ) { + } else if (value instanceof Number) { element = new JsonPrimitive((Number) value); - } - else if( value instanceof Boolean ) { + } else if (value instanceof Boolean) { element = new JsonPrimitive((Boolean) value); } else { - element = deepstreamConfig.getJsonParser().toJsonTree( value ); + element = deepstreamConfig.getJsonParser().toJsonTree(value); } Record record = this.records.get(recordName); @@ -277,11 +361,11 @@ else if( value instanceof Boolean ) { String remoteMessage; if (path == null) { remoteMessage = MessageBuilder.getMsg( - Topic.RECORD, Actions.CREATEANDUPDATE, new String[]{ recordName, String.valueOf(version), element.toString(), config.toString() } + Topic.RECORD, Actions.CREATEANDUPDATE, new String[]{recordName, String.valueOf(version), element.toString(), config.toString()} ); } else { remoteMessage = MessageBuilder.getMsg( - Topic.RECORD, Actions.CREATEANDUPDATE, new String[]{ recordName, String.valueOf(version), path, MessageBuilder.typed(element), config.toString() } + Topic.RECORD, Actions.CREATEANDUPDATE, new String[]{recordName, String.valueOf(version), path, MessageBuilder.typed(element), config.toString()} ); } this.connection.send(remoteMessage); @@ -289,222 +373,292 @@ else if( value instanceof Boolean ) { } /** - * Set the value of a record without being subsribed to it. A write acknowledgement + * Synchronously set the value of a record without being subsribed to it. A write acknowledgement * will be returned with the state of the write. This operation is force write and will * overwrite any remote data. * * @param recordName the name of the record being set. - * @param value the value to set the record to + * @param value the value to set the record to * @return RecordSetResult the result of the write */ @ObjectiveCName("setDataWithAck:value:") - public RecordSetResult setDataWithAck(String recordName, Object value) { + public RecordSetResult setDataWithAck(String recordName, Object value) throws InterruptedException, ExecutionException { return this.setDataWithAck(recordName, null, -1, value); } /** - * Set the value of a record without being subsribed to it. A write acknowledgement + * Synchronously set the value of a record without being subsribed to it. A write acknowledgement * will be returned with the state of the write. This operation is force write and will * overwrite any remote data. * * @param recordName the name of the record being set. - * @param path the path of the record being set - * @param value the value to set the record to + * @param path the path of the record being set + * @param value the value to set the record to * @return RecordSetResult the result of the write */ @ObjectiveCName("setDataWithAck:path:value:") - public RecordSetResult setDataWithAck(String recordName, String path, Object value) { + public RecordSetResult setDataWithAck(String recordName, String path, Object value) throws InterruptedException, ExecutionException { return this.setDataWithAck(recordName, path, -1, value); } /** - * Set the value of a record without being subsribed to it. A write acknowledgement + * Synchronously set the value of a record without being subsribed to it. A write acknowledgement * will be returned with the state of the write. * * @param recordName the name of the record being set. - * @param path the path of the record being set - * @param version the version to set the record to - * @param value the value to set the record to + * @param path the path of the record being set + * @param version the version to set the record to + * @param value the value to set the record to * @return RecordSetResult the result of the write */ @ObjectiveCName("setDataWithAck:path:version:value:") - public RecordSetResult setDataWithAck(String recordName, String path, int version, Object value) { + public RecordSetResult setDataWithAck(String recordName, String path, int version, Object value){ Record record = this.records.get(recordName); if (record != null) { if (path != null) { return record.setWithAck(path, value); } else { - return record.setWithAck(deepstreamConfig.getJsonParser().toJsonTree(value)); + try { + return record.setWithAck(deepstreamConfig.getJsonParser().toJsonTree(value)); + }catch(Exception e){ + e.printStackTrace(); + return null; + } } } - JsonElement element; - if( value instanceof String ) { - element = new JsonPrimitive((String) value); - } - else if( value instanceof Number ) { - element = new JsonPrimitive((Number) value); - } - else if( value instanceof Boolean ) { - element = new JsonPrimitive((Boolean) value); - } else { - element = deepstreamConfig.getJsonParser().toJsonTree( value ); - } - JsonObject config = new JsonObject(); - config.addProperty("writeSuccess", true); - - String[] data; - if( path == null ) { - data = new String[]{ recordName, String.valueOf(version), element.toString(), config.toString() }; - } else { - data = new String[]{ recordName, String.valueOf(version), path, MessageBuilder.typed(value), config.toString() }; + try { + return this.setDataWithAckAsync(recordName, path, version, value, null).get(); + }catch(Exception e){ + e.printStackTrace(); + return null; } + } - final RecordSetResult[] result = new RecordSetResult[1]; - final CountDownLatch snapshotLatch = new CountDownLatch(1); - this.recordSetNotifier.request(String.valueOf(version), Actions.CREATEANDUPDATE, data, new UtilSingleNotifier.UtilSingleNotifierCallback() { - @Override - public void onSingleNotifierError(String name, DeepstreamError error) { - result[0] = new RecordSetResult( error.getMessage() ); - snapshotLatch.countDown(); + /** + * Asynchronously set the value of a record without being subsribed to it. A write acknowledgement + * will be returned with the state of the write. You can block calling thread by executing .get() on result. + * + * @param recordName the name of the record being set. + * @param path the path of the record being set + * @param version the version to set the record to + * @param value the value to set the record to + * @param listener Callback to be called after query is successfull, may be null + * @return future record + */ + @ObjectiveCName("setDataWithAckAsync:path:version:value:") + public Future setDataWithAckAsync(final String recordName, final String path, final int version, final Object value, final SetWithAckResultListener listener) throws InterruptedException, ExecutionException { + Record record = this.records.get(recordName); + if (record != null) { + if (path != null) { + return record.setWithAckAsync(path, value, listener); + } else { + return record.setWithAckAsync(null, deepstreamConfig.getJsonParser().toJsonTree(value), listener); } + } else { - @Override - public void onSingleNotifierResponse(String name, Object data) { - result[0] = new RecordSetResult( null ); - snapshotLatch.countDown(); - } - }); - try { - snapshotLatch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); + return executor.submit(new Callable() { + @Override + public RecordSetResult call() throws Exception { + JsonElement element; + if (value instanceof String) { + element = new JsonPrimitive((String) value); + } else if (value instanceof Number) { + element = new JsonPrimitive((Number) value); + } else if (value instanceof Boolean) { + element = new JsonPrimitive((Boolean) value); + } else { + element = deepstreamConfig.getJsonParser().toJsonTree(value); + } + + JsonObject config = new JsonObject(); + config.addProperty("writeSuccess", true); + + String[] data; + if (path == null) { + data = new String[]{recordName, String.valueOf(version), element.toString(), config.toString()}; + } else { + data = new String[]{recordName, String.valueOf(version), path, MessageBuilder.typed(value), config.toString()}; + } + + final RecordSetResult[] result = new RecordSetResult[1]; + final CountDownLatch snapshotLatch = new CountDownLatch(1); + RecordHandler.this.recordSetNotifier.request(String.valueOf(version), Actions.CREATEANDUPDATE, data, new UtilSingleNotifier.UtilSingleNotifierCallback() { + @Override + public void onSingleNotifierError(String name, DeepstreamError error) { + result[0] = new RecordSetResult(error.getMessage()); + snapshotLatch.countDown(); + } + + @Override + public void onSingleNotifierResponse(String name, Object data) { + result[0] = new RecordSetResult(null); + snapshotLatch.countDown(); + } + }); + try { + snapshotLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + if(listener != null){ + listener.setWithAckResultCompleted(result[0]); + } + return result[0]; + } + }); } - return result[0]; } /** - * Allows the user to query to see whether or not the record exists
- * + * Allows the user to synchronously query to see whether or not the record exists
+ *

* If the record is created locally the listener will be called sync, else * once the record is ready.
* * @param name The name of the record to check */ @ObjectiveCName("has:") - public HasResult has(String name) { - final DeepstreamError[] deepstreamException = new DeepstreamError[1]; - final boolean[] hasRecord = new boolean[1]; - - Record record = records.get( name ); - if( record != null && record.isReady() ) { - hasRecord[0] = true; - } else { - final CountDownLatch hasLatch = new CountDownLatch(1); + public HasResult has(String name){ + try { + return this.hasAsync(name, null).get(); + }catch(Exception e){ + e.printStackTrace(); + return null; + } + } - hasRegistry.request(name, new UtilSingleNotifier.UtilSingleNotifierCallback() { - @Override - @ObjectiveCName("onSingleNotifierError:error:") - public void onSingleNotifierError(String name, DeepstreamError error) { - deepstreamException[0] = error; - hasLatch.countDown(); + /** + * Allows the user to asynchronously query to see whether or not the record exists. Calling thread can be stopped by using .get()
+ *

+ * If the record is created locally the listener will be called sync, else + * once the record is ready.
+ * + * @param name The name of the record to check + * @param listener Callback to be called after record is fetch, may be null + */ + @ObjectiveCName("hasAsync:") + public Future hasAsync(final String name, final HasListener listener) { + return executor.submit(new Callable() { + @Override + public HasResult call() throws Exception { + final DeepstreamError[] deepstreamException = new DeepstreamError[1]; + final boolean[] hasRecord = new boolean[1]; + + Record record = records.get(name); + if (record != null && record.isReady()) { + hasRecord[0] = true; + } else { + final CountDownLatch hasLatch = new CountDownLatch(1); + + hasRegistry.request(name, new UtilSingleNotifier.UtilSingleNotifierCallback() { + @Override + @ObjectiveCName("onSingleNotifierError:error:") + public void onSingleNotifierError(String name, DeepstreamError error) { + deepstreamException[0] = error; + hasLatch.countDown(); + } + + @Override + @ObjectiveCName("onSingleNotifierResponse:data:") + public void onSingleNotifierResponse(String name, Object data) { + hasRecord[0] = (boolean) data; + hasLatch.countDown(); + } + }); + + try { + hasLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } } - @Override - @ObjectiveCName("onSingleNotifierResponse:data:") - public void onSingleNotifierResponse(String name, Object data) { - hasRecord[0] = (boolean) data; - hasLatch.countDown(); + if (listener != null) { + listener.hasCompleted(new HasResult(hasRecord[0], deepstreamException[0])); } - }); - - try { - hasLatch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); + return new HasResult(hasRecord[0], deepstreamException[0]); } - } - - return new HasResult(hasRecord[0], deepstreamException[0]); + }); } - /** * Will be called by the client for incoming messages on the RECORD topic */ @ObjectiveCName("handle:") - protected void handle( Message message ) { + protected void handle(Message message) { Record record; boolean processed = false; String recordName; - if( isUnhandledError( message ) ) { - client.onError( Topic.RECORD, Event.getEvent( message.data[ 0 ] ), message.data[ 1 ] ); + if (isUnhandledError(message)) { + client.onError(Topic.RECORD, Event.getEvent(message.data[0]), message.data[1]); return; } - if( message.action == Actions.ACK || message.action == Actions.ERROR) { - recordName = message.data[ 1 ]; + if (message.action == Actions.ACK || message.action == Actions.ERROR) { + recordName = message.data[1]; - if( isDiscardAck( message ) ) { + if (isDiscardAck(message)) { //TODO: destroyEventEmitter.emit( "destroy_ack_" + recordName, message ); - record = records.get( recordName ); - if( Actions.getAction( message.data[ 0 ] ) == Actions.DELETE && record != null ) { - record.onMessage( message ); + record = records.get(recordName); + if (Actions.getAction(message.data[0]) == Actions.DELETE && record != null) { + record.onMessage(message); } return; } - if( message.data[ 0 ].equals( Actions.SNAPSHOT.toString() ) ) { + if (message.data[0].equals(Actions.SNAPSHOT.toString())) { snapshotRegistry.recieve(recordName, new DeepstreamError(message.data[2]), null); return; } - if( message.data[ 0 ].equals(Actions.HAS.toString() )) { + if (message.data[0].equals(Actions.HAS.toString())) { hasRegistry.recieve(recordName, new DeepstreamError(message.data[2]), null); return; } } else { - recordName = message.data[ 0 ]; + recordName = message.data[0]; } - record = records.get( recordName ); - if( record != null ) { + record = records.get(recordName); + if (record != null) { processed = true; - record.onMessage( message ); + record.onMessage(message); } - if( message.action == Actions.READ && snapshotRegistry.hasRequest( recordName )) { + if (message.action == Actions.READ && snapshotRegistry.hasRequest(recordName)) { processed = true; - snapshotRegistry.recieve( recordName, null, MessageParser.parseObject(message.data[2], deepstreamConfig.getJsonParser())); + snapshotRegistry.recieve(recordName, null, MessageParser.parseObject(message.data[2], deepstreamConfig.getJsonParser())); } - if( message.action == Actions.HAS && hasRegistry.hasRequest( recordName )) { + if (message.action == Actions.HAS && hasRegistry.hasRequest(recordName)) { processed = true; - hasRegistry.recieve( recordName, null, MessageParser.convertTyped(message.data[1], client, deepstreamConfig.getJsonParser())); + hasRegistry.recieve(recordName, null, MessageParser.convertTyped(message.data[1], client, deepstreamConfig.getJsonParser())); } if (message.action == Actions.WRITE_ACKNOWLEDGEMENT) { processed = true; String val = String.valueOf(message.data[1]); - Object versions = deepstreamConfig.getJsonParser().fromJson( val, JsonArray.class ); + Object versions = deepstreamConfig.getJsonParser().fromJson(val, JsonArray.class); Object error = MessageParser.convertTyped(message.data[2], this.client, deepstreamConfig.getJsonParser()); - if( error != null ) { + if (error != null) { this.recordSetNotifier.recieve((JsonArray) versions, new DeepstreamError((String) error)); } else { this.recordSetNotifier.recieve((JsonArray) versions, null); } } - UtilListener listener = listeners.get( recordName ); - if( listener != null ) { + UtilListener listener = listeners.get(recordName); + if (listener != null) { processed = true; - listener.onMessage( message ); + listener.onMessage(message); } - if( !processed ) { + if (!processed) { client.onError(Topic.RECORD, Event.UNSOLICITED_MESSAGE, String.format("%s %s", message.action, recordName)); } } @@ -512,29 +666,29 @@ record = records.get( recordName ); /** * The following methods checks to prevent errors that occur when a record is discarded or deleted and * recreated before the discard / delete ack message is received. - * + *

* A (presumably unsolvable) problem remains when a client deletes a record in the exact moment * between another clients creation and read message for the same record */ @ObjectiveCName("isDiscardAck:") - private boolean isDiscardAck( Message message ) { - Event event = Event.getEvent( message.data[ 0 ] ); - if( event == Event.MESSAGE_DENIED && Actions.getAction(message.data[ 2 ] ) == Actions.DELETE ) { + private boolean isDiscardAck(Message message) { + Event event = Event.getEvent(message.data[0]); + if (event == Event.MESSAGE_DENIED && Actions.getAction(message.data[2]) == Actions.DELETE) { return true; } - Actions action = Actions.getAction( message.data[ 0 ] ); + Actions action = Actions.getAction(message.data[0]); return action == Actions.DELETE || action == Actions.UNSUBSCRIBE; } @ObjectiveCName("isUnhandledError:") private Boolean isUnhandledError(Message message) { - if( message.action != Actions.ERROR ) { + if (message.action != Actions.ERROR) { return false; } - String errorType = message.data[ 0 ]; + String errorType = message.data[0]; return !(errorType.equals(Event.VERSION_EXISTS.toString()) || errorType.equals(Event.MESSAGE_DENIED.toString()) || errorType.equals(Actions.SNAPSHOT.toString()) diff --git a/src/main/java/io/deepstream/RecordMergeStrategies.java b/src/main/java/io/deepstream/RecordMergeStrategies.java index 05e4364..91deed6 100644 --- a/src/main/java/io/deepstream/RecordMergeStrategies.java +++ b/src/main/java/io/deepstream/RecordMergeStrategies.java @@ -1,8 +1,7 @@ package io.deepstream; -import com.google.j2objc.annotations.ObjectiveCName; - import com.google.gson.JsonElement; +import com.google.j2objc.annotations.ObjectiveCName; import java.util.HashMap; import java.util.Map; diff --git a/src/main/java/io/deepstream/RecordMergeStrategy.java b/src/main/java/io/deepstream/RecordMergeStrategy.java index 6e73433..669a923 100644 --- a/src/main/java/io/deepstream/RecordMergeStrategy.java +++ b/src/main/java/io/deepstream/RecordMergeStrategy.java @@ -1,8 +1,7 @@ package io.deepstream; -import com.google.j2objc.annotations.ObjectiveCName; - import com.google.gson.JsonElement; +import com.google.j2objc.annotations.ObjectiveCName; /** * Allows users to reconcile record versions if the data is not correctly in sync ( out of sync record versions ) diff --git a/src/main/java/io/deepstream/RecordMergeStrategyException.java b/src/main/java/io/deepstream/RecordMergeStrategyException.java index a08207e..b2d7feb 100644 --- a/src/main/java/io/deepstream/RecordMergeStrategyException.java +++ b/src/main/java/io/deepstream/RecordMergeStrategyException.java @@ -1,8 +1,7 @@ package io.deepstream; -import com.google.j2objc.annotations.ObjectiveCName; - import com.google.gson.JsonElement; +import com.google.j2objc.annotations.ObjectiveCName; /** * Thrown when a version conflict occurs, and is only exposed to the client diff --git a/src/main/java/io/deepstream/RecordPathChangedCallback.java b/src/main/java/io/deepstream/RecordPathChangedCallback.java index be8da1a..a51237b 100644 --- a/src/main/java/io/deepstream/RecordPathChangedCallback.java +++ b/src/main/java/io/deepstream/RecordPathChangedCallback.java @@ -1,8 +1,7 @@ package io.deepstream; -import com.google.j2objc.annotations.ObjectiveCName; - import com.google.gson.JsonElement; +import com.google.j2objc.annotations.ObjectiveCName; /** * Record data changed listener, used to be notified whenever the record data under a path has been modified either locally or remotely. diff --git a/src/main/java/io/deepstream/RpcHandler.java b/src/main/java/io/deepstream/RpcHandler.java index 73d0cbc..0c7973d 100644 --- a/src/main/java/io/deepstream/RpcHandler.java +++ b/src/main/java/io/deepstream/RpcHandler.java @@ -4,7 +4,10 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; /** * The entry point for rpcs, both requesting them via {@link RpcHandler#make(String, Object)} and @@ -17,6 +20,7 @@ public class RpcHandler { private final Map providers; private final UtilAckTimeoutRegistry ackTimeoutRegistry; private final Map rpcs; + private ExecutorService executor; /** * The main class for remote procedure calls @@ -29,13 +33,14 @@ public class RpcHandler { * @param client The deepstream client */ @ObjectiveCName("init:connection:client:") - RpcHandler(DeepstreamConfig deepstreamConfig, final IConnection connection, DeepstreamClientAbstract client) { + RpcHandler(DeepstreamConfig deepstreamConfig, final IConnection connection, DeepstreamClientAbstract client, ExecutorService executor) { this.deepstreamConfig = deepstreamConfig; this.connection = connection; this.client = client; this.providers = new HashMap<>(); this.rpcs = new HashMap<>(); this.ackTimeoutRegistry = client.getAckTimeoutRegistry(); + this.executor = executor; new UtilResubscribeNotifier(this.client, new UtilResubscribeNotifier.UtilResubscribeListener() { @Override public void resubscribe() { @@ -86,45 +91,71 @@ public void unprovide( String rpcName ) { } /** - * Create a remote procedure call. This requires a rpc name for routing, a JSON serializable object for any associated + * Synchronously create a remote procedure call. This requires a rpc name for routing, a JSON serializable object for any associated * arguments and a callback to notify you with the rpc result or potential error. * @param rpcName The name of the rpc * @param data Serializable data that will be passed to the provider * @return Find out if the rpc succeeded via {@link RpcResult#success()} and associated data via {@link RpcResult#getData()} */ @ObjectiveCName("make:data:") - public RpcResult make(String rpcName, Object data) { - final RpcResult[] rpcResponse = new RpcResult[1]; - final CountDownLatch responseLatch = new CountDownLatch(1); + public RpcResult make(String rpcName, Object data){ + try { + return makeAsync(rpcName, data, null).get(); + }catch(Exception e){ + e.printStackTrace(); + return null; + } + } + /** + * Asynchronously create a remote procedure call. This requires a rpc name for routing, a JSON serializable object for any associated + * arguments and a callback to notify you with the rpc result or potential error. You can block calling thread by executing .get() on result. + * @param rpcName The name of the rpc + * @param data Serializable data that will be passed to the provider + * @param listener Callback to be called after RPC is successfull created, may be null + * @return Find out if the rpc succeeded via {@link RpcResult#success()} and associated data via {@link RpcResult#getData()} + */ + @ObjectiveCName("makeAsync:data:") + public Future makeAsync(final String rpcName, final Object data, final RpcMakeListener listener) { + return executor.submit(new Callable() { + @Override + public RpcResult call() throws Exception { + final RpcResult[] rpcResponse = new RpcResult[1]; + final CountDownLatch responseLatch = new CountDownLatch(1); - synchronized (this) { - String uid = this.client.getUid(); - this.rpcs.put(uid, new Rpc(this.deepstreamConfig, this.client, rpcName, uid, new RpcResponseCallback() { - @Override - public void onRpcSuccess(String rpcName, Object data) { - rpcResponse[0] = new RpcResult(true, data); - responseLatch.countDown(); - } - @Override - public void onRpcError(String rpcName, Object error) { - rpcResponse[0] = new RpcResult(false, error); - responseLatch.countDown(); - } - })); + synchronized (this) { + String uid = RpcHandler.this.client.getUid(); + RpcHandler.this.rpcs.put(uid, new Rpc(RpcHandler.this.deepstreamConfig, RpcHandler.this.client, rpcName, uid, new RpcResponseCallback() { + @Override + public void onRpcSuccess(String rpcName, Object data) { + rpcResponse[0] = new RpcResult(true, data); + responseLatch.countDown(); + } - String typedData = MessageBuilder.typed(data); - this.connection.sendMsg(Topic.RPC, Actions.REQUEST, new String[]{rpcName, uid, typedData}); - } + @Override + public void onRpcError(String rpcName, Object error) { + rpcResponse[0] = new RpcResult(false, error); + responseLatch.countDown(); + } + })); - try { - responseLatch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } + String typedData = MessageBuilder.typed(data); + RpcHandler.this.connection.sendMsg(Topic.RPC, Actions.REQUEST, new String[]{rpcName, uid, typedData}); + } - return rpcResponse[0]; + try { + responseLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + if(listener != null){ + listener.makeCompleted(rpcResponse[0]); + } + return rpcResponse[0]; + } + }); } /** diff --git a/src/main/java/io/deepstream/RpcMakeListener.java b/src/main/java/io/deepstream/RpcMakeListener.java new file mode 100644 index 0000000..19f9936 --- /dev/null +++ b/src/main/java/io/deepstream/RpcMakeListener.java @@ -0,0 +1,9 @@ +package io.deepstream; + +/** + * Created by horin on 04.08.2017. + */ + +public interface RpcMakeListener { + void makeCompleted(RpcResult result); +} diff --git a/src/main/java/io/deepstream/SetNameListener.java b/src/main/java/io/deepstream/SetNameListener.java new file mode 100644 index 0000000..1c94c3a --- /dev/null +++ b/src/main/java/io/deepstream/SetNameListener.java @@ -0,0 +1,9 @@ +package io.deepstream; + +/** + * Created by horin on 04.08.2017. + */ + +public interface SetNameListener { + void setNameCompleted(AnonymousRecord result); +} diff --git a/src/main/java/io/deepstream/SetWithAckResultListener.java b/src/main/java/io/deepstream/SetWithAckResultListener.java new file mode 100644 index 0000000..38e8318 --- /dev/null +++ b/src/main/java/io/deepstream/SetWithAckResultListener.java @@ -0,0 +1,5 @@ +package io.deepstream; + +public interface SetWithAckResultListener { + void setWithAckResultCompleted(RecordSetResult result); +} diff --git a/src/main/java/io/deepstream/SnapshotListener.java b/src/main/java/io/deepstream/SnapshotListener.java new file mode 100644 index 0000000..a1ba91d --- /dev/null +++ b/src/main/java/io/deepstream/SnapshotListener.java @@ -0,0 +1,5 @@ +package io.deepstream; + +public interface SnapshotListener { + void snapshotCompleted(SnapshotResult result); +} diff --git a/src/main/java/io/deepstream/UtilAckTimeoutRegistry.java b/src/main/java/io/deepstream/UtilAckTimeoutRegistry.java index 235059f..d0f2948 100644 --- a/src/main/java/io/deepstream/UtilAckTimeoutRegistry.java +++ b/src/main/java/io/deepstream/UtilAckTimeoutRegistry.java @@ -3,7 +3,12 @@ import com.google.j2objc.annotations.ObjectiveCName; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; class UtilAckTimeoutRegistry implements ConnectionStateListener, UtilTimeoutListener { diff --git a/src/main/java/io/deepstream/UtilJSONPath.java b/src/main/java/io/deepstream/UtilJSONPath.java index 761a81f..f92ce28 100644 --- a/src/main/java/io/deepstream/UtilJSONPath.java +++ b/src/main/java/io/deepstream/UtilJSONPath.java @@ -1,6 +1,9 @@ package io.deepstream; -import com.google.gson.*; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonNull; +import com.google.gson.JsonObject; import java.util.ArrayList; import java.util.Iterator;