Skip to content
This repository was archived by the owner on Apr 6, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 50 additions & 23 deletions src/main/java/io/deepstream/AnonymousRecord.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package io.deepstream;

import com.google.j2objc.annotations.ObjectiveCName;

import com.google.gson.JsonElement;
import com.google.j2objc.annotations.ObjectiveCName;

import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;

/**
* An AnonymousRecord is a record without a predefined name. It
Expand Down Expand Up @@ -269,37 +270,63 @@ public AnonymousRecord removeRecordEventsListener(RecordEventsListener recordEve
}

/**
* Sets the underlying record the anonymous record is bound
* Synchronously sets the underlying record the anonymous record is bound
* to. Can be called multiple times.
*
* @param recordName The name of the underlying record to use
* @return The AnonymousRecord
*/
@ObjectiveCName("setName:")
public AnonymousRecord setName( String recordName ) {
this.unsubscribeRecord();
this.record = this.recordHandler.getRecord( recordName );
this.subscribeRecord();

final CountDownLatch readyLatch = new CountDownLatch(1);
record.whenReady(new Record.RecordReadyListener() {
@Override
public void onRecordReady(String recordName, Record record) {
readyLatch.countDown();
}
});

public AnonymousRecord setName( String recordName ){
try {
readyLatch.await();
} catch (InterruptedException e) {
return setNameAsync(recordName, null).get();
}catch(Exception e){
e.printStackTrace();
return null;
}
}

for( AnonymousRecordNameChangedListener anonymousRecordNameChangedCallback : this.anonymousRecordNameChangedCallbacks ) {
anonymousRecordNameChangedCallback.recordNameChanged( recordName, this );
}

return this;
/**
* Asynchronously sets the underlying record the anonymous record is bound
* to. Can be called multiple times.
*
* @param recordName The name of the underlying record to use
* @param listener Callback to be called after query is successfull, may be null
* @return The AnonymousRecord
*/
@ObjectiveCName("setNameAsync:")
public Future<AnonymousRecord> setNameAsync(final String recordName, final SetNameListener listener) {
return recordHandler.executor.submit(new Callable<AnonymousRecord>() {
@Override
public AnonymousRecord call() throws Exception {
AnonymousRecord.this.unsubscribeRecord();
AnonymousRecord.this.record = AnonymousRecord.this.recordHandler.getRecord(recordName);
AnonymousRecord.this.subscribeRecord();

final CountDownLatch readyLatch = new CountDownLatch(1);
record.whenReady(new Record.RecordReadyListener() {
@Override
public void onRecordReady(String recordName, Record record) {
readyLatch.countDown();
}
});

try {
readyLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

for (AnonymousRecordNameChangedListener anonymousRecordNameChangedCallback : AnonymousRecord.this.anonymousRecordNameChangedCallbacks) {
anonymousRecordNameChangedCallback.recordNameChanged(recordName, AnonymousRecord.this);
}

if(listener != null){
return AnonymousRecord.this;
}
return AnonymousRecord.this;
}
});
}

/**
Expand Down
28 changes: 24 additions & 4 deletions src/main/java/io/deepstream/Connection.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package io.deepstream;

import com.google.j2objc.annotations.ObjectiveCName;

import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.j2objc.annotations.ObjectiveCName;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand Down Expand Up @@ -58,6 +59,23 @@ class Connection implements IConnection {
this.endpoint = createEndpoint();
}

/**
* Creates an endpoint and passed it to {@link Connection#Connection(String, DeepstreamConfig, DeepstreamClient, EndpointFactory, Endpoint)}
*
* @see Connection#Connection(String, DeepstreamConfig, DeepstreamClient, EndpointFactory, Endpoint)
*
* @param url The endpoint url
* @param options The options used to initialise the deepstream client
* @param endpointFactory The factory to create endpoints
* @param client The deepstream client
* @throws URISyntaxException An exception if an invalid url is passed in
*/
Connection(final String url, final DeepstreamConfig options, DeepstreamClient client, EndpointFactory endpointFactory, boolean networkAvailable) throws URISyntaxException {
this( url, options, client, endpointFactory, null );
this.globalConnectivityState = networkAvailable ? GlobalConnectivityState.CONNECTED : GlobalConnectivityState.DISCONNECTED;
this.endpoint = createEndpoint();
}

/**
* Creates a connection, that is responsible for handling all the connection related logic related to state
* and messages
Expand Down Expand Up @@ -260,6 +278,7 @@ else if( this.deliberateClose ) {
return;
}
this.tryReconnect();

}
}

Expand Down Expand Up @@ -326,13 +345,14 @@ else if( message.action == Actions.ACK ) {

@ObjectiveCName("setState:")
private void setState( ConnectionState connectionState ) {
System.out.println(connectionState);
this.connectionState = connectionState;

for (ConnectionStateListener connectStateListener : this.connectStateListeners) {
connectStateListener.connectionStateChanged(connectionState);
}

if( connectionState == ConnectionState.AWAITING_AUTHENTICATION && this.authParameters != null ) {
if( connectionState == ConnectionState.AWAITING_AUTHENTICATION && this.authParameters != null) {
this.sendAuthMessage();
}
}
Expand Down
129 changes: 96 additions & 33 deletions src/main/java/io/deepstream/DeepstreamClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@
import java.util.Date;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* The main entry point for a DeepstreamClient. You can create a client directly using the constructors or use the
* {@link DeepstreamFactory#getClient()}, {@link DeepstreamFactory#getClient(String)} or
* {@link DeepstreamFactory#getClient(String, Properties)} to create one for you and hold them for future reference.
*/
public class DeepstreamClient extends DeepstreamClientAbstract {
private ExecutorService executor;

/**
* The getters for data-sync, such as {@link RecordHandler#getRecord(String)},
Expand Down Expand Up @@ -124,11 +129,24 @@ private DeepstreamClient(final String url, DeepstreamConfig deepstreamConfig) th
*/
@ObjectiveCName("init:deepstreamConfig:endpointFactory:")
public DeepstreamClient(final String url, DeepstreamConfig deepstreamConfig, EndpointFactory endpointFactory) throws URISyntaxException {
this.connection = new Connection(url, deepstreamConfig, this, endpointFactory);
this(url, deepstreamConfig, endpointFactory, false);
}

/**
* deepstream.io java client
* @param url URL to connect to. The protocol can be omited, e.g. <host>:<port>
* @param deepstreamConfig A map of options that extend the ones specified in DefaultConfig.properties
* @param endpointFactory An EndpointFactory that returns an Endpoint
* @param networkAvailable indicates whether network is available or not
* @throws URISyntaxException Thrown if the url in incorrect
*/
public DeepstreamClient(final String url, DeepstreamConfig deepstreamConfig, EndpointFactory endpointFactory, boolean networkAvailable) throws URISyntaxException {
this.executor = Executors.newSingleThreadExecutor();
this.connection = new Connection(url, deepstreamConfig, this, endpointFactory, networkAvailable);
this.event = new EventHandler(deepstreamConfig, this.connection, this);
this.rpc = new RpcHandler(deepstreamConfig, this.connection, this);
this.record = new RecordHandler(deepstreamConfig, this.connection, this);
this.presence = new PresenceHandler(deepstreamConfig, this.connection, this);
this.rpc = new RpcHandler(deepstreamConfig, this.connection, this, this.executor);
this.record = new RecordHandler(deepstreamConfig, this.connection, this, this.executor);
this.presence = new PresenceHandler(deepstreamConfig, this.connection, this, this.executor);
}

/**
Expand Down Expand Up @@ -189,12 +207,12 @@ public void setRuntimeErrorHandler( DeepstreamRuntimeErrorHandler deepstreamRunt
* @return The login result
*/
@ObjectiveCName("login")
public LoginResult login() {
public LoginResult login(){
return this.login(null);
}

/**
* Send authentication parameters to the client to fully open
* Synchronously sends authentication parameters to the client to fully open
* the connection.
*
* Please note: Authentication parameters are send over an already established
Expand All @@ -214,45 +232,92 @@ public LoginResult login() {
* login can be called multiple times until either the connection is authenticated or
* forcefully closed by the server since its maxAuthAttempts threshold has been exceeded
*
* This will block your calling thread
*
* @param authParams JSON.serializable authentication data
* @return The login result
*/
@ObjectiveCName("login:")
public LoginResult login(JsonElement authParams) {
final CountDownLatch loggedInLatch = new CountDownLatch(1);
final LoginResult[] loginResult = new LoginResult[1];

this.connection.authenticate(authParams, new LoginCallback() {
@Override
@ObjectiveCName("loginSuccess:")
public void loginSuccess(Object userData) {
loginResult[0] = new LoginResult(true, userData);
loggedInLatch.countDown();
}
public LoginResult login(final JsonElement authParams){
try {
return this.loginAsync(authParams, null).get();
}catch(Exception e){
e.printStackTrace();
return null;
}
}

/**
* Asynchronously sends authentication parameters to the client to fully open
* the connection and sends callback afterwards.
*
* Please note: Authentication parameters are send over an already established
* connection, rather than appended to the server URL. This means the parameters
* will be encrypted when used with a WSS / HTTPS connection. If the deepstream server
* on the other side has message logging enabled it will however be written to the logs in
* plain text. If additional security is a requirement it might therefor make sense to hash
* the password on the client.
*
* If the connection is not yet established the authentication parameter will be
* stored and send once it becomes available
*
* authParams can be any JSON serializable data structure and its up for the
* permission handler on the server to make sense of them, although something
* like { username: 'someName', password: 'somePass' } will probably make the most sense.
*
* login can be called multiple times until either the connection is authenticated or
* forcefully closed by the server since its maxAuthAttempts threshold has been exceeded
*
* This will not block your main thread. However you can block it by calling get().
*
* @param authParams JSON.serializable authentication data
* @param listener Callback to be called after login is successfull, may be null
* @return The login result
*/
@ObjectiveCName("loginAsync:")
public Future<LoginResult> loginAsync(final JsonElement authParams, final LoginResultListener listener) {
return executor.submit(new Callable<LoginResult>() {
@Override
@ObjectiveCName("loginFailed:data:")
public void loginFailed(Event errorEvent, Object data) {
loginResult[0] = new LoginResult(false, errorEvent, data);
loggedInLatch.countDown();
public LoginResult call() throws Exception {
final CountDownLatch loggedInLatch = new CountDownLatch(1);
final LoginResult[] loginResult = new LoginResult[1];

connection.authenticate(authParams, new LoginCallback() {
@Override
@ObjectiveCName("loginSuccess:")
public void loginSuccess(Object userData) {
loginResult[0] = new LoginResult(true, userData);
loggedInLatch.countDown();
}

@Override
@ObjectiveCName("loginFailed:data:")
public void loginFailed(Event errorEvent, Object data) {
loginResult[0] = new LoginResult(false, errorEvent, data);
loggedInLatch.countDown();
}
});

try {
loggedInLatch.await();
} catch (InterruptedException e) {
loginResult[0] = new LoginResult(false, null, "An issue occured during login");
}
if(listener != null) {
listener.loginCompleted(loginResult[0]);
}
return loginResult[0];
}
});

try {
loggedInLatch.await();
} catch (InterruptedException e) {
loginResult[0] = new LoginResult(false, null, "An issue occured during login");
}

return loginResult[0];
}

/**
* Closes the connection to the server.
* @return The deepstream client
*/
public DeepstreamClient close() {
this.connection.close(false);
this.executor.shutdown();
this.connection.close(true);
this.getAckTimeoutRegistry().close();
return this;
}
Expand Down Expand Up @@ -289,9 +354,7 @@ public ConnectionState getConnectionState() {
}

/**
* Sets global connectivity state and notifies current connections about it. When connectivity is {@link GlobalConnectivityState#DISCONNECTED)} connection will be closed and
* no reconnects will be attempted. If connectivity is set to {@link GlobalConnectivityState#CONNECTED)} and current {@link ConnectionState)} is {@link ConnectionState#CLOSED)}
* or {@link ConnectionState#ERROR)} then client will try reconnecting.
* Set global connectivity state.
* @param {GlobalConnectivityState} globalConnectivityState Current global connectivity state
*/
public void setGlobalConnectivityState(GlobalConnectivityState globalConnectivityState){
Expand Down
5 changes: 0 additions & 5 deletions src/main/java/io/deepstream/DeepstreamClientAbstract.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import com.google.j2objc.annotations.ObjectiveCName;

import com.google.gson.JsonElement;

abstract class DeepstreamClientAbstract {
private UtilAckTimeoutRegistry utilAckTimeoutRegistry;
private DeepstreamRuntimeErrorHandler deepstreamRuntimeErrorHandler;
Expand All @@ -14,10 +12,7 @@ abstract class DeepstreamClientAbstract {
abstract DeepstreamClientAbstract removeConnectionChangeListener(ConnectionStateListener connectionStateListener);
abstract ConnectionState getConnectionState();

abstract LoginResult login();

@ObjectiveCName("login:")
abstract LoginResult login(JsonElement data);
abstract DeepstreamClientAbstract close();
abstract String getUid();

Expand Down
Loading