Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 136 additions & 5 deletions src/main/java/network/p2p/MessageClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,28 @@
import model.codec.EncodedEntity;
import model.lightchain.Identifier;
import modules.codec.JsonEncoder;
import network.p2p.proto.GetReply;
import network.p2p.proto.GetRequest;
import network.p2p.proto.Message;
import network.p2p.proto.MessengerGrpc;
import network.p2p.proto.PutMessage;
import network.p2p.proto.StorageGrpc;

/**
* Client side of gRPC that is responsible for sending messages from this node.
*/
public class MessageClient {
private final MessengerGrpc.MessengerStub asyncStub;
private final StorageGrpc.StorageStub storageAsyncStub;

/**
* Constructor.
*/
public MessageClient(Channel channel) {

asyncStub = MessengerGrpc.newStub(channel);
storageAsyncStub = StorageGrpc.newStub(channel);

}

/**
Expand Down Expand Up @@ -73,11 +81,11 @@ public void onCompleted() {

EncodedEntity encodedEntity = encoder.encode(entity);
Message message = Message.newBuilder()
.setChannel(channel)
.setPayload(ByteString.copyFrom(encodedEntity.getBytes()))
.setType(encodedEntity.getType())
.addTargetIds(ByteString.copyFrom(target.getBytes()))
.build();
.setChannel(channel)
.setPayload(ByteString.copyFrom(encodedEntity.getBytes()))
.setType(encodedEntity.getType())
.addTargetIds(ByteString.copyFrom(target.getBytes()))
.build();
requestObserver.onNext(message);

if (finishLatch.getCount() == 0) {
Expand All @@ -100,4 +108,127 @@ public void onCompleted() {
System.err.println("deliver can not finish within 1 minutes");
}
}

/**
* Implements logic to asynchronously put entity to the target.
*/
public void put(Entity entity, String channel) throws InterruptedException {
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<Empty> responseObserver = new StreamObserver<Empty>() {
@Override
public void onNext(Empty value) {

}

@Override
public void onError(Throwable t) {
System.err.println("put failed: " + Status.fromThrowable(t));
finishLatch.countDown();
}

@Override
public void onCompleted() {
finishLatch.countDown();
}
};

StreamObserver<PutMessage> requestObserver = storageAsyncStub.put(responseObserver);

try {
JsonEncoder encoder = new JsonEncoder();

EncodedEntity encodedEntity = encoder.encode(entity);
PutMessage putMessage = PutMessage.newBuilder()
.setChannel(channel)
.setPayload(ByteString.copyFrom(encodedEntity.getBytes()))
.setType(encodedEntity.getType())
.build();
requestObserver.onNext(putMessage);

if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}

} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}

// Mark the end of requests
requestObserver.onCompleted();

// Receiving happens asynchronously
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
System.err.println("put can not finish within 1 minutes");
}
}

/**
* Implements logic to asynchronously get entity from the target.
*/
public Entity get(Identifier identifier, String channel) throws InterruptedException {

final CountDownLatch finishLatch = new CountDownLatch(1);
final Entity[] entity = {null};
StreamObserver<GetRequest> requestObserver =
storageAsyncStub.get(new StreamObserver<GetReply>() {

@Override
public void onNext(GetReply response) {

JsonEncoder encoder = new JsonEncoder();
EncodedEntity e = new EncodedEntity(response.getPayload().toByteArray(), response.getType());
try {
// puts the incoming entity onto the distributedStorageComponent
entity[0] = encoder.decode(e);
} catch (ClassNotFoundException ex) {
// TODO: replace with fatal log
System.err.println("could not decode incoming GetReply response");
ex.printStackTrace();
}

}

@Override
public void onError(Throwable t) {
finishLatch.countDown();
}

@Override
public void onCompleted() {
finishLatch.countDown();
}

});

try {

GetRequest request = GetRequest
.newBuilder()
.setIdentifier(ByteString.copyFrom(identifier.getBytes()))
.setChannel(channel)
.build();

requestObserver.onNext(request);

} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();

// Receiving happens asynchronously
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
System.err.println("get can not finish within 1 minutes");
}

return entity[0];

}

}
131 changes: 128 additions & 3 deletions src/main/java/network/p2p/MessageServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,28 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import model.Entity;
import model.codec.EncodedEntity;
import model.lightchain.Identifier;
import modules.codec.JsonEncoder;
import network.p2p.proto.GetReply;
import network.p2p.proto.GetRequest;
import network.p2p.proto.Message;
import network.p2p.proto.MessengerGrpc;
import network.p2p.proto.PutMessage;
import network.p2p.proto.StorageGrpc;
import protocol.Engine;

