getList(String key) {
diff --git a/api/src/main/java/io/bosonnetwork/utils/Json.java b/api/src/main/java/io/bosonnetwork/utils/Json.java
index b7bb84f..91606bc 100644
--- a/api/src/main/java/io/bosonnetwork/utils/Json.java
+++ b/api/src/main/java/io/bosonnetwork/utils/Json.java
@@ -890,7 +890,11 @@ public static String toString(Object object, JsonContext context) {
* @throws IllegalArgumentException if the object cannot be serialized
*/
public static String toString(Object object) {
- return toString(object, null);
+ try {
+ return objectMapper().writeValueAsString(object);
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException("object can not be serialized", e);
+ }
}
/**
diff --git a/api/src/main/java/io/bosonnetwork/utils/TimeUtils.java b/api/src/main/java/io/bosonnetwork/utils/TimeUtils.java
index 38b5a6e..9beb66b 100644
--- a/api/src/main/java/io/bosonnetwork/utils/TimeUtils.java
+++ b/api/src/main/java/io/bosonnetwork/utils/TimeUtils.java
@@ -31,6 +31,7 @@
/**
* Data and time related utility functions.
*/
+@Deprecated
public class TimeUtils {
/**
* Parse human friendly duration from a text string.
@@ -49,6 +50,7 @@ public class TimeUtils {
* @return the parsed {@code Duration} object.
* @throws DateTimeParseException if the text cannot be parsed to a duration.
*/
+ @Deprecated
public static Duration parseDuration(CharSequence duration) throws DateTimeParseException {
int idx = duration.length() - 1;
final char specifier = duration.charAt(idx);
diff --git a/api/src/main/java/io/bosonnetwork/vertx/Pump.java b/api/src/main/java/io/bosonnetwork/vertx/Pump.java
index f468fa2..4fd884f 100644
--- a/api/src/main/java/io/bosonnetwork/vertx/Pump.java
+++ b/api/src/main/java/io/bosonnetwork/vertx/Pump.java
@@ -1,3 +1,14 @@
+/*
+ * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ */
+
/*
* Copyright (c) 2023 - bosonnetwork.io
*
@@ -27,209 +38,209 @@
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.VertxException;
-import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.Pipe;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
+// NOTE: This is a modified version of Vert.x's PipeImpl class.
+// Avoid reformatting to preserve diff clarity for easier comparison and
+// merging with upstream changes.
+
/**
- * Pumper that reads data from a {@link ReadStream} and writes it to a {@link WriteStream}.
+ * Pump works like a pipe, pumping data from a {@code ReadStream} to a {@code WriteStream}.
*
- * It handles back-pressure by pausing the source when the destination's write queue is full
- * and resuming it when it's drained. It also supports optional byte limits and configurable
- * behavior for closing the destination stream on completion or failure.
+ * This class is a copy of Vert.x's {@code PipeImpl} class with an additional observer handler.
*/
-public class Pump implements Pipe {
- private final Promise result;
- private final ReadStream src;
- private boolean endOnSuccess = true;
- private boolean endOnFailure = true;
- private long bytesLimit = 0;
- private long bytesPumped = 0;
- private WriteStream dst;
-
- /**
- * Creates a new Pump for the given source stream.
- *
- * @param src the source stream to read from
- */
- public Pump(ReadStream src) {
- this.src = src;
- this.result = Promise.promise();
-
- // Set handlers now
- src.endHandler(result::tryComplete);
- src.exceptionHandler(result::tryFail);
- }
-
- /**
- * Sets a limit on the maximum number of bytes to pump.
- * If the limit is exceeded, the pump fails with a {@link WriteException}.
- *
- * @param bytesLimit the maximum bytes to pump (must be >= 0)
- * @return a reference to this pump for chaining
- * @throws IllegalArgumentException if bytesLimit is negative
- */
- public synchronized Pump bytesLimit(long bytesLimit) {
- if (bytesLimit < 0)
- throw new IllegalArgumentException("bytesLimit must be >= 0");
-
- this.bytesLimit = bytesLimit;
- return this;
- }
-
- /**
- * Configures whether to end the destination stream when the source stream fails.
- *
- * @param end true to end the destination on failure, false otherwise
- * @return a reference to this pump for chaining
- */
- @Override
- public synchronized Pump endOnFailure(boolean end) {
- endOnFailure = end;
- return this;
- }
-
- /**
- * Configures whether to end the destination stream when the source stream completes successfully.
- *
- * @param end true to end the destination on success, false otherwise
- * @return a reference to this pump for chaining
- */
- @Override
- public synchronized Pump endOnSuccess(boolean end) {
- endOnSuccess = end;
- return this;
- }
-
- /**
- * Configures whether to end the destination stream when the source stream completes (success or failure).
- *
- * @param end true to end the destination on completion, false otherwise
- * @return a reference to this pump for chaining
- */
- @Override
- public synchronized Pump endOnComplete(boolean end) {
- endOnSuccess = end;
- endOnFailure = end;
- return this;
- }
-
- private void handleWriteResult(AsyncResult ack) {
- if (ack.failed()) {
- result.tryFail(new WriteException(ack.cause()));
- }
- }
-
- /**
- * Starts the pumping process to the specified destination.
- *
- * @param ws the destination write stream
- * @return a future that completes when the pumping is finished
- * @throws NullPointerException if the destination is null
- * @throws IllegalStateException if a destination has already been set
- */
- @Override
- public Future to(WriteStream ws) {
- Promise promise = Promise.promise();
- if (ws == null) {
- throw new NullPointerException();
- }
- synchronized (this) {
- if (dst != null) {
- throw new IllegalStateException();
- }
- dst = ws;
- }
- Handler drainHandler = v -> src.resume();
- src.handler(item -> {
- bytesPumped += item.length();
- if (bytesLimit > 0 && bytesPumped > bytesLimit) {
- result.tryFail(new WriteException("Pumped bytes limit exceeded"));
- return;
- }
-
- ws.write(item).onComplete(this::handleWriteResult);
- if (ws.writeQueueFull()) {
- src.pause();
- ws.drainHandler(drainHandler);
- }
- });
- src.resume();
- result.future().onComplete(ar -> {
- try {
- src.handler(null);
- } catch (Exception ignore) {
- }
- try {
- src.exceptionHandler(null);
- } catch (Exception ignore) {
- }
- try {
- src.endHandler(null);
- } catch (Exception ignore) {
- }
- if (ar.succeeded()) {
- handleSuccess(promise);
- } else {
- Throwable err = ar.cause();
- if (err instanceof WriteException) {
- src.resume();
- err = err.getCause();
- }
- handleFailure(err, promise);
- }
- });
- return promise.future();
- }
-
- private void handleSuccess(Promise promise) {
- if (endOnSuccess) {
- dst.end().onComplete(promise);
- } else {
- promise.complete();
- }
- }
-
- private void handleFailure(Throwable cause, Promise completionHandler) {
- if (endOnFailure){
- dst
- .end()
- .transform(ar -> Future.failedFuture(cause))
- .onComplete(completionHandler);
- } else {
- completionHandler.fail(cause);
- }
- }
-
- /**
- * Closes the pump, detaching handlers from the source and destination.
- * This will fail the result future if it's not already completed.
- */
- public void close() {
- synchronized (this) {
- src.exceptionHandler(null);
- src.handler(null);
- if (dst != null) {
- dst.drainHandler(null);
- dst.exceptionHandler(null);
- }
- }
- VertxException err = new VertxException("Pipe closed", true);
- if (result.tryFail(err)) {
- src.resume();
- }
- }
-
- private static class WriteException extends VertxException {
- private static final long serialVersionUID = 5995527496327696671L;
-
- private WriteException(String message) {
- super(message, true);
- }
-
- private WriteException(Throwable cause) {
- super(cause, true);
- }
- }
+public class Pump implements Pipe {
+
+ private final Promise result;
+ private final ReadStream src;
+ private boolean endOnSuccess = true;
+ private boolean endOnFailure = true;
+ private Handler observer;
+ private WriteStream dst;
+
+ private Pump(ReadStream src) {
+ this.src = src;
+ this.result = Promise.promise();
+
+ // Set handlers now
+ src.endHandler(result::tryComplete);
+ src.exceptionHandler(result::tryFail);
+ }
+
+ /**
+ * Create a new {@code Pump} with the given source {@code ReadStream}.
+ *
+ * @param src the source read stream
+ * @param the type of items being pumped
+ * @return the pump instance
+ */
+ public static Pump from(ReadStream src) {
+ return new Pump<>(src);
+ }
+
+ /**
+ * Set to {@code true} to end the destination {@code WriteStream} when the source {@code ReadStream} fails.
+ *
+ * The default value is {@code true}.
+ *
+ * @param end {@code true} to end the destination on failure
+ * @return a reference to this, so the API can be used fluently
+ */
+ @Override
+ public synchronized Pipe endOnFailure(boolean end) {
+ endOnFailure = end;
+ return this;
+ }
+
+ /**
+ * Set to {@code true} to end the destination {@code WriteStream} when the source {@code ReadStream} ends.
+ *
+ * The default value is {@code true}.
+ *
+ * @param end {@code true} to end the destination on success
+ * @return a reference to this, so the API can be used fluently
+ */
+ @Override
+ public synchronized Pipe endOnSuccess(boolean end) {
+ endOnSuccess = end;
+ return this;
+ }
+
+ /**
+ * Set to {@code true} to end the destination {@code WriteStream} when the source {@code ReadStream} ends or fails.
+ *
+ * @param end {@code true} to end the destination on completion
+ * @return a reference to this, so the API can be used fluently
+ */
+ @Override
+ public synchronized Pipe endOnComplete(boolean end) {
+ endOnSuccess = end;
+ endOnFailure = end;
+ return this;
+ }
+
+ /**
+ * Set an observer {@code Handler} to be notified of the elements pumped.
+ *
+ * @param observer the observer handler
+ * @return a reference to this, so the API can be used fluently
+ */
+ public synchronized Pipe observer(Handler observer) {
+ this.observer = observer;
+ return this;
+ }
+
+ private void handleWriteResult(AsyncResult ack) {
+ if (ack.failed()) {
+ result.tryFail(new WriteException(ack.cause()));
+ }
+ }
+
+ /**
+ * Start pumping to the given destination {@code WriteStream}.
+ *
+ * @param ws the destination write stream
+ * @return a future that completes when the pumping is complete
+ */
+ @Override
+ public Future to(WriteStream ws) {
+ Promise promise = Promise.promise();
+ if (ws == null) {
+ throw new NullPointerException();
+ }
+ synchronized (Pump.this) {
+ if (dst != null) {
+ throw new IllegalStateException();
+ }
+ dst = ws;
+ }
+ Handler drainHandler = v -> src.resume();
+ src.handler(item -> {
+ if (observer != null) {
+ try {
+ observer.handle(item);
+ } catch (Throwable t) {
+ result.tryFail(t);
+ return;
+ }
+ }
+
+ ws.write(item).onComplete(this::handleWriteResult);
+ if (ws.writeQueueFull()) {
+ src.pause();
+ ws.drainHandler(drainHandler);
+ }
+ });
+ src.resume();
+ result.future().onComplete(ar -> {
+ try {
+ src.handler(null);
+ } catch (Exception ignore) {
+ }
+ try {
+ src.exceptionHandler(null);
+ } catch (Exception ignore) {
+ }
+ try {
+ src.endHandler(null);
+ } catch (Exception ignore) {
+ }
+ if (ar.succeeded()) {
+ handleSuccess(promise);
+ } else {
+ Throwable err = ar.cause();
+ if (err instanceof WriteException) {
+ src.resume();
+ err = err.getCause();
+ }
+ handleFailure(err, promise);
+ }
+ });
+ return promise.future();
+ }
+
+ private void handleSuccess(Promise promise) {
+ if (endOnSuccess) {
+ dst.end().onComplete(promise);
+ } else {
+ promise.complete();
+ }
+ }
+
+ private void handleFailure(Throwable cause, Promise completionHandler) {
+ if (endOnFailure){
+ dst
+ .end()
+ .transform(ar -> Future.failedFuture(cause))
+ .onComplete(completionHandler);
+ } else {
+ completionHandler.fail(cause);
+ }
+ }
+
+ /**
+ * Close the pump.
+ */
+ public void close() {
+ synchronized (this) {
+ src.exceptionHandler(null);
+ src.handler(null);
+ if (dst != null) {
+ dst.drainHandler(null);
+ dst.exceptionHandler(null);
+ }
+ }
+ VertxException err = new VertxException("Pipe closed", true);
+ if (result.tryFail(err)) {
+ src.resume();
+ }
+ }
+
+ private static class WriteException extends VertxException {
+ private WriteException(Throwable cause) {
+ super(cause, true);
+ }
+ }
}
\ No newline at end of file
diff --git a/api/src/main/java/io/bosonnetwork/web/CompactWebTokenAuth.java b/api/src/main/java/io/bosonnetwork/web/CompactWebTokenAuth.java
index ea0ae4c..37c935f 100644
--- a/api/src/main/java/io/bosonnetwork/web/CompactWebTokenAuth.java
+++ b/api/src/main/java/io/bosonnetwork/web/CompactWebTokenAuth.java
@@ -54,14 +54,14 @@
* designed for efficiency. The token structure is partially inspired by JWT but simplified
* and using CBOR/base64url encoding.
*
- * Token Format
+ * Token Format
*
* token = payload.signature
* payload = base64url(CBOR(claims))
* signature = base64url(ED25519Signature(SHA256(payload)))
*
*
- * Claims Definition
+ * Claims Definition
* Server issued token claims:
*
* - jti: Token ID (nonce)
@@ -116,6 +116,14 @@ public interface UserRepository {
*/
Future> getAssociated(Id subject, Id associated);
+ /**
+ * Creates a {@code UserRepository} instance using the provided {@code ClientAuthenticator}.
+ * The returned repository facilitates access to user-related data and entities
+ * by leveraging the given authenticator for security purposes.
+ *
+ * @param authenticator the {@code ClientAuthenticator} instance used to validate client authentication
+ * @return a {@code UserRepository} implementation that utilizes the given {@code ClientAuthenticator}
+ */
static UserRepository fromClientAuthenticator(ClientAuthenticator authenticator) {
return new AuthenticatorUserRepo(authenticator);
}
@@ -276,6 +284,14 @@ public static CompactWebTokenAuth create(Identity identity, UserRepository userR
maxServerIssuedTokenLifetime, maxClientIssuedTokenLifetime, leeway);
}
+ /**
+ * Creates a new instance of CompactWebTokenAuth with the specified identity, user repository,
+ * and default configuration values for token lifetimes and leeway.
+ *
+ * @param identity the identity associated with the authentication process
+ * @param userRepository the repository used for managing user data
+ * @return a new instance of CompactWebTokenAuth
+ */
public static CompactWebTokenAuth create(Identity identity, UserRepository userRepository) {
return new CompactWebTokenAuth(identity, userRepository,
MAX_SERVER_ISSUED_TOKEN_LIFETIME, MAX_CLIENT_ISSUED_TOKEN_LIFETIME, DEFAULT_LEEWAY);
diff --git a/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java b/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java
index f7f2e6a..050ad85 100644
--- a/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java
+++ b/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java
@@ -46,17 +46,27 @@ static void setup(Vertx vertx, VertxTestContext context) throws Exception {
FileUtils.deleteFile(testDir);
Files.createDirectories(testDir);
- pgServer = PostgresqlServer.start("migration", "test", "secret");
-
- var postgresURL = pgServer.getDatabaseUrl();
- PgConnectOptions pgConnectOptions = PgConnectOptions.fromUri(postgresURL);
- PoolOptions pgPoolOptions = new PoolOptions().setMaxSize(8);
- postgres = PgBuilder.pool()
- .with(pgPoolOptions)
- .connectingTo(pgConnectOptions)
- .using(vertx)
- .build();
- databases.add(Arguments.of("postgres", postgres));
+ try {
+ pgServer = PostgresqlServer.start("migration", "test", "secret");
+ } catch (Exception e) {
+ System.err.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
+ System.err.println("Start PostgreSQL container failed: " + e.getMessage());
+ System.err.println("Check your Docker installation.");
+ System.err.println("Skipping Postgres tests.");
+ System.err.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
+ }
+
+ if (pgServer != null) {
+ var postgresURL = pgServer.getDatabaseUrl();
+ PgConnectOptions pgConnectOptions = PgConnectOptions.fromUri(postgresURL);
+ PoolOptions pgPoolOptions = new PoolOptions().setMaxSize(8);
+ postgres = PgBuilder.pool()
+ .with(pgPoolOptions)
+ .connectingTo(pgConnectOptions)
+ .using(vertx)
+ .build();
+ databases.add(Arguments.of("postgres", postgres));
+ }
var sqliteURL = "jdbc:sqlite:" + testDir.resolve("test.db");
JDBCConnectOptions sqliteConnectOptions = new JDBCConnectOptions()
@@ -71,8 +81,11 @@ static void setup(Vertx vertx, VertxTestContext context) throws Exception {
@AfterAll
static void teardown(VertxTestContext context) throws Exception {
- Future.all(postgres.close(), sqlite.close()).onComplete(ar -> {
- pgServer.stop();
+ Future f1 = postgres != null ? postgres.close().onComplete(ar -> pgServer.stop())
+ : Future.succeededFuture();
+ Future f2 = sqlite != null ? sqlite.close() : Future.succeededFuture();
+
+ Future.all(f1, f2).onComplete(ar -> {
context.completeNow();
});
}
diff --git a/cmds/src/main/java/io/bosonnetwork/launcher/Main.java b/cmds/src/main/java/io/bosonnetwork/launcher/Main.java
index c448aed..ecf4f58 100644
--- a/cmds/src/main/java/io/bosonnetwork/launcher/Main.java
+++ b/cmds/src/main/java/io/bosonnetwork/launcher/Main.java
@@ -137,7 +137,7 @@ private static void loadService(ServiceConfig serviceConfig) {
Path dataPath = config.dataDir() == null ? null : config.dataDir().resolve(svc.getId()).toAbsolutePath();
ServiceContext ctx = new DefaultServiceContext(vertx, node,
ClientAuthenticator.allowAll(), ClientAuthorizer.noop(),
- FederationAuthenticator.allowAll(), null, serviceConfig.configuration, dataPath);
+ FederationAuthenticator.allowAll(), null, null, serviceConfig.configuration, dataPath);
svc.init(ctx);
System.out.format("Service %s[%s] is loaded.\n", svc.getName(), serviceConfig.className);
diff --git a/dht/src/test/java/io/bosonnetwork/kademlia/storage/DataStorageTests.java b/dht/src/test/java/io/bosonnetwork/kademlia/storage/DataStorageTests.java
index bc86539..36b8b48 100644
--- a/dht/src/test/java/io/bosonnetwork/kademlia/storage/DataStorageTests.java
+++ b/dht/src/test/java/io/bosonnetwork/kademlia/storage/DataStorageTests.java
@@ -89,14 +89,25 @@ static void setupDataStorage(Vertx vertx, VertxTestContext context) {
var futures = new ArrayList>();
- pgServer = PostgresqlServer.start("boson_node", "test", "secret");
- var postgresqlURI = pgServer.getDatabaseUrl();
- postgresStorage = new PostgresStorage(postgresqlURI);
- var future1 = postgresStorage.initialize(vertx, valueExpiration, peerInfoExpiration).onComplete(context.succeeding(version -> {
- context.verify(() -> assertEquals(CURRENT_SCHEMA_VERSION, version));
- dataStorages.add(Arguments.of("PostgresStorage", postgresStorage));
- }));
- futures.add(future1);
+ try {
+ pgServer = PostgresqlServer.start("boson_node", "test", "secret");
+ } catch (Exception e) {
+ System.err.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
+ System.err.println("Start PostgreSQL container failed: " + e.getMessage());
+ System.err.println("Check your Docker installation.");
+ System.err.println("Skipping Postgres tests.");
+ System.err.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
+ }
+
+ if (pgServer != null) {
+ var postgresqlURI = pgServer.getDatabaseUrl();
+ postgresStorage = new PostgresStorage(postgresqlURI);
+ var future1 = postgresStorage.initialize(vertx, valueExpiration, peerInfoExpiration).onComplete(context.succeeding(version -> {
+ context.verify(() -> assertEquals(CURRENT_SCHEMA_VERSION, version));
+ dataStorages.add(Arguments.of("PostgresStorage", postgresStorage));
+ }));
+ futures.add(future1);
+ }
var sqliteURI = "jdbc:sqlite:" + testDir.resolve("storage.db");
sqliteStorage = new SQLiteStorage(sqliteURI);
@@ -139,7 +150,8 @@ static void tearDown(Vertx vertx, VertxTestContext context) {
}
Future.all(futures).onComplete(context.succeeding(result -> {
- pgServer.stop();
+ if (pgServer != null)
+ pgServer.stop();
context.completeNow();
}));
}