From c182c6764fed0af7479b8a46699061a3b428a7a8 Mon Sep 17 00:00:00 2001 From: Jingyu Date: Thu, 15 Jan 2026 15:07:48 +0800 Subject: [PATCH 1/2] Improve the ReadStream handling --- .../io/bosonnetwork/service/Federation.java | 10 + .../vertx/ObservableReadStream.java | 100 +++++++ .../main/java/io/bosonnetwork/vertx/Pump.java | 246 ------------------ 3 files changed, 110 insertions(+), 246 deletions(-) create mode 100644 api/src/main/java/io/bosonnetwork/vertx/ObservableReadStream.java delete mode 100644 api/src/main/java/io/bosonnetwork/vertx/Pump.java diff --git a/api/src/main/java/io/bosonnetwork/service/Federation.java b/api/src/main/java/io/bosonnetwork/service/Federation.java index cfddd7e..67a7f94 100644 --- a/api/src/main/java/io/bosonnetwork/service/Federation.java +++ b/api/src/main/java/io/bosonnetwork/service/Federation.java @@ -76,4 +76,14 @@ default CompletableFuture getNode(Id nodeId) { * or completes exceptionally/with null if the service cannot be located */ public CompletableFuture> getServices(Id peerId, Id nodeId); + + /** + * Retrieves a list of services associated with a specific peer identified by its ID. + * + * @param peerId the unique identifier of the peer whose services are to be queried + * @return a {@link CompletableFuture} that completes with a list of {@link ServiceInfo} objects + * representing the services associated with the specified peer, or completes exceptionally + * if an error occurs while retrieving the services + */ + public CompletableFuture> getServices(Id peerId); } \ No newline at end of file diff --git a/api/src/main/java/io/bosonnetwork/vertx/ObservableReadStream.java b/api/src/main/java/io/bosonnetwork/vertx/ObservableReadStream.java new file mode 100644 index 0000000..771d517 --- /dev/null +++ b/api/src/main/java/io/bosonnetwork/vertx/ObservableReadStream.java @@ -0,0 +1,100 @@ +package io.bosonnetwork.vertx; + +import io.vertx.core.Handler; +import io.vertx.core.streams.ReadStream; + +/** + * A ReadStream wrapper that observes each element before forwarding it. + *

+ * If the {@code observeHandler} throws an exception: + * - the stream is paused + * - no further elements are forwarded + * - the exception handler is invoked + *

+ * Note: the underlying stream may still invoke {@code endHandler} + * after termination. Consumers should treat {@code exceptionHandler} + * as the authoritative failure signal. + */ +public class ObservableReadStream implements ReadStream { + private final ReadStream delegate; + private final Handler observeHandler; + private volatile boolean terminated; + private Handler exceptionHandler; + + /** + * Constructs an ObservableReadStream that wraps a given ReadStream and observes each element + * before forwarding it, using the provided observeHandler. + * + * @param delegate the underlying ReadStream to wrap and delegate operations to + * @param observeHandler the handler invoked to observe each element before it is forwarded + * to the final consumer; if this handler throws an exception, the stream + * is paused, no further elements are forwarded, and the exception + * handler (if set) is invoked + */ + public ObservableReadStream(ReadStream delegate, Handler observeHandler) { + this.delegate = delegate; + this.observeHandler = observeHandler; + } + + @Override + public ObservableReadStream exceptionHandler(Handler handler) { + exceptionHandler = handler; + delegate.exceptionHandler(handler); + return this; + } + + @Override + public ObservableReadStream handler(Handler handler) { + if (terminated) + return this; + + if (handler == null) { + delegate.handler(null); + return this; + } + + delegate.handler(element -> { + if (observeHandler != null) { + try { + observeHandler.handle(element); + } catch (Throwable t) { + delegate.pause(); + terminated = true; + if (exceptionHandler != null) + exceptionHandler.handle(t); + + return; + } + } + + handler.handle(element); + }); + + return this; + } + + @Override + public ObservableReadStream pause() { + delegate.pause(); + return this; + } + + @Override + public ObservableReadStream resume() { + if (!terminated) + delegate.resume(); + return this; + } + + @Override + public ObservableReadStream fetch(long amount) { + delegate.fetch(amount); + return this; + } + + @Override + public ObservableReadStream endHandler(Handler endHandler) { + delegate.endHandler(endHandler); + return this; + } +} \ No newline at end of file diff --git a/api/src/main/java/io/bosonnetwork/vertx/Pump.java b/api/src/main/java/io/bosonnetwork/vertx/Pump.java deleted file mode 100644 index 4fd884f..0000000 --- a/api/src/main/java/io/bosonnetwork/vertx/Pump.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 - * which is available at https://www.apache.org/licenses/LICENSE-2.0. - * - * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 - */ - -/* - * Copyright (c) 2023 - bosonnetwork.io - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.bosonnetwork.vertx; - -import io.vertx.core.AsyncResult; -import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.Promise; -import io.vertx.core.VertxException; -import io.vertx.core.streams.Pipe; -import io.vertx.core.streams.ReadStream; -import io.vertx.core.streams.WriteStream; - -// NOTE: This is a modified version of Vert.x's PipeImpl class. -// Avoid reformatting to preserve diff clarity for easier comparison and -// merging with upstream changes. - -/** - * Pump works like a pipe, pumping data from a {@code ReadStream} to a {@code WriteStream}. - *

