Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ jobs:
os: linux
language: java
script: bash ./gradlew -b build.gradle test
- stage: Release to bintray and pod
- stage: J2Objc
install: bash ./scripts/setupJ2OBJC.sh true
script: bash ./gradlew -b build-prod.gradle bintrayUpload
on:
tags: true
script: bash ./gradlew -b build-prod.gradle
# - stage: Release to bintray and pod
# install: bash ./scripts/setupJ2OBJC.sh true
# script: bash ./gradlew -b build-prod.gradle bintrayUpload
# on:
# tags: true
4 changes: 2 additions & 2 deletions build-prod.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ sourceSets {

dependencies {
compile 'com.google.code.gson:gson:2.6.2'
compile 'org.java-websocket:Java-WebSocket:1.3.0'
compile 'org.java-websocket:Java-WebSocket:1.3.5'
compile 'com.google.j2objc:j2objc-annotations:1.1'

testCompile group: 'junit', name: 'junit', version: '4.11'
Expand Down Expand Up @@ -65,7 +65,7 @@ j2objcConfig {
}

group 'io.deepstream'
version '2.2.3'
version '2.2.4'

publishing {
publications {
Expand Down
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ task apidoc(type: Javadoc) {

dependencies {
compile 'com.google.code.gson:gson:2.6.2'
// compile 'org.java-websocket:Java-WebSocket:1.3.0'
compile 'org.java-websocket:Java-WebSocket:1.3.4'
compile 'org.java-websocket:Java-WebSocket:1.3.5'
compile 'com.google.j2objc:j2objc-annotations:1.1'

testCompile group: 'junit', name: 'junit', version: '4.11'
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/deepstream/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ else if( this.deliberateClose ) {
this.setState( ConnectionState.CLOSED );
}
else {
this.setState( ConnectionState.ERROR );
if(!this.originalUrl.equals(this.url)) {
this.url = this.originalUrl;
this.endpoint = this.createEndpoint();
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/deepstream/DeepstreamClientAbstract.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ void onError(Topic topic, Event event, String msg) throws DeepstreamException {
* Help to diagnose the problem quicker by checking for
* some mon problems
*/
if( event.equals( Event.ACK_TIMEOUT ) || event.equals( Event.RESPONSE_TIMEOUT ) ) {
if( getConnectionState().equals( ConnectionState.AWAITING_AUTHENTICATION ) ) {
if( Event.ACK_TIMEOUT.equals( event ) || Event.RESPONSE_TIMEOUT.equals( event ) ) {
if( ConnectionState.AWAITING_AUTHENTICATION.equals( getConnectionState() ) ) {
String errMsg = "Your message timed out because you\'re not authenticated. Have you called login()?";
onError( Topic.ERROR, Event.NOT_AUTHENTICATED, errMsg );
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface DeepstreamRuntimeErrorHandler {

/**
* Triggered whenever a runtime error occurs ( mostly async such as TimeOuts or MergeConflicts ).
* Recieves a topic to indicate if it was e.g. RPC, event and a english error message to simplify
* Receives a topic to indicate if it was e.g. RPC, event and a english error message to simplify
* debugging purposes.
* @param topic The Topic the error occured on
* @param event The Error Event
Expand Down
24 changes: 8 additions & 16 deletions src/main/java/io/deepstream/JavaEndpointWebsocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft;
import org.java_websocket.drafts.Draft_10;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;

import java.io.IOException;
Expand Down Expand Up @@ -43,38 +43,30 @@ public void forceClose() {

@Override
public void open() {
this.websocket = new WebSocket( this.uri, new Draft_10() );
try {
this.websocket.setSocket(websocket.factory.createSocket());
this.websocket.connect();
} catch (IOException e) {
e.printStackTrace();
}
this.websocket = new WebSocket( this.uri, new Draft_6455() );
this.websocket.connect();
}

private class WebSocket extends WebSocketClient {
SSLSocketFactory factory;

WebSocket( URI serverUri , Draft draft ) {
super( serverUri, draft );
// Set the SSL context if the socket server is using Secure WebSockets
if (serverUri.toString().startsWith("wss:")) {
SSLContext sslContext;
SSLSocketFactory factory;
try {
sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, null, null);
factory = sslContext.getSocketFactory();
this.setSocket(factory.createSocket());
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
} catch (KeyManagementException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
// set the SSL context to the client factory

factory = sslContext.getSocketFactory();
// this.setWebSocketFactory(new DefaultSSLWebSocketClientFactory(sslContext));
}


}

@Override
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/deepstream/LoginResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public boolean loggedIn() {
* Return the data associated with login. If login was successful,
* this would be the user associated data. Otherwise data explaining
* the reason why it wasn't.
* @return A JsonElement containing the data recieved from the server during login
* @return A JsonElement containing the data received from the server during login
*/
public Object getData() {
return this.data;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/deepstream/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Message {
final Topic topic;

/**
* @param raw The raw data recieved
* @param raw The raw data received
* @param topic The message topic
* @param action The message action
* @param data The message data, as an array
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/deepstream/PresenceHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ else if( message.action == Actions.PRESENCE_LEAVE ) {
this.broadcastEvent( Topic.PRESENCE.toString(), message.data[0], false );
}
else if( message.action == Actions.QUERY ) {
this.notifier.recieve(Actions.QUERY.toString(), null, message.data);
this.notifier.receive(Actions.QUERY.toString(), null, message.data);
}
else {
this.client.onError( Topic.PRESENCE, Event.UNSOLICITED_MESSAGE, message.action.toString() );
Expand Down
95 changes: 65 additions & 30 deletions src/main/java/io/deepstream/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import java.util.*;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;


/**
Expand All @@ -29,14 +31,16 @@ public class Record {
private final ArrayList<RecordReadyListener> onceRecordReadyListeners;
private final String name;
private final DeepstreamConfig deepstreamConfig;
private boolean isReady;
private boolean isDestroyed;
private volatile boolean isReady;
private volatile boolean isDestroyed;
private boolean isDiscarded;
private int version;
private int usages;
private AtomicInteger usages;
private RecordMergeStrategy mergeStrategy;
private RecordRemoteUpdateHandler recordRemoteUpdateHandler;
private JsonElement data;
private boolean hasProvider;
private ReentrantLock readyLock;

/**
* Constructor is not public since it is created via {@link RecordHandler#getRecord(String)}
Expand All @@ -47,11 +51,11 @@ public class Record {
* @param client deepstream.io client
*/
@ObjectiveCName("init:recordOptions:connection:deepstreamConfig:client:")
Record(String name, Map recordOptions, IConnection connection, DeepstreamConfig deepstreamConfig, DeepstreamClientAbstract client) {
Record (String name, Map recordOptions, IConnection connection, DeepstreamConfig deepstreamConfig, DeepstreamClientAbstract client) {
this.ackTimeoutRegistry = client.getAckTimeoutRegistry();
this.name = name;
this.deepstreamConfig = deepstreamConfig;
this.usages = 0;
this.usages = new AtomicInteger();
this.version = -1;
this.connection = connection;
this.client = client;
Expand All @@ -62,6 +66,8 @@ public class Record {
this.isReady = false;
this.isDestroyed = false;
this.hasProvider = false;
this.isDiscarded = false;
this.readyLock = new ReentrantLock();
this.mergeStrategy = this.deepstreamConfig.getRecordMergeStrategy() != null ?
RecordMergeStrategies.INSTANCE.getMergeStrategy(this.deepstreamConfig.getRecordMergeStrategy()) : null ;
this.recordEventsListeners = new ArrayList<RecordEventsListener>();
Expand Down Expand Up @@ -448,24 +454,36 @@ public Record unsubscribe( String path, RecordPathChangedCallback recordPathChan
*/
public Record discard() throws DeepstreamRecordDestroyedException {
throwExceptionIfDestroyed("discard");
this.usages--;
if( this.usages <= 0 ) {
this.whenReady(new RecordReadyListener() {
@Override
public void onRecordReady(String recordName, Record record) {
ackTimeoutRegistry.add(Topic.RECORD, Actions.UNSUBSCRIBE, name, deepstreamConfig.getSubscriptionTimeout());
connection.send( MessageBuilder.getMsg( Topic.RECORD, Actions.UNSUBSCRIBE, name ) );

for(RecordDestroyPendingListener recordDestroyPendingHandler: recordDestroyPendingListeners) {
recordDestroyPendingHandler.onDestroyPending( name );
}
}
});
this.destroy();
if (this.usages.decrementAndGet() <= 0) {
finishDiscard();
}
return this;
}

void finishDiscard () {
// This must be a synchronized block so that RecordHandler.getRecord will not continue until the
// record has been removed from the cache in onDestroyPending.
synchronized (this) {
// We only want one thread to do the discard one time so check the isDiscarded flag within
// this synchronized section.
if (!isDiscarded) {
this.whenReady(new RecordReadyListener() {
@Override
public void onRecordReady (String recordName, Record record) {
ackTimeoutRegistry.add(Topic.RECORD, Actions.UNSUBSCRIBE, name, deepstreamConfig.getSubscriptionTimeout());
connection.send(MessageBuilder.getMsg(Topic.RECORD, Actions.UNSUBSCRIBE, name));

for (RecordDestroyPendingListener recordDestroyPendingHandler : recordDestroyPendingListeners) {
recordDestroyPendingHandler.onDestroyPending(name);
}
}
});
this.destroy();
isDiscarded = true;
}
}
}

/**
* Delete the record. This is called when you want to remove the record entirely from deepstream, deleting it from storage
* and cache and telling all other users that it has been deleted. This in turn will force all clients to discard the record.<br/>
Expand Down Expand Up @@ -503,13 +521,20 @@ public void onRecordReady(String recordName, Record record) {
*/
@ObjectiveCName("whenReady:")
Record whenReady(RecordReadyListener recordReadyListener) {
if( this.isReady ) {
recordReadyListener.onRecordReady( this.name, this );
} else {
synchronized (this) {
readyLock.lock();
try {
if( this.isReady ) {
readyLock.unlock();
recordReadyListener.onRecordReady( this.name, this );
} else {
this.onceRecordReadyListeners.add( recordReadyListener );
}
}
finally {
if (readyLock.isHeldByCurrentThread()) {
readyLock.unlock();
}
}
return this;
}

Expand Down Expand Up @@ -541,9 +566,9 @@ private void handleWriteAcknowledgement(Message message) {
Object versions = gson.fromJson( val, JsonArray.class );
Object error = MessageParser.convertTyped(message.data[ 2 ], this.client, gson);
if( error != null ) {
this.recordSetNotifier.recieve((JsonArray) versions, new DeepstreamError((String) error));
this.recordSetNotifier.receive((JsonArray) versions, new DeepstreamError((String) error));
} else {
this.recordSetNotifier.recieve((JsonArray) versions, null);
this.recordSetNotifier.receive((JsonArray) versions, null);
}
}

Expand Down Expand Up @@ -778,12 +803,22 @@ private void onRead( Message message ) {
* and emits the ready event
*/
private void setReady() {
this.isReady = true;
ArrayList<RecordReadyListener> listCopy = null;

readyLock.lock();
try {
this.isReady = true;
// Capture the list inside the lock so we can execute the listeners outside the lock.
listCopy = new ArrayList<RecordReadyListener>(this.onceRecordReadyListeners);
this.onceRecordReadyListeners.clear();
}
finally {
readyLock.unlock();
}

for(RecordReadyListener recordReadyListener: this.onceRecordReadyListeners) {
for(RecordReadyListener recordReadyListener: listCopy) {
recordReadyListener.onRecordReady( this.name, this );
}
this.onceRecordReadyListeners.clear();
}

/**
Expand Down Expand Up @@ -901,8 +936,8 @@ void addRecordDestroyPendingListener(RecordDestroyPendingListener recordDestroyP
this.recordDestroyPendingListeners.add( recordDestroyPendingListener );
}

void incrementUsage() {
this.usages++;
int getAndIncrementUsage() {
return this.usages.getAndIncrement();
}

@ObjectiveCName("RecordRemoteUpdateHandler")
Expand Down
Loading