From 620459225a9cf31f4bd937ad7252f4afc6a8fd0a Mon Sep 17 00:00:00 2001 From: Jingyu Date: Thu, 27 Nov 2025 13:09:45 +0800 Subject: [PATCH 1/2] Skip thread-context checks when retrieving results from completed VertxFuture instances --- .../io/bosonnetwork/vertx/VertxFuture.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/api/src/main/java/io/bosonnetwork/vertx/VertxFuture.java b/api/src/main/java/io/bosonnetwork/vertx/VertxFuture.java index e5e01a4..776fc74 100644 --- a/api/src/main/java/io/bosonnetwork/vertx/VertxFuture.java +++ b/api/src/main/java/io/bosonnetwork/vertx/VertxFuture.java @@ -684,6 +684,15 @@ public boolean isCompletedExceptionally() { */ @Override public T get() throws InterruptedException, ExecutionException { + if (future.isComplete()) { + if (future.succeeded()) + return future.result(); + else if (future.failed()) + throw new ExecutionException(future.cause()); + else + throw new InterruptedException("Context closed"); + } + if (Context.isOnVertxThread() || Context.isOnEventLoopThread()) throw new IllegalStateException("Cannot not be called on vertx thread or event loop thread"); @@ -716,6 +725,15 @@ else if (future.failed()) */ @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (future.isComplete()) { + if (future.succeeded()) + return future.result(); + else if (future.failed()) + throw new ExecutionException(future.cause()); + else + throw new InterruptedException("Context closed"); + } + if (Context.isOnVertxThread() || Context.isOnEventLoopThread()) throw new IllegalStateException("Cannot not be called on vertx thread or event loop thread"); From bdc1df5fac834003b771025c246830342285b24a Mon Sep 17 00:00:00 2001 From: Jingyu Date: Wed, 3 Dec 2025 17:15:56 +0800 Subject: [PATCH 2/2] Refactor database layer to use new abstract database API and unified schema migration --- api/pom.xml | 6 +- .../main/java/io/bosonnetwork/PeerInfo.java | 2 +- .../io/bosonnetwork/database/Pagination.java | 2 +- .../service/ClientAuthenticator.java | 2 +- .../io/bosonnetwork/service/Federation.java | 2 +- .../database/PostgresqlServer.java | 45 ++ .../database/VersionedSchemaTests.java | 35 +- dht/pom.xml | 398 ++++++----- .../impl/SimpleNodeConfiguration.java | 2 +- .../kademlia/storage/DataStorage.java | 24 +- .../kademlia/storage/DatabaseStorage.java | 667 +++++++----------- .../kademlia/storage/InMemoryStorage.java | 4 +- .../kademlia/storage/PostgresStorage.java | 223 +----- .../kademlia/storage/SQLiteStorage.java | 240 +------ .../kademlia/storage/SqlDialect.java | 142 ++++ .../db/postgres/1_initial_schema.sql | 43 ++ .../resources/db/sqlite/1_initial_schema.sql | 43 ++ .../kademlia/storage/DataStorageTests.java | 76 +- .../kademlia/storage/PostgresqlServer.java | 45 ++ 19 files changed, 908 insertions(+), 1093 deletions(-) create mode 100644 api/src/test/java/io/bosonnetwork/database/PostgresqlServer.java create mode 100644 dht/src/main/java/io/bosonnetwork/kademlia/storage/SqlDialect.java create mode 100644 dht/src/main/resources/db/postgres/1_initial_schema.sql create mode 100644 dht/src/main/resources/db/sqlite/1_initial_schema.sql create mode 100644 dht/src/test/java/io/bosonnetwork/kademlia/storage/PostgresqlServer.java diff --git a/api/pom.xml b/api/pom.xml index e445c23..6779383 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -150,6 +150,11 @@ vertx-junit5 test + + org.testcontainers + testcontainers-postgresql + test + io.vertx vertx-pg-client @@ -160,7 +165,6 @@ vertx-jdbc-client test - org.xerial sqlite-jdbc diff --git a/api/src/main/java/io/bosonnetwork/PeerInfo.java b/api/src/main/java/io/bosonnetwork/PeerInfo.java index 8bb0503..1482523 100644 --- a/api/src/main/java/io/bosonnetwork/PeerInfo.java +++ b/api/src/main/java/io/bosonnetwork/PeerInfo.java @@ -48,7 +48,7 @@ public class PeerInfo { */ public static final Object ATTRIBUTE_PEER_ID = new Object(); - private final Id publicKey; // Peer ID + private final Id publicKey; // Peer ID private final byte[] privateKey; // Private key to sign the peer info private final Id nodeId; // The node that provide the service peer private final Id origin; // The node that announces the peer diff --git a/api/src/main/java/io/bosonnetwork/database/Pagination.java b/api/src/main/java/io/bosonnetwork/database/Pagination.java index 44ee3f0..9ac25da 100644 --- a/api/src/main/java/io/bosonnetwork/database/Pagination.java +++ b/api/src/main/java/io/bosonnetwork/database/Pagination.java @@ -71,7 +71,7 @@ public String toSql() { if (offset == 0 && limit == 0) return ""; // caller may omit OFFSET/LIMIT completely - return " OFFSET " + offset + " LIMIT " + limit; + return " LIMIT " + limit + " OFFSET " + offset; } /** diff --git a/api/src/main/java/io/bosonnetwork/service/ClientAuthenticator.java b/api/src/main/java/io/bosonnetwork/service/ClientAuthenticator.java index 608575d..d684b4e 100644 --- a/api/src/main/java/io/bosonnetwork/service/ClientAuthenticator.java +++ b/api/src/main/java/io/bosonnetwork/service/ClientAuthenticator.java @@ -7,5 +7,5 @@ public interface ClientAuthenticator { CompletableFuture authenticateUser(Id userId, byte[] nonce, byte[] signature); - CompletableFuture authenticateDevice(Id userId, Id deviceId, byte[] nonce, byte[] signature); + CompletableFuture authenticateDevice(Id userId, Id deviceId, byte[] nonce, byte[] signature, String address); } \ No newline at end of file diff --git a/api/src/main/java/io/bosonnetwork/service/Federation.java b/api/src/main/java/io/bosonnetwork/service/Federation.java index 6afd184..95111e0 100644 --- a/api/src/main/java/io/bosonnetwork/service/Federation.java +++ b/api/src/main/java/io/bosonnetwork/service/Federation.java @@ -23,5 +23,5 @@ default CompletableFuture getNode(Id nodeId) { // public CompletableFuture> getAllServices(Id nodeId); - public CompletableFuture getService(Id peerId); + public CompletableFuture getService(Id nodeId, Id peerId); } \ No newline at end of file diff --git a/api/src/test/java/io/bosonnetwork/database/PostgresqlServer.java b/api/src/test/java/io/bosonnetwork/database/PostgresqlServer.java new file mode 100644 index 0000000..d64b8bb --- /dev/null +++ b/api/src/test/java/io/bosonnetwork/database/PostgresqlServer.java @@ -0,0 +1,45 @@ +package io.bosonnetwork.database; + +import org.testcontainers.postgresql.PostgreSQLContainer; +import org.testcontainers.utility.DockerImageName; + +public class PostgresqlServer { + private PostgreSQLContainer container; + + private PostgresqlServer(PostgreSQLContainer container) { + this.container = container; + } + + public static PostgresqlServer start(String database, String username, String password) { + DockerImageName image = DockerImageName + .parse("postgres:18-alpine"); + + PostgreSQLContainer container = new PostgreSQLContainer(image) + .withDatabaseName(database) + .withUsername(username) + .withPassword(password); + + return new PostgresqlServer(container).start(); + } + + private PostgresqlServer start() { + container.start(); + return this; + } + + public void stop() { + if (container != null) { + container.stop(); + container = null; + } + } + + public String getDatabaseUrl() { + return "postgresql://" + + container.getUsername() + ":" + + container.getPassword() + "@" + + container.getHost() + ":" + + container.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT) + "/" + + container.getDatabaseName(); + } +} \ No newline at end of file diff --git a/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java b/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java index 309b828..18dbb09 100644 --- a/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java +++ b/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java @@ -9,6 +9,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.jdbcclient.JDBCConnectOptions; import io.vertx.jdbcclient.JDBCPool; @@ -36,34 +37,44 @@ public class VersionedSchemaTests { private static final List databases = new ArrayList<>(); + private static PostgresqlServer pgServer; + private static SqlClient postgres; + private static SqlClient sqlite; + @BeforeAll static void setup(Vertx vertx, VertxTestContext context) throws Exception { + FileUtils.deleteFile(testDir); Files.createDirectories(testDir); - var sqliteURL = "jdbc:sqlite:" + testDir.resolve("test.db"); - JDBCConnectOptions sqliteConnectOptions = new JDBCConnectOptions() - .setJdbcUrl(sqliteURL); - // Single connection recommended for SQLite - PoolOptions sqlitePoolOptions = new PoolOptions().setMaxSize(1); - SqlClient sqliteClient = JDBCPool.pool(vertx, sqliteConnectOptions, sqlitePoolOptions); - databases.add(Arguments.of("sqlite", sqliteClient)); + pgServer = PostgresqlServer.start("migration", "test", "secret"); - var postgresURL = "postgresql://jingyu:secret@localhost:5432/test"; + var postgresURL = pgServer.getDatabaseUrl(); PgConnectOptions pgConnectOptions = PgConnectOptions.fromUri(postgresURL); PoolOptions pgPoolOptions = new PoolOptions().setMaxSize(8); - SqlClient pgClient = PgBuilder.pool() + postgres = PgBuilder.pool() .with(pgPoolOptions) .connectingTo(pgConnectOptions) .using(vertx) .build(); - // databases.add(Arguments.of("postgres", pgClient)); + databases.add(Arguments.of("postgres", postgres)); + + var sqliteURL = "jdbc:sqlite:" + testDir.resolve("test.db"); + JDBCConnectOptions sqliteConnectOptions = new JDBCConnectOptions() + .setJdbcUrl(sqliteURL); + // Single connection recommended for SQLite + PoolOptions sqlitePoolOptions = new PoolOptions().setMaxSize(1); + sqlite = JDBCPool.pool(vertx, sqliteConnectOptions, sqlitePoolOptions); + databases.add(Arguments.of("sqlite", sqlite)); context.completeNow(); } @AfterAll - static void teardown() throws Exception { - FileUtils.deleteFile(testRoot); + static void teardown(VertxTestContext context) throws Exception { + Future.all(postgres.close(), sqlite.close()).onComplete(ar -> { + pgServer.stop(); + context.completeNow(); + }); } static Stream testDatabaseProvider() { diff --git a/dht/pom.xml b/dht/pom.xml index cd00450..d565aeb 100644 --- a/dht/pom.xml +++ b/dht/pom.xml @@ -1,201 +1,209 @@ - - 4.0.0 + + 4.0.0 + + + io.bosonnetwork + boson-parent + 3-SNAPSHOT + + - io.bosonnetwork - boson-parent - 3-SNAPSHOT - - - - io.bosonnetwork - boson-dht - 2.0.8-SNAPSHOT - jar - - Boson DHT - - Boson Kademlia DHT node. - - https://github.com/bosonnetwork/Boson.Core - - - - MIT License - https://github.com/bosonnetwork/Boson.Core/blob/master/LICENSE - repo - - - - - - boson-network-dev - Boson Network - support@bosonnetwork.io - BosonNetwork - https://github.com/bosonnetwork - - architect - developer - - - https://avatars.githubusercontent.com/u/152134507 - - - - - - scm:git:git@github.com:bosonnetwork/Boson.Core.git - scm:git:git@github.com:bosonnetwork/Boson.Core.git - git@github.com:bosonnetwork/Boson.Core.git - - - + boson-dht + 2.0.8-SNAPSHOT + jar + + Boson DHT + + Boson Kademlia DHT node. + + https://github.com/bosonnetwork/Boson.Core + + + + MIT License + https://github.com/bosonnetwork/Boson.Core/blob/master/LICENSE + repo + + + + + + boson-network-dev + Boson Network + support@bosonnetwork.io + BosonNetwork + https://github.com/bosonnetwork + + architect + developer + + + https://avatars.githubusercontent.com/u/152134507 + + + + + + scm:git:git@github.com:bosonnetwork/Boson.Core.git + scm:git:git@github.com:bosonnetwork/Boson.Core.git + git@github.com:bosonnetwork/Boson.Core.git + + + + + + io.bosonnetwork + boson-dependencies + 2.0.8-SNAPSHOT + pom + import + + + + - - io.bosonnetwork - boson-dependencies - 2.0.8-SNAPSHOT - pom - import - + + + io.bosonnetwork + boson-api + + + + io.vertx + vertx-core + + + io.vertx + vertx-sql-client-templates + + + io.vertx + vertx-pg-client + + + io.vertx + vertx-jdbc-client + + + io.vertx + vertx-web + + + io.vertx + vertx-web-validation + + + io.netty + netty-resolver-dns-native-macos + osx-aarch_64 + + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.dataformat + jackson-dataformat-cbor + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + + + + com.github.ben-manes.caffeine + caffeine + + + + org.xerial + sqlite-jdbc + + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + + io.vertx + vertx-micrometer-metrics + + + io.micrometer + micrometer-registry-prometheus + + + + + org.junit.jupiter + junit-jupiter + test + + + io.vertx + vertx-junit5 + test + + + org.testcontainers + testcontainers-postgresql + + + net.datafaker + datafaker + test + - - - - - - io.bosonnetwork - boson-api - - - - io.vertx - vertx-core - - - io.vertx - vertx-pg-client - - - io.vertx - vertx-jdbc-client - - - io.vertx - vertx-web - - - io.vertx - vertx-web-validation - - - io.netty - netty-resolver-dns-native-macos - osx-aarch_64 - - - - com.fasterxml.jackson.core - jackson-core - - - com.fasterxml.jackson.core - jackson-databind - - - com.fasterxml.jackson.dataformat - jackson-dataformat-cbor - - - com.fasterxml.jackson.dataformat - jackson-dataformat-yaml - - - - com.github.ben-manes.caffeine - caffeine - - - - org.xerial - sqlite-jdbc - - - - org.slf4j - slf4j-api - - - ch.qos.logback - logback-classic - - - - io.vertx - vertx-micrometer-metrics - - - io.micrometer - micrometer-registry-prometheus - - - - - org.junit.jupiter - junit-jupiter - test - - - io.vertx - vertx-junit5 - test - - - net.datafaker - datafaker - test - - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - - org.apache.maven.plugins - maven-source-plugin - - - - - - org.apache.maven.plugins - maven-gpg-plugin - - - - org.apache.maven.plugins - maven-surefire-plugin - - 1 - true - - none - 1 - - - - + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + org.apache.maven.plugins + maven-source-plugin + + + + + + org.apache.maven.plugins + maven-gpg-plugin + + + + org.apache.maven.plugins + maven-surefire-plugin + + 1 + true + + none + 1 + + + + \ No newline at end of file diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/impl/SimpleNodeConfiguration.java b/dht/src/main/java/io/bosonnetwork/kademlia/impl/SimpleNodeConfiguration.java index 8a2e234..4dcbff8 100644 --- a/dht/src/main/java/io/bosonnetwork/kademlia/impl/SimpleNodeConfiguration.java +++ b/dht/src/main/java/io/bosonnetwork/kademlia/impl/SimpleNodeConfiguration.java @@ -35,7 +35,7 @@ public SimpleNodeConfiguration(NodeConfiguration config) { this.port = config.port(); this.privateKey = config.privateKey(); this.dataPath = config.dataPath(); - this.storageURL = config.storageURL() != null ? config.storageURL() : InMemoryStorage.STORAGE_URL; + this.storageURL = config.storageURL() != null ? config.storageURL() : InMemoryStorage.STORAGE_URI; this.bootstrapNodes = new ArrayList<>(config.bootstrapNodes() != null ? config.bootstrapNodes() : Collections.emptyList()); this.enableSpamThrottling = config.enableSpamThrottling(); this.enableSuspiciousNodeDetector = config.enableSuspiciousNodeDetector(); diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/storage/DataStorage.java b/dht/src/main/java/io/bosonnetwork/kademlia/storage/DataStorage.java index 3456bc6..1b53787 100644 --- a/dht/src/main/java/io/bosonnetwork/kademlia/storage/DataStorage.java +++ b/dht/src/main/java/io/bosonnetwork/kademlia/storage/DataStorage.java @@ -266,22 +266,22 @@ public interface DataStorage { */ Future removePeers(Id id); - static boolean supports(String url) { - // now only support inmemory, sqlite and postgres - return url.equals(InMemoryStorage.STORAGE_URL) || url.startsWith(SQLiteStorage.STORAGE_URL_PREFIX) || - url.startsWith(PostgresStorage.STORAGE_URL_PREFIX); + static boolean supports(String uri) { + // now only support in-memory, sqlite and postgres + return uri.equals(InMemoryStorage.STORAGE_URI) || uri.startsWith(SQLiteStorage.STORAGE_URI_PREFIX) || + uri.startsWith(PostgresStorage.STORAGE_URI_PREFIX); } - static DataStorage create(String url) { - Objects.requireNonNull(url, "url"); + static DataStorage create(String uri) { + Objects.requireNonNull(uri, "url"); - if (url.equals(InMemoryStorage.STORAGE_URL)) + if (uri.equals(InMemoryStorage.STORAGE_URI)) return new InMemoryStorage(); - if (url.startsWith(SQLiteStorage.STORAGE_URL_PREFIX)) - return new SQLiteStorage(url); - if (url.startsWith(PostgresStorage.STORAGE_URL_PREFIX)) - return new PostgresStorage(url); + if (uri.startsWith(SQLiteStorage.STORAGE_URI_PREFIX)) + return new SQLiteStorage(uri); + if (uri.startsWith(PostgresStorage.STORAGE_URI_PREFIX)) + return new PostgresStorage(uri); - throw new IllegalArgumentException("Unsupported storage: " + url); + throw new IllegalArgumentException("Unsupported storage: " + uri); } } \ No newline at end of file diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/storage/DatabaseStorage.java b/dht/src/main/java/io/bosonnetwork/kademlia/storage/DatabaseStorage.java index 9c65642..6638f01 100644 --- a/dht/src/main/java/io/bosonnetwork/kademlia/storage/DatabaseStorage.java +++ b/dht/src/main/java/io/bosonnetwork/kademlia/storage/DatabaseStorage.java @@ -22,192 +22,63 @@ package io.bosonnetwork.kademlia.storage; +import java.nio.file.Path; +import java.util.HashMap; import java.util.List; -import java.util.function.Function; +import java.util.Map; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; -import io.vertx.sqlclient.Pool; import io.vertx.sqlclient.Row; -import io.vertx.sqlclient.RowSet; -import io.vertx.sqlclient.SqlClient; -import io.vertx.sqlclient.SqlConnection; -import io.vertx.sqlclient.Tuple; +import io.vertx.sqlclient.templates.SqlTemplate; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import io.bosonnetwork.BosonException; import io.bosonnetwork.Id; import io.bosonnetwork.PeerInfo; import io.bosonnetwork.Value; +import io.bosonnetwork.database.VersionedSchema; +import io.bosonnetwork.database.VertxDatabase; import io.bosonnetwork.kademlia.exceptions.ImmutableSubstitutionFail; import io.bosonnetwork.kademlia.exceptions.SequenceNotExpected; import io.bosonnetwork.kademlia.exceptions.SequenceNotMonotonic; -public abstract class DatabaseStorage implements DataStorage { - protected static final int SCHEMA_VERSION = 5; - - protected final String connectionUri; - +public abstract class DatabaseStorage implements DataStorage, VertxDatabase { protected long valueExpiration; protected long peerInfoExpiration; protected int schemaVersion; - protected SqlClient client; - - private static final Logger log = LoggerFactory.getLogger(DatabaseStorage.class); - - protected DatabaseStorage(String connectionUri) { - this.connectionUri = connectionUri; - } - - protected Future executeSequentially(SqlConnection connection, List statements, int index) { - if (index >= statements.size()) - return Future.succeededFuture(); - - // Execute current statement and recurse to next - String sql = statements.get(index); - log.trace("Executing schema statement: {}", sql); - return connection.preparedQuery(sql) - .execute() - .compose(result -> executeSequentially(connection, statements, index + 1)) // Move to next statement - .recover(e -> { - log.error("Schema statement failed: {}", sql, e); - BosonException error = new DataStorageException("Create database schema failed: " + sql, e); - return Future.failedFuture(error); - }); - } - - protected Future createSchema(List statements) { - return pool().withTransaction(connection -> executeSequentially(connection, statements, 0)); - } - - protected Pool pool() { - Pool pool = client instanceof Pool p ? p : null; - if (pool == null) // This should never happen - throw new IllegalStateException("SqlClient is not a Pool instance"); - - return pool; - } - - // Returns the first row mapped via the given function, or null if the row set is empty. - protected static T findUniqueOrNull(RowSet rows, Function mapper) { - return mapper.apply(rows.size() == 0 ? null : rows.iterator().next()); - } - - protected static List findMany(RowSet rows, Function mapper) { - return rows.stream().map(mapper).toList(); - } - - protected abstract void setupSqlClient(Vertx vertx, String connectionUri); - - protected abstract List createSchemaStatements(); - - protected abstract String selectSchemaVersion(); - - protected abstract String insertSchemaVersion(); - - protected abstract String selectValueById(); - - protected abstract String selectValuesByPersistentAndAnnouncedBefore(); - - protected abstract String selectValuesByPersistentAndAnnouncedBeforePaginated(); - - protected abstract String selectAllValues(); - - protected abstract String selectAllValuesPaginated(); - - protected abstract String upsertValue(); - - protected abstract String updateValueAnnouncedById(); - - protected abstract String deleteValueById(); - - protected abstract String deleteNonPersistentValuesAnnouncedBefore(); - - protected abstract String selectPeerByIdAndNodeId(); - - protected abstract String selectPeersById(); - - protected abstract String selectPeersByPersistentAndAnnouncedBefore(); - - protected abstract String selectPeersByPersistentAndAnnouncedBeforePaginated(); - - protected abstract String selectAllPeers(); - - protected abstract String selectAllPeersPaginated(); - - protected abstract String upsertPeer(); + protected abstract Logger getLogger(); - protected abstract String updatePeerAnnouncedByIdAndNodeId(); + protected abstract void init(Vertx vertx); - protected abstract String deletePeerByIdAndNodeId(); + protected abstract Path getSchemaPath(); - protected abstract String deletePeersById(); - - protected abstract String deleteNonPersistentPeersAnnouncedBefore(); - - protected Future getOrInitSchemaVersion() { - log.debug("Checking schema version..."); - return client.preparedQuery(selectSchemaVersion()) - .execute() - .compose(rows -> { - if (rows.size() > 0) { - int version = findUniqueOrNull(rows, DatabaseStorage::rowToInteger); - log.info("Detected existing schema version: {}", version); - return Future.succeededFuture(version); - } else { - log.info("No schema version found, setting version {}", SCHEMA_VERSION); - return client.preparedQuery(insertSchemaVersion()) - .execute(Tuple.of(SCHEMA_VERSION)) - .map(v -> SCHEMA_VERSION); - } - }); - } + protected abstract SqlDialect getDialect(); @Override public Future initialize(Vertx vertx, long valueExpiration, long peerInfoExpiration) { - if (client != null) - return Future.failedFuture(new DataStorageException("Storage already initialized")); - - log.info("Initializing storage with connection URI: {}", connectionUri); + init(vertx); this.valueExpiration = valueExpiration; this.peerInfoExpiration = peerInfoExpiration; - setupSqlClient(vertx, connectionUri); - - log.info("Creating database schema..."); - return createSchema(createSchemaStatements()) - .compose(unused -> getOrInitSchemaVersion()) - .andThen(ar -> { + VersionedSchema schema = VersionedSchema.init(getClient(), getSchemaPath()); + return schema.migrate().andThen(ar -> { if (ar.succeeded()) { - schemaVersion = ar.result(); - log.info("Database schema created successfully, version: {}", schemaVersion); + schemaVersion = schema.getCurrentVersion().version(); + getLogger().info("Database is ready, current schema version: {}", schemaVersion); } else { - log.error("Database schema creation failed", ar.cause()); + getLogger().error("Schema migration failed, current schema version: {}", + schema.getCurrentVersion().version(), ar.cause()); } - }).recover(cause -> - Future.failedFuture(new DataStorageException("Database operation failed", cause)) + }).map(v -> schema.getCurrentVersion().version()) + .recover(cause -> + Future.failedFuture(new DataStorageException("Database operation failed", cause)) ); } - @Override - public Future close() { - if (client == null) { - log.info("Storage already closed"); - return Future.succeededFuture(); - } - log.info("Closing storage..."); - Future future = client.close(); - client = null; - return future.recover(cause -> - Future.failedFuture(new DataStorageException("Database operation failed", cause)) - ); - } - @Override public int getSchemaVersion() { return schemaVersion; @@ -217,19 +88,20 @@ public int getSchemaVersion() { public Future purge() { long now = System.currentTimeMillis(); - log.info("Purging expired values and peers..."); - return pool().withTransaction(connection -> - connection.preparedQuery(deleteNonPersistentValuesAnnouncedBefore()) - .execute(Tuple.of(now - valueExpiration)) - .compose(unused -> - connection.preparedQuery(deleteNonPersistentPeersAnnouncedBefore()) - .execute(Tuple.of(now - peerInfoExpiration)) + getLogger().info("Purging expired values and peers..."); + return withTransaction(c -> + SqlTemplate.forUpdate(c, getDialect().deleteNonPersistentValuesAnnouncedBefore()) + .execute(Map.of("updatedBefore", now - valueExpiration)) + .compose(r -> + SqlTemplate.forUpdate(c, getDialect().deleteNonPersistentPeersAnnouncedBefore()) + .execute(Map.of("updatedBefore", now - peerInfoExpiration)) + .map((Void) null) ) ).andThen(ar -> { if (ar.succeeded()) - log.info("Purge completed successfully"); + getLogger().info("Purge completed successfully"); else - log.error("Failed to purge expired values and peers", ar.cause()); + getLogger().error("Failed to purge expired values and peers", ar.cause()); }).recover(cause -> Future.failedFuture(new DataStorageException("Database operation failed", cause)) ).mapEmpty(); @@ -247,54 +119,44 @@ public Future putValue(Value value, boolean persistent) { @Override public Future putValue(Value value, boolean persistent, int expectedSequenceNumber) { - log.debug("Putting value with id: {}, persistent: {}, expectedSequenceNumber: {}", + getLogger().debug("Putting value with id: {}, persistent: {}, expectedSequenceNumber: {}", value.getId(), persistent, expectedSequenceNumber); - log.debug("Trying to check the existing value with id: {}", value.getId()); + getLogger().debug("Trying to check the existing value with id: {}", value.getId()); return getValue(value.getId()).compose(existing -> { if (existing != null) { // Immutable check if (existing.isMutable() != value.isMutable()) { - log.warn("Rejecting value {}: cannot replace mismatched mutable/immutable", value.getId()); + getLogger().warn("Rejecting value {}: cannot replace mismatched mutable/immutable", value.getId()); return Future.failedFuture(new ImmutableSubstitutionFail("Cannot replace mismatched mutable/immutable value")); } if (value.getSequenceNumber() < existing.getSequenceNumber()) { - log.warn("Rejecting value {}: sequence number not monotonic", value.getId()); + getLogger().warn("Rejecting value {}: sequence number not monotonic", value.getId()); return Future.failedFuture(new SequenceNotMonotonic("Sequence number less than current")); } if (expectedSequenceNumber >= 0 && existing.getSequenceNumber() > expectedSequenceNumber) { - log.warn("Rejecting value {}: sequence number not expected", value.getId()); + getLogger().warn("Rejecting value {}: sequence number not expected", value.getId()); return Future.failedFuture(new SequenceNotExpected("Sequence number not expected")); } if (existing.hasPrivateKey() && !value.hasPrivateKey()) { // Skip update if the existing value is owned by this node and the new value is not. - // Should not throw NotOwnerException, just silently ignores to avoid disrupting valid operations. - log.info("Skipping to update value for id {}: owned by this node", value.getId()); + // Should not throw NotOwnerException, just silently ignore to avoid disrupting valid operations. + getLogger().info("Skipping to update value for id {}: owned by this node", value.getId()); return Future.succeededFuture(existing); } } - long now = System.currentTimeMillis(); - return client.preparedQuery(upsertValue()) - .execute(Tuple.of(value.getId().bytes(), - persistent, - value.getPublicKey() != null ? value.getPublicKey().bytes() : null, - value.getPrivateKey(), - value.getRecipient() != null ? value.getRecipient().bytes() : null, - value.getNonce(), - value.getSignature(), - value.getSequenceNumber(), - value.getData(), - now, - now)) - .map(v -> value); + return withTransaction(c -> + SqlTemplate.forUpdate(c, getDialect().upsertValue()) + .execute(valueToMap(value, persistent)) + .map(v -> value)); }).andThen(ar -> { if (ar.succeeded()) - log.debug("Put value with id: {} successfully", value.getId()); + getLogger().debug("Put value with id: {} successfully", value.getId()); else - log.error("Failed to put value with id: {}", value.getId(), ar.cause()); + getLogger().error("Failed to put value with id: {}", value.getId(), ar.cause()); }).recover(cause -> Future.failedFuture(new DataStorageException("Database operation failed", cause)) ); @@ -302,19 +164,19 @@ public Future putValue(Value value, boolean persistent, int expectedSeque @Override public Future getValue(Id id) { - log.debug("Getting value with id: {}", id); - return client.preparedQuery(selectValueById()) - .execute(Tuple.of(id.bytes())) - .map(rows -> findUniqueOrNull(rows, DatabaseStorage::rowToValue)) + getLogger().debug("Getting value with id: {}", id); + return SqlTemplate.forQuery(getClient(), getDialect().selectValueById()) + .execute(Map.of("id", id.bytes())) + .map(rows -> findUnique(rows, DatabaseStorage::rowToValue)) .andThen(ar -> { if (ar.succeeded()) { if (ar.result() != null) - log.debug("Got value with id: {}", id); + getLogger().debug("Got value with id: {}", id); else //noinspection LoggingSimilarMessage - log.debug("No value found with id: {}", id); + getLogger().debug("No value found with id: {}", id); } else { - log.error("Failed to get value with id: {}", id, ar.cause()); + getLogger().error("Failed to get value with id: {}", id, ar.cause()); } }).recover(cause -> Future.failedFuture(new DataStorageException("Database operation failed", cause)) @@ -323,7 +185,7 @@ public Future getValue(Id id) { @Override public Future> getValues() { - return client.preparedQuery(selectAllValues()) + return query(getDialect().selectAllValues()) .execute() .map(rows -> findMany(rows, DatabaseStorage::rowToValue)) .recover(cause -> @@ -333,8 +195,8 @@ public Future> getValues() { @Override public Future> getValues(int offset, int limit) { - return client.preparedQuery(selectAllValuesPaginated()) - .execute(Tuple.of(limit, offset)) + return SqlTemplate.forQuery(getClient(), getDialect().selectAllValuesPaginated()) + .execute(Map.of("limit", limit, "offset", offset)) .map(rows -> findMany(rows, DatabaseStorage::rowToValue)) .recover(cause -> Future.failedFuture(new DataStorageException("Database operation failed", cause)) @@ -343,8 +205,8 @@ public Future> getValues(int offset, int limit) { @Override public Future> getValues(boolean persistent, long announcedBefore) { - return client.preparedQuery(selectValuesByPersistentAndAnnouncedBefore()) - .execute(Tuple.of(persistent, announcedBefore)) + return SqlTemplate.forQuery(getClient(), getDialect().selectValuesByPersistentAndAnnouncedBefore()) + .execute(Map.of("persistent", persistent, "updatedBefore", announcedBefore)) .map(rows -> findMany(rows, DatabaseStorage::rowToValue)) .recover(cause -> Future.failedFuture(new DataStorageException("Database operation failed", cause)) @@ -353,8 +215,12 @@ public Future> getValues(boolean persistent, long announcedBefore) { @Override public Future> getValues(boolean persistent, long announcedBefore, int offset, int limit) { - return client.preparedQuery(selectValuesByPersistentAndAnnouncedBeforePaginated()) - .execute(Tuple.of(persistent, announcedBefore, limit, offset)) + return SqlTemplate.forQuery(getClient(), getDialect().selectValuesByPersistentAndAnnouncedBeforePaginated()) + .execute(Map.of( + "persistent", persistent, + "updatedBefore", announcedBefore, + "limit", limit, + "offset", offset)) .map(rows -> findMany(rows, DatabaseStorage::rowToValue)) .recover(cause -> Future.failedFuture(new DataStorageException("Database operation failed", cause)) @@ -363,44 +229,46 @@ public Future> getValues(boolean persistent, long announcedBefore, i @Override public Future updateValueAnnouncedTime(Id id) { - log.debug("Updating value announced time with id: {}", id); + getLogger().debug("Updating value announced time with id: {}", id); long now = System.currentTimeMillis(); - return client.preparedQuery(updateValueAnnouncedById()) - .execute(Tuple.of(now, id.bytes())) - .map(v -> v.rowCount() > 0 ? now : 0L) - .andThen(ar -> { - if (ar.succeeded()) { - if (ar.result() != 0) - log.debug("Updated value announced time with id: {}", id); - else - //noinspection LoggingSimilarMessage - log.debug("No value found with id: {}", id); - } else { - log.error("Failed to update value announced time with id: {}", id, ar.cause()); - } - }).recover(cause -> - Future.failedFuture(new DataStorageException("Database operation failed", cause)) - ); + return withTransaction(c -> + SqlTemplate.forUpdate(c, getDialect().updateValueAnnouncedById()) + .execute(Map.of("id", id.bytes(), "updated", now)) + .map(r -> r.rowCount() > 0 ? now : 0L) + ).andThen(ar -> { + if (ar.succeeded()) { + if (ar.result() != 0) + getLogger().debug("Updated value announced time with id: {}", id); + else + //noinspection LoggingSimilarMessage + getLogger().debug("No value found with id: {}", id); + } else { + getLogger().error("Failed to update value announced time with id: {}", id, ar.cause()); + } + }).recover(cause -> + Future.failedFuture(new DataStorageException("Database operation failed", cause)) + ); } @Override public Future removeValue(Id id) { - log.debug("Removing value with id: {}", id); - return client.preparedQuery(deleteValueById()) - .execute(Tuple.of(id.bytes())) - .map(rowSet -> rowSet.rowCount() >= 1) - .andThen(ar -> { - if (ar.succeeded()) { - if (ar.result()) - log.debug("Removed value with id: {}", id); - else - log.debug("No value found with id: {}", id); - } else { - log.error("Failed to remove value with id: {}", id, ar.cause()); - } - }).recover(cause -> - Future.failedFuture(new DataStorageException("Database operation failed", cause)) - ); + getLogger().debug("Removing value with id: {}", id); + return withTransaction(c -> + SqlTemplate.forUpdate(c, getDialect().deleteValueById()) + .execute(Map.of("id", id.bytes())) + .map(this::hasEffectedRows) + ).andThen(ar -> { + if (ar.succeeded()) { + if (ar.result()) + getLogger().debug("Removed value with id: {}", id); + else + getLogger().debug("No value found with id: {}", id); + } else { + getLogger().error("Failed to remove value with id: {}", id, ar.cause()); + } + }).recover(cause -> + Future.failedFuture(new DataStorageException("Database operation failed", cause)) + ); } @Override @@ -410,92 +278,46 @@ public Future putPeer(PeerInfo peerInfo) { @Override public Future putPeer(PeerInfo peerInfo, boolean persistent) { - return putPeer(client, peerInfo, persistent); - } - - protected Future putPeer(SqlClient sqlClient, PeerInfo peerInfo, boolean persistent) { - log.debug("Putting peer with id: {} @ {}, persistent: {}", peerInfo.getId(), peerInfo.getNodeId(), persistent); - log.debug("Trying to check the existing peer with id: {} @ {}", peerInfo.getId(), peerInfo.getNodeId()); - return getPeer(sqlClient, peerInfo.getId(), peerInfo.getNodeId()).compose(existing -> { + getLogger().debug("Putting peer with id: {} @ {}, persistent: {}", peerInfo.getId(), peerInfo.getNodeId(), persistent); + getLogger().debug("Trying to check the existing peer with id: {} @ {}", peerInfo.getId(), peerInfo.getNodeId()); + return getPeer(peerInfo.getId(), peerInfo.getNodeId()).compose(existing -> { if (existing != null && existing.hasPrivateKey() && !peerInfo.hasPrivateKey()) { // Skip update if the existing peer info is owned by this node and the new peer info is not. - // Should not throw NotOwnerException, just silently ignores to avoid disrupting valid operations. - log.info("Skipping to update peer for id {} @ {}: owned by this node", peerInfo.getId(), peerInfo.getNodeId()); + // Should not throw NotOwnerException, just silently ignore to avoid disrupting valid operations. + getLogger().info("Skipping to update peer for id {} @ {}: owned by this node", peerInfo.getId(), peerInfo.getNodeId()); return Future.succeededFuture(peerInfo); } - long now = System.currentTimeMillis(); - return sqlClient.preparedQuery(upsertPeer()) - .execute(Tuple.of(peerInfo.getId().bytes(), - peerInfo.getNodeId().bytes(), - persistent, - peerInfo.getPrivateKey(), - peerInfo.getOrigin() != null ? peerInfo.getOrigin().bytes() : null, - peerInfo.getPort(), - peerInfo.getAlternativeURI(), - peerInfo.getSignature(), - now, - now)) - .map(v -> peerInfo); + return withTransaction(c -> + SqlTemplate.forUpdate(c, getDialect().upsertPeer()) + .execute(peerToMap(peerInfo, persistent)) + .map(v -> peerInfo)); }).andThen(ar -> { if (ar.succeeded()) - log.debug("Put peer with id: {} @ {} successfully", peerInfo.getId(), peerInfo.getNodeId()); + getLogger().debug("Put peer with id: {} @ {} successfully", peerInfo.getId(), peerInfo.getNodeId()); else - log.error("Failed to put peer with id: {} @ {}", peerInfo.getId(), peerInfo.getNodeId(), ar.cause()); + getLogger().error("Failed to put peer with id: {} @ {}", peerInfo.getId(), peerInfo.getNodeId(), ar.cause()); }).recover(cause -> Future.failedFuture(new DataStorageException("Database operation failed", cause)) ); } - /* noinspection - protected Future putPeersSequentially(SqlClient sqlClient, List peerInfos, int index) { - return index >= peerInfos.size() ? - Future.succeededFuture() : - putPeer(sqlClient, peerInfos.get(index), false).compose(r -> - putPeersSequentially(sqlClient, peerInfos, index + 1) - ); - } - @Override public Future> putPeers(List peerInfos) { if (peerInfos.isEmpty()) return Future.succeededFuture(peerInfos); - return pool().withTransaction(connection -> - putPeersSequentially(connection, peerInfos, 0) - .map(v -> peerInfos) - ); - } - */ - - @Override - public Future> putPeers(List peerInfos) { - if (peerInfos.isEmpty()) - return Future.succeededFuture(peerInfos); + List> params = peerInfos.stream().map(p -> peerToMap(p, false)).toList(); - long now = System.currentTimeMillis(); - List tuples = peerInfos.stream().map(peerInfo -> Tuple.of( - peerInfo.getId().bytes(), - peerInfo.getNodeId().bytes(), - false, - peerInfo.getPrivateKey(), - peerInfo.getOrigin() != null ? peerInfo.getOrigin().bytes() : null, - peerInfo.getPort(), - peerInfo.getAlternativeURI(), - peerInfo.getSignature(), - now, - now - )).toList(); - - return pool().withTransaction(connection -> - connection.preparedQuery(upsertPeer()) - .executeBatch(tuples) + return withTransaction(c -> + SqlTemplate.forUpdate(c, getDialect().upsertPeer()) + .executeBatch(params) .map(v -> peerInfos) ).andThen(ar -> { if (ar.succeeded()) - log.debug("Put {} peers successfully", peerInfos.size()); + getLogger().debug("Put {} peers successfully", peerInfos.size()); else - log.error("Failed to put peers", ar.cause()); + getLogger().error("Failed to put peers", ar.cause()); }).recover(cause -> Future.failedFuture(new DataStorageException("Database operation failed", cause)) ); @@ -503,23 +325,19 @@ public Future> putPeers(List peerInfos) { @Override public Future getPeer(Id id, Id nodeId) { - return getPeer(client, id, nodeId); - } - - protected Future getPeer(SqlClient sqlClient, Id id, Id nodeId) { - log.debug("Getting peer with id: {} @ {}", id, nodeId); - return sqlClient.preparedQuery(selectPeerByIdAndNodeId()) - .execute(Tuple.of(id.bytes(), nodeId.bytes())) - .map(rows -> findUniqueOrNull(rows, DatabaseStorage::rowToPeer)) + getLogger().debug("Getting peer with id: {} @ {}", id, nodeId); + return SqlTemplate.forQuery(getClient(), getDialect().selectPeerByIdAndNodeId()) + .execute(Map.of("id", id.bytes(), "nodeId", nodeId.bytes())) + .map(rows -> findUnique(rows, DatabaseStorage::rowToPeer)) .andThen(ar -> { if (ar.succeeded()) { if (ar.result() != null) - log.debug("Got peer with id: {} @ {}", id, nodeId); + getLogger().debug("Got peer with id: {} @ {}", id, nodeId); else //noinspection LoggingSimilarMessage - log.debug("No peer found with id: {} @ {}", id, nodeId); + getLogger().debug("No peer found with id: {} @ {}", id, nodeId); } else { - log.error("Failed to get peer with id: {} @ {}", id, nodeId, ar.cause()); + getLogger().error("Failed to get peer with id: {} @ {}", id, nodeId, ar.cause()); } }).recover(cause -> Future.failedFuture(new DataStorageException("Database operation failed", cause)) @@ -528,19 +346,19 @@ protected Future getPeer(SqlClient sqlClient, Id id, Id nodeId) { @Override public Future> getPeers(Id id) { - log.debug("Getting peers with id: {}", id); - return client.preparedQuery(selectPeersById()) - .execute(Tuple.of(id.bytes())) + getLogger().debug("Getting peers with id: {}", id); + return SqlTemplate.forQuery(getClient(), getDialect().selectPeersById()) + .execute(Map.of("id", id.bytes())) .map(rows -> findMany(rows, DatabaseStorage::rowToPeer)) .andThen(ar -> { if (ar.succeeded()) { if (!ar.result().isEmpty()) - log.debug("Got peers with id: {}", id); + getLogger().debug("Got peers with id: {}", id); else //noinspection LoggingSimilarMessage - log.debug("No peers found with id: {}", id); + getLogger().debug("No peers found with id: {}", id); } else { - log.error("Failed to get peers with id: {}", id, ar.cause()); + getLogger().error("Failed to get peers with id: {}", id, ar.cause()); } }).recover(cause -> Future.failedFuture(new DataStorageException("Database operation failed", cause)) @@ -549,7 +367,7 @@ public Future> getPeers(Id id) { @Override public Future> getPeers() { - return client.preparedQuery(selectAllPeers()) + return query(getDialect().selectAllPeers()) .execute() .map(rows -> findMany(rows, DatabaseStorage::rowToPeer)) .recover(cause -> @@ -559,8 +377,8 @@ public Future> getPeers() { @Override public Future> getPeers(int offset, int limit) { - return client.preparedQuery(selectAllPeersPaginated()) - .execute(Tuple.of(limit, offset)) + return SqlTemplate.forQuery(getClient(), getDialect().selectAllPeersPaginated()) + .execute(Map.of("limit", limit, "offset", offset)) .map(rows -> findMany(rows, DatabaseStorage::rowToPeer)) .recover(cause -> Future.failedFuture(new DataStorageException("Database operation failed", cause)) @@ -569,8 +387,8 @@ public Future> getPeers(int offset, int limit) { @Override public Future> getPeers(boolean persistent, long announcedBefore) { - return client.preparedQuery(selectPeersByPersistentAndAnnouncedBefore()) - .execute(Tuple.of(persistent, announcedBefore)) + return SqlTemplate.forQuery(getClient(), getDialect().selectPeersByPersistentAndAnnouncedBefore()) + .execute(Map.of("persistent", persistent, "updatedBefore", announcedBefore)) .map(rows -> findMany(rows, DatabaseStorage::rowToPeer)) .recover(cause -> Future.failedFuture(new DataStorageException("Database operation failed", cause)) @@ -579,8 +397,12 @@ public Future> getPeers(boolean persistent, long announcedBefore) @Override public Future> getPeers(boolean persistent, long announcedBefore, int offset, int limit) { - return client.preparedQuery(selectPeersByPersistentAndAnnouncedBeforePaginated()) - .execute(Tuple.of(persistent, announcedBefore, limit, offset)) + return SqlTemplate.forQuery(getClient(), getDialect().selectPeersByPersistentAndAnnouncedBeforePaginated()) + .execute(Map.of( + "persistent", persistent, + "updatedBefore", announcedBefore, + "limit", limit, + "offset", offset)) .map(rows -> findMany(rows, DatabaseStorage::rowToPeer)) .recover(cause -> Future.failedFuture(new DataStorageException("Database operation failed", cause)) @@ -589,138 +411,139 @@ public Future> getPeers(boolean persistent, long announcedBefore, @Override public Future updatePeerAnnouncedTime(Id id, Id nodeId) { - log.debug("Updating peer announced time with id: {} @ {}", id, nodeId); + getLogger().debug("Updating peer announced time with id: {} @ {}", id, nodeId); long now = System.currentTimeMillis(); - return client.preparedQuery(updatePeerAnnouncedByIdAndNodeId()) - .execute(Tuple.of(now, id.bytes(), nodeId.bytes())) - .map(v -> v.rowCount() > 0 ? now : 0L) - .andThen(ar -> { - if (ar.succeeded()) { - if (ar.result() != 0) - log.debug("Updated peer announced time with id: {} @ {}", id, nodeId); - else - log.debug("No peer found with id: {} @ {}", id, nodeId); - } else { - log.error("Failed to update peer announced time with id: {} @ {}", id, nodeId, ar.cause()); - } - }).recover(cause -> - Future.failedFuture(new DataStorageException("Database operation failed", cause)) - ); + return withTransaction(c -> + SqlTemplate.forUpdate(c, getDialect().updatePeerAnnouncedByIdAndNodeId()) + .execute(Map.of("id", id.bytes(), "nodeId", nodeId.bytes(), "updated", now)) + .map(r -> r.rowCount() > 0 ? now : 0L) + ).andThen(ar -> { + if (ar.succeeded()) { + if (ar.result() != 0) + getLogger().debug("Updated peer announced time with id: {} @ {}", id, nodeId); + else + getLogger().debug("No peer found with id: {} @ {}", id, nodeId); + } else { + getLogger().error("Failed to update peer announced time with id: {} @ {}", id, nodeId, ar.cause()); + } + }).recover(cause -> + Future.failedFuture(new DataStorageException("Database operation failed", cause)) + ); } @Override public Future removePeer(Id id, Id nodeId) { - log.debug("Removing peer with id: {} @ {}", id, nodeId); - return client.preparedQuery(deletePeerByIdAndNodeId()) - .execute(Tuple.of(id.bytes(), nodeId.bytes())) - .map(rowSet -> rowSet.rowCount() >= 1) - .andThen(ar -> { - if (ar.succeeded()) { - if (ar.result()) - log.debug("Removed peer with id: {} @ {}", id, nodeId); - else - //noinspection LoggingSimilarMessage - log.debug("No peer found with id: {} @ {}", id, nodeId); - } else { - log.error("Failed to remove peer with id: {} @ {}", id, nodeId, ar.cause()); - } - }).recover(cause -> - Future.failedFuture(new DataStorageException("Database operation failed", cause)) - ); + getLogger().debug("Removing peer with id: {} @ {}", id, nodeId); + return withTransaction(c -> + SqlTemplate.forUpdate(c, getDialect().deletePeerByIdAndNodeId()) + .execute(Map.of("id", id.bytes(), "nodeId", nodeId.bytes())) + .map(this::hasEffectedRows) + ).andThen(ar -> { + if (ar.succeeded()) { + if (ar.result()) + getLogger().debug("Removed peer with id: {} @ {}", id, nodeId); + else + //noinspection LoggingSimilarMessage + getLogger().debug("No peer found with id: {} @ {}", id, nodeId); + } else { + getLogger().error("Failed to remove peer with id: {} @ {}", id, nodeId, ar.cause()); + } + }).recover(cause -> + Future.failedFuture(new DataStorageException("Database operation failed", cause)) + ); } @Override public Future removePeers(Id id) { - log.debug("Removing peers with id: {}", id); - return client.preparedQuery(deletePeersById()) - .execute(Tuple.of(id.bytes())) - .map(rowSet -> rowSet.rowCount() >= 1) - .andThen(ar -> { - if (ar.succeeded()) { - if (ar.result()) - log.debug("Removed peers with id: {}", id); - else - log.debug("No peers found with id: {}", id); - } else { - log.error("Failed to remove peers with id: {}", id, ar.cause()); - } - }).recover(cause -> - Future.failedFuture(new DataStorageException("Database operation failed", cause)) - ); - } - - @SuppressWarnings("SameParameterValue") - protected static int rowToInteger(Row row, int defaultValue) { - return row != null ? row.getInteger(0) : defaultValue; + getLogger().debug("Removing peers with id: {}", id); + return withTransaction(c -> + SqlTemplate.forUpdate(c, getDialect().deletePeersById()) + .execute(Map.of("id", id.bytes())) + .map(this::hasEffectedRows) + ).andThen(ar -> { + if (ar.succeeded()) { + if (ar.result()) + getLogger().debug("Removed peers with id: {}", id); + else + getLogger().debug("No peers found with id: {}", id); + } else { + getLogger().error("Failed to remove peers with id: {}", id, ar.cause()); + } + }).recover(cause -> + Future.failedFuture(new DataStorageException("Database operation failed", cause)) + ); } - protected static int rowToInteger(Row row) { - return rowToInteger(row, 0); + protected static Map valueToMap(Value value, boolean persistent) { + Map map = new HashMap<>(); + map.put("id", value.getId().bytes()); + map.put("publicKey", value.getPublicKey() != null ? value.getPublicKey().bytes() : null); + map.put("privateKey", value.getPrivateKey()); + map.put("recipient", value.getRecipient() != null ? value.getRecipient().bytes() : null); + map.put("nonce", value.getNonce()); + map.put("signature", value.getSignature()); + map.put("sequenceNumber", value.getSequenceNumber()); + map.put("data", value.getData()); + map.put("persistent", persistent); + long now = System.currentTimeMillis(); + map.put("created", now); + map.put("updated", now); + return map; } protected static Value rowToValue(Row row) { - if (row == null) - return null; - - // Column indices: - // 0: id (BLOB, NOT NULL) - // 1: persistent (BOOLEAN, NOT NULL) - // 2: publicKey (BLOB, nullable) - // 3: privateKey (BLOB, nullable) - // 4: recipient (BLOB, nullable) - // 5: nonce (BLOB, nullable) - // 6: signature (BLOB, nullable) - // 7: sequenceNumber (INTEGER, NOT NULL) - // 8: data (BLOB, nullable) - // 9: created (BIGINT, NOT NULL) - // 10: updated (BIGINT, NOT NULL) - - // skip 0:id - // skip 1:persistent - Buffer buffer = row.getBuffer(2); - Id publicKey = buffer == null ? null : Id.of(buffer.getBytes()); - buffer = row.getBuffer(3); + Id publicKey = getId(row, "public_key"); + Buffer buffer = row.getBuffer("private_key"); byte[] privateKey = buffer == null ? null : buffer.getBytes(); - buffer = row.getBuffer(4); - Id recipient = buffer == null ? null : Id.of(buffer.getBytes()); - buffer = row.getBuffer(5); + Id recipient = getId(row, "recipient"); + buffer = row.getBuffer("nonce"); byte[] nonce = buffer == null ? null : buffer.getBytes(); - buffer = row.getBuffer(6); + buffer = row.getBuffer("signature"); byte[] signature = buffer == null ? null : buffer.getBytes(); - int sequenceNumber = row.getInteger(7); // NOT NULL - buffer = row.getBuffer(8); + int sequenceNumber = row.getInteger("sequence_number"); // NOT NULL + buffer = row.getBuffer("data"); byte[] data = buffer == null ? null : buffer.getBytes(); return Value.of(publicKey, privateKey, recipient, nonce, sequenceNumber, signature, data); } + protected static Map peerToMap(PeerInfo peerInfo, boolean persistent) { + Map map = new HashMap<>(); + map.put("id", peerInfo.getId().bytes()); + map.put("nodeId", peerInfo.getNodeId().bytes()); + map.put("privateKey", peerInfo.getPrivateKey()); + map.put("origin", peerInfo.getOrigin() != null ? peerInfo.getOrigin().bytes() : null); + map.put("port", peerInfo.getPort()); + map.put("alternativeUri", peerInfo.getAlternativeURI()); + map.put("signature", peerInfo.getSignature()); + map.put("persistent", persistent); + long now = System.currentTimeMillis(); + map.put("created", now); + map.put("updated", now); + return map; + } + protected static PeerInfo rowToPeer(Row row) { - if (row == null) - return null; - - // Column indices: - // 0: id (BLOB, NOT NULL) - // 1: nodeId (BLOB, NOT NULL) - // 2: persistent (BOOLEAN, NOT NULL) - // 3: privateKey (BLOB, nullable) - // 4: origin (BLOB, nullable) - // 5: port (INTEGER, NOT NULL) - // 6: alternativeURI (TEXT, nullable) - // 7: signature (BLOB, nullable) - // 8: created (BIGINT, NOT NULL) - // 9: updated (BIGINT, NOT NULL) - - Id id = Id.of(row.getBuffer(0).getBytes()); // NOT NULL - Id nodeId = Id.of(row.getBuffer(1).getBytes()); // NOT NULL - Buffer buffer = row.getBuffer(3); + Id id = getId(row, "id"); + Id nodeId = getId(row, "node_id"); + Buffer buffer = row.getBuffer("private_key"); byte[] privateKey = buffer == null ? null : buffer.getBytes(); - buffer = row.getBuffer(4); - Id origin = buffer == null ? null : Id.of(buffer.getBytes()); - int port = row.getInteger(5); // NOT NULL - String alternativeURI = row.getString(6); // Nullable - buffer = row.getBuffer(7); + Id origin = getId(row, "origin"); + int port = row.getInteger("port"); + String alternativeURI = row.getString("alternative_uri"); + buffer = row.getBuffer("signature"); byte[] signature = buffer == null ? null : buffer.getBytes(); return PeerInfo.of(id, privateKey, nodeId, origin, port, alternativeURI, signature); } + + private static Id getId(Row row, String column) { + Buffer buf = row.getBuffer(column); + return buf == null ? null : Id.of(buf.getBytes()); + } + + @Override + public Future close() { + return getClient().close(); + } } \ No newline at end of file diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/storage/InMemoryStorage.java b/dht/src/main/java/io/bosonnetwork/kademlia/storage/InMemoryStorage.java index 08744d2..77c4370 100644 --- a/dht/src/main/java/io/bosonnetwork/kademlia/storage/InMemoryStorage.java +++ b/dht/src/main/java/io/bosonnetwork/kademlia/storage/InMemoryStorage.java @@ -42,9 +42,9 @@ import io.bosonnetwork.kademlia.exceptions.SequenceNotMonotonic; public class InMemoryStorage implements DataStorage { - public static final String STORAGE_URL = "inmemory"; + public static final String STORAGE_URI = "inmemory"; - private static final int SCHEMA_VERSION = 5; + private static final int SCHEMA_VERSION = 1; private static final int DEFAULT_MAP_CAPACITY = 32; diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/storage/PostgresStorage.java b/dht/src/main/java/io/bosonnetwork/kademlia/storage/PostgresStorage.java index f994c4c..2a294bf 100644 --- a/dht/src/main/java/io/bosonnetwork/kademlia/storage/PostgresStorage.java +++ b/dht/src/main/java/io/bosonnetwork/kademlia/storage/PostgresStorage.java @@ -22,79 +22,34 @@ package io.bosonnetwork.kademlia.storage; -import java.util.List; +import java.net.URL; +import java.nio.file.Path; import io.vertx.core.Vertx; import io.vertx.pgclient.PgBuilder; import io.vertx.pgclient.PgConnectOptions; +import io.vertx.sqlclient.Pool; import io.vertx.sqlclient.PoolOptions; +import io.vertx.sqlclient.SqlClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PostgresStorage extends DatabaseStorage implements DataStorage { - protected static final String STORAGE_URL_PREFIX = "postgresql://"; + protected static final String STORAGE_URI_PREFIX = "postgresql://"; - private static final List SCHEMA = List.of( - // Schema version - """ - CREATE TABLE IF NOT EXISTS schema_version ( - id INTEGER NOT NULL PRIMARY KEY CHECK (id = 1), - version INTEGER NOT NULL - ) - """, - // Table values - """ - CREATE TABLE IF NOT EXISTS valores ( - id BYTEA NOT NULL PRIMARY KEY, - persistent BOOLEAN NOT NULL DEFAULT FALSE, - publicKey BYTEA, - privateKey BYTEA, - recipient BYTEA, - nonce BYTEA, - signature BYTEA, - sequenceNumber INTEGER NOT NULL DEFAULT 0, - data BYTEA NOT NULL, - created BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, - updated BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000 - ) - """, - // Partial index for persistent + announced queries - "CREATE INDEX IF NOT EXISTS idx_valores_persistent_true_updated ON valores (updated DESC) WHERE persistent = TRUE", - // Partial index for non-persistent + updated queries - "CREATE INDEX IF NOT EXISTS idx_valores_persistent_false_updated ON valores (updated DESC) WHERE persistent = FALSE", - // Full index for all values - "CREATE INDEX IF NOT EXISTS idx_valores_updated ON valores (updated DESC)", - // Table peers - """ - CREATE TABLE IF NOT EXISTS peers ( - id BYTEA NOT NULL, - nodeId BYTEA NOT NULL, - persistent BOOLEAN NOT NULL DEFAULT FALSE, - privateKey BYTEA, - origin BYTEA, - port INTEGER NOT NULL, - alternativeURI VARCHAR(512), - signature BYTEA NOT NULL, - created BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, - updated BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, - PRIMARY KEY (id, nodeId) - ) - """, - // Partial index for persistent + announced queries - "CREATE INDEX IF NOT EXISTS idx_peers_persistent_true_updated ON peers (updated DESC) WHERE persistent = TRUE", - // Partial index for non-persistent + updated queries - "CREATE INDEX IF NOT EXISTS idx_peers_persistent_false_updated ON peers (updated DESC) WHERE persistent = FALSE", - // Full index IF NOT EXISTS for all values - "CREATE INDEX IF NOT EXISTS idx_peers_updated ON peers (updated DESC)", - // Initialize the schema version - "INSERT INTO schema_version (id, version) VALUES (1, " + SCHEMA_VERSION + ") ON CONFLICT (id) DO NOTHING" - ); + private final String connectionUri; + private Pool client; + private SqlDialect sqlDialect; + + private static final Logger log = LoggerFactory.getLogger(PostgresStorage.class); protected PostgresStorage(String connectionUri) { - super(connectionUri); + this.connectionUri = connectionUri; } // postgresql://[user[:password]@][host][:port][,...][/dbname][?param1=value1&...] @Override - protected void setupSqlClient(Vertx vertx, String connectionUri) { + protected void init(Vertx vertx) { PgConnectOptions connectOptions = PgConnectOptions.fromUri(connectionUri); PoolOptions poolOptions = new PoolOptions().setMaxSize(8); client = PgBuilder.pool() @@ -102,156 +57,30 @@ protected void setupSqlClient(Vertx vertx, String connectionUri) { .connectingTo(connectOptions) .using(vertx) .build(); + sqlDialect = new SqlDialect() {}; } @Override - protected List createSchemaStatements() { - return SCHEMA; - } - - @Override - protected String selectSchemaVersion() { - return "SELECT version FROM schema_version WHERE id = 1"; - } - - @Override - protected String insertSchemaVersion() { - return "INSERT INTO schema_version (id, version) VALUES (1, $1)"; - } - - @Override - protected String selectValueById() { - return "SELECT * FROM valores WHERE id = $1"; - } - - @Override - protected String selectValuesByPersistentAndAnnouncedBefore() { - return "SELECT * FROM valores WHERE persistent = $1 AND updated <= $2 ORDER BY updated DESC, id"; - } - - @Override - protected String selectValuesByPersistentAndAnnouncedBeforePaginated() { - return "SELECT * FROM valores WHERE persistent = $1 AND updated <= $2 ORDER BY updated DESC, id LIMIT $3 OFFSET $4"; - } - - @Override - protected String selectAllValues() { - return "SELECT * FROM valores ORDER BY updated DESC, id"; - } - - @Override - protected String selectAllValuesPaginated() { - return "SELECT * FROM valores ORDER BY updated DESC, id LIMIT $1 OFFSET $2"; - } - - /* noinspection - privateKey = CASE - WHEN excluded.privateKey IS NOT NULL THEN excluded.privateKey - ELSE peers.privateKey - END, - */ - - @Override - protected String upsertValue() { - return """ - INSERT INTO valores ( - id, persistent, publicKey, privateKey, recipient, nonce, signature, - sequenceNumber, data, created, updated - ) VALUES ( - $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 - ) ON CONFLICT(id) DO UPDATE SET - persistent = excluded.persistent, - publicKey = excluded.publicKey, - privateKey = excluded.privateKey, - recipient = excluded.recipient, - nonce = excluded.nonce, - signature = excluded.signature, - sequenceNumber = excluded.sequenceNumber, - data = excluded.data, - updated = excluded.updated - """; - } - - @Override - protected String updateValueAnnouncedById() { - return "UPDATE valores SET updated = $1 WHERE id = $2"; - } - - @Override - protected String deleteValueById() { - return "DELETE FROM valores WHERE id = $1"; - } + protected Path getSchemaPath() { + URL schemaPath = getClass().getClassLoader().getResource("db/postgres"); + if (schemaPath == null || schemaPath.getPath() == null) + throw new IllegalStateException("Migration path not exists"); - @Override - protected String deleteNonPersistentValuesAnnouncedBefore() { - return "DELETE FROM valores WHERE persistent = FALSE AND updated < $1"; - } - - @Override - protected String selectPeerByIdAndNodeId() { - return "SELECT * FROM peers WHERE id = $1 AND nodeId = $2"; - } - - @Override - protected String selectPeersById() { - return "SELECT * FROM peers WHERE id = $1 ORDER BY updated DESC, nodeId"; - } - - @Override - protected String selectPeersByPersistentAndAnnouncedBefore() { - return "SELECT * FROM peers WHERE persistent = $1 AND updated <= $2 ORDER BY updated DESC, id, nodeId"; - } - - @Override - protected String selectPeersByPersistentAndAnnouncedBeforePaginated() { - return "SELECT * FROM peers WHERE persistent = $1 AND updated <= $2 ORDER BY updated DESC, id, nodeId LIMIT $3 OFFSET $4"; - } - - @Override - protected String selectAllPeers() { - return "SELECT * FROM peers ORDER BY updated DESC, id, nodeId"; - } - - @Override - protected String selectAllPeersPaginated() { - return "SELECT * FROM peers ORDER BY updated DESC, id, nodeId LIMIT $1 OFFSET $2"; - } - - @Override - protected String upsertPeer() { - return """ - INSERT INTO peers ( - id, nodeId, persistent, privateKey, origin, port, - alternativeURI, signature, created, updated - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) - ON CONFLICT(id, nodeId) DO UPDATE SET - persistent = excluded.persistent, - privateKey = excluded.privateKey, - origin = excluded.origin, - port = excluded.port, - alternativeURI = excluded.alternativeURI, - signature = excluded.signature, - updated = excluded.updated - """; - } - - @Override - protected String updatePeerAnnouncedByIdAndNodeId() { - return "UPDATE peers SET updated = $1 WHERE id = $2 AND nodeId = $3"; + return Path.of(schemaPath.getPath()); } @Override - protected String deletePeerByIdAndNodeId() { - return "DELETE FROM peers WHERE id = $1 AND nodeId = $2"; + public SqlClient getClient() { + return client; } @Override - protected String deletePeersById() { - return "DELETE FROM peers WHERE id = $1"; + protected SqlDialect getDialect() { + return sqlDialect; } @Override - protected String deleteNonPersistentPeersAnnouncedBefore() { - return "DELETE FROM peers WHERE persistent = FALSE AND updated < $1"; + protected Logger getLogger() { + return log; } } \ No newline at end of file diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/storage/SQLiteStorage.java b/dht/src/main/java/io/bosonnetwork/kademlia/storage/SQLiteStorage.java index 21427d3..88a758d 100644 --- a/dht/src/main/java/io/bosonnetwork/kademlia/storage/SQLiteStorage.java +++ b/dht/src/main/java/io/bosonnetwork/kademlia/storage/SQLiteStorage.java @@ -22,239 +22,77 @@ package io.bosonnetwork.kademlia.storage; -import java.util.List; +import java.net.URL; +import java.nio.file.Path; import io.vertx.core.Vertx; import io.vertx.jdbcclient.JDBCPool; +import io.vertx.sqlclient.Pool; +import io.vertx.sqlclient.PoolOptions; +import io.vertx.sqlclient.SqlClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.sqlite.SQLiteDataSource; public class SQLiteStorage extends DatabaseStorage implements DataStorage { - protected static final String STORAGE_URL_PREFIX = "jdbc:sqlite:"; + protected static final String STORAGE_URI_PREFIX = "jdbc:sqlite:"; - private static final List SCHEMA = List.of( - // Schema version - """ - CREATE TABLE IF NOT EXISTS schema_version ( - id INTEGER NOT NULL PRIMARY KEY CHECK (id = 1), - version INTEGER NOT NULL - ) - """, - // Table values - """ - CREATE TABLE IF NOT EXISTS valores ( - id BLOB NOT NULL PRIMARY KEY, - persistent BOOLEAN NOT NULL DEFAULT FALSE, - publicKey BLOB, - privateKey BLOB, - recipient BLOB, - nonce BLOB, - signature BLOB, - sequenceNumber INTEGER NOT NULL DEFAULT 0, - data BLOB NOT NULL, - created INTEGER NOT NULL DEFAULT (CAST(unixepoch('subsec') * 1000 AS INTEGER)), - updated INTEGER NOT NULL DEFAULT (CAST(unixepoch('subsec') * 1000 AS INTEGER)) - ) WITHOUT ROWID - """, - // Partial index for persistent + announced queries - "CREATE INDEX IF NOT EXISTS idx_valores_persistent_true_updated ON valores (updated DESC) WHERE persistent = TRUE", - // Partial index for non-persistent + updated queries - "CREATE INDEX IF NOT EXISTS idx_valores_persistent_false_updated ON valores (updated DESC) WHERE persistent = FALSE", - // Full index for all values - "CREATE INDEX IF NOT EXISTS idx_valores_updated ON valores (updated DESC)", - // Table peers - """ - CREATE TABLE IF NOT EXISTS peers ( - id BLOB NOT NULL, - nodeId BLOB NOT NULL, - persistent BOOLEAN NOT NULL DEFAULT FALSE, - privateKey BLOB, - origin BLOB, - port INTEGER NOT NULL, - alternativeURI TEXT, - signature BLOB NOT NULL, - created INTEGER NOT NULL DEFAULT (CAST(unixepoch('subsec') * 1000 AS INTEGER)), - updated INTEGER NOT NULL DEFAULT (CAST(unixepoch('subsec') * 1000 AS INTEGER)), - PRIMARY KEY (id, nodeId) - ) WITHOUT ROWID - """, - // Partial index for persistent + announced queries - "CREATE INDEX IF NOT EXISTS idx_peers_persistent_true_updated ON peers (updated DESC) WHERE persistent = TRUE", - // Partial index for non-persistent + updated queries - "CREATE INDEX IF NOT EXISTS idx_peers_persistent_false_updated ON peers (updated DESC) WHERE persistent = FALSE", - // Full index for all values - "CREATE INDEX IF NOT EXISTS idx_peers_updated ON peers (updated DESC)", - // Initialize the schema version - "INSERT INTO schema_version (id, version) VALUES (1, " + SCHEMA_VERSION + ") ON CONFLICT (id) DO NOTHING" - ); + private final String connectionUri; + private Pool client; + private SqlDialect sqlDialect; + + private static final Logger log = LoggerFactory.getLogger(SQLiteStorage.class); protected SQLiteStorage(String connectionUri) { - super(connectionUri); + this.connectionUri = connectionUri; } @Override - protected void setupSqlClient(Vertx vertx, String connectionUri) { - /*/ + protected void init(Vertx vertx) { // Vert.x 5.x style - JDBCConnectOptions connectOptions = new JDBCConnectOptions() - .setJdbcUrl(connectionUri); + SQLiteDataSource dataSource = new SQLiteDataSource(); + dataSource.setUrl(connectionUri); + dataSource.setJournalMode("WAL"); + dataSource.setEnforceForeignKeys(true); + dataSource.setBusyTimeout(5000); + dataSource.setLockingMode("NORMAL"); + dataSource.setSharedCache(false); + dataSource.setFullSync(true); + // Single connection recommended for SQLite PoolOptions poolOptions = new PoolOptions().setMaxSize(1); - client = JDBCPool.pool(vertx, connectOptions, poolOptions); - */ + client = JDBCPool.pool(vertx, dataSource, poolOptions); + sqlDialect = new SqlDialect() {}; + /*/ // Vert.x 4.x style SQLiteDataSource ds = new SQLiteDataSource(); ds.setUrl(connectionUri); client = JDBCPool.pool(vertx, ds); + */ } @Override - protected List createSchemaStatements() { - return SCHEMA; - } - - @Override - protected String selectSchemaVersion() { - return "SELECT version FROM schema_version WHERE id = 1"; - } - - @Override - protected String insertSchemaVersion() { - return "INSERT INTO schema_version (id, version) VALUES (1, ?)"; - } - - @Override - protected String selectValueById() { - return "SELECT * FROM valores WHERE id = ?"; - } - - @Override - protected String selectValuesByPersistentAndAnnouncedBefore() { - return "SELECT * FROM valores WHERE persistent = ? AND updated <= ? ORDER BY updated DESC, id"; - } - - @Override - protected String selectValuesByPersistentAndAnnouncedBeforePaginated() { - return "SELECT * FROM valores WHERE persistent = ? AND updated <= ? ORDER BY updated DESC, id LIMIT ? OFFSET ?"; - } - - @Override - protected String selectAllValues() { - return "SELECT * FROM valores ORDER BY updated DESC, id"; - } - - @Override - protected String selectAllValuesPaginated() { - return "SELECT * FROM valores ORDER BY updated DESC, id LIMIT ? OFFSET ?"; - } - - /* noinspection - privateKey = CASE - WHEN excluded.privateKey IS NOT NULL THEN excluded.privateKey - ELSE peers.privateKey - END, - */ - - @Override - protected String upsertValue() { - return """ - INSERT INTO valores ( - id, persistent, publicKey, privateKey, recipient, nonce, signature, - sequenceNumber, data, created, updated - ) VALUES ( - ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? - ) ON CONFLICT(id) DO UPDATE SET - persistent = excluded.persistent, - publicKey = excluded.publicKey, - privateKey = excluded.privateKey, - recipient = excluded.recipient, - nonce = excluded.nonce, - signature = excluded.signature, - sequenceNumber = excluded.sequenceNumber, - data = excluded.data, - updated = excluded.updated - """; - } - - @Override - protected String updateValueAnnouncedById() { - return "UPDATE valores SET updated = ? WHERE id = ?"; - } - - protected String deleteValueById() { - return "DELETE FROM valores WHERE id = ?"; - } - - @Override - protected String deleteNonPersistentValuesAnnouncedBefore() { - return "DELETE FROM valores WHERE persistent = FALSE AND updated < ?"; - } - - @Override - protected String selectPeerByIdAndNodeId() { - return "SELECT * FROM peers WHERE id = ? AND nodeId = ?"; - } - - @Override - protected String selectPeersById() { - return "SELECT * FROM peers WHERE id = ? ORDER BY updated DESC, nodeId"; - } - - @Override - protected String selectPeersByPersistentAndAnnouncedBefore() { - return "SELECT * FROM peers WHERE persistent = ? AND updated <= ? ORDER BY updated DESC, id, nodeId"; - } - - @Override - protected String selectPeersByPersistentAndAnnouncedBeforePaginated() { - return "SELECT * FROM peers WHERE persistent = ? AND updated <= ? ORDER BY updated DESC, id, nodeId LIMIT ? OFFSET ?"; - } - - @Override - protected String selectAllPeers() { - return "SELECT * FROM peers ORDER BY updated DESC, id, nodeId"; - } + protected Path getSchemaPath() { + URL schemaPath = getClass().getClassLoader().getResource("db/sqlite"); + if (schemaPath == null || schemaPath.getPath() == null) + throw new IllegalStateException("Migration path not exists"); - @Override - protected String selectAllPeersPaginated() { - return "SELECT * FROM peers ORDER BY updated DESC, id, nodeId LIMIT ? OFFSET ?"; - } - - @Override - protected String upsertPeer() { - return """ - INSERT INTO peers ( - id, nodeId, persistent, privateKey, origin, port, - alternativeURI, signature, created, updated - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(id, nodeId) DO UPDATE SET - persistent = excluded.persistent, - privateKey = excluded.privateKey, - origin = excluded.origin, - port = excluded.port, - alternativeURI = excluded.alternativeURI, - signature = excluded.signature, - updated = excluded.updated - """; - } - - @Override - protected String updatePeerAnnouncedByIdAndNodeId() { - return "UPDATE peers SET updated = ? WHERE id = ? AND nodeId = ?"; + return Path.of(schemaPath.getPath()); } @Override - protected String deletePeerByIdAndNodeId() { - return "DELETE FROM peers WHERE id = ? AND nodeId = ?"; + public SqlClient getClient() { + return client; } @Override - protected String deletePeersById() { - return "DELETE FROM peers WHERE id = ?"; + protected SqlDialect getDialect() { + return sqlDialect; } @Override - protected String deleteNonPersistentPeersAnnouncedBefore() { - return "DELETE FROM peers WHERE persistent = FALSE AND updated < ?"; + protected Logger getLogger() { + return log; } } \ No newline at end of file diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/storage/SqlDialect.java b/dht/src/main/java/io/bosonnetwork/kademlia/storage/SqlDialect.java new file mode 100644 index 0000000..d0a82a1 --- /dev/null +++ b/dht/src/main/java/io/bosonnetwork/kademlia/storage/SqlDialect.java @@ -0,0 +1,142 @@ +package io.bosonnetwork.kademlia.storage; + +public interface SqlDialect { + default String upsertValue() { + return """ + INSERT INTO valores ( + id, public_key, private_key, recipient, nonce, signature, + sequence_number, data, persistent, created, updated + ) VALUES ( + #{id}, #{publicKey}, #{privateKey}, #{recipient}, #{nonce}, #{signature}, + #{sequenceNumber}, #{data}, #{persistent}, #{created}, #{updated} + ) ON CONFLICT(id) DO UPDATE SET + public_key = excluded.public_key, + private_key = CASE + WHEN excluded.private_key IS NOT NULL + THEN excluded.private_key + ELSE valores.private_key + END, + recipient = excluded.recipient, + nonce = excluded.nonce, + signature = excluded.signature, + sequence_number = excluded.sequence_number, + data = excluded.data, + persistent = excluded.persistent, + updated = excluded.updated + """; + } + + default String selectValueById() { + return "SELECT * FROM valores WHERE id = #{id}"; + } + + default String selectValuesByPersistentAndAnnouncedBefore() { + return """ + SELECT * FROM valores + WHERE persistent = #{persistent} AND updated <= #{updatedBefore} + ORDER BY updated DESC, id + """; + } + + default String selectValuesByPersistentAndAnnouncedBeforePaginated() { + return """ + SELECT * FROM valores + WHERE persistent = #{persistent} AND updated <= #{updatedBefore} + ORDER BY updated DESC, id + LIMIT #{limit} OFFSET #{offset} + """; + + } + + default String selectAllValues() { + return "SELECT * FROM valores ORDER BY updated DESC, id"; + } + + default String selectAllValuesPaginated() { + return "SELECT * FROM valores ORDER BY updated DESC, id LIMIT #{limit} OFFSET #{offset}"; + } + + default String updateValueAnnouncedById() { + return "UPDATE valores SET updated = #{updated} WHERE id = #{id}"; + } + + default String deleteValueById() { + return "DELETE FROM valores WHERE id = #{id}"; + } + + default String deleteNonPersistentValuesAnnouncedBefore() { + return "DELETE FROM valores WHERE persistent = FALSE AND updated < #{updatedBefore}"; + } + + default String upsertPeer() { + return """ + INSERT INTO peers ( + id, node_id, private_key, origin, port, alternative_uri, signature, + persistent, created, updated + ) VALUES ( + #{id}, #{nodeId}, #{privateKey}, #{origin}, #{port}, #{alternativeUri}, #{signature}, + #{persistent}, #{created}, #{updated} + ) ON CONFLICT(id, node_id) DO UPDATE SET + private_key = CASE + WHEN excluded.private_key IS NOT NULL + THEN excluded.private_key + ELSE peers.private_key + END, + origin = excluded.origin, + port = excluded.port, + alternative_uri = excluded.alternative_uri, + signature = excluded.signature, + persistent = excluded.persistent, + updated = excluded.updated + """; + } + + default String selectPeerByIdAndNodeId() { + return "SELECT * FROM peers WHERE id = #{id} AND node_id = #{nodeId}"; + } + + default String selectPeersById() { + return "SELECT * FROM peers WHERE id = #{id} ORDER BY updated DESC, node_id"; + } + + default String selectPeersByPersistentAndAnnouncedBefore() { + return """ + SELECT * FROM peers + WHERE persistent = #{persistent} AND updated <= #{updatedBefore} + ORDER BY updated DESC, id, node_id + """; + } + + default String selectPeersByPersistentAndAnnouncedBeforePaginated() { + return """ + SELECT * FROM peers + WHERE persistent = #{persistent} AND updated <= #{updatedBefore} + ORDER BY updated DESC, id, node_id + LIMIT #{limit} OFFSET #{offset} + """; + } + + default String selectAllPeers() { + return "SELECT * FROM peers ORDER BY updated DESC, id, node_id"; + } + + default String selectAllPeersPaginated() { + return "SELECT * FROM peers ORDER BY updated DESC, id, node_id LIMIT #{limit} OFFSET #{offset}"; + } + + default String updatePeerAnnouncedByIdAndNodeId() { + return "UPDATE peers SET updated = #{updated} WHERE id = #{id} AND node_id = #{nodeId}"; + } + + default String deletePeerByIdAndNodeId() { + return "DELETE FROM peers WHERE id = #{id} AND node_id = #{nodeId}"; + } + + default String deletePeersById() { + return "DELETE FROM peers WHERE id = #{id}"; + } + + default String deleteNonPersistentPeersAnnouncedBefore() { + return "DELETE FROM peers WHERE persistent = FALSE AND updated < #{updatedBefore}"; + } +} \ No newline at end of file diff --git a/dht/src/main/resources/db/postgres/1_initial_schema.sql b/dht/src/main/resources/db/postgres/1_initial_schema.sql new file mode 100644 index 0000000..b0d2a03 --- /dev/null +++ b/dht/src/main/resources/db/postgres/1_initial_schema.sql @@ -0,0 +1,43 @@ +-- Initial schema for Boson DHT node + +CREATE TABLE IF NOT EXISTS valores ( + id BYTEA NOT NULL PRIMARY KEY, + public_key BYTEA DEFAULT NULL, + private_key BYTEA DEFAULT NULL, + recipient BYTEA DEFAULT NULL, + nonce BYTEA DEFAULT NULL, + signature BYTEA DEFAULT NULL, + sequence_number INTEGER NOT NULL DEFAULT 0, + data BYTEA NOT NULL, + persistent BOOLEAN NOT NULL DEFAULT FALSE, + created BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + updated BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000 +); + +-- Partial index for persistent + announced queries +CREATE INDEX IF NOT EXISTS idx_valores_persistent_true_updated ON valores (updated DESC) WHERE persistent = TRUE; +-- Partial index for non-persistent + updated queries +CREATE INDEX IF NOT EXISTS idx_valores_persistent_false_updated ON valores (updated DESC) WHERE persistent = FALSE; +-- Full index for all values +CREATE INDEX IF NOT EXISTS idx_valores_updated ON valores (updated DESC); + +CREATE TABLE IF NOT EXISTS peers ( + id BYTEA NOT NULL, + node_id BYTEA NOT NULL, + private_key BYTEA DEFAULT NULL, + origin BYTEA DEFAULT NULL, + port INTEGER NOT NULL, + alternative_uri VARCHAR(512) DEFAULT NULL, + signature BYTEA NOT NULL, + persistent BOOLEAN NOT NULL DEFAULT FALSE, + created BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + updated BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + PRIMARY KEY (id, node_id) +); + +-- Partial index for persistent + announced queries +CREATE INDEX IF NOT EXISTS idx_peers_persistent_true_updated ON peers (updated DESC) WHERE persistent = TRUE; +-- Partial index for non-persistent + updated queries +CREATE INDEX IF NOT EXISTS idx_peers_persistent_false_updated ON peers (updated DESC) WHERE persistent = FALSE; +-- Full index for all values +CREATE INDEX IF NOT EXISTS idx_peers_updated ON peers (updated DESC); \ No newline at end of file diff --git a/dht/src/main/resources/db/sqlite/1_initial_schema.sql b/dht/src/main/resources/db/sqlite/1_initial_schema.sql new file mode 100644 index 0000000..abe81bc --- /dev/null +++ b/dht/src/main/resources/db/sqlite/1_initial_schema.sql @@ -0,0 +1,43 @@ +-- Initial schema for Boson DHT node + +CREATE TABLE IF NOT EXISTS valores ( + id BLOB NOT NULL PRIMARY KEY, + public_key BLOB DEFAULT NULL, + private_key BLOB DEFAULT NULL, + recipient BLOB DEFAULT NULL, + nonce BLOB DEFAULT NULL, + signature BLOB DEFAULT NULL, + sequence_number INTEGER NOT NULL DEFAULT 0, + data BLOB NOT NULL, + persistent BOOLEAN NOT NULL DEFAULT FALSE, + created INTEGER NOT NULL DEFAULT (CAST(unixepoch('subsec') * 1000 AS INTEGER)), + updated INTEGER NOT NULL DEFAULT (CAST(unixepoch('subsec') * 1000 AS INTEGER)) +) WITHOUT ROWID; + +-- Partial index for persistent + announced queries +CREATE INDEX IF NOT EXISTS idx_valores_persistent_true_updated ON valores (updated DESC) WHERE persistent = TRUE; +-- Partial index for non-persistent + updated queries +CREATE INDEX IF NOT EXISTS idx_valores_persistent_false_updated ON valores (updated DESC) WHERE persistent = FALSE; +-- Full index for all values +CREATE INDEX IF NOT EXISTS idx_valores_updated ON valores (updated DESC); + +CREATE TABLE IF NOT EXISTS peers ( + id BLOB NOT NULL, + node_id BLOB NOT NULL, + private_key BLOB DEFAULT NULL, + origin BLOB DEFAULT NULL, + port INTEGER NOT NULL, + alternative_uri VARCHAR(512) DEFAULT NULL, + signature BLOB NOT NULL, + persistent BOOLEAN NOT NULL DEFAULT FALSE, + created INTEGER NOT NULL DEFAULT (CAST(unixepoch('subsec') * 1000 AS INTEGER)), + updated INTEGER NOT NULL DEFAULT (CAST(unixepoch('subsec') * 1000 AS INTEGER)), + PRIMARY KEY (id, node_id) +) WITHOUT ROWID; + +-- Partial index for persistent + announced queries +CREATE INDEX IF NOT EXISTS idx_peers_persistent_true_updated ON peers (updated DESC) WHERE persistent = TRUE; +-- Partial index for non-persistent + updated queries +CREATE INDEX IF NOT EXISTS idx_peers_persistent_false_updated ON peers (updated DESC) WHERE persistent = FALSE; +-- Full index for all values +CREATE INDEX IF NOT EXISTS idx_peers_updated ON peers (updated DESC); \ No newline at end of file diff --git a/dht/src/test/java/io/bosonnetwork/kademlia/storage/DataStorageTests.java b/dht/src/test/java/io/bosonnetwork/kademlia/storage/DataStorageTests.java index cc8f9bf..bc86539 100644 --- a/dht/src/test/java/io/bosonnetwork/kademlia/storage/DataStorageTests.java +++ b/dht/src/test/java/io/bosonnetwork/kademlia/storage/DataStorageTests.java @@ -54,7 +54,7 @@ public class DataStorageTests { private static final Path testRoot = Path.of(System.getProperty("java.io.tmpdir"), "boson"); private static final Path testDir = Path.of(testRoot.toString(), "dht", "DataStorageTests"); - private static final int CURRENT_SCHEMA_VERSION = 5; + private static final int CURRENT_SCHEMA_VERSION = 1; private static final Faker faker = new Faker(); @@ -75,6 +75,7 @@ public class DataStorageTests { private static long announced1; private static long announced2; + private static PostgresqlServer pgServer; private static final List dataStorages = new ArrayList<>(); @BeforeAll @@ -88,28 +89,29 @@ static void setupDataStorage(Vertx vertx, VertxTestContext context) { var futures = new ArrayList>(); - inMemoryStorage = new InMemoryStorage(); - var future1 = inMemoryStorage.initialize(vertx, valueExpiration, peerInfoExpiration).onComplete(context.succeeding(version -> { + pgServer = PostgresqlServer.start("boson_node", "test", "secret"); + var postgresqlURI = pgServer.getDatabaseUrl(); + postgresStorage = new PostgresStorage(postgresqlURI); + var future1 = postgresStorage.initialize(vertx, valueExpiration, peerInfoExpiration).onComplete(context.succeeding(version -> { context.verify(() -> assertEquals(CURRENT_SCHEMA_VERSION, version)); - dataStorages.add(Arguments.of("InMemoryStorage", inMemoryStorage)); + dataStorages.add(Arguments.of("PostgresStorage", postgresStorage)); })); futures.add(future1); - var sqliteURL = "jdbc:sqlite:" + testDir.resolve("storage.db"); - sqliteStorage = new SQLiteStorage(sqliteURL); + var sqliteURI = "jdbc:sqlite:" + testDir.resolve("storage.db"); + sqliteStorage = new SQLiteStorage(sqliteURI); var future2 = sqliteStorage.initialize(vertx, valueExpiration, peerInfoExpiration).onComplete(context.succeeding(version -> { context.verify(() -> assertEquals(CURRENT_SCHEMA_VERSION, version)); dataStorages.add(Arguments.of("SQLiteStorage", sqliteStorage)); })); futures.add(future2); - var postgresqlURL = "postgresql://jingyu:secret@localhost:5432/test"; - postgresStorage = new PostgresStorage(postgresqlURL); - var future3 = postgresStorage.initialize(vertx, valueExpiration, peerInfoExpiration).onComplete(context.succeeding(version -> { + inMemoryStorage = new InMemoryStorage(); + var future3 = inMemoryStorage.initialize(vertx, valueExpiration, peerInfoExpiration).onComplete(context.succeeding(version -> { context.verify(() -> assertEquals(CURRENT_SCHEMA_VERSION, version)); - dataStorages.add(Arguments.of("PostgresStorage", postgresStorage)); + dataStorages.add(Arguments.of("InMemoryStorage", inMemoryStorage)); })); - // futures.add(future3); + futures.add(future3); Future.all(futures).onSuccess(unused -> { try { @@ -125,6 +127,23 @@ static void setupDataStorage(Vertx vertx, VertxTestContext context) { }).onComplete(context.succeedingThenComplete()); } + @AfterAll + static void tearDown(Vertx vertx, VertxTestContext context) { + var futures = new ArrayList>(); + + for (var storageArg : dataStorages) { + var args = storageArg.get(); + DataStorage storage1 = (DataStorage) args[1]; + var future = storage1.close(); + futures.add(future); + } + + Future.all(futures).onComplete(context.succeeding(result -> { + pgServer.stop(); + context.completeNow(); + })); + } + private static List generateValues(int count) throws Exception { var values = new ArrayList(count); for (int i = 0; i < count; i++) { @@ -201,27 +220,6 @@ private static Map> generateMultiPeerInfos(int count, int siz return multiPeers; } - @AfterAll - static void tearDown(Vertx vertx, VertxTestContext context) { - var futures = new ArrayList>(); - - for (var storageArg : dataStorages) { - var args = storageArg.get(); - DataStorage storage1 = (DataStorage) args[1]; - var future = storage1.close(); - futures.add(future); - } - - Future.all(futures).onComplete(context.succeeding(result -> { - try { - FileUtils.deleteFile(testRoot); - } catch (IOException ignore) { - } - - context.completeNow(); - })); - } - static Stream testStoragesProvider() { return dataStorages.stream(); } @@ -1307,18 +1305,4 @@ void testRemovePeer(String name, DataStorage storage, Vertx vertx, VertxTestCont })); })).onComplete(context.succeedingThenComplete()); } - - @ParameterizedTest(name = "{0}") - @MethodSource("testStoragesProvider") - @Order(201) - void testInitializeAgain(String name, DataStorage storage, Vertx vertx, VertxTestContext context) { - storage.initialize(vertx, valueExpiration, peerInfoExpiration).andThen(ar -> { - context.verify(() -> { - assertTrue(ar.failed()); - assertInstanceOf(DataStorageException.class, ar.cause()); - }); - - context.completeNow(); - }); - } } \ No newline at end of file diff --git a/dht/src/test/java/io/bosonnetwork/kademlia/storage/PostgresqlServer.java b/dht/src/test/java/io/bosonnetwork/kademlia/storage/PostgresqlServer.java new file mode 100644 index 0000000..d0f0491 --- /dev/null +++ b/dht/src/test/java/io/bosonnetwork/kademlia/storage/PostgresqlServer.java @@ -0,0 +1,45 @@ +package io.bosonnetwork.kademlia.storage; + +import org.testcontainers.postgresql.PostgreSQLContainer; +import org.testcontainers.utility.DockerImageName; + +public class PostgresqlServer { + private PostgreSQLContainer container; + + private PostgresqlServer(PostgreSQLContainer container) { + this.container = container; + } + + public static PostgresqlServer start(String database, String username, String password) { + DockerImageName image = DockerImageName + .parse("postgres:18-alpine"); + + PostgreSQLContainer container = new PostgreSQLContainer(image) + .withDatabaseName(database) + .withUsername(username) + .withPassword(password); + + return new PostgresqlServer(container).start(); + } + + private PostgresqlServer start() { + container.start(); + return this; + } + + public void stop() { + if (container != null) { + container.stop(); + container = null; + } + } + + public String getDatabaseUrl() { + return "postgresql://" + + container.getUsername() + ":" + + container.getPassword() + "@" + + container.getHost() + ":" + + container.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT) + "/" + + container.getDatabaseName(); + } +} \ No newline at end of file