- * This class is a copy of Vert.x's {@code PipeImpl} class with an additional observer handler. - */ -public class Pump implements Pipe { - - private final Promise result; - private final ReadStream src; - private boolean endOnSuccess = true; - private boolean endOnFailure = true; - private Handler observer; - private WriteStream dst; - - private Pump(ReadStream src) { - this.src = src; - this.result = Promise.promise(); - - // Set handlers now - src.endHandler(result::tryComplete); - src.exceptionHandler(result::tryFail); - } - - /** - * Create a new {@code Pump} with the given source {@code ReadStream}. - * - * @param src the source read stream - * @param the type of items being pumped - * @return the pump instance - */ - public static Pump from(ReadStream src) { - return new Pump<>(src); - } - - /** - * Set to {@code true} to end the destination {@code WriteStream} when the source {@code ReadStream} fails. - *

- * The default value is {@code true}. - * - * @param end {@code true} to end the destination on failure - * @return a reference to this, so the API can be used fluently - */ - @Override - public synchronized Pipe endOnFailure(boolean end) { - endOnFailure = end; - return this; - } - - /** - * Set to {@code true} to end the destination {@code WriteStream} when the source {@code ReadStream} ends. - *

- * The default value is {@code true}. - * - * @param end {@code true} to end the destination on success - * @return a reference to this, so the API can be used fluently - */ - @Override - public synchronized Pipe endOnSuccess(boolean end) { - endOnSuccess = end; - return this; - } - - /** - * Set to {@code true} to end the destination {@code WriteStream} when the source {@code ReadStream} ends or fails. - * - * @param end {@code true} to end the destination on completion - * @return a reference to this, so the API can be used fluently - */ - @Override - public synchronized Pipe endOnComplete(boolean end) { - endOnSuccess = end; - endOnFailure = end; - return this; - } - - /** - * Set an observer {@code Handler} to be notified of the elements pumped. - * - * @param observer the observer handler - * @return a reference to this, so the API can be used fluently - */ - public synchronized Pipe observer(Handler observer) { - this.observer = observer; - return this; - } - - private void handleWriteResult(AsyncResult ack) { - if (ack.failed()) { - result.tryFail(new WriteException(ack.cause())); - } - } - - /** - * Start pumping to the given destination {@code WriteStream}. - * - * @param ws the destination write stream - * @return a future that completes when the pumping is complete - */ - @Override - public Future to(WriteStream ws) { - Promise promise = Promise.promise(); - if (ws == null) { - throw new NullPointerException(); - } - synchronized (Pump.this) { - if (dst != null) { - throw new IllegalStateException(); - } - dst = ws; - } - Handler drainHandler = v -> src.resume(); - src.handler(item -> { - if (observer != null) { - try { - observer.handle(item); - } catch (Throwable t) { - result.tryFail(t); - return; - } - } - - ws.write(item).onComplete(this::handleWriteResult); - if (ws.writeQueueFull()) { - src.pause(); - ws.drainHandler(drainHandler); - } - }); - src.resume(); - result.future().onComplete(ar -> { - try { - src.handler(null); - } catch (Exception ignore) { - } - try { - src.exceptionHandler(null); - } catch (Exception ignore) { - } - try { - src.endHandler(null); - } catch (Exception ignore) { - } - if (ar.succeeded()) { - handleSuccess(promise); - } else { - Throwable err = ar.cause(); - if (err instanceof WriteException) { - src.resume(); - err = err.getCause(); - } - handleFailure(err, promise); - } - }); - return promise.future(); - } - - private void handleSuccess(Promise promise) { - if (endOnSuccess) { - dst.end().onComplete(promise); - } else { - promise.complete(); - } - } - - private void handleFailure(Throwable cause, Promise completionHandler) { - if (endOnFailure){ - dst - .end() - .transform(ar -> Future.failedFuture(cause)) - .onComplete(completionHandler); - } else { - completionHandler.fail(cause); - } - } - - /** - * Close the pump. - */ - public void close() { - synchronized (this) { - src.exceptionHandler(null); - src.handler(null); - if (dst != null) { - dst.drainHandler(null); - dst.exceptionHandler(null); - } - } - VertxException err = new VertxException("Pipe closed", true); - if (result.tryFail(err)) { - src.resume(); - } - } - - private static class WriteException extends VertxException { - private WriteException(Throwable cause) { - super(cause, true); - } - } -} \ No newline at end of file From d513d4586877d417afd30952dfdf991f27448f3a Mon Sep 17 00:00:00 2001 From: Jingyu Date: Fri, 16 Jan 2026 15:53:25 +0800 Subject: [PATCH 2/2] Refine Id deserializer compatibility across binary and text encodings --- .../io/bosonnetwork/json/internal/IdDeserializer.java | 4 +++- .../json/internal/InetAddressDeserializer.java | 4 +++- .../bosonnetwork/json/internal/NodeInfoDeserializer.java | 8 +++++--- .../bosonnetwork/json/internal/PeerInfoDeserializer.java | 6 ++++-- .../io/bosonnetwork/json/internal/ValueDeserializer.java | 6 ++++-- .../kademlia/protocol/AnnouncePeerRequest.java | 6 ++++-- .../bosonnetwork/kademlia/protocol/FindValueResponse.java | 6 ++++-- .../bosonnetwork/kademlia/protocol/StoreValueRequest.java | 6 ++++-- 8 files changed, 31 insertions(+), 15 deletions(-) diff --git a/api/src/main/java/io/bosonnetwork/json/internal/IdDeserializer.java b/api/src/main/java/io/bosonnetwork/json/internal/IdDeserializer.java index 0edb2ee..d1130e4 100644 --- a/api/src/main/java/io/bosonnetwork/json/internal/IdDeserializer.java +++ b/api/src/main/java/io/bosonnetwork/json/internal/IdDeserializer.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.core.Base64Variants; import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.deser.std.StdDeserializer; @@ -66,6 +67,7 @@ public IdDeserializer(Class vc) { */ @Override public Id deserialize(JsonParser p, DeserializationContext ctx) throws IOException { - return DataFormat.isBinary(p) ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); + return DataFormat.isBinary(p) || p.currentToken() != JsonToken.VALUE_STRING ? + Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); } } \ No newline at end of file diff --git a/api/src/main/java/io/bosonnetwork/json/internal/InetAddressDeserializer.java b/api/src/main/java/io/bosonnetwork/json/internal/InetAddressDeserializer.java index 0b07a73..d0b8174 100644 --- a/api/src/main/java/io/bosonnetwork/json/internal/InetAddressDeserializer.java +++ b/api/src/main/java/io/bosonnetwork/json/internal/InetAddressDeserializer.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.Base64Variants; import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.deser.std.StdDeserializer; @@ -65,7 +66,8 @@ public InetAddressDeserializer(Class vc) { */ @Override public InetAddress deserialize(JsonParser p, DeserializationContext ctx) throws IOException { - return DataFormat.isBinary(p) ? InetAddress.getByAddress(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : + return DataFormat.isBinary(p) || p.currentToken() != JsonToken.VALUE_STRING ? + InetAddress.getByAddress(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : InetAddress.getByName(p.getText()); } } \ No newline at end of file diff --git a/api/src/main/java/io/bosonnetwork/json/internal/NodeInfoDeserializer.java b/api/src/main/java/io/bosonnetwork/json/internal/NodeInfoDeserializer.java index fdecfcf..992488c 100644 --- a/api/src/main/java/io/bosonnetwork/json/internal/NodeInfoDeserializer.java +++ b/api/src/main/java/io/bosonnetwork/json/internal/NodeInfoDeserializer.java @@ -82,13 +82,15 @@ public NodeInfo deserialize(JsonParser p, DeserializationContext ctx) throws IOE int port = 0; // id - if (p.nextToken() != JsonToken.VALUE_NULL) - id = binaryFormat ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); + JsonToken token = p.nextToken(); + if (token != JsonToken.VALUE_NULL) + id = binaryFormat || token != JsonToken.VALUE_STRING ? + Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); // address // text format: IP address string or hostname string // binary format: binary ip address or host name string - JsonToken token = p.nextToken(); + token = p.nextToken(); if (token == JsonToken.VALUE_STRING) host = p.getText(); else if (token == JsonToken.VALUE_EMBEDDED_OBJECT) diff --git a/api/src/main/java/io/bosonnetwork/json/internal/PeerInfoDeserializer.java b/api/src/main/java/io/bosonnetwork/json/internal/PeerInfoDeserializer.java index dae5b53..409f63a 100644 --- a/api/src/main/java/io/bosonnetwork/json/internal/PeerInfoDeserializer.java +++ b/api/src/main/java/io/bosonnetwork/json/internal/PeerInfoDeserializer.java @@ -92,7 +92,8 @@ public PeerInfo deserialize(JsonParser p, DeserializationContext ctx) throws IOE switch (fieldName) { case "id": if (token != JsonToken.VALUE_NULL) - publicKey = binaryFormat ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); + publicKey = binaryFormat || token != JsonToken.VALUE_STRING ? + Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); break; case "n": if (token != JsonToken.VALUE_NULL) @@ -104,7 +105,8 @@ public PeerInfo deserialize(JsonParser p, DeserializationContext ctx) throws IOE break; case "o": if (token != JsonToken.VALUE_NULL) - nodeId = binaryFormat ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); + nodeId = binaryFormat || token != JsonToken.VALUE_STRING ? + Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); break; case "os": if (token != JsonToken.VALUE_NULL) diff --git a/api/src/main/java/io/bosonnetwork/json/internal/ValueDeserializer.java b/api/src/main/java/io/bosonnetwork/json/internal/ValueDeserializer.java index 3c4e694..658410f 100644 --- a/api/src/main/java/io/bosonnetwork/json/internal/ValueDeserializer.java +++ b/api/src/main/java/io/bosonnetwork/json/internal/ValueDeserializer.java @@ -88,11 +88,13 @@ public Value deserialize(JsonParser p, DeserializationContext ctx) throws IOExce switch (fieldName) { case "k": if (token != JsonToken.VALUE_NULL) - publicKey = binaryFormat ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); + publicKey = binaryFormat || token != JsonToken.VALUE_STRING ? + Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); break; case "rec": if (token != JsonToken.VALUE_NULL) - recipient = binaryFormat ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); + recipient = binaryFormat || token != JsonToken.VALUE_STRING ? + Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); break; case "n": if (token != JsonToken.VALUE_NULL) diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/protocol/AnnouncePeerRequest.java b/dht/src/main/java/io/bosonnetwork/kademlia/protocol/AnnouncePeerRequest.java index b498a43..146f324 100644 --- a/dht/src/main/java/io/bosonnetwork/kademlia/protocol/AnnouncePeerRequest.java +++ b/dht/src/main/java/io/bosonnetwork/kademlia/protocol/AnnouncePeerRequest.java @@ -191,7 +191,8 @@ public AnnouncePeerRequest deserialize(JsonParser p, DeserializationContext ctxt cas = p.getIntValue(); break; case "k": - peerId = binaryFormat ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); + peerId = binaryFormat || token != JsonToken.VALUE_STRING ? + Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); break; case "n": nonce = p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL); @@ -201,7 +202,8 @@ public AnnouncePeerRequest deserialize(JsonParser p, DeserializationContext ctxt break; case "o": if (token != JsonToken.VALUE_NULL) - nodeId = binaryFormat ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); + nodeId = binaryFormat || token != JsonToken.VALUE_STRING ? + Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); break; case "os": if (token != JsonToken.VALUE_NULL) diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/protocol/FindValueResponse.java b/dht/src/main/java/io/bosonnetwork/kademlia/protocol/FindValueResponse.java index fe58ee8..7342ab0 100644 --- a/dht/src/main/java/io/bosonnetwork/kademlia/protocol/FindValueResponse.java +++ b/dht/src/main/java/io/bosonnetwork/kademlia/protocol/FindValueResponse.java @@ -171,11 +171,13 @@ public FindValueResponse deserialize(JsonParser p, DeserializationContext ctxt) break; case "k": if (token != JsonToken.VALUE_NULL) - publicKey = binaryFormat ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); + publicKey = binaryFormat || token != JsonToken.VALUE_STRING ? + Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); break; case "rec": if (token != JsonToken.VALUE_NULL) - recipient = binaryFormat ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); + recipient = binaryFormat || token != JsonToken.VALUE_STRING ? + Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); break; case "n": if (token != JsonToken.VALUE_NULL) diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/protocol/StoreValueRequest.java b/dht/src/main/java/io/bosonnetwork/kademlia/protocol/StoreValueRequest.java index 5af5f13..4586193 100644 --- a/dht/src/main/java/io/bosonnetwork/kademlia/protocol/StoreValueRequest.java +++ b/dht/src/main/java/io/bosonnetwork/kademlia/protocol/StoreValueRequest.java @@ -165,11 +165,13 @@ public StoreValueRequest deserialize(JsonParser p, DeserializationContext ctxt) break; case "k": if (token != JsonToken.VALUE_NULL) - publicKey = binaryFormat ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); + publicKey = binaryFormat || token != JsonToken.VALUE_STRING ? + Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); break; case "rec": if (token != JsonToken.VALUE_NULL) - recipient = binaryFormat ? Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); + recipient = binaryFormat || token != JsonToken.VALUE_STRING ? + Id.of(p.getBinaryValue(Base64Variants.MODIFIED_FOR_URL)) : Id.of(p.getText()); break; case "n": if (token != JsonToken.VALUE_NULL)