Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.fasterxml.jackson.core.Base64Variants;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;

Expand Down Expand Up @@ -66,6 +67,7 @@ public IdDeserializer(Class<?> vc) {
*/
@Override
public Id deserialize(JsonParser p, DeserializationContext ctx) throws IOException {
return DataFormat.isBinary(p) ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText());
return DataFormat.isBinary(p) || p.currentToken() != JsonToken.VALUE_STRING ?
Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.fasterxml.jackson.core.Base64Variants;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;

Expand Down Expand Up @@ -65,7 +66,8 @@ public InetAddressDeserializer(Class<?> vc) {
*/
@Override
public InetAddress deserialize(JsonParser p, DeserializationContext ctx) throws IOException {
return DataFormat.isBinary(p) ? InetAddress.getByAddress(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) :
return DataFormat.isBinary(p) || p.currentToken() != JsonToken.VALUE_STRING ?
InetAddress.getByAddress(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) :
InetAddress.getByName(p.getText());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,15 @@ public NodeInfo deserialize(JsonParser p, DeserializationContext ctx) throws IOE
int port = 0;

// id
if (p.nextToken() != JsonToken.VALUE_NULL)
id = binaryFormat ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText());
JsonToken token = p.nextToken();
if (token != JsonToken.VALUE_NULL)
id = binaryFormat || token != JsonToken.VALUE_STRING ?
Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText());

// address
// text format: IP address string or hostname string
// binary format: binary ip address or host name string
JsonToken token = p.nextToken();
token = p.nextToken();
if (token == JsonToken.VALUE_STRING)
host = p.getText();
else if (token == JsonToken.VALUE_EMBEDDED_OBJECT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public PeerInfo deserialize(JsonParser p, DeserializationContext ctx) throws IOE
switch (fieldName) {
case "id":
if (token != JsonToken.VALUE_NULL)
publicKey = binaryFormat ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText());
publicKey = binaryFormat || token != JsonToken.VALUE_STRING ?
Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText());
break;
case "n":
if (token != JsonToken.VALUE_NULL)
Expand All @@ -104,7 +105,8 @@ public PeerInfo deserialize(JsonParser p, DeserializationContext ctx) throws IOE
break;
case "o":
if (token != JsonToken.VALUE_NULL)
nodeId = binaryFormat ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText());
nodeId = binaryFormat || token != JsonToken.VALUE_STRING ?
Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText());
break;
case "os":
if (token != JsonToken.VALUE_NULL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,13 @@ public Value deserialize(JsonParser p, DeserializationContext ctx) throws IOExce
switch (fieldName) {
case "k":
if (token != JsonToken.VALUE_NULL)
publicKey = binaryFormat ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText());
publicKey = binaryFormat || token != JsonToken.VALUE_STRING ?
Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText());
break;
case "rec":
if (token != JsonToken.VALUE_NULL)
recipient = binaryFormat ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText());
recipient = binaryFormat || token != JsonToken.VALUE_STRING ?
Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText());
break;
case "n":
if (token != JsonToken.VALUE_NULL)
Expand Down
10 changes: 10 additions & 0 deletions api/src/main/java/io/bosonnetwork/service/Federation.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,14 @@ default CompletableFuture<? extends FederatedNode> getNode(Id nodeId) {
* or completes exceptionally/with null if the service cannot be located
*/
public CompletableFuture<List<? extends ServiceInfo>> getServices(Id peerId, Id nodeId);

/**
* Retrieves a list of services associated with a specific peer identified by its ID.
*
* @param peerId the unique identifier of the peer whose services are to be queried
* @return a {@link CompletableFuture} that completes with a list of {@link ServiceInfo} objects
* representing the services associated with the specified peer, or completes exceptionally
* if an error occurs while retrieving the services
*/
public CompletableFuture<List<? extends ServiceInfo>> getServices(Id peerId);
}
100 changes: 100 additions & 0 deletions api/src/main/java/io/bosonnetwork/vertx/ObservableReadStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package io.bosonnetwork.vertx;

import io.vertx.core.Handler;
import io.vertx.core.streams.ReadStream;

/**
* A ReadStream wrapper that observes each element before forwarding it.
* <p>
* If the {@code observeHandler} throws an exception:
* - the stream is paused
* - no further elements are forwarded
* - the exception handler is invoked
* </p>
* Note: the underlying stream may still invoke {@code endHandler}
* after termination. Consumers should treat {@code exceptionHandler}
* as the authoritative failure signal.
*/
public class ObservableReadStream<T> implements ReadStream<T> {
private final ReadStream<T> delegate;
private final Handler<T> observeHandler;
private volatile boolean terminated;
private Handler<Throwable> exceptionHandler;

/**
* Constructs an ObservableReadStream that wraps a given ReadStream and observes each element
* before forwarding it, using the provided observeHandler.
*
* @param delegate the underlying ReadStream to wrap and delegate operations to
* @param observeHandler the handler invoked to observe each element before it is forwarded
* to the final consumer; if this handler throws an exception, the stream
* is paused, no further elements are forwarded, and the exception
* handler (if set) is invoked
*/
public ObservableReadStream(ReadStream<T> delegate, Handler<T> observeHandler) {
this.delegate = delegate;
this.observeHandler = observeHandler;
}

@Override
public ObservableReadStream<T> exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
delegate.exceptionHandler(handler);
return this;
}

@Override
public ObservableReadStream<T> handler(Handler<T> handler) {
if (terminated)
return this;

if (handler == null) {
delegate.handler(null);
return this;
}

delegate.handler(element -> {
if (observeHandler != null) {
try {
observeHandler.handle(element);
} catch (Throwable t) {
delegate.pause();
terminated = true;
if (exceptionHandler != null)
exceptionHandler.handle(t);

return;
}
}

handler.handle(element);
});

return this;
}

@Override
public ObservableReadStream<T> pause() {
delegate.pause();
return this;
}

@Override
public ObservableReadStream<T> resume() {
if (!terminated)
delegate.resume();
return this;
}

@Override
public ObservableReadStream<T> fetch(long amount) {
delegate.fetch(amount);
return this;
}

@Override
public ObservableReadStream<T> endHandler(Handler<Void> endHandler) {
delegate.endHandler(endHandler);
return this;
}
}
Loading
Loading