getDatabaseProductName() {
}
/**
- * Creates a simple text query using the underlying client.
+ * Performs per-connection initialization before executing any SQL.
+ *
+ * This method is invoked every time a {@link SqlConnection} is acquired
+ * (both for pooled and non-pooled clients), before any queries or
+ * transactions are executed.
+ *
+ * Typical use cases include:
+ *
+ * - Setting PostgreSQL {@code search_path}
+ * - Configuring session variables
+ * - Applying tenant- or schema-specific settings
+ *
+ *
+ * If this method fails, the associated operation or transaction will fail.
+ * Implementations should not suppress errors.
+ *
+ * Implementations must be idempotent.
*
- * @param sql SQL text to execute
- * @return a Vert.x {@link Query} for the provided SQL
+ * @param connection the connection to prepare
+ * @return a future completing when initialization is complete
*/
- default Query> query(String sql) {
- return getClient().query(sql);
+ default Future prepareConnection(SqlConnection connection) {
+ return Future.succeededFuture();
}
- /**
- * Creates a prepared query using the underlying client.
- *
- * @param sql SQL text with placeholders
- * @return a Vert.x {@link PreparedQuery} for the provided SQL
- */
- default PreparedQuery> preparedQuery(String sql) {
- return getClient().preparedQuery(sql);
+ private BiFunction> wrapped(Function function) {
+ return (c, t) -> prepareConnection(c).map(v -> function.apply(t));
+ }
+
+ private Function> wrappedAsync(Function> function) {
+ return c -> prepareConnection(c).compose(v -> function.apply(c));
}
/**
@@ -105,9 +118,9 @@ default PreparedQuery> preparedQuery(String sql) {
*/
default Future withTransaction(Function> function) {
if (getClient() instanceof Pool p) {
- return p.withTransaction(function);
- } else if (getClient() instanceof SqlConnection c) {
- return withTransaction(c, function);
+ return p.withTransaction(c -> wrappedAsync(function).apply(c));
+ } else if (getClient() instanceof SqlConnection connection) {
+ return withTransaction(connection, c -> wrappedAsync(function).apply(c));
} else {
return Future.failedFuture(new IllegalStateException("Client must be an instance of SqlConnection or Pool"));
}
@@ -141,11 +154,9 @@ private Future withTransaction(SqlConnection connection, Function Future withConnection(Function> function) {
if (getClient() instanceof SqlConnection c) {
- return function.apply(c);
+ return wrappedAsync(function).apply(c);
} else if (getClient() instanceof Pool p) {
- return p.getConnection().compose(c ->
- function.apply(c).onComplete(ar -> c.close())
- );
+ return p.withConnection(c -> wrappedAsync(function).apply(c));
} else {
return Future.failedFuture(new IllegalStateException("Client must be an instance of SqlConnection or Pool"));
}
@@ -264,7 +275,7 @@ default List findMany(RowSet rowSet, Function mapper) {
* @param result the SQL result to check
* @return true if at least one row was affected, false otherwise
*/
- default boolean hasEffectedRows(SqlResult> result) {
+ default boolean hasAffectedRows(SqlResult> result) {
return result.rowCount() > 0;
}
diff --git a/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java b/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java
index fe1cf2d..de7329f 100644
--- a/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java
+++ b/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java
@@ -104,9 +104,9 @@ static Stream testDatabaseProvider() {
@Timeout(value = 2, timeUnit = TimeUnit.MINUTES)
@Order(1)
void testMigrate(String name, SqlClient client, Vertx vertx, VertxTestContext context) {
- Path schemaPath = Path.of(getClass().getResource("/db/schema_test/" + name).getPath());
+ Path migrationPath = Path.of(getClass().getResource("/db/schema_test/" + name).getPath());
- VersionedSchema schema = VersionedSchema.init(vertx, client, schemaPath);
+ VersionedSchema schema = VersionedSchema.init(vertx, client, migrationPath);
schema.migrate().onComplete(context.succeeding(v -> {
context.verify(() -> {
var sv = schema.getCurrentVersion();
@@ -121,9 +121,9 @@ void testMigrate(String name, SqlClient client, Vertx vertx, VertxTestContext co
@Test
@Order(2)
void testMigrateWithSchemaFoo(Vertx vertx, VertxTestContext context) {
- Path schemaPath = Path.of(getClass().getResource("/db/schema_test/postgres").getPath());
+ Path migrationPath = Path.of(getClass().getResource("/db/schema_test/postgres").getPath());
- VersionedSchema schema = VersionedSchema.init(vertx, postgres, "foo", schemaPath);
+ VersionedSchema schema = VersionedSchema.init(vertx, postgres, "foo", migrationPath);
schema.migrate().onComplete(context.succeeding(v -> {
context.verify(() -> {
var sv = schema.getCurrentVersion();
@@ -138,9 +138,9 @@ void testMigrateWithSchemaFoo(Vertx vertx, VertxTestContext context) {
@Test
@Order(3)
void testMigrateWithSchemaBar(Vertx vertx, VertxTestContext context) {
- Path schemaPath = Path.of(getClass().getResource("/db/schema_test/postgres").getPath());
+ Path migrationPath = Path.of(getClass().getResource("/db/schema_test/postgres").getPath());
- VersionedSchema schema = VersionedSchema.init(vertx, postgres, "bar", schemaPath);
+ VersionedSchema schema = VersionedSchema.init(vertx, postgres, "bar", migrationPath);
schema.migrate().onComplete(context.succeeding(v -> {
context.verify(() -> {
var sv = schema.getCurrentVersion();
diff --git a/cmds/src/main/java/io/bosonnetwork/shell/Main.java b/cmds/src/main/java/io/bosonnetwork/shell/Main.java
index d73005c..939b742 100644
--- a/cmds/src/main/java/io/bosonnetwork/shell/Main.java
+++ b/cmds/src/main/java/io/bosonnetwork/shell/Main.java
@@ -232,10 +232,10 @@ private void parseArgs() throws IOException {
}
if (storageURL != null) {
- builder.storageURI(storageURL);
+ builder.database(storageURL);
} else {
if (builder.hasDataDir())
- builder.storageURI("jdbc:sqlite:" + builder.dataDir().resolve("node.db"));
+ builder.database("jdbc:sqlite:" + builder.dataDir().resolve("node.db"));
}
if (!builder.hasPrivateKey())
diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/KadNode.java b/dht/src/main/java/io/bosonnetwork/kademlia/KadNode.java
index 5f40eb5..431621b 100644
--- a/dht/src/main/java/io/bosonnetwork/kademlia/KadNode.java
+++ b/dht/src/main/java/io/bosonnetwork/kademlia/KadNode.java
@@ -133,11 +133,11 @@ private void checkConfig(NodeConfiguration config) {
}
}
- if (config.storageURI() != null) {
- if (!DataStorage.supports(config.storageURI()))
- throw new IllegalArgumentException("unsupported storage URL: " + config.storageURI());
+ if (config.databaseUri() != null) {
+ if (!DataStorage.supports(config.databaseUri()))
+ throw new IllegalArgumentException("unsupported storage URL: " + config.databaseUri());
} else {
- log.warn("No storage URL is configured, in-memory storage is used");
+ throw new IllegalArgumentException("No database is configured");
}
}
@@ -229,14 +229,14 @@ public void prepare(Vertx vertx, Context context) {
public Future deploy() {
tokenManager = new TokenManager();
- String storageURI = config.storageURI();
+ String storageURI = config.databaseUri();
// fix the sqlite database file location
if (storageURI.startsWith("jdbc:sqlite:")) {
Path dbFile = Path.of(storageURI.substring("jdbc:sqlite:".length()));
if (!dbFile.isAbsolute())
storageURI = "jdbc:sqlite:" + config.dataDir().resolve(dbFile).toAbsolutePath();
}
- storage = DataStorage.create(storageURI);
+ storage = DataStorage.create(storageURI, config.databasePoolSize(), config.databaseSchemaName());
// TODO: empty blacklist for now
blacklist = Blacklist.empty();
diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/impl/SimpleNodeConfiguration.java b/dht/src/main/java/io/bosonnetwork/kademlia/impl/SimpleNodeConfiguration.java
index 3f04a98..6dd6544 100644
--- a/dht/src/main/java/io/bosonnetwork/kademlia/impl/SimpleNodeConfiguration.java
+++ b/dht/src/main/java/io/bosonnetwork/kademlia/impl/SimpleNodeConfiguration.java
@@ -23,7 +23,9 @@ public class SimpleNodeConfiguration implements NodeConfiguration {
private final int port;
private final Signature.PrivateKey privateKey;
private final Path dataDir;
- private final String storageURL;
+ private final String databaseUri;
+ private final int databasePoolSize;
+ private final String databaseSchemaName;
private final ArrayList bootstrapNodes;
private final boolean enableSpamThrottling;
private final boolean enableSuspiciousNodeDetector;
@@ -37,7 +39,9 @@ public SimpleNodeConfiguration(NodeConfiguration config) {
this.privateKey = config.privateKey();
this.dataDir = config.dataDir() != null ? config.dataDir().toAbsolutePath() :
Path.of(System.getProperty("user.dir")).resolve("node");
- this.storageURL = config.storageURI() != null ? config.storageURI() : InMemoryStorage.STORAGE_URI;
+ this.databaseUri = config.databaseUri() != null ? config.databaseUri() : InMemoryStorage.STORAGE_URI;
+ this.databasePoolSize = config.databasePoolSize();
+ this.databaseSchemaName = config.databaseSchemaName();
this.bootstrapNodes = new ArrayList<>(config.bootstrapNodes() != null ? config.bootstrapNodes() : Collections.emptyList());
this.enableSpamThrottling = config.enableSpamThrottling();
this.enableSuspiciousNodeDetector = config.enableSuspiciousNodeDetector();
@@ -95,8 +99,18 @@ public Path dataDir() {
}
@Override
- public String storageURI() {
- return storageURL;
+ public String databaseUri() {
+ return databaseUri;
+ }
+
+ @Override
+ public int databasePoolSize() {
+ return databasePoolSize;
+ }
+
+ @Override
+ public String databaseSchemaName() {
+ return databaseSchemaName;
}
@Override
diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/storage/DataStorage.java b/dht/src/main/java/io/bosonnetwork/kademlia/storage/DataStorage.java
index 1b53787..61fe856 100644
--- a/dht/src/main/java/io/bosonnetwork/kademlia/storage/DataStorage.java
+++ b/dht/src/main/java/io/bosonnetwork/kademlia/storage/DataStorage.java
@@ -272,15 +272,15 @@ static boolean supports(String uri) {
uri.startsWith(PostgresStorage.STORAGE_URI_PREFIX);
}
- static DataStorage create(String uri) {
+ static DataStorage create(String uri, int poolSize, String schema) {
Objects.requireNonNull(uri, "url");
if (uri.equals(InMemoryStorage.STORAGE_URI))
return new InMemoryStorage();
if (uri.startsWith(SQLiteStorage.STORAGE_URI_PREFIX))
- return new SQLiteStorage(uri);
+ return new SQLiteStorage(uri, poolSize);
if (uri.startsWith(PostgresStorage.STORAGE_URI_PREFIX))
- return new PostgresStorage(uri);
+ return new PostgresStorage(uri, poolSize, schema);
throw new IllegalArgumentException("Unsupported storage: " + uri);
}
diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/storage/DatabaseStorage.java b/dht/src/main/java/io/bosonnetwork/kademlia/storage/DatabaseStorage.java
index 5d12f2b..914d870 100644
--- a/dht/src/main/java/io/bosonnetwork/kademlia/storage/DatabaseStorage.java
+++ b/dht/src/main/java/io/bosonnetwork/kademlia/storage/DatabaseStorage.java
@@ -53,7 +53,11 @@ public abstract class DatabaseStorage implements DataStorage, VertxDatabase {
protected abstract void init(Vertx vertx);
- protected abstract Path getSchemaPath();
+ protected abstract Path getMigrationPath();
+
+ protected String getSchema() {
+ return null;
+ }
protected abstract SqlDialect getDialect();
@@ -64,7 +68,7 @@ public Future initialize(Vertx vertx, long valueExpiration, long peerIn
this.valueExpiration = valueExpiration;
this.peerInfoExpiration = peerInfoExpiration;
- VersionedSchema schema = VersionedSchema.init(vertx, getClient(), getSchemaPath());
+ VersionedSchema schema = VersionedSchema.init(vertx, getClient(), getSchema(), getMigrationPath());
return schema.migrate().andThen(ar -> {
if (ar.succeeded()) {
schemaVersion = schema.getCurrentVersion().version();
@@ -165,66 +169,72 @@ public Future putValue(Value value, boolean persistent, int expectedSeque
@Override
public Future getValue(Id id) {
getLogger().debug("Getting value with id: {}", id);
- return SqlTemplate.forQuery(getClient(), getDialect().selectValueById())
- .execute(Map.of("id", id.bytes()))
- .map(rows -> findUnique(rows, DatabaseStorage::rowToValue))
- .andThen(ar -> {
- if (ar.succeeded()) {
- if (ar.result() != null)
- getLogger().debug("Got value with id: {}", id);
- else
- //noinspection LoggingSimilarMessage
- getLogger().debug("No value found with id: {}", id);
- } else {
- getLogger().error("Failed to get value with id: {}", id, ar.cause());
- }
- }).recover(cause ->
- Future.failedFuture(new DataStorageException("Database operation failed", cause))
- );
+ return withConnection(c ->
+ SqlTemplate.forQuery(c, getDialect().selectValueById())
+ .execute(Map.of("id", id.bytes()))
+ .map(rows -> findUnique(rows, DatabaseStorage::rowToValue))
+ .andThen(ar -> {
+ if (ar.succeeded()) {
+ if (ar.result() != null)
+ getLogger().debug("Got value with id: {}", id);
+ else
+ //noinspection LoggingSimilarMessage
+ getLogger().debug("No value found with id: {}", id);
+ } else {
+ getLogger().error("Failed to get value with id: {}", id, ar.cause());
+ }
+ })
+ ).recover(cause ->
+ Future.failedFuture(new DataStorageException("Database operation failed", cause))
+ );
}
@Override
public Future> getValues() {
- return query(getDialect().selectAllValues())
- .execute()
- .map(rows -> findMany(rows, DatabaseStorage::rowToValue))
- .recover(cause ->
- Future.failedFuture(new DataStorageException("Database operation failed", cause))
- );
+ return withConnection(c ->
+ c.query(getDialect().selectAllValues())
+ .execute()
+ .map(rows -> findMany(rows, DatabaseStorage::rowToValue))
+ ).recover(cause ->
+ Future.failedFuture(new DataStorageException("Database operation failed", cause))
+ );
}
@Override
public Future> getValues(int offset, int limit) {
- return SqlTemplate.forQuery(getClient(), getDialect().selectAllValuesPaginated())
- .execute(Map.of("limit", limit, "offset", offset))
- .map(rows -> findMany(rows, DatabaseStorage::rowToValue))
- .recover(cause ->
- Future.failedFuture(new DataStorageException("Database operation failed", cause))
- );
+ return withConnection(c ->
+ SqlTemplate.forQuery(c, getDialect().selectAllValuesPaginated())
+ .execute(Map.of("limit", limit, "offset", offset))
+ .map(rows -> findMany(rows, DatabaseStorage::rowToValue))
+ ).recover(cause ->
+ Future.failedFuture(new DataStorageException("Database operation failed", cause))
+ );
}
@Override
public Future> getValues(boolean persistent, long announcedBefore) {
- return SqlTemplate.forQuery(getClient(), getDialect().selectValuesByPersistentAndAnnouncedBefore())
- .execute(Map.of("persistent", persistent, "updatedBefore", announcedBefore))
- .map(rows -> findMany(rows, DatabaseStorage::rowToValue))
- .recover(cause ->
- Future.failedFuture(new DataStorageException("Database operation failed", cause))
- );
+ return withConnection(c ->
+ SqlTemplate.forQuery(c, getDialect().selectValuesByPersistentAndAnnouncedBefore())
+ .execute(Map.of("persistent", persistent, "updatedBefore", announcedBefore))
+ .map(rows -> findMany(rows, DatabaseStorage::rowToValue))
+ ).recover(cause ->
+ Future.failedFuture(new DataStorageException("Database operation failed", cause))
+ );
}
@Override
public Future> getValues(boolean persistent, long announcedBefore, int offset, int limit) {
- return SqlTemplate.forQuery(getClient(), getDialect().selectValuesByPersistentAndAnnouncedBeforePaginated())
- .execute(Map.of(
- "persistent", persistent,
- "updatedBefore", announcedBefore,
- "limit", limit,
- "offset", offset))
- .map(rows -> findMany(rows, DatabaseStorage::rowToValue))
- .recover(cause ->
- Future.failedFuture(new DataStorageException("Database operation failed", cause))
- );
+ return withConnection(c ->
+ SqlTemplate.forQuery(c, getDialect().selectValuesByPersistentAndAnnouncedBeforePaginated())
+ .execute(Map.of(
+ "persistent", persistent,
+ "updatedBefore", announcedBefore,
+ "limit", limit,
+ "offset", offset))
+ .map(rows -> findMany(rows, DatabaseStorage::rowToValue))
+ ).recover(cause ->
+ Future.failedFuture(new DataStorageException("Database operation failed", cause))
+ );
}
@Override
@@ -256,7 +266,7 @@ public Future removeValue(Id id) {
return withTransaction(c ->
SqlTemplate.forUpdate(c, getDialect().deleteValueById())
.execute(Map.of("id", id.bytes()))
- .map(this::hasEffectedRows)
+ .map(this::hasAffectedRows)
).andThen(ar -> {
if (ar.succeeded()) {
if (ar.result())
@@ -326,87 +336,95 @@ public Future> putPeers(List peerInfos) {
@Override
public Future getPeer(Id id, Id nodeId) {
getLogger().debug("Getting peer with id: {} @ {}", id, nodeId);
- return SqlTemplate.forQuery(getClient(), getDialect().selectPeerByIdAndNodeId())
- .execute(Map.of("id", id.bytes(), "nodeId", nodeId.bytes()))
- .map(rows -> findUnique(rows, DatabaseStorage::rowToPeer))
- .andThen(ar -> {
- if (ar.succeeded()) {
- if (ar.result() != null)
- getLogger().debug("Got peer with id: {} @ {}", id, nodeId);
- else
- //noinspection LoggingSimilarMessage
- getLogger().debug("No peer found with id: {} @ {}", id, nodeId);
- } else {
- getLogger().error("Failed to get peer with id: {} @ {}", id, nodeId, ar.cause());
- }
- }).recover(cause ->
- Future.failedFuture(new DataStorageException("Database operation failed", cause))
- );
+ return withConnection(c ->
+ SqlTemplate.forQuery(c, getDialect().selectPeerByIdAndNodeId())
+ .execute(Map.of("id", id.bytes(), "nodeId", nodeId.bytes()))
+ .map(rows -> findUnique(rows, DatabaseStorage::rowToPeer))
+ .andThen(ar -> {
+ if (ar.succeeded()) {
+ if (ar.result() != null)
+ getLogger().debug("Got peer with id: {} @ {}", id, nodeId);
+ else
+ //noinspection LoggingSimilarMessage
+ getLogger().debug("No peer found with id: {} @ {}", id, nodeId);
+ } else {
+ getLogger().error("Failed to get peer with id: {} @ {}", id, nodeId, ar.cause());
+ }
+ })
+ ).recover(cause ->
+ Future.failedFuture(new DataStorageException("Database operation failed", cause))
+ );
}
@Override
public Future> getPeers(Id id) {
getLogger().debug("Getting peers with id: {}", id);
- return SqlTemplate.forQuery(getClient(), getDialect().selectPeersById())
- .execute(Map.of("id", id.bytes()))
- .map(rows -> findMany(rows, DatabaseStorage::rowToPeer))
- .andThen(ar -> {
- if (ar.succeeded()) {
- if (!ar.result().isEmpty())
- getLogger().debug("Got peers with id: {}", id);
- else
- //noinspection LoggingSimilarMessage
- getLogger().debug("No peers found with id: {}", id);
- } else {
- getLogger().error("Failed to get peers with id: {}", id, ar.cause());
- }
- }).recover(cause ->
- Future.failedFuture(new DataStorageException("Database operation failed", cause))
- );
+ return withConnection(c ->
+ SqlTemplate.forQuery(c, getDialect().selectPeersById())
+ .execute(Map.of("id", id.bytes()))
+ .map(rows -> findMany(rows, DatabaseStorage::rowToPeer))
+ .andThen(ar -> {
+ if (ar.succeeded()) {
+ if (!ar.result().isEmpty())
+ getLogger().debug("Got peers with id: {}", id);
+ else
+ //noinspection LoggingSimilarMessage
+ getLogger().debug("No peers found with id: {}", id);
+ } else {
+ getLogger().error("Failed to get peers with id: {}", id, ar.cause());
+ }
+ })
+ ).recover(cause ->
+ Future.failedFuture(new DataStorageException("Database operation failed", cause))
+ );
}
@Override
public Future> getPeers() {
- return query(getDialect().selectAllPeers())
- .execute()
- .map(rows -> findMany(rows, DatabaseStorage::rowToPeer))
- .recover(cause ->
- Future.failedFuture(new DataStorageException("Database operation failed", cause))
- );
+ return withConnection(c ->
+ c.query(getDialect().selectAllPeers())
+ .execute()
+ .map(rows -> findMany(rows, DatabaseStorage::rowToPeer))
+ ).recover(cause ->
+ Future.failedFuture(new DataStorageException("Database operation failed", cause))
+ );
}
@Override
public Future> getPeers(int offset, int limit) {
- return SqlTemplate.forQuery(getClient(), getDialect().selectAllPeersPaginated())
- .execute(Map.of("limit", limit, "offset", offset))
- .map(rows -> findMany(rows, DatabaseStorage::rowToPeer))
- .recover(cause ->
- Future.failedFuture(new DataStorageException("Database operation failed", cause))
- );
+ return withConnection(c ->
+ SqlTemplate.forQuery(c, getDialect().selectAllPeersPaginated())
+ .execute(Map.of("limit", limit, "offset", offset))
+ .map(rows -> findMany(rows, DatabaseStorage::rowToPeer))
+ ).recover(cause ->
+ Future.failedFuture(new DataStorageException("Database operation failed", cause))
+ );
}
@Override
public Future> getPeers(boolean persistent, long announcedBefore) {
- return SqlTemplate.forQuery(getClient(), getDialect().selectPeersByPersistentAndAnnouncedBefore())
- .execute(Map.of("persistent", persistent, "updatedBefore", announcedBefore))
- .map(rows -> findMany(rows, DatabaseStorage::rowToPeer))
- .recover(cause ->
- Future.failedFuture(new DataStorageException("Database operation failed", cause))
- );
+ return withConnection(c ->
+ SqlTemplate.forQuery(c, getDialect().selectPeersByPersistentAndAnnouncedBefore())
+ .execute(Map.of("persistent", persistent, "updatedBefore", announcedBefore))
+ .map(rows -> findMany(rows, DatabaseStorage::rowToPeer))
+ ).recover(cause ->
+ Future.failedFuture(new DataStorageException("Database operation failed", cause))
+ );
}
@Override
public Future> getPeers(boolean persistent, long announcedBefore, int offset, int limit) {
- return SqlTemplate.forQuery(getClient(), getDialect().selectPeersByPersistentAndAnnouncedBeforePaginated())
- .execute(Map.of(
- "persistent", persistent,
- "updatedBefore", announcedBefore,
- "limit", limit,
- "offset", offset))
- .map(rows -> findMany(rows, DatabaseStorage::rowToPeer))
- .recover(cause ->
- Future.failedFuture(new DataStorageException("Database operation failed", cause))
- );
+ return withConnection(c ->
+ SqlTemplate.forQuery(c, getDialect().selectPeersByPersistentAndAnnouncedBeforePaginated())
+ .execute(Map.of(
+ "persistent", persistent,
+ "updatedBefore", announcedBefore,
+ "limit", limit,
+ "offset", offset))
+ .map(rows -> findMany(rows, DatabaseStorage::rowToPeer))
+ ).recover(cause ->
+ Future.failedFuture(new DataStorageException("Database operation failed", cause))
+ );
}
@Override
@@ -437,7 +455,7 @@ public Future removePeer(Id id, Id nodeId) {
return withTransaction(c ->
SqlTemplate.forUpdate(c, getDialect().deletePeerByIdAndNodeId())
.execute(Map.of("id", id.bytes(), "nodeId", nodeId.bytes()))
- .map(this::hasEffectedRows)
+ .map(this::hasAffectedRows)
).andThen(ar -> {
if (ar.succeeded()) {
if (ar.result())
@@ -459,7 +477,7 @@ public Future removePeers(Id id) {
return withTransaction(c ->
SqlTemplate.forUpdate(c, getDialect().deletePeersById())
.execute(Map.of("id", id.bytes()))
- .map(this::hasEffectedRows)
+ .map(this::hasAffectedRows)
).andThen(ar -> {
if (ar.succeeded()) {
if (ar.result())
diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/storage/PostgresStorage.java b/dht/src/main/java/io/bosonnetwork/kademlia/storage/PostgresStorage.java
index 6dca088..9421a26 100644
--- a/dht/src/main/java/io/bosonnetwork/kademlia/storage/PostgresStorage.java
+++ b/dht/src/main/java/io/bosonnetwork/kademlia/storage/PostgresStorage.java
@@ -25,12 +25,14 @@
import java.net.URL;
import java.nio.file.Path;
+import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.pgclient.PgBuilder;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.SqlClient;
+import io.vertx.sqlclient.SqlConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,20 +40,28 @@ public class PostgresStorage extends DatabaseStorage implements DataStorage {
protected static final String STORAGE_URI_PREFIX = "postgresql://";
private final String connectionUri;
+ private final int poolSize;
+ private final String schema;
private Pool client;
private SqlDialect sqlDialect;
private static final Logger log = LoggerFactory.getLogger(PostgresStorage.class);
- protected PostgresStorage(String connectionUri) {
+ protected PostgresStorage(String connectionUri, int poolSize, String schema) {
this.connectionUri = connectionUri;
+ this.poolSize = poolSize > 0 ? poolSize : 8;
+ this.schema = schema;
+ }
+
+ protected PostgresStorage(String connectionUri) {
+ this(connectionUri, 0, null);
}
// postgresql://[user[:password]@][host][:port][,...][/dbname][?param1=value1&...]
@Override
protected void init(Vertx vertx) {
PgConnectOptions connectOptions = PgConnectOptions.fromUri(connectionUri);
- PoolOptions poolOptions = new PoolOptions().setMaxSize(8);
+ PoolOptions poolOptions = new PoolOptions().setMaxSize(poolSize);
client = PgBuilder.pool()
.with(poolOptions)
.connectingTo(connectOptions)
@@ -61,12 +71,17 @@ protected void init(Vertx vertx) {
}
@Override
- protected Path getSchemaPath() {
- URL schemaPath = getClass().getResource("/db/kadnode/postgres");
- if (schemaPath == null || schemaPath.getPath() == null)
+ protected Path getMigrationPath() {
+ URL migrationResource = getClass().getResource("/db/kadnode/postgres");
+ if (migrationResource == null || migrationResource.getPath() == null)
throw new IllegalStateException("Migration path not exists");
- return Path.of(schemaPath.getPath());
+ return Path.of(migrationResource.getPath());
+ }
+
+ @Override
+ protected String getSchema() {
+ return schema;
}
@Override
@@ -74,6 +89,16 @@ public SqlClient getClient() {
return client;
}
+ @Override
+ public Future prepareConnection(SqlConnection connection) {
+ if (schema == null)
+ return Future.succeededFuture();
+ else
+ return connection.query("SET search_path TO " + schema)
+ .execute()
+ .mapEmpty();
+ }
+
@Override
protected SqlDialect getDialect() {
return sqlDialect;
diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/storage/SQLiteStorage.java b/dht/src/main/java/io/bosonnetwork/kademlia/storage/SQLiteStorage.java
index 1b2f2a7..55e6313 100644
--- a/dht/src/main/java/io/bosonnetwork/kademlia/storage/SQLiteStorage.java
+++ b/dht/src/main/java/io/bosonnetwork/kademlia/storage/SQLiteStorage.java
@@ -38,13 +38,19 @@ public class SQLiteStorage extends DatabaseStorage implements DataStorage {
protected static final String STORAGE_URI_PREFIX = "jdbc:sqlite:";
private final String connectionUri;
+ private final int poolSize;
private Pool client;
private SqlDialect sqlDialect;
private static final Logger log = LoggerFactory.getLogger(SQLiteStorage.class);
- protected SQLiteStorage(String connectionUri) {
+ protected SQLiteStorage(String connectionUri, int poolSize) {
this.connectionUri = connectionUri;
+ this.poolSize = poolSize > 0 ? poolSize : 1;
+ }
+
+ protected SQLiteStorage(String connectionUri) {
+ this(connectionUri, 0);
}
@Override
@@ -60,7 +66,7 @@ protected void init(Vertx vertx) {
dataSource.setFullSync(true);
// Single connection recommended for SQLite
- PoolOptions poolOptions = new PoolOptions().setMaxSize(1);
+ PoolOptions poolOptions = new PoolOptions().setMaxSize(poolSize);
client = JDBCPool.pool(vertx, dataSource, poolOptions);
sqlDialect = new SqlDialect() {};
@@ -73,12 +79,12 @@ protected void init(Vertx vertx) {
}
@Override
- protected Path getSchemaPath() {
- URL schemaPath = getClass().getResource("/db/kadnode/sqlite");
- if (schemaPath == null || schemaPath.getPath() == null)
+ protected Path getMigrationPath() {
+ URL migrationResource = getClass().getResource("/db/kadnode/sqlite");
+ if (migrationResource == null || migrationResource.getPath() == null)
throw new IllegalStateException("Migration path not exists");
- return Path.of(schemaPath.getPath());
+ return Path.of(migrationResource.getPath());
}
@Override
diff --git a/dht/src/main/resources/node.yaml b/dht/src/main/resources/node.yaml
index f759105..d93a40d 100644
--- a/dht/src/main/resources/node.yaml
+++ b/dht/src/main/resources/node.yaml
@@ -62,14 +62,32 @@ privateKey: 0x751a9612f9bd80e6e37a77a704dc2a99dbfb162c35cb138ca46eaacd656de9bedf
# - Windows: %ProgramData%\boson\node
dataDir: ~/.local/share/boson/node
-# Storage backend used by the node.
-# Supported values:
+# Database configuration.
+#
+# Supported database URIs:
# - PostgreSQL: postgresql://user:password@host:port/database
# - SQLite: jdbc:sqlite:/path/to/sqlite.db
#
# For lightweight or embedded deployments, SQLite is recommended.
-# For super node deployments, prefer PostgreSQL.
-storageURL: jdbc:sqlite:node.db
+# For super node or high-concurrency deployments, prefer PostgreSQL.
+#
+# Configuration fields:
+# uri:
+# Database connection URI.
+#
+# poolSize:
+# Size of the database connection pool.
+# A value of 0 means "use the database implementation default".
+#
+# schema:
+# Database schema (namespace) name.
+# This option is only applicable to PostgreSQL.
+# For other databases (e.g., SQLite), this field is ignored.
+#
+database:
+ uri: jdbc:sqlite:node.db
+ # poolSize: 0
+ # schema: kademlia
# Bootstrap nodes used when joining the DHT.
# Each bootstrap node is a node info triple:
diff --git a/dht/src/test/java/io/bosonnetwork/kademlia/InstantTests.java b/dht/src/test/java/io/bosonnetwork/kademlia/InstantTests.java
deleted file mode 100644
index 830d1e8..0000000
--- a/dht/src/test/java/io/bosonnetwork/kademlia/InstantTests.java
+++ /dev/null
@@ -1,236 +0,0 @@
-package io.bosonnetwork.kademlia;
-
-import java.util.concurrent.TimeUnit;
-
-import io.micrometer.core.instrument.DistributionSummary;
-import io.micrometer.core.instrument.MeterRegistry;
-import io.micrometer.core.instrument.Timer;
-import io.vertx.core.AbstractVerticle;
-import io.vertx.core.DeploymentOptions;
-import io.vertx.core.Promise;
-import io.vertx.core.Vertx;
-import io.vertx.core.VertxOptions;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.core.datagram.DatagramSocket;
-import io.vertx.core.datagram.DatagramSocketOptions;
-import io.vertx.core.http.HttpServerOptions;
-import io.vertx.core.net.SocketAddress;
-import io.vertx.micrometer.MicrometerMetricsOptions;
-import io.vertx.micrometer.VertxPrometheusOptions;
-import io.vertx.micrometer.backends.BackendRegistries;
-import org.slf4j.Logger;
-
-import io.bosonnetwork.crypto.Random;
-
-public class InstantTests {
- private static final int CLIENT_INSTANCES = 30;
- private static final int TOTAL_MESSAGES = 10000;
-
- static class EchoServer extends AbstractVerticle {
- private static final int PORT = 1234;
- private static final String HOST = "0.0.0.0";
-
- private static final Logger log = org.slf4j.LoggerFactory.getLogger(EchoServer.class);
-
- private DatagramSocket socket;
-
- private MeterRegistry registry;
- private Timer sendTimer;
- private DistributionSummary packetSizeSummary;
-
- private int count = 0;
- private long begin = 0;
- private long end = 0;
-
- @Override
- public void start(Promise startPromise) throws Exception {
- registry = BackendRegistries.getDefaultNow();
- sendTimer = Timer.builder("boson_datagram_send_time")
- .description("Time to send a packet")
- .tag("module", "DHT")
- .publishPercentiles(0.5, 0.95, 0.99)
- .register(registry);
- packetSizeSummary = DistributionSummary.builder("boson_datagram_packet_size_bytes")
- .description("Size of sent packets")
- .tag("module", "DHT")
- .publishPercentiles(0.5, 0.95, 0.99)
- .register(registry);
-
- socket = vertx.createDatagramSocket(new DatagramSocketOptions()
- .setSendBufferSize(1024 * 1024)
- .setReceiveBufferSize(1024 * 1024)
- .setTrafficClass(0x10));
-
- // Set up the packet handler
- socket.handler(packet -> {
- if (count == 0)
- begin = System.currentTimeMillis();
-
- SocketAddress sender = packet.sender();
- echo(packet.data(), packet.sender());
- });
-
- socket.exceptionHandler(e -> log.error("Socket exception", e));
-
- // Bind the socket to the specified host and port
- socket.listen(PORT, HOST).onComplete(ar -> {
- if (ar.succeeded()) {
- log.info("UDP Echo Server listening on {}:{}", HOST, PORT);
- startPromise.complete();
- } else {
- log.error("Failed to bind server on {}:{}", HOST, PORT, ar.cause());
- startPromise.fail(ar.cause());
- }
- });
- }
-
- @Override
- public void stop(Promise stopPromise) throws Exception {
- socket.close().onComplete(ar -> {
- log.info("UDP Echo Server stopped");
- stopPromise.complete();
- });
- }
-
- private void echo(Buffer data, SocketAddress addr) {
- context.runOnContext(v -> {
- long startTime = System.nanoTime();
- packetSizeSummary.record(data.length());
-
- socket.send(data, addr.port(), addr.host()).onComplete(ar -> {
- sendTimer.record(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-
- if (ar.succeeded()) {
- ++count;
- //log.info("Echoed packet to {}, total echoed: {}", addr, count);
- //if (count % 1000 == 0)
- // log.info("Echoed {} packets", count);
-
- if (count == CLIENT_INSTANCES * TOTAL_MESSAGES) {
- end = System.currentTimeMillis();
- System.out.println(">>>>>>>>>>>>>>>> " + (end - begin) + " ms");
- }
- } else {
- log.error("Failed to send packet to {}", addr, ar.cause());
- }
- });
- });
- }
-
- public static void main(String[] args) {
- VertxOptions options = new VertxOptions().setMetricsOptions(
- new MicrometerMetricsOptions()
- .setPrometheusOptions(new VertxPrometheusOptions()
- .setEnabled(true)
- //.setPublishQuantiles(true)
- .setStartEmbeddedServer(true)
- .setEmbeddedServerOptions(new HttpServerOptions().setPort(8080))
- .setEmbeddedServerEndpoint("/metrics"))
- .setEnabled(true));
-
- Vertx vertx = Vertx.vertx(options);
- vertx.deployVerticle(new EchoServer()).onComplete(ar -> {
- if (ar.succeeded()) {
- log.info("Echo server deployed successfully");
- } else {
- log.error("Failed to deploy Echo server", ar.cause());
- }
- });
- }
- }
-
- public static class EchoClient extends AbstractVerticle {
- private static final int SEND_DELAY_MS = 1;
-
- private static final String SERVER_HOST = "127.0.0.1";
- private static final int SERVER_PORT = 1234;
- private static final Logger log = org.slf4j.LoggerFactory.getLogger(EchoClient.class);
-
- private DatagramSocket socket;
- private int totalSent = 0;
- private int totalReceived = 0;
-
- public EchoClient() {
- }
-
- @Override
- public void start(Promise startPromise) throws Exception {
- socket = vertx.createDatagramSocket(new DatagramSocketOptions()
- .setSendBufferSize(1024 * 1024)
- .setReceiveBufferSize(1024 * 1024));
-
- socket.handler(packet -> {
- ++totalReceived;
- //log.info("Received response from {}, total received: {}/{}", packet.sender(), totalReceived, TOTAL_MESSAGES);
- if (totalReceived == TOTAL_MESSAGES) {
- log.info("Finished receiving messages! Total received {} messages", totalReceived);
- undeployIfFinished();
- }
- });
-
- socket.exceptionHandler(e -> log.error("Socket exception", e));
-
- log.info("UDP Echo client {} started", deploymentID());
-
- context.runOnContext(this::sendMessage);
- startPromise.complete();
- }
-
- @Override
- public void stop(Promise stopPromise) throws Exception {
- socket.close().onComplete(ar -> {
- vertx.close();
- log.info("UDP Echo client stopped");
- stopPromise.complete();
- });
- }
-
- private void sendMessage(Void arg) {
- byte[] message = Random.randomBytes(Random.random().nextInt(32, 1024));
-
- socket.send(Buffer.buffer(message), SERVER_PORT, SERVER_HOST).onComplete(ar -> {
- if (ar.succeeded()) {
- ++totalSent;
- //log.info("Message sent successfully to server {}/{}", totalSent, TOTAL_MESSAGES);
-
- if (totalSent < TOTAL_MESSAGES) {
- vertx.setTimer(SEND_DELAY_MS, id -> context.runOnContext(this::sendMessage));
- } else {
- log.info("Finished sending messages! Total sent {} messages", totalSent);
- }
- } else {
- log.error("Failed to send message", ar.cause());
- }
- });
- }
-
- private void undeployIfFinished() {
- vertx.sharedData().getLocalCounter("ECHO_CLIENT_FINISHED").onSuccess(counter -> {
- counter.incrementAndGet().onSuccess(v -> {
- if (v == CLIENT_INSTANCES)
- vertx.undeploy(deploymentID());
- }).onFailure(e -> {
- log.error("Failed to increment counter", e);
- });
- }).onFailure(e -> {
- log.error("Failed to get shared counter", e);
- });
- }
-
- public static void main(String[] args) {
- Vertx vertx = Vertx.vertx();
-
- DeploymentOptions options = new DeploymentOptions()
- .setInstances(CLIENT_INSTANCES);
-
- vertx.deployVerticle(EchoClient.class, options).onComplete(ar -> {
- if (ar.succeeded()) {
- log.info("Echo client[{} instances] deployed successfully", CLIENT_INSTANCES);
- } else {
- log.error("Failed to deploy Echo client", ar.cause());
- vertx.close();
- }
- });
- }
- }
-}
\ No newline at end of file
diff --git a/dht/src/test/java/io/bosonnetwork/kademlia/NodeAsyncTests.java b/dht/src/test/java/io/bosonnetwork/kademlia/NodeAsyncTests.java
index 78f8834..18ae575 100644
--- a/dht/src/test/java/io/bosonnetwork/kademlia/NodeAsyncTests.java
+++ b/dht/src/test/java/io/bosonnetwork/kademlia/NodeAsyncTests.java
@@ -75,7 +75,7 @@ private static VertxFuture startBootstrap() {
.address4(localAddr)
.port(TEST_NODES_PORT_START - 1)
.dataDir(testDir.resolve("nodes" + File.separator + "node-bootstrap"))
- .storageURI("jdbc:sqlite:" + testDir.resolve("nodes" + File.separator + "node-bootstrap" + File.separator + "storage.db"))
+ .database("jdbc:sqlite:" + testDir.resolve("nodes" + File.separator + "node-bootstrap" + File.separator + "storage.db"))
.enableDeveloperMode()
.build();
@@ -128,7 +128,7 @@ private static VertxFuture createTestNode(int index) {
.address4(localAddr)
.port(TEST_NODES_PORT_START + index)
.dataDir(testDir.resolve("nodes" + File.separator + "node-" + index))
- .storageURI("jdbc:sqlite:" + testDir.resolve("nodes" + File.separator + "node-" + index + File.separator + "storage.db"))
+ .database("jdbc:sqlite:" + testDir.resolve("nodes" + File.separator + "node-" + index + File.separator + "storage.db"))
.addBootstrap(bootstrap.getNodeInfo().getV4())
.enableDeveloperMode()
.build();
diff --git a/dht/src/test/java/io/bosonnetwork/kademlia/NodeSyncTests.java b/dht/src/test/java/io/bosonnetwork/kademlia/NodeSyncTests.java
index e36f235..bbdcc48 100644
--- a/dht/src/test/java/io/bosonnetwork/kademlia/NodeSyncTests.java
+++ b/dht/src/test/java/io/bosonnetwork/kademlia/NodeSyncTests.java
@@ -59,7 +59,7 @@ private static void startBootstrap() throws Exception {
.address4(localAddr)
.port(TEST_NODES_PORT_START - 1)
.dataDir(testDir.resolve("nodes" + File.separator + "node-bootstrap"))
- .storageURI("jdbc:sqlite:" + testDir.resolve("nodes" + File.separator + "node-bootstrap" + File.separator + "storage.db"))
+ .database("jdbc:sqlite:" + testDir.resolve("nodes" + File.separator + "node-bootstrap" + File.separator + "storage.db"))
.enableDeveloperMode()
.build();
@@ -81,7 +81,7 @@ private static void startTestNodes() throws Exception {
.address4(localAddr)
.port(TEST_NODES_PORT_START + i)
.dataDir(testDir.resolve("nodes" + File.separator + "node-" + i))
- .storageURI("jdbc:sqlite:" + testDir.resolve("nodes" + File.separator + "node-" + i + File.separator + "storage.db"))
+ .database("jdbc:sqlite:" + testDir.resolve("nodes" + File.separator + "node-" + i + File.separator + "storage.db"))
.addBootstrap(bootstrap.getNodeInfo().getV4())
.enableDeveloperMode()
.build();
diff --git a/dht/src/test/java/io/bosonnetwork/kademlia/SybilTests.java b/dht/src/test/java/io/bosonnetwork/kademlia/SybilTests.java
index 2fd6afa..a9a6641 100644
--- a/dht/src/test/java/io/bosonnetwork/kademlia/SybilTests.java
+++ b/dht/src/test/java/io/bosonnetwork/kademlia/SybilTests.java
@@ -80,7 +80,7 @@ void setUp() throws Exception {
.port(39001)
.generatePrivateKey()
.dataDir(testDir.resolve("nodes" + File.separator + "node-target"))
- .storageURI("jdbc:sqlite:" + testDir.resolve("nodes" + File.separator + "node-target" + File.separator + "storage.db"))
+ .database("jdbc:sqlite:" + testDir.resolve("nodes" + File.separator + "node-target" + File.separator + "storage.db"))
.enableDeveloperMode()
.build());
target.start().get();
diff --git a/dht/src/test/java/io/bosonnetwork/kademlia/TestNodeLauncher.java b/dht/src/test/java/io/bosonnetwork/kademlia/TestNodeLauncher.java
new file mode 100644
index 0000000..2a63e0b
--- /dev/null
+++ b/dht/src/test/java/io/bosonnetwork/kademlia/TestNodeLauncher.java
@@ -0,0 +1,90 @@
+package io.bosonnetwork.kademlia;
+
+import java.io.InputStream;
+import java.net.Inet4Address;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Objects;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.VertxOptions;
+
+import io.bosonnetwork.ConnectionStatusListener;
+import io.bosonnetwork.DefaultNodeConfiguration;
+import io.bosonnetwork.Network;
+import io.bosonnetwork.Node;
+import io.bosonnetwork.NodeConfiguration;
+import io.bosonnetwork.crypto.Signature;
+import io.bosonnetwork.utils.AddressUtils;
+import io.bosonnetwork.utils.Json;
+
+public class TestNodeLauncher {
+ private static final Path dataPath = Path.of(System.getProperty("java.io.tmpdir"), "boson", "KademliaNode");
+ private static Vertx vertx;
+ private static Signature.KeyPair nodeKey;
+ private static Node node;
+
+ private static NodeConfiguration loadConfiguration() throws Exception {
+ try (InputStream s = TestNodeLauncher.class.getResourceAsStream("/testNode.yaml")) {
+ Map map = Json.yamlMapper().readValue(s, Json.mapType());
+ // fix the host
+ if (map.containsKey("host4"))
+ map.put("host4", Objects.requireNonNull(AddressUtils.getDefaultRouteAddress(Inet4Address.class)).getHostAddress());
+
+ // fix the dataDir
+ map.put("dataDir", dataPath.toAbsolutePath().toString());
+
+ return DefaultNodeConfiguration.fromMap(map);
+ } catch (Exception e) {
+ System.err.println("Failed to load configuration file: " + e.getMessage());
+ throw e;
+ }
+ }
+
+ public static void main(String[] args) {
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ if (node != null) {
+ System.out.println("Shutting down the Boson Kademlia node ...");
+ node.stop().thenRun(() ->
+ System.out.println("Boson node stopped.")
+ ).join();
+
+ // Cannot chain vertx.close() to the above future because closing Vert.x will terminate its event loop,
+ // preventing any pending future handlers from executing.
+ System.out.print("Shutting down Vert.x gracefully...");
+ vertx.close().toCompletionStage().toCompletableFuture().join();
+ System.out.println("Done!");
+ }
+ }));
+
+ vertx = Vertx.vertx(new VertxOptions()
+ .setWorkerPoolSize(4)
+ .setEventLoopPoolSize(4)
+ .setPreferNativeTransport(true));
+
+ try {
+ NodeConfiguration config = loadConfiguration();
+ node = Node.kadNode(config);
+ node.addConnectionStatusListener(new ConnectionStatusListener() {
+ @Override
+ public void connected(Network network) {
+ System.out.println("Kademlia node connected to " + network);
+ }
+
+ @Override
+ public void disconnected(Network network) {
+ System.out.println("Kademlia node disconnected from " + network);
+ }
+ });
+
+ System.out.println("Starting the Boson Kademlia node ...");
+ node.start().thenRun(() -> {
+ System.out.printf("Started the Boson Kademlia node %s at %s:%d\n",
+ node.getId(), config.host4(), config.port());
+ }).join();
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ vertx.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/dht/src/test/resources/testNode.yaml b/dht/src/test/resources/testNode.yaml
new file mode 100644
index 0000000..b63bfef
--- /dev/null
+++ b/dht/src/test/resources/testNode.yaml
@@ -0,0 +1,11 @@
+host4: 192.168.8.10
+port: 39001
+# node id: GMDVFJ5zdvS88Do5TbPAHnMHMsZt282A84KjfptVXQAb
+privateKey: 253vmGfhqrrGrzqzxu9NBPGgZFA1iHqyeEJMMZem7ebBn8nApYgFX8diYpLFdb34vdPMutt1eAeW2tvWEWwJH9nP
+dataDir: .
+database:
+ uri: jdbc:sqlite:node.db
+enableSpamThrottling: true
+enableSuspiciousNodeDetector: true
+enableDeveloperMode: true
+enableMetrics: false
\ No newline at end of file