/**
Expand All @@ -37,6 +48,7 @@
public class MessageServer {
private final Server server;
private final HashMap<String, Engine> engineChannelTable;
public ConcurrentMap<String, Set<Entity>> distributedStorageComponent;

/**
* Create a MessageServer using ServerBuilder as a base.
Expand All @@ -45,10 +57,12 @@ public class MessageServer {
*/
public MessageServer(int port) {
server = ServerBuilder.forPort(port)
.addService(new MessengerImpl())
.build();
.addService(new MessengerImpl())
.addService(new StorageImpl())
.build();

this.engineChannelTable = new HashMap<>();
this.distributedStorageComponent = new ConcurrentHashMap<>();
}

/**
Expand Down Expand Up @@ -93,7 +107,7 @@ public void stop() throws InterruptedException {
}

/**
* Concrete implementation of the gRPC Serverside response methods.
* Concrete implementation of the gRPC Serverside response methods for Message functionality.
*/
@SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "meant to be externally mutable")
public class MessengerImpl extends MessengerGrpc.MessengerImplBase {
Expand Down Expand Up @@ -149,4 +163,115 @@ public void onCompleted() {
}
}

/**
* Concrete implementation of the gRPC Serverside response methods for Storage functionality.
*/
@SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "meant to be externally mutable")
public class StorageImpl extends StorageGrpc.StorageImplBase {

@Override
public StreamObserver<PutMessage> put(StreamObserver<Empty> responseObserver) {
return new StreamObserver<PutMessage>() {
@Override
@SuppressFBWarnings(value = "DM_EXIT", justification = "meant to fail VM safely upon error")
public void onNext(PutMessage putMessage) {

JsonEncoder encoder = new JsonEncoder();
EncodedEntity e = new EncodedEntity(putMessage.getPayload().toByteArray(), putMessage.getType());

if (engineChannelTable.containsKey(putMessage.getChannel())) {
try {

// TODO: replace with info log
System.out.println("Putting Entity");
System.out.println("ID: " + encoder.decode(e).id());
System.out.println("Channel: " + putMessage.getChannel());
System.out.println("Type: " + putMessage.getType());

// puts the incoming entity onto the distributedStorageComponent

if (distributedStorageComponent.containsKey(putMessage.getChannel())) {
distributedStorageComponent.get(putMessage.getChannel()).add(encoder.decode(e));
} else {
HashSet s = new HashSet<>();
s.add(encoder.decode(e));
distributedStorageComponent.put(putMessage.getChannel(), s);
}

} catch (ClassNotFoundException ex) {
// TODO: replace with fatal log
System.err.println("could not decode incoming put message");
ex.printStackTrace();
System.exit(1);
}
} else {
// TODO: replace with error log
System.err.println("no channel found for incoming put message: " + putMessage.getChannel());
}
}

@Override
public void onError(Throwable t) {
// TODO: replace with error log
System.err.println("encountered error in deliver: " + t);
}

@Override
public void onCompleted() {
responseObserver.onNext(com.google.protobuf.Empty.newBuilder().build());
responseObserver.onCompleted();
}
};
}

@Override
public StreamObserver<GetRequest> get(StreamObserver<GetReply> responseObserver) {
return new StreamObserver<GetRequest>() {
@Override
public void onNext(GetRequest request) {

Identifier id = new Identifier(request.getIdentifier().toByteArray());
JsonEncoder encoder = new JsonEncoder();

System.out.println("Getting Entity");
System.out.println("ID: " + id);

Entity entity = null;

if (distributedStorageComponent.containsKey(request.getChannel())) {

for (Entity e : distributedStorageComponent.get(request.getChannel())) {
if (e.id().comparedTo(id) == 0) {
entity = e;
EncodedEntity encodedEntity = encoder.encode(entity);

GetReply reply = GetReply
.newBuilder()
.setPayload(ByteString.copyFrom(encodedEntity.getBytes()))
.setType(encodedEntity.getType())
.build();

responseObserver.onNext(reply);
}
}

} else {
System.out.println("CHANNEL NOT FOUND");
}

}

@Override
public void onError(Throwable t) {
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}

}

}
17 changes: 15 additions & 2 deletions src/main/java/network/p2p/P2pConduit.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public void unicast(Entity e, Identifier target) throws LightChainNetworkingExce
@Override
public void put(Entity e) throws LightChainDistributedStorageException {

try {
network.putEntity(e, this.channel);
} catch (InterruptedException | IllegalArgumentException ex) {
throw new LightChainDistributedStorageException();
}

}

/**
Expand All @@ -59,11 +65,18 @@ public void put(Entity e) throws LightChainDistributedStorageException {
*/
@Override
public Entity get(Identifier identifier) throws LightChainDistributedStorageException {
return null;

try {
return network.getEntity(identifier, this.channel);
} catch (InterruptedException | IllegalArgumentException ex) {
throw new LightChainDistributedStorageException();
}

}

@Override
public ArrayList<Entity> allEntities() throws LightChainDistributedStorageException {
return null;
return network.getAllEntities(channel);
}

}
Loading