From 288932e98fbcfd24b9a1be6e1a68dcf3fada382f Mon Sep 17 00:00:00 2001 From: Frank Date: Mon, 9 Oct 2017 15:39:48 -0700 Subject: [PATCH 1/7] Fix thread safety problems. Fixes #109. - Fix the most obvious thread safety issues in RecordHandler when dealing with HashMaps. - Fix a thread safety issue with the Record usage count. The scenario this fixes is as follows (t1 and t2 are independent threads): t1 - foo_t1 = client.record.getRecord("foo") t1 - foo_t1.discard() preempts just after if( this.usages <= 0 ) t2 - foo_t2 = client.record.getRecord("foo") t1 - resumes and finishes foo_t1.discard() t2 - resumes and is now using a record that had destroy() called on it - Fix the following whenReady thread safety bug: t1: Record.whenReady(): preempts after "} else {" t2: Record.setReady() t2: Resumes and adds record to onceRecordReadyListeners, which will never be called because setReady has already been called. This will result in a hang. - Add threaded test to RecordTest. This test will hang when run on code before this commit. --- src/main/java/io/deepstream/Record.java | 91 +++++++++++++------ .../java/io/deepstream/RecordHandler.java | 84 +++++++++++------ src/test/java/io/deepstream/RecordTest.java | 41 +++++++++ 3 files changed, 159 insertions(+), 57 deletions(-) diff --git a/src/main/java/io/deepstream/Record.java b/src/main/java/io/deepstream/Record.java index c26cac2..a9e9860 100644 --- a/src/main/java/io/deepstream/Record.java +++ b/src/main/java/io/deepstream/Record.java @@ -7,6 +7,8 @@ import java.util.*; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; /** @@ -29,14 +31,16 @@ public class Record { private final ArrayList onceRecordReadyListeners; private final String name; private final DeepstreamConfig deepstreamConfig; - private boolean isReady; - private boolean isDestroyed; + private volatile boolean isReady; + private volatile boolean isDestroyed; + private boolean isDiscarded; private int version; - private int usages; + private AtomicInteger usages; private RecordMergeStrategy mergeStrategy; private RecordRemoteUpdateHandler recordRemoteUpdateHandler; private JsonElement data; private boolean hasProvider; + private ReentrantLock readyLock; /** * Constructor is not public since it is created via {@link RecordHandler#getRecord(String)} @@ -47,11 +51,11 @@ public class Record { * @param client deepstream.io client */ @ObjectiveCName("init:recordOptions:connection:deepstreamConfig:client:") - Record(String name, Map recordOptions, IConnection connection, DeepstreamConfig deepstreamConfig, DeepstreamClientAbstract client) { + Record (String name, Map recordOptions, IConnection connection, DeepstreamConfig deepstreamConfig, DeepstreamClientAbstract client) { this.ackTimeoutRegistry = client.getAckTimeoutRegistry(); this.name = name; this.deepstreamConfig = deepstreamConfig; - this.usages = 0; + this.usages = new AtomicInteger(); this.version = -1; this.connection = connection; this.client = client; @@ -62,6 +66,8 @@ public class Record { this.isReady = false; this.isDestroyed = false; this.hasProvider = false; + this.isDiscarded = false; + this.readyLock = new ReentrantLock(); this.mergeStrategy = this.deepstreamConfig.getRecordMergeStrategy() != null ? RecordMergeStrategies.INSTANCE.getMergeStrategy(this.deepstreamConfig.getRecordMergeStrategy()) : null ; this.recordEventsListeners = new ArrayList(); @@ -448,24 +454,36 @@ public Record unsubscribe( String path, RecordPathChangedCallback recordPathChan */ public Record discard() throws DeepstreamRecordDestroyedException { throwExceptionIfDestroyed("discard"); - this.usages--; - if( this.usages <= 0 ) { - this.whenReady(new RecordReadyListener() { - @Override - public void onRecordReady(String recordName, Record record) { - ackTimeoutRegistry.add(Topic.RECORD, Actions.UNSUBSCRIBE, name, deepstreamConfig.getSubscriptionTimeout()); - connection.send( MessageBuilder.getMsg( Topic.RECORD, Actions.UNSUBSCRIBE, name ) ); - - for(RecordDestroyPendingListener recordDestroyPendingHandler: recordDestroyPendingListeners) { - recordDestroyPendingHandler.onDestroyPending( name ); - } - } - }); - this.destroy(); + if (this.usages.decrementAndGet() <= 0) { + finishDiscard(); } return this; } + void finishDiscard () { + // This must be a synchronized block so that RecordHandler.getRecord will not continue until the + // record has been removed from the cache in onDestroyPending. + synchronized (this) { + // We only want one thread to do the discard one time so check the isDiscarded flag within + // this synchronized section. + if (!isDiscarded) { + this.whenReady(new RecordReadyListener() { + @Override + public void onRecordReady (String recordName, Record record) { + ackTimeoutRegistry.add(Topic.RECORD, Actions.UNSUBSCRIBE, name, deepstreamConfig.getSubscriptionTimeout()); + connection.send(MessageBuilder.getMsg(Topic.RECORD, Actions.UNSUBSCRIBE, name)); + + for (RecordDestroyPendingListener recordDestroyPendingHandler : recordDestroyPendingListeners) { + recordDestroyPendingHandler.onDestroyPending(name); + } + } + }); + this.destroy(); + isDiscarded = true; + } + } + } + /** * Delete the record. This is called when you want to remove the record entirely from deepstream, deleting it from storage * and cache and telling all other users that it has been deleted. This in turn will force all clients to discard the record.
@@ -503,13 +521,20 @@ public void onRecordReady(String recordName, Record record) { */ @ObjectiveCName("whenReady:") Record whenReady(RecordReadyListener recordReadyListener) { - if( this.isReady ) { - recordReadyListener.onRecordReady( this.name, this ); - } else { - synchronized (this) { + readyLock.lock(); + try { + if( this.isReady ) { + readyLock.unlock(); + recordReadyListener.onRecordReady( this.name, this ); + } else { this.onceRecordReadyListeners.add( recordReadyListener ); } } + finally { + if (readyLock.isHeldByCurrentThread()) { + readyLock.unlock(); + } + } return this; } @@ -778,12 +803,22 @@ private void onRead( Message message ) { * and emits the ready event */ private void setReady() { - this.isReady = true; + ArrayList listCopy = null; + + readyLock.lock(); + try { + this.isReady = true; + // Capture the list inside the lock so we can execute the listeners outside the lock. + listCopy = new ArrayList(this.onceRecordReadyListeners); + this.onceRecordReadyListeners.clear(); + } + finally { + readyLock.unlock(); + } - for(RecordReadyListener recordReadyListener: this.onceRecordReadyListeners) { + for(RecordReadyListener recordReadyListener: listCopy) { recordReadyListener.onRecordReady( this.name, this ); } - this.onceRecordReadyListeners.clear(); } /** @@ -901,8 +936,8 @@ void addRecordDestroyPendingListener(RecordDestroyPendingListener recordDestroyP this.recordDestroyPendingListeners.add( recordDestroyPendingListener ); } - void incrementUsage() { - this.usages++; + int getAndIncrementUsage() { + return this.usages.getAndIncrement(); } @ObjectiveCName("RecordRemoteUpdateHandler") diff --git a/src/main/java/io/deepstream/RecordHandler.java b/src/main/java/io/deepstream/RecordHandler.java index a89e226..994dd8e 100644 --- a/src/main/java/io/deepstream/RecordHandler.java +++ b/src/main/java/io/deepstream/RecordHandler.java @@ -5,7 +5,9 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReentrantLock; /** * The getters for data-sync, such as {@link RecordHandler#getRecord(String)}, @@ -17,13 +19,14 @@ public class RecordHandler { private final DeepstreamConfig deepstreamConfig; private final IConnection connection; private final DeepstreamClientAbstract client; - private final Map records; + private final ConcurrentHashMap records; private final Map lists; private final UtilSingleNotifier hasRegistry; private final UtilSingleNotifier snapshotRegistry; - private final Map listeners; + private final ConcurrentHashMap listeners; private final RecordHandlerListeners recordHandlerListeners; private final UtilSingleNotifier recordSetNotifier; + private final ReentrantLock recordsLock; /** * A collection of factories for records. This class @@ -38,11 +41,12 @@ public class RecordHandler { this.deepstreamConfig = deepstreamConfig; this.connection = connection; this.client = client; + this.recordsLock = new ReentrantLock(); recordHandlerListeners = new RecordHandlerListeners(); - records = new HashMap(); + records = new ConcurrentHashMap(); lists = new HashMap(); - listeners = new HashMap(); + listeners = new ConcurrentHashMap(); hasRegistry = new UtilSingleNotifier(client, connection, Topic.RECORD, Actions.HAS, deepstreamConfig.getRecordReadTimeout()); snapshotRegistry = new UtilSingleNotifier(client, connection, Topic.RECORD, Actions.SNAPSHOT, deepstreamConfig.getRecordReadTimeout()); @@ -57,22 +61,32 @@ public class RecordHandler { */ @ObjectiveCName("getRecord:") public Record getRecord( String name ) { - Record record = records.get( name ); - if( record == null ) { - synchronized (this) { - record = records.get( name ); - if (record == null) { - record = new Record(name, new HashMap(), connection, deepstreamConfig, client); - records.put(name, record); - record.addRecordEventsListener(recordHandlerListeners); - record.addRecordDestroyPendingListener(recordHandlerListeners); - record.start(); - } + Record record = null; + + recordsLock.lock(); + try { + record = records.get( name ); + if( record == null ) { + record = createRecord(name); + recordsLock.unlock(); + record.start(); + } else if (record.getAndIncrementUsage() <= 0) { + // Some other thread is discarding this record. We need the record out of the records + // map before we can put the new record there so we will finish the discard here. Either this + // thread or the other thread will end up finishing the discard. + record.finishDiscard(); + + assert records.get(name) == null; + record = createRecord(name); + recordsLock.unlock(); + record.start(); + } + } finally { + if (recordsLock.isHeldByCurrentThread()) { + recordsLock.unlock(); } } - record.incrementUsage(); - if (!record.isReady()) { final CountDownLatch readyLatch = new CountDownLatch(1); record.whenReady(new Record.RecordReadyListener() { @@ -91,6 +105,15 @@ public void onRecordReady(String recordName, Record record) { return record; } + Record createRecord (String name) { + Record record = new Record(name, new HashMap(), connection, deepstreamConfig, client); + record.addRecordEventsListener(recordHandlerListeners); + record.addRecordDestroyPendingListener(recordHandlerListeners); + record.getAndIncrementUsage(); + records.put(name, record); + return record; + } + /** * Returns an existing List or creates a new one. A list is a specialised * type of record that holds an array of recordNames. @@ -100,12 +123,14 @@ public void onRecordReady(String recordName, Record record) { */ @ObjectiveCName("getList:") public List getList( String name ) { - List list = lists.get( name ); - if( list == null ) { - list = new List(this, name); - lists.put(name, list); + synchronized (lists) { + List list = lists.get(name); + if (list == null) { + list = new List(this, name); + lists.put(name, list); + } + return list; } - return list; } /** @@ -137,10 +162,10 @@ public AnonymousRecord getAnonymousRecord() { */ @ObjectiveCName("listen:listenCallback:") public void listen( String pattern, ListenListener listenCallback ) { - if( listeners.containsKey( pattern ) ) { - client.onError( Topic.RECORD, Event.LISTENER_EXISTS, pattern ); - } else { - synchronized (this) { + synchronized (listeners) { + if (listeners.containsKey( pattern )) { + client.onError(Topic.RECORD, Event.LISTENER_EXISTS, pattern); + } else { UtilListener utilListener = new UtilListener(Topic.RECORD, pattern, listenCallback, deepstreamConfig, client, connection); listeners.put(pattern, utilListener); utilListener.start(); @@ -155,10 +180,9 @@ public void listen( String pattern, ListenListener listenCallback ) { */ @ObjectiveCName("unlisten:") public void unlisten( String pattern ) { - UtilListener listener = listeners.get( pattern ); + UtilListener listener = listeners.remove( pattern ); if( listener != null ) { listener.destroy(); - listeners.remove( pattern ); } else { client.onError( Topic.RECORD, Event.NOT_LISTENING, pattern ); } @@ -592,7 +616,9 @@ public void onRecordDeleted(String recordName) { @ObjectiveCName("onRecordDiscarded:") public void onRecordDiscarded(String recordName) { records.remove(recordName); - lists.remove(recordName); + synchronized (lists) { + lists.remove(recordName); + } } } } diff --git a/src/test/java/io/deepstream/RecordTest.java b/src/test/java/io/deepstream/RecordTest.java index 05ddea4..bdec66b 100644 --- a/src/test/java/io/deepstream/RecordTest.java +++ b/src/test/java/io/deepstream/RecordTest.java @@ -9,6 +9,9 @@ import java.util.HashMap; import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.*; @@ -61,6 +64,44 @@ record = new Record( "recordA", new HashMap(), connectionMock, config, deepstrea public void tearDown() { } + @Test + public void recordThreadTest() { + ExecutorService recordService = Executors.newFixedThreadPool(20); + ExecutorService handleService = Executors.newFixedThreadPool(1); + + for (int ii = 0; ii < 10000; ++ii) { + recordService.submit(new Runnable() { + @Override + public void run () { + Record record = recordHandler.getRecord("foo"); + Assert.assertFalse(record.isDestroyed()); + record.discard(); + } + }); + handleService.submit(new Runnable() { + @Override + public void run () { + recordHandler.handle(MessageParser.parseMessage(TestUtil.replaceSeperators("R|R|foo|1|{\"bar\":\"baz\"}"), deepstreamClientMock)); + } + }); + } + + recordService.shutdown(); + handleService.shutdown(); + + try + { + if (!recordService.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + Assert.fail("recordService has hung threads"); + } + if (!handleService.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + Assert.fail("handleService has hung threads"); + } + } catch (InterruptedException exc) { + Assert.fail(exc.getMessage()); + } + } + @Test public void recordSendsAckAndRead() { record.onMessage( MessageParser.parseMessage( TestUtil.replaceSeperators( "R|A|S|recordA" ), deepstreamClientMock ) ); From 446b46074771f7906e35caf50a4b7056aa566f82 Mon Sep 17 00:00:00 2001 From: Yasser Fadl Date: Tue, 17 Oct 2017 16:35:50 +0200 Subject: [PATCH 2/7] Remove bintray stage --- .travis.yml | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 34bf08a..ede1871 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,8 +16,11 @@ jobs: os: linux language: java script: bash ./gradlew -b build.gradle test - - stage: Release to bintray and pod + - stage: J2Objc install: bash ./scripts/setupJ2OBJC.sh true - script: bash ./gradlew -b build-prod.gradle bintrayUpload - on: - tags: true + script: bash ./gradlew -b build-prod.gradle + # - stage: Release to bintray and pod + # install: bash ./scripts/setupJ2OBJC.sh true + # script: bash ./gradlew -b build-prod.gradle bintrayUpload + # on: + # tags: true From b0720df573148f2716958ef22a00011f101e336a Mon Sep 17 00:00:00 2001 From: Frank Riley Date: Tue, 24 Oct 2017 03:24:56 -0700 Subject: [PATCH 3/7] I before E except after C (#114) --- .../io/deepstream/DeepstreamRuntimeErrorHandler.java | 2 +- src/main/java/io/deepstream/LoginResult.java | 2 +- src/main/java/io/deepstream/Message.java | 2 +- src/main/java/io/deepstream/PresenceHandler.java | 2 +- src/main/java/io/deepstream/Record.java | 4 ++-- src/main/java/io/deepstream/RecordHandler.java | 12 ++++++------ src/main/java/io/deepstream/RpcHandler.java | 2 +- .../java/io/deepstream/RpcRequestedListener.java | 4 ++-- src/main/java/io/deepstream/UtilSingleNotifier.java | 10 +++++----- 9 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/main/java/io/deepstream/DeepstreamRuntimeErrorHandler.java b/src/main/java/io/deepstream/DeepstreamRuntimeErrorHandler.java index fa8f0cb..def6b79 100644 --- a/src/main/java/io/deepstream/DeepstreamRuntimeErrorHandler.java +++ b/src/main/java/io/deepstream/DeepstreamRuntimeErrorHandler.java @@ -16,7 +16,7 @@ public interface DeepstreamRuntimeErrorHandler { /** * Triggered whenever a runtime error occurs ( mostly async such as TimeOuts or MergeConflicts ). - * Recieves a topic to indicate if it was e.g. RPC, event and a english error message to simplify + * Receives a topic to indicate if it was e.g. RPC, event and a english error message to simplify * debugging purposes. * @param topic The Topic the error occured on * @param event The Error Event diff --git a/src/main/java/io/deepstream/LoginResult.java b/src/main/java/io/deepstream/LoginResult.java index 9a01dba..ab41a18 100644 --- a/src/main/java/io/deepstream/LoginResult.java +++ b/src/main/java/io/deepstream/LoginResult.java @@ -51,7 +51,7 @@ public boolean loggedIn() { * Return the data associated with login. If login was successful, * this would be the user associated data. Otherwise data explaining * the reason why it wasn't. - * @return A JsonElement containing the data recieved from the server during login + * @return A JsonElement containing the data received from the server during login */ public Object getData() { return this.data; diff --git a/src/main/java/io/deepstream/Message.java b/src/main/java/io/deepstream/Message.java index c39cb86..d9289ca 100644 --- a/src/main/java/io/deepstream/Message.java +++ b/src/main/java/io/deepstream/Message.java @@ -13,7 +13,7 @@ class Message { final Topic topic; /** - * @param raw The raw data recieved + * @param raw The raw data received * @param topic The message topic * @param action The message action * @param data The message data, as an array diff --git a/src/main/java/io/deepstream/PresenceHandler.java b/src/main/java/io/deepstream/PresenceHandler.java index 4448675..99fea15 100644 --- a/src/main/java/io/deepstream/PresenceHandler.java +++ b/src/main/java/io/deepstream/PresenceHandler.java @@ -119,7 +119,7 @@ else if( message.action == Actions.PRESENCE_LEAVE ) { this.broadcastEvent( Topic.PRESENCE.toString(), message.data[0], false ); } else if( message.action == Actions.QUERY ) { - this.notifier.recieve(Actions.QUERY.toString(), null, message.data); + this.notifier.receive(Actions.QUERY.toString(), null, message.data); } else { this.client.onError( Topic.PRESENCE, Event.UNSOLICITED_MESSAGE, message.action.toString() ); diff --git a/src/main/java/io/deepstream/Record.java b/src/main/java/io/deepstream/Record.java index c26cac2..2193a7d 100644 --- a/src/main/java/io/deepstream/Record.java +++ b/src/main/java/io/deepstream/Record.java @@ -541,9 +541,9 @@ private void handleWriteAcknowledgement(Message message) { Object versions = gson.fromJson( val, JsonArray.class ); Object error = MessageParser.convertTyped(message.data[ 2 ], this.client, gson); if( error != null ) { - this.recordSetNotifier.recieve((JsonArray) versions, new DeepstreamError((String) error)); + this.recordSetNotifier.receive((JsonArray) versions, new DeepstreamError((String) error)); } else { - this.recordSetNotifier.recieve((JsonArray) versions, null); + this.recordSetNotifier.receive((JsonArray) versions, null); } } diff --git a/src/main/java/io/deepstream/RecordHandler.java b/src/main/java/io/deepstream/RecordHandler.java index a89e226..23e7cb8 100644 --- a/src/main/java/io/deepstream/RecordHandler.java +++ b/src/main/java/io/deepstream/RecordHandler.java @@ -458,12 +458,12 @@ record = records.get( recordName ); } if( message.data[ 0 ].equals( Actions.SNAPSHOT.toString() ) ) { - snapshotRegistry.recieve(recordName, new DeepstreamError(message.data[2]), null); + snapshotRegistry.receive(recordName, new DeepstreamError(message.data[2]), null); return; } if( message.data[ 0 ].equals(Actions.HAS.toString() )) { - hasRegistry.recieve(recordName, new DeepstreamError(message.data[2]), null); + hasRegistry.receive(recordName, new DeepstreamError(message.data[2]), null); return; } } else { @@ -478,12 +478,12 @@ record = records.get( recordName ); if( message.action == Actions.READ && snapshotRegistry.hasRequest( recordName )) { processed = true; - snapshotRegistry.recieve( recordName, null, MessageParser.parseObject(message.data[2], deepstreamConfig.getJsonParser())); + snapshotRegistry.receive( recordName, null, MessageParser.parseObject(message.data[2], deepstreamConfig.getJsonParser())); } if( message.action == Actions.HAS && hasRegistry.hasRequest( recordName )) { processed = true; - hasRegistry.recieve( recordName, null, MessageParser.convertTyped(message.data[1], client, deepstreamConfig.getJsonParser())); + hasRegistry.receive( recordName, null, MessageParser.convertTyped(message.data[1], client, deepstreamConfig.getJsonParser())); } if (message.action == Actions.WRITE_ACKNOWLEDGEMENT) { @@ -492,9 +492,9 @@ record = records.get( recordName ); Object versions = deepstreamConfig.getJsonParser().fromJson( val, JsonArray.class ); Object error = MessageParser.convertTyped(message.data[2], this.client, deepstreamConfig.getJsonParser()); if( error != null ) { - this.recordSetNotifier.recieve((JsonArray) versions, new DeepstreamError((String) error)); + this.recordSetNotifier.receive((JsonArray) versions, new DeepstreamError((String) error)); } else { - this.recordSetNotifier.recieve((JsonArray) versions, null); + this.recordSetNotifier.receive((JsonArray) versions, null); } } diff --git a/src/main/java/io/deepstream/RpcHandler.java b/src/main/java/io/deepstream/RpcHandler.java index d119c11..1838b88 100644 --- a/src/main/java/io/deepstream/RpcHandler.java +++ b/src/main/java/io/deepstream/RpcHandler.java @@ -130,7 +130,7 @@ public void onRpcError(String rpcName, Object error) { /** * Main interface. Handles incoming messages * from the message distributor - * @param message The message recieved from the server + * @param message The message received from the server */ @ObjectiveCName("handle:") void handle( Message message ) { diff --git a/src/main/java/io/deepstream/RpcRequestedListener.java b/src/main/java/io/deepstream/RpcRequestedListener.java index e8ca71f..36e8ffe 100644 --- a/src/main/java/io/deepstream/RpcRequestedListener.java +++ b/src/main/java/io/deepstream/RpcRequestedListener.java @@ -3,11 +3,11 @@ import com.google.j2objc.annotations.ObjectiveCName; /** - * Listener for any rpc requests recieved from the server + * Listener for any rpc requests received from the server */ public interface RpcRequestedListener { /** - * This listener will be invoked whenever the client recieves an rpc request from the server, and will be able + * This listener will be invoked whenever the client receives an rpc request from the server, and will be able * to respond via {@link RpcResponse#send(Object)} or {@link RpcResponse#reject()} * @param rpcName The name of the rpc being requested * @param data The data the request was made with diff --git a/src/main/java/io/deepstream/UtilSingleNotifier.java b/src/main/java/io/deepstream/UtilSingleNotifier.java index 931bd4e..494e2ef 100644 --- a/src/main/java/io/deepstream/UtilSingleNotifier.java +++ b/src/main/java/io/deepstream/UtilSingleNotifier.java @@ -75,7 +75,7 @@ public void request( String name, UtilSingleNotifierCallback utilSingleNotifierC /** * Add a request where a response may contain more than one bit of data. Commonly used with - * {@link UtilSingleNotifier#recieve(JsonArray, DeepstreamError)} + * {@link UtilSingleNotifier#receive(JsonArray, DeepstreamError)} * * @param name The name or version to store callbacks on * @param data The data to send in the request @@ -103,8 +103,8 @@ public void request( String name, Actions action, String[] data, UtilSingleNotif * @param error An error that may have occurred during the request * @param data The result data from the request */ - @ObjectiveCName("recieve:error:data:") - public void recieve(String name, DeepstreamError error, Object data) { + @ObjectiveCName("receive:error:data:") + public void receive(String name, DeepstreamError error, Object data) { ArrayList callbacks = requests.get( name ); for (UtilSingleNotifierCallback callback : callbacks) { ackTimeoutRegistry.clear(topic, action, name); @@ -127,7 +127,7 @@ public void recieve(String name, DeepstreamError error, Object data) { * @param data The data received in the message * @param error Any errors from the message */ - public void recieve(JsonArray data, DeepstreamError error) { + public void receive(JsonArray data, DeepstreamError error) { for (JsonElement version : data) { ArrayList callbacks = requests.get( version.getAsString() ); UtilSingleNotifierCallback cb = callbacks.get(0); @@ -164,7 +164,7 @@ private void send( Actions action, String[] data ) { @Override @ObjectiveCName("onTimeout:action:event:name:") public void onTimeout(Topic topic, Actions action, Event event, String name) { - this.recieve(name, new DeepstreamError(String.format("Response for % timed out", name)), null); + this.receive(name, new DeepstreamError(String.format("Response for % timed out", name)), null); } interface UtilSingleNotifierCallback { From f6f2a085cd4e60a95d2471e89440db47dd33f0ba Mon Sep 17 00:00:00 2001 From: Alex Harley Date: Tue, 24 Oct 2017 12:54:11 +0200 Subject: [PATCH 4/7] fix: update ws library --- build-prod.gradle | 2 +- build.gradle | 2 +- .../java/io/deepstream/JavaEndpointWebsocket.java | 14 +++++++++----- src/main/java/io/deepstream/RecordHandler.java | 2 +- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/build-prod.gradle b/build-prod.gradle index 9074ef2..2033ebd 100644 --- a/build-prod.gradle +++ b/build-prod.gradle @@ -36,7 +36,7 @@ sourceSets { dependencies { compile 'com.google.code.gson:gson:2.6.2' - compile 'org.java-websocket:Java-WebSocket:1.3.0' + compile 'org.java-websocket:Java-WebSocket:1.3.5' compile 'com.google.j2objc:j2objc-annotations:1.1' testCompile group: 'junit', name: 'junit', version: '4.11' diff --git a/build.gradle b/build.gradle index 5308c45..ca6c159 100644 --- a/build.gradle +++ b/build.gradle @@ -29,7 +29,7 @@ task apidoc(type: Javadoc) { dependencies { compile 'com.google.code.gson:gson:2.6.2' - compile 'org.java-websocket:Java-WebSocket:1.3.0' + compile 'org.java-websocket:Java-WebSocket:1.3.5' compile 'com.google.j2objc:j2objc-annotations:1.1' testCompile group: 'junit', name: 'junit', version: '4.11' diff --git a/src/main/java/io/deepstream/JavaEndpointWebsocket.java b/src/main/java/io/deepstream/JavaEndpointWebsocket.java index 0c0d148..f4cc7bf 100644 --- a/src/main/java/io/deepstream/JavaEndpointWebsocket.java +++ b/src/main/java/io/deepstream/JavaEndpointWebsocket.java @@ -1,12 +1,13 @@ package io.deepstream; -import org.java_websocket.client.DefaultSSLWebSocketClientFactory; import org.java_websocket.client.WebSocketClient; import org.java_websocket.drafts.Draft; -import org.java_websocket.drafts.Draft_10; +import org.java_websocket.drafts.Draft_6455; import org.java_websocket.handshake.ServerHandshake; import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocketFactory; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.security.KeyManagementException; @@ -41,7 +42,7 @@ public void forceClose() { @Override public void open() { - this.websocket = new WebSocket( this.uri, new Draft_10() ); + this.websocket = new WebSocket(this.uri, new Draft_6455()); this.websocket.connect(); } @@ -51,16 +52,19 @@ private class WebSocket extends WebSocketClient { // Set the SSL context if the socket server is using Secure WebSockets if (serverUri.toString().startsWith("wss:")) { SSLContext sslContext; + SSLSocketFactory factory; try { sslContext = SSLContext.getInstance("TLS"); sslContext.init(null, null, null); + factory = sslContext.getSocketFactory(); + this.setSocket(factory.createSocket()); } catch (NoSuchAlgorithmException e) { throw new RuntimeException(e); } catch (KeyManagementException e) { throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); } - // set the SSL context to the client factory - this.setWebSocketFactory(new DefaultSSLWebSocketClientFactory(sslContext)); } } diff --git a/src/main/java/io/deepstream/RecordHandler.java b/src/main/java/io/deepstream/RecordHandler.java index 23e7cb8..a9f2bb9 100644 --- a/src/main/java/io/deepstream/RecordHandler.java +++ b/src/main/java/io/deepstream/RecordHandler.java @@ -486,7 +486,7 @@ record = records.get( recordName ); hasRegistry.receive( recordName, null, MessageParser.convertTyped(message.data[1], client, deepstreamConfig.getJsonParser())); } - if (message.action == Actions.WRITE_ACKNOWLEDGEMENT) { + if (message.action == Actions.WRITE_ACKNOWLEDGEMENT && !processed) { processed = true; String val = String.valueOf(message.data[1]); Object versions = deepstreamConfig.getJsonParser().fromJson( val, JsonArray.class ); From 406e6d1b1a0dd8a6fb1c5da09b51440b59fe7e99 Mon Sep 17 00:00:00 2001 From: Stephan Schuster Date: Tue, 16 Oct 2018 15:46:46 +0200 Subject: [PATCH 5/7] Fixed NPE when being called for unknown event types (see issue #131) --- src/main/java/io/deepstream/DeepstreamClientAbstract.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/deepstream/DeepstreamClientAbstract.java b/src/main/java/io/deepstream/DeepstreamClientAbstract.java index e1fd622..46bf123 100644 --- a/src/main/java/io/deepstream/DeepstreamClientAbstract.java +++ b/src/main/java/io/deepstream/DeepstreamClientAbstract.java @@ -45,8 +45,8 @@ void onError(Topic topic, Event event, String msg) throws DeepstreamException { * Help to diagnose the problem quicker by checking for * some mon problems */ - if( event.equals( Event.ACK_TIMEOUT ) || event.equals( Event.RESPONSE_TIMEOUT ) ) { - if( getConnectionState().equals( ConnectionState.AWAITING_AUTHENTICATION ) ) { + if( Event.ACK_TIMEOUT.equals( event ) || Event.RESPONSE_TIMEOUT.equals( event ) ) { + if( ConnectionState.AWAITING_AUTHENTICATION.equals( getConnectionState() ) ) { String errMsg = "Your message timed out because you\'re not authenticated. Have you called login()?"; onError( Topic.ERROR, Event.NOT_AUTHENTICATED, errMsg ); return; From 003230fc108a14ec88c6a8d2e0634e325d2e99ca Mon Sep 17 00:00:00 2001 From: Rawad Daher Date: Mon, 9 Mar 2020 18:33:31 +0200 Subject: [PATCH 6/7] Fix connection status when deepstream is lost connection to be error in order to handle this disconnection --- src/main/java/io/deepstream/Connection.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/io/deepstream/Connection.java b/src/main/java/io/deepstream/Connection.java index 15acc07..019e629 100644 --- a/src/main/java/io/deepstream/Connection.java +++ b/src/main/java/io/deepstream/Connection.java @@ -254,6 +254,7 @@ else if( this.deliberateClose ) { this.setState( ConnectionState.CLOSED ); } else { + this.setState( ConnectionState.ERROR ); if(!this.originalUrl.equals(this.url)) { this.url = this.originalUrl; this.endpoint = this.createEndpoint(); From f9d50b131eb128de1de561bf5c6941d8316ad13e Mon Sep 17 00:00:00 2001 From: Doug Rodriguez Date: Wed, 29 Apr 2020 01:38:13 -0400 Subject: [PATCH 7/7] Adding new changes original repo --- build-prod.gradle | 2 +- .../io/deepstream/JavaEndpointWebsocket.java | 17 ++--------------- .../java/io/deepstream/UtilSingleNotifier.java | 2 +- 3 files changed, 4 insertions(+), 17 deletions(-) diff --git a/build-prod.gradle b/build-prod.gradle index aa7cc40..4a2c0b9 100644 --- a/build-prod.gradle +++ b/build-prod.gradle @@ -65,7 +65,7 @@ j2objcConfig { } group 'io.deepstream' -version '2.2.3' +version '2.2.4' publishing { publications { diff --git a/src/main/java/io/deepstream/JavaEndpointWebsocket.java b/src/main/java/io/deepstream/JavaEndpointWebsocket.java index 399fa73..8bc6710 100644 --- a/src/main/java/io/deepstream/JavaEndpointWebsocket.java +++ b/src/main/java/io/deepstream/JavaEndpointWebsocket.java @@ -43,18 +43,11 @@ public void forceClose() { @Override public void open() { - this.websocket = new WebSocket( this.uri, new Draft_10() ); - try { - this.websocket.setSocket(websocket.factory.createSocket()); - this.websocket.connect(); - } catch (IOException e) { - e.printStackTrace(); - } + this.websocket = new WebSocket( this.uri, new Draft_6455() ); + this.websocket.connect(); } private class WebSocket extends WebSocketClient { - SSLSocketFactory factory; - WebSocket( URI serverUri , Draft draft ) { super( serverUri, draft ); // Set the SSL context if the socket server is using Secure WebSockets @@ -73,13 +66,7 @@ private class WebSocket extends WebSocketClient { } catch (IOException e) { throw new RuntimeException(e); } - // set the SSL context to the client factory - - factory = sslContext.getSocketFactory(); -// this.setWebSocketFactory(new DefaultSSLWebSocketClientFactory(sslContext)); } - - } @Override diff --git a/src/main/java/io/deepstream/UtilSingleNotifier.java b/src/main/java/io/deepstream/UtilSingleNotifier.java index 494e2ef..c46b84d 100644 --- a/src/main/java/io/deepstream/UtilSingleNotifier.java +++ b/src/main/java/io/deepstream/UtilSingleNotifier.java @@ -164,7 +164,7 @@ private void send( Actions action, String[] data ) { @Override @ObjectiveCName("onTimeout:action:event:name:") public void onTimeout(Topic topic, Actions action, Event event, String name) { - this.receive(name, new DeepstreamError(String.format("Response for % timed out", name)), null); + this.receive(name, new DeepstreamError(String.format("Response for %s timed out", name)), null); } interface UtilSingleNotifierCallback {