diff --git a/api/src/main/java/io/bosonnetwork/DefaultNodeConfiguration.java b/api/src/main/java/io/bosonnetwork/DefaultNodeConfiguration.java index 315829b..e552576 100644 --- a/api/src/main/java/io/bosonnetwork/DefaultNodeConfiguration.java +++ b/api/src/main/java/io/bosonnetwork/DefaultNodeConfiguration.java @@ -299,7 +299,7 @@ public static DefaultNodeConfiguration fromMap(Map map) { config.host4 = m.getString("host4", config.host4); config.host6 = m.getString("host6", config.host6); - if (config.host4 == null || config.host4.isEmpty() && config.host6 == null || config.host6.isEmpty()) + if ((config.host4 == null || config.host4.isEmpty()) && (config.host6 == null || config.host6.isEmpty())) throw new IllegalArgumentException("Missing host4 or host6"); config.port = m.getPort("port", config.port); @@ -367,7 +367,7 @@ public Map toMap() { map.put("host6", host6); map.put("port", port); - map.put("privateKey", privateKey); + map.put("privateKey", Base58.encode(privateKey.bytes())); if (dataDir != null) map.put("dataDir", dataDir); diff --git a/api/src/main/java/io/bosonnetwork/database/VertxDatabase.java b/api/src/main/java/io/bosonnetwork/database/VertxDatabase.java index e9bf4ff..555661f 100644 --- a/api/src/main/java/io/bosonnetwork/database/VertxDatabase.java +++ b/api/src/main/java/io/bosonnetwork/database/VertxDatabase.java @@ -274,6 +274,10 @@ default boolean hasEffectedRows(SqlResult result) { * @return a future completing when the connection is closed */ default Future close() { - return getClient().close(); + SqlClient client = getClient(); + if (client == null) + return Future.succeededFuture(); + + return client.close(); } } \ No newline at end of file diff --git a/api/src/main/java/io/bosonnetwork/utils/AddressUtils.java b/api/src/main/java/io/bosonnetwork/utils/AddressUtils.java index 259e261..1550aa7 100644 --- a/api/src/main/java/io/bosonnetwork/utils/AddressUtils.java +++ b/api/src/main/java/io/bosonnetwork/utils/AddressUtils.java @@ -32,9 +32,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.NetworkInterface; -import java.net.ProtocolFamily; import java.net.SocketException; -import java.net.StandardProtocolFamily; import java.net.URL; import java.net.UnknownHostException; import java.util.ArrayList; @@ -58,7 +56,7 @@ public class AddressUtils { // IPv4 Bogon ranges (excludes ranges covered by InetAddress methods) // Reference: RFC 1918, RFC 6890 private static final String[] IPV4_BOGON_RANGES = { - // "0.0.0.0/8", // Any local + // "0.0.0.0/8", // Any local // "10.0.0.0/8", // Site local "100.64.0.0/10", // Private network - shared address space (RFC 6598) // "127.0.0.0/8", // Loopback @@ -555,10 +553,9 @@ else if (type == Inet6Address.class) * @throws IllegalArgumentException if the type is not supported */ public static InetAddress getDefaultRouteAddress(Class type) { - InetAddress target = null; - ProtocolFamily family = type == Inet6Address.class ? StandardProtocolFamily.INET6 : StandardProtocolFamily.INET; - try (DatagramSocket socket = new DatagramSocket()) { + InetAddress target; + if (type == Inet4Address.class) target = InetAddress.getByAddress(new byte[]{8, 8, 8, 8}); else if (type == Inet6Address.class) diff --git a/api/src/main/java/io/bosonnetwork/utils/ConfigMap.java b/api/src/main/java/io/bosonnetwork/utils/ConfigMap.java index 08e3460..01be59c 100644 --- a/api/src/main/java/io/bosonnetwork/utils/ConfigMap.java +++ b/api/src/main/java/io/bosonnetwork/utils/ConfigMap.java @@ -31,6 +31,8 @@ import java.util.Objects; import java.util.Set; +import io.bosonnetwork.Id; + /** * A wrapper around a {@code Map} that provides type-safe configuration value retrieval. *

@@ -375,7 +377,6 @@ else if (val instanceof String s) return port; } - /** * Retrieves a valid port number for the specified key, or returns a default value if the key is not present. *

@@ -396,6 +397,49 @@ public int getPort(String key, int def) { return port; } + /** + * Retrieves the Id associated with the specified key from the map. + * The key must not be null, and the corresponding value in the map must be a valid String + * representation of an Id. If the key is not present or the value is invalid, an exception + * is thrown. + * + * @param key the key whose associated Id is to be retrieved + * @return the Id associated with the specified key + * @throws NullPointerException if the key is null + * @throws IllegalArgumentException if the key is not present in the map, or if the + * value associated with the key is not a valid String representation of an Id + */ + public Id getId(String key) { + Objects.requireNonNull(key); + Object val = map.get(key); + if (val == null) + throw new IllegalArgumentException("Missing Id value - " + key); + else if (val instanceof String s) + try { + return Id.of(s); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid Id value - " + key + ": " + val); + } + else + throw new IllegalArgumentException("Invalid Id value - " + key + ": " + val); + } + + /** + * Retrieves the identifier associated with the specified key. If the key is not found + * in the map, the provided default identifier is returned. + * + * @param key the key whose associated Id is to be retrieved + * @param def the default identifier to return if the key is not present in the map + * @return the identifier associated with the given key, or the default identifier if the key is not found + * @throws NullPointerException if the key is null + * @throws IllegalArgumentException if the value associated with the key is not a valid String + * representation of an Id + */ + public Id getId(String key, Id def) { + Objects.requireNonNull(key); + return map.containsKey(key) ? getId(key) : def; + } + /** * Retrieves a nested configuration object for the specified key. *

diff --git a/api/src/main/java/io/bosonnetwork/utils/FileUtils.java b/api/src/main/java/io/bosonnetwork/utils/FileUtils.java index d3f0c45..a741b35 100644 --- a/api/src/main/java/io/bosonnetwork/utils/FileUtils.java +++ b/api/src/main/java/io/bosonnetwork/utils/FileUtils.java @@ -98,7 +98,7 @@ public static Path normalizePath(Path path) { } } - return path; + return path == null ? null : path.toAbsolutePath(); } /** diff --git a/api/src/main/java/io/bosonnetwork/utils/Pair.java b/api/src/main/java/io/bosonnetwork/utils/Pair.java index 8cc73c9..6b245d3 100644 --- a/api/src/main/java/io/bosonnetwork/utils/Pair.java +++ b/api/src/main/java/io/bosonnetwork/utils/Pair.java @@ -23,6 +23,7 @@ package io.bosonnetwork.utils; +import java.util.Objects; import java.util.function.Function; /** @@ -38,25 +39,25 @@ public class Pair { /** * Create a value pair object from the given values. * - * @param type for value a. - * @param type for value b. * @param a value a. * @param b value b. - * @return the new Pair object. */ - public static Pair of(C a, D b) { - return new Pair(a, b); + public Pair(A a, B b) { + this.a = a; + this.b = b; } /** * Create a value pair object from the given values. * + * @param type for value a. + * @param type for value b. * @param a value a. * @param b value b. + * @return the new Pair object. */ - public Pair(A a, B b) { - this.a = a; - this.b = b; + public static Pair of(A1 a, B1 b) { + return new Pair<>(a, b); } /** @@ -77,6 +78,22 @@ public B b() { return b; } + @Override + public int hashCode() { + return Objects.hash(a, b); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + + if (obj instanceof Pair that) + return Objects.equals(this.a, that.a) && Objects.equals(this.b, that.b); + + return false; + } + @Override public String toString() { StringBuilder repr = new StringBuilder(); diff --git a/api/src/main/java/io/bosonnetwork/utils/Quadruple.java b/api/src/main/java/io/bosonnetwork/utils/Quadruple.java new file mode 100644 index 0000000..386c83a --- /dev/null +++ b/api/src/main/java/io/bosonnetwork/utils/Quadruple.java @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2023 - bosonnetwork.io + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.bosonnetwork.utils; + +import java.util.Objects; +import java.util.function.Function; + +/** + * This class is a simple holder for a quadruple of values. + *

+ * It follows the same conventions as {@link Pair} and {@link Triple}, but stores four values instead of two/three. + * + * @param type for value a. + * @param type for value b. + * @param type for value c. + * @param type for value d. + */ +public class Quadruple { + private final A a; + private final B b; + private final C c; + private final D d; + + /** + * Create a value quadruple object from the given values. + * + * @param a value a. + * @param b value b. + * @param c value c. + * @param d value d. + */ + public Quadruple(A a, B b, C c, D d) { + this.a = a; + this.b = b; + this.c = c; + this.d = d; + } + + /** + * Create a value quadruple object from the given values. + * + * @param type for value a. + * @param type for value b. + * @param type for value c. + * @param type for value d. + * @param a value a. + * @param b value b. + * @param c value c. + * @param d value d. + * @return the new Quadruple object. + */ + public static Quadruple of(A1 a, B1 b, C1 c, D1 d) { + return new Quadruple<>(a, b, c, d); + } + + /** + * Gets the value a from the quadruple object. + * + * @return the value a. + */ + public A a() { + return a; + } + + /** + * Gets the value b from the quadruple object. + * + * @return the value b. + */ + public B b() { + return b; + } + + /** + * Gets the value c from the quadruple object. + * + * @return the value c. + */ + public C c() { + return c; + } + + /** + * Gets the value d from the quadruple object. + * + * @return the value d. + */ + public D d() { + return d; + } + + @Override + public int hashCode() { + return Objects.hash(a, b, c, d); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + + if (obj instanceof Quadruple that) + return Objects.equals(this.a, that.a) + && Objects.equals(this.b, that.b) + && Objects.equals(this.c, that.c) + && Objects.equals(this.d, that.d); + + return false; + } + + @Override + public String toString() { + StringBuilder repr = new StringBuilder(); + + Function valueOf = (v) -> + (v instanceof String) ? "\"" + v + "\"" : String.valueOf(v); + + repr.append("<") + .append(valueOf.apply(a)) + .append(", ") + .append(valueOf.apply(b)) + .append(", ") + .append(valueOf.apply(c)) + .append(", ") + .append(valueOf.apply(d)) + .append(">"); + + return repr.toString(); + } +} \ No newline at end of file diff --git a/api/src/main/java/io/bosonnetwork/utils/Triple.java b/api/src/main/java/io/bosonnetwork/utils/Triple.java new file mode 100644 index 0000000..5969aea --- /dev/null +++ b/api/src/main/java/io/bosonnetwork/utils/Triple.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2023 - bosonnetwork.io + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.bosonnetwork.utils; + +import java.util.Objects; +import java.util.function.Function; + +/** + * This class is a simple holder for a triple of values. + *

+ * It follows the same conventions as {@link Pair}, but stores three values instead of two. + * + * @param type for value a. + * @param type for value b. + * @param type for value c. + */ +public class Triple { + private final A a; + private final B b; + private final C c; + + /** + * Create a value triple object from the given values. + * + * @param a value a. + * @param b value b. + * @param c value c. + */ + public Triple(A a, B b, C c) { + this.a = a; + this.b = b; + this.c = c; + } + + /** + * Create a value triple object from the given values. + * + * @param type for value a. + * @param type for value b. + * @param type for value c. + * @param a value a. + * @param b value b. + * @param c value c. + * @return the new Triple object. + */ + public static Triple of(A1 a, B1 b, C1 c) { + return new Triple<>(a, b, c); + } + + /** + * Gets the value a from the triple object. + * + * @return the value a. + */ + public A a() { + return a; + } + + /** + * Gets the value b from the triple object. + * + * @return the value b. + */ + public B b() { + return b; + } + + /** + * Gets the value c from the triple object. + * + * @return the value c. + */ + public C c() { + return c; + } + + @Override + public int hashCode() { + return Objects.hash(a, b, c); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + + if (obj instanceof Triple that) + return Objects.equals(this.a, that.a) && Objects.equals(this.b, that.b) && Objects.equals(this.c, that.c); + + return false; + } + + @Override + public String toString() { + StringBuilder repr = new StringBuilder(); + + Function valueOf = (v) -> + (v instanceof String) ? "\"" + v + "\"" : String.valueOf(v); + + repr.append("<") + .append(valueOf.apply(a)) + .append(", ") + .append(valueOf.apply(b)) + .append(", ") + .append(valueOf.apply(c)) + .append(">"); + + return repr.toString(); + } +} \ No newline at end of file diff --git a/api/src/main/java/io/bosonnetwork/vertx/BosonVerticle.java b/api/src/main/java/io/bosonnetwork/vertx/BosonVerticle.java index 6549521..572f205 100644 --- a/api/src/main/java/io/bosonnetwork/vertx/BosonVerticle.java +++ b/api/src/main/java/io/bosonnetwork/vertx/BosonVerticle.java @@ -211,7 +211,7 @@ public final Future deploy(Context context) throws Exception { /** * Internal helper method to simulate undeployment under Vert.x 5.x’s {@code Deployable} interface. *

- * This should not be called directly by user code. It is used by Vert.x internals or + * User code should not call this directly. It is used by Vert.x internals or * integration layers that work with Vert.x 5.x’s deployment model. *

* diff --git a/api/src/main/java/io/bosonnetwork/vertx/Pump.java b/api/src/main/java/io/bosonnetwork/vertx/Pump.java new file mode 100644 index 0000000..f468fa2 --- /dev/null +++ b/api/src/main/java/io/bosonnetwork/vertx/Pump.java @@ -0,0 +1,235 @@ +/* + * Copyright (c) 2023 - bosonnetwork.io + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.bosonnetwork.vertx; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.VertxException; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.Pipe; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; + +/** + * Pumper that reads data from a {@link ReadStream} and writes it to a {@link WriteStream}. + *

+ * It handles back-pressure by pausing the source when the destination's write queue is full + * and resuming it when it's drained. It also supports optional byte limits and configurable + * behavior for closing the destination stream on completion or failure. + */ +public class Pump implements Pipe { + private final Promise result; + private final ReadStream src; + private boolean endOnSuccess = true; + private boolean endOnFailure = true; + private long bytesLimit = 0; + private long bytesPumped = 0; + private WriteStream dst; + + /** + * Creates a new Pump for the given source stream. + * + * @param src the source stream to read from + */ + public Pump(ReadStream src) { + this.src = src; + this.result = Promise.promise(); + + // Set handlers now + src.endHandler(result::tryComplete); + src.exceptionHandler(result::tryFail); + } + + /** + * Sets a limit on the maximum number of bytes to pump. + * If the limit is exceeded, the pump fails with a {@link WriteException}. + * + * @param bytesLimit the maximum bytes to pump (must be >= 0) + * @return a reference to this pump for chaining + * @throws IllegalArgumentException if bytesLimit is negative + */ + public synchronized Pump bytesLimit(long bytesLimit) { + if (bytesLimit < 0) + throw new IllegalArgumentException("bytesLimit must be >= 0"); + + this.bytesLimit = bytesLimit; + return this; + } + + /** + * Configures whether to end the destination stream when the source stream fails. + * + * @param end true to end the destination on failure, false otherwise + * @return a reference to this pump for chaining + */ + @Override + public synchronized Pump endOnFailure(boolean end) { + endOnFailure = end; + return this; + } + + /** + * Configures whether to end the destination stream when the source stream completes successfully. + * + * @param end true to end the destination on success, false otherwise + * @return a reference to this pump for chaining + */ + @Override + public synchronized Pump endOnSuccess(boolean end) { + endOnSuccess = end; + return this; + } + + /** + * Configures whether to end the destination stream when the source stream completes (success or failure). + * + * @param end true to end the destination on completion, false otherwise + * @return a reference to this pump for chaining + */ + @Override + public synchronized Pump endOnComplete(boolean end) { + endOnSuccess = end; + endOnFailure = end; + return this; + } + + private void handleWriteResult(AsyncResult ack) { + if (ack.failed()) { + result.tryFail(new WriteException(ack.cause())); + } + } + + /** + * Starts the pumping process to the specified destination. + * + * @param ws the destination write stream + * @return a future that completes when the pumping is finished + * @throws NullPointerException if the destination is null + * @throws IllegalStateException if a destination has already been set + */ + @Override + public Future to(WriteStream ws) { + Promise promise = Promise.promise(); + if (ws == null) { + throw new NullPointerException(); + } + synchronized (this) { + if (dst != null) { + throw new IllegalStateException(); + } + dst = ws; + } + Handler drainHandler = v -> src.resume(); + src.handler(item -> { + bytesPumped += item.length(); + if (bytesLimit > 0 && bytesPumped > bytesLimit) { + result.tryFail(new WriteException("Pumped bytes limit exceeded")); + return; + } + + ws.write(item).onComplete(this::handleWriteResult); + if (ws.writeQueueFull()) { + src.pause(); + ws.drainHandler(drainHandler); + } + }); + src.resume(); + result.future().onComplete(ar -> { + try { + src.handler(null); + } catch (Exception ignore) { + } + try { + src.exceptionHandler(null); + } catch (Exception ignore) { + } + try { + src.endHandler(null); + } catch (Exception ignore) { + } + if (ar.succeeded()) { + handleSuccess(promise); + } else { + Throwable err = ar.cause(); + if (err instanceof WriteException) { + src.resume(); + err = err.getCause(); + } + handleFailure(err, promise); + } + }); + return promise.future(); + } + + private void handleSuccess(Promise promise) { + if (endOnSuccess) { + dst.end().onComplete(promise); + } else { + promise.complete(); + } + } + + private void handleFailure(Throwable cause, Promise completionHandler) { + if (endOnFailure){ + dst + .end() + .transform(ar -> Future.failedFuture(cause)) + .onComplete(completionHandler); + } else { + completionHandler.fail(cause); + } + } + + /** + * Closes the pump, detaching handlers from the source and destination. + * This will fail the result future if it's not already completed. + */ + public void close() { + synchronized (this) { + src.exceptionHandler(null); + src.handler(null); + if (dst != null) { + dst.drainHandler(null); + dst.exceptionHandler(null); + } + } + VertxException err = new VertxException("Pipe closed", true); + if (result.tryFail(err)) { + src.resume(); + } + } + + private static class WriteException extends VertxException { + private static final long serialVersionUID = 5995527496327696671L; + + private WriteException(String message) { + super(message, true); + } + + private WriteException(Throwable cause) { + super(cause, true); + } + } +} \ No newline at end of file diff --git a/api/src/main/java/io/bosonnetwork/vertx/VertxFuture.java b/api/src/main/java/io/bosonnetwork/vertx/VertxFuture.java index 2cc9163..4197e49 100644 --- a/api/src/main/java/io/bosonnetwork/vertx/VertxFuture.java +++ b/api/src/main/java/io/bosonnetwork/vertx/VertxFuture.java @@ -22,6 +22,9 @@ package io.bosonnetwork.vertx; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -858,6 +861,54 @@ public CompletionStage minimalCompletionStage() { return new MinimalStage<>(future); } + /** + * Returns a new VertxFuture that is completed when all the given futures complete. + * + * @param futures the futures to wait for + * @return a new VertxFuture that is completed when all the given futures complete + */ + public static VertxFuture allOf(VertxFuture... futures) { + List> vfs = Arrays.stream(futures).map(f -> f.future).toList(); + Future cf = Future.all(vfs).mapEmpty(); + return of(cf); + } + + /** + * Returns a new VertxFuture that is completed when all the given futures complete. + * + * @param futures the collection of futures to wait for + * @return a new VertxFuture that is completed when all the given futures complete + */ + public static VertxFuture allOf(Collection> futures) { + List> vfs = futures.stream().map(f -> f.future).toList(); + Future cf = Future.all(vfs).mapEmpty(); + return of(cf); + } + + /** + * Returns a new VertxFuture that is completed when any of the given futures succeed. + * + * @param futures the futures to wait for + * @return a new VertxFuture that is completed when any of the given futures succeed + */ + public static VertxFuture anyOf(VertxFuture... futures) { + List> vfs = Arrays.stream(futures).map(f -> f.future).toList(); + Future cf = Future.any(vfs).mapEmpty(); + return of(cf); + } + + /** + * Returns a new VertxFuture that is completed when any of the given futures succeed. + * + * @param futures the collection of futures to wait for + * @return a new VertxFuture that is completed when any of the given futures succeed + */ + public static VertxFuture anyOf(Collection> futures) { + List> vfs = futures.stream().map(f -> f.future).toList(); + Future cf = Future.any(vfs).mapEmpty(); + return of(cf); + } + /** * A reduced view of VertxFuture that exposes only {@link CompletionStage} operations, * disabling mutation methods such as {@code complete()}, {@code cancel()}, etc. diff --git a/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java b/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java index 18dbb09..f7f2e6a 100644 --- a/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java +++ b/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java @@ -85,7 +85,7 @@ static Stream testDatabaseProvider() { @MethodSource("testDatabaseProvider") @Timeout(value = 2, timeUnit = TimeUnit.MINUTES) void testMigrate(String name, SqlClient client, VertxTestContext context) { - Path schemaPath = Path.of(getClass().getClassLoader().getResource("db/" + name).getPath()); + Path schemaPath = Path.of(getClass().getResource("/db/schema_test/" + name).getPath()); VersionedSchema schema = VersionedSchema.init(client, schemaPath); schema.migrate().onComplete(context.succeeding(v -> { diff --git a/api/src/test/resources/db/postgres/001_init_schema.sql b/api/src/test/resources/db/schema_test/postgres/001_init_schema.sql similarity index 100% rename from api/src/test/resources/db/postgres/001_init_schema.sql rename to api/src/test/resources/db/schema_test/postgres/001_init_schema.sql diff --git a/api/src/test/resources/db/postgres/002_add_index.sql b/api/src/test/resources/db/schema_test/postgres/002_add_index.sql similarity index 100% rename from api/src/test/resources/db/postgres/002_add_index.sql rename to api/src/test/resources/db/schema_test/postgres/002_add_index.sql diff --git a/api/src/test/resources/db/postgres/003_insert_sample_data.sql b/api/src/test/resources/db/schema_test/postgres/003_insert_sample_data.sql similarity index 100% rename from api/src/test/resources/db/postgres/003_insert_sample_data.sql rename to api/src/test/resources/db/schema_test/postgres/003_insert_sample_data.sql diff --git a/api/src/test/resources/db/postgres/004_add_profile_table.sql b/api/src/test/resources/db/schema_test/postgres/004_add_profile_table.sql similarity index 100% rename from api/src/test/resources/db/postgres/004_add_profile_table.sql rename to api/src/test/resources/db/schema_test/postgres/004_add_profile_table.sql diff --git a/api/src/test/resources/db/postgres/005_add_last_login_column.sql b/api/src/test/resources/db/schema_test/postgres/005_add_last_login_column.sql similarity index 100% rename from api/src/test/resources/db/postgres/005_add_last_login_column.sql rename to api/src/test/resources/db/schema_test/postgres/005_add_last_login_column.sql diff --git a/api/src/test/resources/db/postgres/006_normalize_email_case.sql b/api/src/test/resources/db/schema_test/postgres/006_normalize_email_case.sql similarity index 100% rename from api/src/test/resources/db/postgres/006_normalize_email_case.sql rename to api/src/test/resources/db/schema_test/postgres/006_normalize_email_case.sql diff --git a/api/src/test/resources/db/postgres/007_add_message_likes_table.sql b/api/src/test/resources/db/schema_test/postgres/007_add_message_likes_table.sql similarity index 100% rename from api/src/test/resources/db/postgres/007_add_message_likes_table.sql rename to api/src/test/resources/db/schema_test/postgres/007_add_message_likes_table.sql diff --git a/api/src/test/resources/db/postgres/008_add_view_user_messages_view.sql b/api/src/test/resources/db/schema_test/postgres/008_add_view_user_messages_view.sql similarity index 100% rename from api/src/test/resources/db/postgres/008_add_view_user_messages_view.sql rename to api/src/test/resources/db/schema_test/postgres/008_add_view_user_messages_view.sql diff --git a/api/src/test/resources/db/postgres/009_add_audit_log_table.sql b/api/src/test/resources/db/schema_test/postgres/009_add_audit_log_table.sql similarity index 100% rename from api/src/test/resources/db/postgres/009_add_audit_log_table.sql rename to api/src/test/resources/db/schema_test/postgres/009_add_audit_log_table.sql diff --git a/api/src/test/resources/db/postgres/010_add_trigger.sql b/api/src/test/resources/db/schema_test/postgres/010_add_trigger.sql similarity index 100% rename from api/src/test/resources/db/postgres/010_add_trigger.sql rename to api/src/test/resources/db/schema_test/postgres/010_add_trigger.sql diff --git a/api/src/test/resources/db/sqlite/001_init_schema.sql b/api/src/test/resources/db/schema_test/sqlite/001_init_schema.sql similarity index 100% rename from api/src/test/resources/db/sqlite/001_init_schema.sql rename to api/src/test/resources/db/schema_test/sqlite/001_init_schema.sql diff --git a/api/src/test/resources/db/sqlite/002_add_index.sql b/api/src/test/resources/db/schema_test/sqlite/002_add_index.sql similarity index 100% rename from api/src/test/resources/db/sqlite/002_add_index.sql rename to api/src/test/resources/db/schema_test/sqlite/002_add_index.sql diff --git a/api/src/test/resources/db/sqlite/003_insert_sample_data.sql b/api/src/test/resources/db/schema_test/sqlite/003_insert_sample_data.sql similarity index 100% rename from api/src/test/resources/db/sqlite/003_insert_sample_data.sql rename to api/src/test/resources/db/schema_test/sqlite/003_insert_sample_data.sql diff --git a/api/src/test/resources/db/sqlite/004_add_profile_table.sql b/api/src/test/resources/db/schema_test/sqlite/004_add_profile_table.sql similarity index 100% rename from api/src/test/resources/db/sqlite/004_add_profile_table.sql rename to api/src/test/resources/db/schema_test/sqlite/004_add_profile_table.sql diff --git a/api/src/test/resources/db/sqlite/005_add_last_login_column.sql b/api/src/test/resources/db/schema_test/sqlite/005_add_last_login_column.sql similarity index 100% rename from api/src/test/resources/db/sqlite/005_add_last_login_column.sql rename to api/src/test/resources/db/schema_test/sqlite/005_add_last_login_column.sql diff --git a/api/src/test/resources/db/sqlite/006_normalize_email_case.sql b/api/src/test/resources/db/schema_test/sqlite/006_normalize_email_case.sql similarity index 100% rename from api/src/test/resources/db/sqlite/006_normalize_email_case.sql rename to api/src/test/resources/db/schema_test/sqlite/006_normalize_email_case.sql diff --git a/api/src/test/resources/db/sqlite/007_add_message_likes_table.sql b/api/src/test/resources/db/schema_test/sqlite/007_add_message_likes_table.sql similarity index 100% rename from api/src/test/resources/db/sqlite/007_add_message_likes_table.sql rename to api/src/test/resources/db/schema_test/sqlite/007_add_message_likes_table.sql diff --git a/api/src/test/resources/db/sqlite/008_add_view_user_messages_view.sql b/api/src/test/resources/db/schema_test/sqlite/008_add_view_user_messages_view.sql similarity index 100% rename from api/src/test/resources/db/sqlite/008_add_view_user_messages_view.sql rename to api/src/test/resources/db/schema_test/sqlite/008_add_view_user_messages_view.sql diff --git a/api/src/test/resources/db/sqlite/009_add_audit_log_table.sql b/api/src/test/resources/db/schema_test/sqlite/009_add_audit_log_table.sql similarity index 100% rename from api/src/test/resources/db/sqlite/009_add_audit_log_table.sql rename to api/src/test/resources/db/schema_test/sqlite/009_add_audit_log_table.sql diff --git a/api/src/test/resources/db/sqlite/010_add_trigger.sql b/api/src/test/resources/db/schema_test/sqlite/010_add_trigger.sql similarity index 100% rename from api/src/test/resources/db/sqlite/010_add_trigger.sql rename to api/src/test/resources/db/schema_test/sqlite/010_add_trigger.sql diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/KadNode.java b/dht/src/main/java/io/bosonnetwork/kademlia/KadNode.java index c21bd1a..f085353 100644 --- a/dht/src/main/java/io/bosonnetwork/kademlia/KadNode.java +++ b/dht/src/main/java/io/bosonnetwork/kademlia/KadNode.java @@ -195,7 +195,7 @@ else if (current instanceof ListenerArray listeners) } @Override - public synchronized VertxFuture start() { + public VertxFuture start() { if (this.vertx != null) return VertxFuture.failedFuture(new IllegalStateException("Already started")); diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/impl/DHT.java b/dht/src/main/java/io/bosonnetwork/kademlia/impl/DHT.java index 99fdde2..255913a 100644 --- a/dht/src/main/java/io/bosonnetwork/kademlia/impl/DHT.java +++ b/dht/src/main/java/io/bosonnetwork/kademlia/impl/DHT.java @@ -990,8 +990,10 @@ public Future findNode(Id id, LookupOption option) { runOnContext(v -> { NodeInfo node = routingTable.getEntry(id, true); - if (option == LookupOption.LOCAL) + if (option == LookupOption.LOCAL) { promise.complete(node); + return; + } if (node != null && option != LookupOption.CONSERVATIVE) { promise.complete(node); diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/storage/PostgresStorage.java b/dht/src/main/java/io/bosonnetwork/kademlia/storage/PostgresStorage.java index 2a294bf..6dca088 100644 --- a/dht/src/main/java/io/bosonnetwork/kademlia/storage/PostgresStorage.java +++ b/dht/src/main/java/io/bosonnetwork/kademlia/storage/PostgresStorage.java @@ -62,7 +62,7 @@ protected void init(Vertx vertx) { @Override protected Path getSchemaPath() { - URL schemaPath = getClass().getClassLoader().getResource("db/postgres"); + URL schemaPath = getClass().getResource("/db/kadnode/postgres"); if (schemaPath == null || schemaPath.getPath() == null) throw new IllegalStateException("Migration path not exists"); diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/storage/SQLiteStorage.java b/dht/src/main/java/io/bosonnetwork/kademlia/storage/SQLiteStorage.java index 88a758d..1b2f2a7 100644 --- a/dht/src/main/java/io/bosonnetwork/kademlia/storage/SQLiteStorage.java +++ b/dht/src/main/java/io/bosonnetwork/kademlia/storage/SQLiteStorage.java @@ -74,7 +74,7 @@ protected void init(Vertx vertx) { @Override protected Path getSchemaPath() { - URL schemaPath = getClass().getClassLoader().getResource("db/sqlite"); + URL schemaPath = getClass().getResource("/db/kadnode/sqlite"); if (schemaPath == null || schemaPath.getPath() == null) throw new IllegalStateException("Migration path not exists"); diff --git a/dht/src/main/resources/db/postgres/001_initial_schema.sql b/dht/src/main/resources/db/kadnode/postgres/001_initial_schema.sql similarity index 100% rename from dht/src/main/resources/db/postgres/001_initial_schema.sql rename to dht/src/main/resources/db/kadnode/postgres/001_initial_schema.sql diff --git a/dht/src/main/resources/db/sqlite/001_initial_schema.sql b/dht/src/main/resources/db/kadnode/sqlite/001_initial_schema.sql similarity index 100% rename from dht/src/main/resources/db/sqlite/001_initial_schema.sql rename to dht/src/main/resources/db/kadnode/sqlite/001_initial_schema.sql diff --git a/dht/src/test/java/io/bosonnetwork/kademlia/NodeAsyncTests.java b/dht/src/test/java/io/bosonnetwork/kademlia/NodeAsyncTests.java index 6712156..78f8834 100644 --- a/dht/src/test/java/io/bosonnetwork/kademlia/NodeAsyncTests.java +++ b/dht/src/test/java/io/bosonnetwork/kademlia/NodeAsyncTests.java @@ -5,7 +5,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import java.io.File; import java.io.IOException; @@ -16,10 +15,10 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; @@ -57,9 +56,14 @@ public class NodeAsyncTests { private static final Path testDir = Path.of(System.getProperty("java.io.tmpdir"), "boson", "NodeAsyncTests"); - private static InetAddress localAddr; + private static final InetAddress localAddr = AddressUtils.getDefaultRouteAddress(Inet4Address.class); + + private static final Vertx testingVertx =Vertx.vertx(new VertxOptions() + .setEventLoopPoolSize(32) + .setWorkerPoolSize(8) + .setBlockedThreadCheckIntervalUnit(TimeUnit.SECONDS) + .setBlockedThreadCheckInterval(120)); - private static Vertx vertx; private static KadNode bootstrap; private static final List testNodes = new ArrayList<>(TEST_NODES); @@ -67,7 +71,7 @@ private static VertxFuture startBootstrap() { System.out.println("\n\n\007🟢 Starting the bootstrap node ..."); var config = NodeConfiguration.builder() - .vertx(vertx) + .vertx(testingVertx) .address4(localAddr) .port(TEST_NODES_PORT_START - 1) .dataDir(testDir.resolve("nodes" + File.separator + "node-bootstrap")) @@ -120,7 +124,7 @@ private static VertxFuture createTestNode(int index) { System.out.format("\n\n\007🟢 Starting the node %d ...\n", index); var config = NodeConfiguration.builder() - .vertx(vertx) + .vertx(testingVertx) .address4(localAddr) .port(TEST_NODES_PORT_START + index) .dataDir(testDir.resolve("nodes" + File.separator + "node-" + index)) @@ -132,16 +136,24 @@ private static VertxFuture createTestNode(int index) { var node = new KadNode(config); testNodes.add(node); - Promise promise = Promise.promise(); + Promise promise = Promise.promise(); node.addConnectionStatusListener(new ConnectionStatusListener() { @Override public void connected(Network network) { System.out.printf("\007🟢 The node %d - %s is ready ...\n", index, node.getId()); - promise.complete(node); + promise.complete(); } }); - return node.start().thenApply(v -> node); + // WARNING: This currently causes teardown failures. + // Stopping the first node triggers undeployment of all test nodes. + // The root cause is still unknown and needs further investigation. + /*/ + node.start(); + return VertxFuture.of(promise.future()).thenApply(v -> node); + */ + // But this will work well. + return node.start().thenCompose(v -> VertxFuture.of(promise.future())).thenApply(v -> node); } private static VertxFuture startTestNodes() { @@ -174,74 +186,56 @@ private static VertxFuture dumpRoutingTable(String name, KadNode node) { } } - /*/ private static VertxFuture dumpRoutingTables() { - return dumpRoutingTable("node-bootstrap", bootstrap).thenCompose(v -> { - return executeSequentially(testNodes.size(), 0, index -> { - KadNode node = testNodes.get(index); - return dumpRoutingTable("node-" + index, node); - }); - }); - } - */ - - private static VertxFuture dumpRoutingTables() { - List> futures = new ArrayList<>(testNodes.size() + 1); - futures.add(dumpRoutingTable("node-bootstrap", bootstrap).toVertxFuture()); + List> futures = new ArrayList<>(testNodes.size() + 1); + futures.add(dumpRoutingTable("node-bootstrap", bootstrap)); for (int i = 0; i < testNodes.size(); i++) - futures.add(dumpRoutingTable("node-" + i, testNodes.get(i)).toVertxFuture()); + futures.add(dumpRoutingTable("node-" + i, testNodes.get(i))); - return VertxFuture.of(Future.all(futures).mapEmpty()); + return VertxFuture.allOf(futures); } // in Vert.x 4.5.x, not support asynchronous lifecycle on static @BeforeAll and @AfterAll methods. - // So we use synchronous method to setup and teardown to make it compatible with Vert.x 4.5.x and 5.0.x + // So we use synchronous method to do setup and teardown to make it compatible with Vert.x 4.5.x and 5.0.x @BeforeAll @Timeout(value = TEST_NODES + 1, timeUnit = TimeUnit.MINUTES) static void setup(VertxTestContext context) throws Exception { - localAddr = AddressUtils.getDefaultRouteAddress(Inet4Address.class); - - if (localAddr == null) - fail("No eligible address to run the test."); - else - System.out.println("🟢 local address: " + localAddr.getHostAddress()); - if (Files.exists(testDir)) FileUtils.deleteFile(testDir); Files.createDirectories(testDir); - vertx = Vertx.vertx(new VertxOptions() - .setEventLoopPoolSize(32) - .setWorkerPoolSize(8) - .setBlockedThreadCheckIntervalUnit(TimeUnit.SECONDS) - .setBlockedThreadCheckInterval(120)); - - startBootstrap().thenCompose(v -> startTestNodes()).toVertxFuture() - .onComplete(context.succeeding(v -> { - System.out.println("\n\n\007🟢 All the nodes are ready!!! starting to run the test cases"); - context.completeNow(); - })); + startBootstrap().thenCompose(v -> startTestNodes()).whenComplete((v, e) -> { + if (e == null) { + System.out.println("\n\n\007🟢 All the nodes are ready!!! starting to run the test cases"); + context.completeNow(); + } else { + context.failNow(e); + } + }); } @AfterAll @Timeout(value = TEST_NODES + 1, timeUnit = TimeUnit.SECONDS) - static void teardown(VertxTestContext context) throws Exception { - dumpRoutingTables().thenCompose(v -> { - return stopTestNodes(); - }).thenCompose(v -> { - return stopBootstrap(); - }).thenRun(() -> { - try { - FileUtils.deleteFile(testDir); - } catch (Exception e) { - fail(e); - } - - System.out.format("\n\n\007🟢 Test cases finished\n"); - }).toVertxFuture().onComplete(context.succeedingThenComplete()); + static void teardown(VertxTestContext context) { + dumpRoutingTables().thenCompose(v -> stopTestNodes()) + .thenCompose(v -> stopBootstrap()) + .thenRun(() -> { + try { + FileUtils.deleteFile(testDir); + } catch (Exception e) { + throw new CompletionException(e); + } + }).whenComplete((v, e) -> { + if (e == null) { + System.out.format("\n\n\007🟢 Test cases finished\n"); + context.completeNow(); + } else { + context.failNow(e); + } + }); } @Test @@ -250,7 +244,7 @@ void testNodeWithPresetKey(VertxTestContext context) { Id nodeId = Id.of(keypair.publicKey().bytes()); var config = NodeConfiguration.builder() - .vertx(vertx) + .vertx(testingVertx) .address4(localAddr) .port(TEST_NODES_PORT_START - 100) .privateKey(keypair.privateKey().bytes()) diff --git a/dht/src/test/java/io/bosonnetwork/kademlia/rpc/RPCServerTests.java b/dht/src/test/java/io/bosonnetwork/kademlia/rpc/RPCServerTests.java index 3ee0795..10a27e7 100644 --- a/dht/src/test/java/io/bosonnetwork/kademlia/rpc/RPCServerTests.java +++ b/dht/src/test/java/io/bosonnetwork/kademlia/rpc/RPCServerTests.java @@ -72,6 +72,7 @@ public class RPCServerTests { private static final Faker faker = new Faker(); + @SuppressWarnings("ConstantConditions") private static final String localAddr = AddressUtils.getDefaultRouteAddress(Inet4Address.class).getHostAddress(); private final static Map values = new HashMap<>();