diff --git a/api/pom.xml b/api/pom.xml
index e445c23..6779383 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -150,6 +150,11 @@
vertx-junit5
test
+
+ org.testcontainers
+ testcontainers-postgresql
+ test
+
io.vertx
vertx-pg-client
@@ -160,7 +165,6 @@
vertx-jdbc-client
test
-
org.xerial
sqlite-jdbc
diff --git a/api/src/main/java/io/bosonnetwork/PeerInfo.java b/api/src/main/java/io/bosonnetwork/PeerInfo.java
index 8bb0503..1482523 100644
--- a/api/src/main/java/io/bosonnetwork/PeerInfo.java
+++ b/api/src/main/java/io/bosonnetwork/PeerInfo.java
@@ -48,7 +48,7 @@ public class PeerInfo {
*/
public static final Object ATTRIBUTE_PEER_ID = new Object();
- private final Id publicKey; // Peer ID
+ private final Id publicKey; // Peer ID
private final byte[] privateKey; // Private key to sign the peer info
private final Id nodeId; // The node that provide the service peer
private final Id origin; // The node that announces the peer
diff --git a/api/src/main/java/io/bosonnetwork/database/Pagination.java b/api/src/main/java/io/bosonnetwork/database/Pagination.java
index 44ee3f0..9ac25da 100644
--- a/api/src/main/java/io/bosonnetwork/database/Pagination.java
+++ b/api/src/main/java/io/bosonnetwork/database/Pagination.java
@@ -71,7 +71,7 @@ public String toSql() {
if (offset == 0 && limit == 0)
return ""; // caller may omit OFFSET/LIMIT completely
- return " OFFSET " + offset + " LIMIT " + limit;
+ return " LIMIT " + limit + " OFFSET " + offset;
}
/**
diff --git a/api/src/main/java/io/bosonnetwork/service/ClientAuthenticator.java b/api/src/main/java/io/bosonnetwork/service/ClientAuthenticator.java
index 608575d..d684b4e 100644
--- a/api/src/main/java/io/bosonnetwork/service/ClientAuthenticator.java
+++ b/api/src/main/java/io/bosonnetwork/service/ClientAuthenticator.java
@@ -7,5 +7,5 @@
public interface ClientAuthenticator {
CompletableFuture authenticateUser(Id userId, byte[] nonce, byte[] signature);
- CompletableFuture authenticateDevice(Id userId, Id deviceId, byte[] nonce, byte[] signature);
+ CompletableFuture authenticateDevice(Id userId, Id deviceId, byte[] nonce, byte[] signature, String address);
}
\ No newline at end of file
diff --git a/api/src/main/java/io/bosonnetwork/service/Federation.java b/api/src/main/java/io/bosonnetwork/service/Federation.java
index 6afd184..95111e0 100644
--- a/api/src/main/java/io/bosonnetwork/service/Federation.java
+++ b/api/src/main/java/io/bosonnetwork/service/Federation.java
@@ -23,5 +23,5 @@ default CompletableFuture extends FederatedNode> getNode(Id nodeId) {
// public CompletableFuture> getAllServices(Id nodeId);
- public CompletableFuture extends FederatedService> getService(Id peerId);
+ public CompletableFuture extends FederatedService> getService(Id nodeId, Id peerId);
}
\ No newline at end of file
diff --git a/api/src/main/java/io/bosonnetwork/vertx/VertxFuture.java b/api/src/main/java/io/bosonnetwork/vertx/VertxFuture.java
index e5e01a4..776fc74 100644
--- a/api/src/main/java/io/bosonnetwork/vertx/VertxFuture.java
+++ b/api/src/main/java/io/bosonnetwork/vertx/VertxFuture.java
@@ -684,6 +684,15 @@ public boolean isCompletedExceptionally() {
*/
@Override
public T get() throws InterruptedException, ExecutionException {
+ if (future.isComplete()) {
+ if (future.succeeded())
+ return future.result();
+ else if (future.failed())
+ throw new ExecutionException(future.cause());
+ else
+ throw new InterruptedException("Context closed");
+ }
+
if (Context.isOnVertxThread() || Context.isOnEventLoopThread())
throw new IllegalStateException("Cannot not be called on vertx thread or event loop thread");
@@ -716,6 +725,15 @@ else if (future.failed())
*/
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ if (future.isComplete()) {
+ if (future.succeeded())
+ return future.result();
+ else if (future.failed())
+ throw new ExecutionException(future.cause());
+ else
+ throw new InterruptedException("Context closed");
+ }
+
if (Context.isOnVertxThread() || Context.isOnEventLoopThread())
throw new IllegalStateException("Cannot not be called on vertx thread or event loop thread");
diff --git a/api/src/test/java/io/bosonnetwork/database/PostgresqlServer.java b/api/src/test/java/io/bosonnetwork/database/PostgresqlServer.java
new file mode 100644
index 0000000..d64b8bb
--- /dev/null
+++ b/api/src/test/java/io/bosonnetwork/database/PostgresqlServer.java
@@ -0,0 +1,45 @@
+package io.bosonnetwork.database;
+
+import org.testcontainers.postgresql.PostgreSQLContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class PostgresqlServer {
+ private PostgreSQLContainer container;
+
+ private PostgresqlServer(PostgreSQLContainer container) {
+ this.container = container;
+ }
+
+ public static PostgresqlServer start(String database, String username, String password) {
+ DockerImageName image = DockerImageName
+ .parse("postgres:18-alpine");
+
+ PostgreSQLContainer container = new PostgreSQLContainer(image)
+ .withDatabaseName(database)
+ .withUsername(username)
+ .withPassword(password);
+
+ return new PostgresqlServer(container).start();
+ }
+
+ private PostgresqlServer start() {
+ container.start();
+ return this;
+ }
+
+ public void stop() {
+ if (container != null) {
+ container.stop();
+ container = null;
+ }
+ }
+
+ public String getDatabaseUrl() {
+ return "postgresql://" +
+ container.getUsername() + ":" +
+ container.getPassword() + "@" +
+ container.getHost() + ":" +
+ container.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT) + "/" +
+ container.getDatabaseName();
+ }
+}
\ No newline at end of file
diff --git a/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java b/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java
index 309b828..18dbb09 100644
--- a/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java
+++ b/api/src/test/java/io/bosonnetwork/database/VersionedSchemaTests.java
@@ -9,6 +9,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
+import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.jdbcclient.JDBCConnectOptions;
import io.vertx.jdbcclient.JDBCPool;
@@ -36,34 +37,44 @@ public class VersionedSchemaTests {
private static final List databases = new ArrayList<>();
+ private static PostgresqlServer pgServer;
+ private static SqlClient postgres;
+ private static SqlClient sqlite;
+
@BeforeAll
static void setup(Vertx vertx, VertxTestContext context) throws Exception {
+ FileUtils.deleteFile(testDir);
Files.createDirectories(testDir);
- var sqliteURL = "jdbc:sqlite:" + testDir.resolve("test.db");
- JDBCConnectOptions sqliteConnectOptions = new JDBCConnectOptions()
- .setJdbcUrl(sqliteURL);
- // Single connection recommended for SQLite
- PoolOptions sqlitePoolOptions = new PoolOptions().setMaxSize(1);
- SqlClient sqliteClient = JDBCPool.pool(vertx, sqliteConnectOptions, sqlitePoolOptions);
- databases.add(Arguments.of("sqlite", sqliteClient));
+ pgServer = PostgresqlServer.start("migration", "test", "secret");
- var postgresURL = "postgresql://jingyu:secret@localhost:5432/test";
+ var postgresURL = pgServer.getDatabaseUrl();
PgConnectOptions pgConnectOptions = PgConnectOptions.fromUri(postgresURL);
PoolOptions pgPoolOptions = new PoolOptions().setMaxSize(8);
- SqlClient pgClient = PgBuilder.pool()
+ postgres = PgBuilder.pool()
.with(pgPoolOptions)
.connectingTo(pgConnectOptions)
.using(vertx)
.build();
- // databases.add(Arguments.of("postgres", pgClient));
+ databases.add(Arguments.of("postgres", postgres));
+
+ var sqliteURL = "jdbc:sqlite:" + testDir.resolve("test.db");
+ JDBCConnectOptions sqliteConnectOptions = new JDBCConnectOptions()
+ .setJdbcUrl(sqliteURL);
+ // Single connection recommended for SQLite
+ PoolOptions sqlitePoolOptions = new PoolOptions().setMaxSize(1);
+ sqlite = JDBCPool.pool(vertx, sqliteConnectOptions, sqlitePoolOptions);
+ databases.add(Arguments.of("sqlite", sqlite));
context.completeNow();
}
@AfterAll
- static void teardown() throws Exception {
- FileUtils.deleteFile(testRoot);
+ static void teardown(VertxTestContext context) throws Exception {
+ Future.all(postgres.close(), sqlite.close()).onComplete(ar -> {
+ pgServer.stop();
+ context.completeNow();
+ });
}
static Stream testDatabaseProvider() {
diff --git a/dht/pom.xml b/dht/pom.xml
index cd00450..d565aeb 100644
--- a/dht/pom.xml
+++ b/dht/pom.xml
@@ -1,201 +1,209 @@
-
- 4.0.0
+
+ 4.0.0
+
+
+ io.bosonnetwork
+ boson-parent
+ 3-SNAPSHOT
+
+
-
io.bosonnetwork
- boson-parent
- 3-SNAPSHOT
-
-
-
- io.bosonnetwork
- boson-dht
- 2.0.8-SNAPSHOT
- jar
-
- Boson DHT
-
- Boson Kademlia DHT node.
-
- https://github.com/bosonnetwork/Boson.Core
-
-
-
- MIT License
- https://github.com/bosonnetwork/Boson.Core/blob/master/LICENSE
- repo
-
-
-
-
-
- boson-network-dev
- Boson Network
- support@bosonnetwork.io
- BosonNetwork
- https://github.com/bosonnetwork
-
- architect
- developer
-
-
- https://avatars.githubusercontent.com/u/152134507
-
-
-
-
-
- scm:git:git@github.com:bosonnetwork/Boson.Core.git
- scm:git:git@github.com:bosonnetwork/Boson.Core.git
- git@github.com:bosonnetwork/Boson.Core.git
-
-
-
+ boson-dht
+ 2.0.8-SNAPSHOT
+ jar
+
+ Boson DHT
+
+ Boson Kademlia DHT node.
+
+ https://github.com/bosonnetwork/Boson.Core
+
+
+
+ MIT License
+ https://github.com/bosonnetwork/Boson.Core/blob/master/LICENSE
+ repo
+
+
+
+
+
+ boson-network-dev
+ Boson Network
+ support@bosonnetwork.io
+ BosonNetwork
+ https://github.com/bosonnetwork
+
+ architect
+ developer
+
+
+ https://avatars.githubusercontent.com/u/152134507
+
+
+
+
+
+ scm:git:git@github.com:bosonnetwork/Boson.Core.git
+ scm:git:git@github.com:bosonnetwork/Boson.Core.git
+ git@github.com:bosonnetwork/Boson.Core.git
+
+
+
+
+
+ io.bosonnetwork
+ boson-dependencies
+ 2.0.8-SNAPSHOT
+ pom
+ import
+
+
+
+
-
- io.bosonnetwork
- boson-dependencies
- 2.0.8-SNAPSHOT
- pom
- import
-
+
+
+ io.bosonnetwork
+ boson-api
+
+
+
+ io.vertx
+ vertx-core
+
+
+ io.vertx
+ vertx-sql-client-templates
+
+
+ io.vertx
+ vertx-pg-client
+
+
+ io.vertx
+ vertx-jdbc-client
+
+
+ io.vertx
+ vertx-web
+
+
+ io.vertx
+ vertx-web-validation
+
+
+ io.netty
+ netty-resolver-dns-native-macos
+ osx-aarch_64
+
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-cbor
+
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-yaml
+
+
+
+ com.github.ben-manes.caffeine
+ caffeine
+
+
+
+ org.xerial
+ sqlite-jdbc
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+ ch.qos.logback
+ logback-classic
+
+
+
+ io.vertx
+ vertx-micrometer-metrics
+
+
+ io.micrometer
+ micrometer-registry-prometheus
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ test
+
+
+ io.vertx
+ vertx-junit5
+ test
+
+
+ org.testcontainers
+ testcontainers-postgresql
+
+
+ net.datafaker
+ datafaker
+ test
+
-
-
-
-
-
- io.bosonnetwork
- boson-api
-
-
-
- io.vertx
- vertx-core
-
-
- io.vertx
- vertx-pg-client
-
-
- io.vertx
- vertx-jdbc-client
-
-
- io.vertx
- vertx-web
-
-
- io.vertx
- vertx-web-validation
-
-
- io.netty
- netty-resolver-dns-native-macos
- osx-aarch_64
-
-
-
- com.fasterxml.jackson.core
- jackson-core
-
-
- com.fasterxml.jackson.core
- jackson-databind
-
-
- com.fasterxml.jackson.dataformat
- jackson-dataformat-cbor
-
-
- com.fasterxml.jackson.dataformat
- jackson-dataformat-yaml
-
-
-
- com.github.ben-manes.caffeine
- caffeine
-
-
-
- org.xerial
- sqlite-jdbc
-
-
-
- org.slf4j
- slf4j-api
-
-
- ch.qos.logback
- logback-classic
-
-
-
- io.vertx
- vertx-micrometer-metrics
-
-
- io.micrometer
- micrometer-registry-prometheus
-
-
-
-
- org.junit.jupiter
- junit-jupiter
- test
-
-
- io.vertx
- vertx-junit5
- test
-
-
- net.datafaker
- datafaker
- test
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
-
-
- org.apache.maven.plugins
- maven-source-plugin
-
-
-
-
-
- org.apache.maven.plugins
- maven-gpg-plugin
-
-
-
- org.apache.maven.plugins
- maven-surefire-plugin
-
- 1
- true
-
- none
- 1
-
-
-
-
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-gpg-plugin
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ 1
+ true
+
+ none
+ 1
+
+
+
+
\ No newline at end of file
diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/impl/SimpleNodeConfiguration.java b/dht/src/main/java/io/bosonnetwork/kademlia/impl/SimpleNodeConfiguration.java
index 8a2e234..4dcbff8 100644
--- a/dht/src/main/java/io/bosonnetwork/kademlia/impl/SimpleNodeConfiguration.java
+++ b/dht/src/main/java/io/bosonnetwork/kademlia/impl/SimpleNodeConfiguration.java
@@ -35,7 +35,7 @@ public SimpleNodeConfiguration(NodeConfiguration config) {
this.port = config.port();
this.privateKey = config.privateKey();
this.dataPath = config.dataPath();
- this.storageURL = config.storageURL() != null ? config.storageURL() : InMemoryStorage.STORAGE_URL;
+ this.storageURL = config.storageURL() != null ? config.storageURL() : InMemoryStorage.STORAGE_URI;
this.bootstrapNodes = new ArrayList<>(config.bootstrapNodes() != null ? config.bootstrapNodes() : Collections.emptyList());
this.enableSpamThrottling = config.enableSpamThrottling();
this.enableSuspiciousNodeDetector = config.enableSuspiciousNodeDetector();
diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/storage/DataStorage.java b/dht/src/main/java/io/bosonnetwork/kademlia/storage/DataStorage.java
index 3456bc6..1b53787 100644
--- a/dht/src/main/java/io/bosonnetwork/kademlia/storage/DataStorage.java
+++ b/dht/src/main/java/io/bosonnetwork/kademlia/storage/DataStorage.java
@@ -266,22 +266,22 @@ public interface DataStorage {
*/
Future removePeers(Id id);
- static boolean supports(String url) {
- // now only support inmemory, sqlite and postgres
- return url.equals(InMemoryStorage.STORAGE_URL) || url.startsWith(SQLiteStorage.STORAGE_URL_PREFIX) ||
- url.startsWith(PostgresStorage.STORAGE_URL_PREFIX);
+ static boolean supports(String uri) {
+ // now only support in-memory, sqlite and postgres
+ return uri.equals(InMemoryStorage.STORAGE_URI) || uri.startsWith(SQLiteStorage.STORAGE_URI_PREFIX) ||
+ uri.startsWith(PostgresStorage.STORAGE_URI_PREFIX);
}
- static DataStorage create(String url) {
- Objects.requireNonNull(url, "url");
+ static DataStorage create(String uri) {
+ Objects.requireNonNull(uri, "url");
- if (url.equals(InMemoryStorage.STORAGE_URL))
+ if (uri.equals(InMemoryStorage.STORAGE_URI))
return new InMemoryStorage();
- if (url.startsWith(SQLiteStorage.STORAGE_URL_PREFIX))
- return new SQLiteStorage(url);
- if (url.startsWith(PostgresStorage.STORAGE_URL_PREFIX))
- return new PostgresStorage(url);
+ if (uri.startsWith(SQLiteStorage.STORAGE_URI_PREFIX))
+ return new SQLiteStorage(uri);
+ if (uri.startsWith(PostgresStorage.STORAGE_URI_PREFIX))
+ return new PostgresStorage(uri);
- throw new IllegalArgumentException("Unsupported storage: " + url);
+ throw new IllegalArgumentException("Unsupported storage: " + uri);
}
}
\ No newline at end of file
diff --git a/dht/src/main/java/io/bosonnetwork/kademlia/storage/DatabaseStorage.java b/dht/src/main/java/io/bosonnetwork/kademlia/storage/DatabaseStorage.java
index 9c65642..6638f01 100644
--- a/dht/src/main/java/io/bosonnetwork/kademlia/storage/DatabaseStorage.java
+++ b/dht/src/main/java/io/bosonnetwork/kademlia/storage/DatabaseStorage.java
@@ -22,192 +22,63 @@
package io.bosonnetwork.kademlia.storage;
+import java.nio.file.Path;
+import java.util.HashMap;
import java.util.List;
-import java.util.function.Function;
+import java.util.Map;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
-import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.Row;
-import io.vertx.sqlclient.RowSet;
-import io.vertx.sqlclient.SqlClient;
-import io.vertx.sqlclient.SqlConnection;
-import io.vertx.sqlclient.Tuple;
+import io.vertx.sqlclient.templates.SqlTemplate;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import io.bosonnetwork.BosonException;
import io.bosonnetwork.Id;
import io.bosonnetwork.PeerInfo;
import io.bosonnetwork.Value;
+import io.bosonnetwork.database.VersionedSchema;
+import io.bosonnetwork.database.VertxDatabase;
import io.bosonnetwork.kademlia.exceptions.ImmutableSubstitutionFail;
import io.bosonnetwork.kademlia.exceptions.SequenceNotExpected;
import io.bosonnetwork.kademlia.exceptions.SequenceNotMonotonic;
-public abstract class DatabaseStorage implements DataStorage {
- protected static final int SCHEMA_VERSION = 5;
-
- protected final String connectionUri;
-
+public abstract class DatabaseStorage implements DataStorage, VertxDatabase {
protected long valueExpiration;
protected long peerInfoExpiration;
protected int schemaVersion;
- protected SqlClient client;
-
- private static final Logger log = LoggerFactory.getLogger(DatabaseStorage.class);
-
- protected DatabaseStorage(String connectionUri) {
- this.connectionUri = connectionUri;
- }
-
- protected Future executeSequentially(SqlConnection connection, List statements, int index) {
- if (index >= statements.size())
- return Future.succeededFuture();
-
- // Execute current statement and recurse to next
- String sql = statements.get(index);
- log.trace("Executing schema statement: {}", sql);
- return connection.preparedQuery(sql)
- .execute()
- .compose(result -> executeSequentially(connection, statements, index + 1)) // Move to next statement
- .recover(e -> {
- log.error("Schema statement failed: {}", sql, e);
- BosonException error = new DataStorageException("Create database schema failed: " + sql, e);
- return Future.failedFuture(error);
- });
- }
-
- protected Future createSchema(List statements) {
- return pool().withTransaction(connection -> executeSequentially(connection, statements, 0));
- }
-
- protected Pool pool() {
- Pool pool = client instanceof Pool p ? p : null;
- if (pool == null) // This should never happen
- throw new IllegalStateException("SqlClient is not a Pool instance");
-
- return pool;
- }
-
- // Returns the first row mapped via the given function, or null if the row set is empty.
- protected static T findUniqueOrNull(RowSet rows, Function mapper) {
- return mapper.apply(rows.size() == 0 ? null : rows.iterator().next());
- }
-
- protected static List findMany(RowSet rows, Function mapper) {
- return rows.stream().map(mapper).toList();
- }
-
- protected abstract void setupSqlClient(Vertx vertx, String connectionUri);
-
- protected abstract List createSchemaStatements();
-
- protected abstract String selectSchemaVersion();
-
- protected abstract String insertSchemaVersion();
-
- protected abstract String selectValueById();
-
- protected abstract String selectValuesByPersistentAndAnnouncedBefore();
-
- protected abstract String selectValuesByPersistentAndAnnouncedBeforePaginated();
-
- protected abstract String selectAllValues();
-
- protected abstract String selectAllValuesPaginated();
-
- protected abstract String upsertValue();
-
- protected abstract String updateValueAnnouncedById();
-
- protected abstract String deleteValueById();
-
- protected abstract String deleteNonPersistentValuesAnnouncedBefore();
-
- protected abstract String selectPeerByIdAndNodeId();
-
- protected abstract String selectPeersById();
-
- protected abstract String selectPeersByPersistentAndAnnouncedBefore();
-
- protected abstract String selectPeersByPersistentAndAnnouncedBeforePaginated();
-
- protected abstract String selectAllPeers();
-
- protected abstract String selectAllPeersPaginated();
-
- protected abstract String upsertPeer();
+ protected abstract Logger getLogger();
- protected abstract String updatePeerAnnouncedByIdAndNodeId();
+ protected abstract void init(Vertx vertx);
- protected abstract String deletePeerByIdAndNodeId();
+ protected abstract Path getSchemaPath();
- protected abstract String deletePeersById();
-
- protected abstract String deleteNonPersistentPeersAnnouncedBefore();
-
- protected Future getOrInitSchemaVersion() {
- log.debug("Checking schema version...");
- return client.preparedQuery(selectSchemaVersion())
- .execute()
- .compose(rows -> {
- if (rows.size() > 0) {
- int version = findUniqueOrNull(rows, DatabaseStorage::rowToInteger);
- log.info("Detected existing schema version: {}", version);
- return Future.succeededFuture(version);
- } else {
- log.info("No schema version found, setting version {}", SCHEMA_VERSION);
- return client.preparedQuery(insertSchemaVersion())
- .execute(Tuple.of(SCHEMA_VERSION))
- .map(v -> SCHEMA_VERSION);
- }
- });
- }
+ protected abstract SqlDialect getDialect();
@Override
public Future initialize(Vertx vertx, long valueExpiration, long peerInfoExpiration) {
- if (client != null)
- return Future.failedFuture(new DataStorageException("Storage already initialized"));
-
- log.info("Initializing storage with connection URI: {}", connectionUri);
+ init(vertx);
this.valueExpiration = valueExpiration;
this.peerInfoExpiration = peerInfoExpiration;
- setupSqlClient(vertx, connectionUri);
-
- log.info("Creating database schema...");
- return createSchema(createSchemaStatements())
- .compose(unused -> getOrInitSchemaVersion())
- .andThen(ar -> {
+ VersionedSchema schema = VersionedSchema.init(getClient(), getSchemaPath());
+ return schema.migrate().andThen(ar -> {
if (ar.succeeded()) {
- schemaVersion = ar.result();
- log.info("Database schema created successfully, version: {}", schemaVersion);
+ schemaVersion = schema.getCurrentVersion().version();
+ getLogger().info("Database is ready, current schema version: {}", schemaVersion);
} else {
- log.error("Database schema creation failed", ar.cause());
+ getLogger().error("Schema migration failed, current schema version: {}",
+ schema.getCurrentVersion().version(), ar.cause());
}
- }).recover(cause ->
- Future.failedFuture(new DataStorageException("Database operation failed", cause))
+ }).map(v -> schema.getCurrentVersion().version())
+ .recover(cause ->
+ Future.failedFuture(new DataStorageException("Database operation failed", cause))
);
}
- @Override
- public Future close() {
- if (client == null) {
- log.info("Storage already closed");
- return Future.succeededFuture();
- }
- log.info("Closing storage...");
- Future future = client.close();
- client = null;
- return future.recover(cause ->
- Future.failedFuture(new DataStorageException("Database operation failed", cause))
- );
- }
-
@Override
public int getSchemaVersion() {
return schemaVersion;
@@ -217,19 +88,20 @@ public int getSchemaVersion() {
public Future purge() {
long now = System.currentTimeMillis();
- log.info("Purging expired values and peers...");
- return pool().withTransaction(connection ->
- connection.preparedQuery(deleteNonPersistentValuesAnnouncedBefore())
- .execute(Tuple.of(now - valueExpiration))
- .compose(unused ->
- connection.preparedQuery(deleteNonPersistentPeersAnnouncedBefore())
- .execute(Tuple.of(now - peerInfoExpiration))
+ getLogger().info("Purging expired values and peers...");
+ return withTransaction(c ->
+ SqlTemplate.forUpdate(c, getDialect().deleteNonPersistentValuesAnnouncedBefore())
+ .execute(Map.of("updatedBefore", now - valueExpiration))
+ .compose(r ->
+ SqlTemplate.forUpdate(c, getDialect().deleteNonPersistentPeersAnnouncedBefore())
+ .execute(Map.of("updatedBefore", now - peerInfoExpiration))
+ .map((Void) null)
)
).andThen(ar -> {
if (ar.succeeded())
- log.info("Purge completed successfully");
+ getLogger().info("Purge completed successfully");
else
- log.error("Failed to purge expired values and peers", ar.cause());
+ getLogger().error("Failed to purge expired values and peers", ar.cause());
}).recover(cause ->
Future.failedFuture(new DataStorageException("Database operation failed", cause))
).mapEmpty();
@@ -247,54 +119,44 @@ public Future putValue(Value value, boolean persistent) {
@Override
public Future putValue(Value value, boolean persistent, int expectedSequenceNumber) {
- log.debug("Putting value with id: {}, persistent: {}, expectedSequenceNumber: {}",
+ getLogger().debug("Putting value with id: {}, persistent: {}, expectedSequenceNumber: {}",
value.getId(), persistent, expectedSequenceNumber);
- log.debug("Trying to check the existing value with id: {}", value.getId());
+ getLogger().debug("Trying to check the existing value with id: {}", value.getId());
return getValue(value.getId()).compose(existing -> {
if (existing != null) {
// Immutable check
if (existing.isMutable() != value.isMutable()) {
- log.warn("Rejecting value {}: cannot replace mismatched mutable/immutable", value.getId());
+ getLogger().warn("Rejecting value {}: cannot replace mismatched mutable/immutable", value.getId());
return Future.failedFuture(new ImmutableSubstitutionFail("Cannot replace mismatched mutable/immutable value"));
}
if (value.getSequenceNumber() < existing.getSequenceNumber()) {
- log.warn("Rejecting value {}: sequence number not monotonic", value.getId());
+ getLogger().warn("Rejecting value {}: sequence number not monotonic", value.getId());
return Future.failedFuture(new SequenceNotMonotonic("Sequence number less than current"));
}
if (expectedSequenceNumber >= 0 && existing.getSequenceNumber() > expectedSequenceNumber) {
- log.warn("Rejecting value {}: sequence number not expected", value.getId());
+ getLogger().warn("Rejecting value {}: sequence number not expected", value.getId());
return Future.failedFuture(new SequenceNotExpected("Sequence number not expected"));
}
if (existing.hasPrivateKey() && !value.hasPrivateKey()) {
// Skip update if the existing value is owned by this node and the new value is not.
- // Should not throw NotOwnerException, just silently ignores to avoid disrupting valid operations.
- log.info("Skipping to update value for id {}: owned by this node", value.getId());
+ // Should not throw NotOwnerException, just silently ignore to avoid disrupting valid operations.
+ getLogger().info("Skipping to update value for id {}: owned by this node", value.getId());
return Future.succeededFuture(existing);
}
}
- long now = System.currentTimeMillis();
- return client.preparedQuery(upsertValue())
- .execute(Tuple.of(value.getId().bytes(),
- persistent,
- value.getPublicKey() != null ? value.getPublicKey().bytes() : null,
- value.getPrivateKey(),
- value.getRecipient() != null ? value.getRecipient().bytes() : null,
- value.getNonce(),
- value.getSignature(),
- value.getSequenceNumber(),
- value.getData(),
- now,
- now))
- .map(v -> value);
+ return withTransaction(c ->
+ SqlTemplate.forUpdate(c, getDialect().upsertValue())
+ .execute(valueToMap(value, persistent))
+ .map(v -> value));
}).andThen(ar -> {
if (ar.succeeded())
- log.debug("Put value with id: {} successfully", value.getId());
+ getLogger().debug("Put value with id: {} successfully", value.getId());
else
- log.error("Failed to put value with id: {}", value.getId(), ar.cause());
+ getLogger().error("Failed to put value with id: {}", value.getId(), ar.cause());
}).recover(cause ->
Future.failedFuture(new DataStorageException("Database operation failed", cause))
);
@@ -302,19 +164,19 @@ public Future putValue(Value value, boolean persistent, int expectedSeque
@Override
public Future getValue(Id id) {
- log.debug("Getting value with id: {}", id);
- return client.preparedQuery(selectValueById())
- .execute(Tuple.of(id.bytes()))
- .map(rows -> findUniqueOrNull(rows, DatabaseStorage::rowToValue))
+ getLogger().debug("Getting value with id: {}", id);
+ return SqlTemplate.forQuery(getClient(), getDialect().selectValueById())
+ .execute(Map.of("id", id.bytes()))
+ .map(rows -> findUnique(rows, DatabaseStorage::rowToValue))
.andThen(ar -> {
if (ar.succeeded()) {
if (ar.result() != null)
- log.debug("Got value with id: {}", id);
+ getLogger().debug("Got value with id: {}", id);
else
//noinspection LoggingSimilarMessage
- log.debug("No value found with id: {}", id);
+ getLogger().debug("No value found with id: {}", id);
} else {
- log.error("Failed to get value with id: {}", id, ar.cause());
+ getLogger().error("Failed to get value with id: {}", id, ar.cause());
}
}).recover(cause ->
Future.failedFuture(new DataStorageException("Database operation failed", cause))
@@ -323,7 +185,7 @@ public Future getValue(Id id) {
@Override
public Future> getValues() {
- return client.preparedQuery(selectAllValues())
+ return query(getDialect().selectAllValues())
.execute()
.map(rows -> findMany(rows, DatabaseStorage::rowToValue))
.recover(cause ->
@@ -333,8 +195,8 @@ public Future> getValues() {
@Override
public Future> getValues(int offset, int limit) {
- return client.preparedQuery(selectAllValuesPaginated())
- .execute(Tuple.of(limit, offset))
+ return SqlTemplate.forQuery(getClient(), getDialect().selectAllValuesPaginated())
+ .execute(Map.of("limit", limit, "offset", offset))
.map(rows -> findMany(rows, DatabaseStorage::rowToValue))
.recover(cause ->
Future.failedFuture(new DataStorageException("Database operation failed", cause))
@@ -343,8 +205,8 @@ public Future> getValues(int offset, int limit) {
@Override
public Future> getValues(boolean persistent, long announcedBefore) {
- return client.preparedQuery(selectValuesByPersistentAndAnnouncedBefore())
- .execute(Tuple.of(persistent, announcedBefore))
+ return SqlTemplate.forQuery(getClient(), getDialect().selectValuesByPersistentAndAnnouncedBefore())
+ .execute(Map.of("persistent", persistent, "updatedBefore", announcedBefore))
.map(rows -> findMany(rows, DatabaseStorage::rowToValue))
.recover(cause ->
Future.failedFuture(new DataStorageException("Database operation failed", cause))
@@ -353,8 +215,12 @@ public Future> getValues(boolean persistent, long announcedBefore) {
@Override
public Future> getValues(boolean persistent, long announcedBefore, int offset, int limit) {
- return client.preparedQuery(selectValuesByPersistentAndAnnouncedBeforePaginated())
- .execute(Tuple.of(persistent, announcedBefore, limit, offset))
+ return SqlTemplate.forQuery(getClient(), getDialect().selectValuesByPersistentAndAnnouncedBeforePaginated())
+ .execute(Map.of(
+ "persistent", persistent,
+ "updatedBefore", announcedBefore,
+ "limit", limit,
+ "offset", offset))
.map(rows -> findMany(rows, DatabaseStorage::rowToValue))
.recover(cause ->
Future.failedFuture(new DataStorageException("Database operation failed", cause))
@@ -363,44 +229,46 @@ public Future> getValues(boolean persistent, long announcedBefore, i
@Override
public Future updateValueAnnouncedTime(Id id) {
- log.debug("Updating value announced time with id: {}", id);
+ getLogger().debug("Updating value announced time with id: {}", id);
long now = System.currentTimeMillis();
- return client.preparedQuery(updateValueAnnouncedById())
- .execute(Tuple.of(now, id.bytes()))
- .map(v -> v.rowCount() > 0 ? now : 0L)
- .andThen(ar -> {
- if (ar.succeeded()) {
- if (ar.result() != 0)
- log.debug("Updated value announced time with id: {}", id);
- else
- //noinspection LoggingSimilarMessage
- log.debug("No value found with id: {}", id);
- } else {
- log.error("Failed to update value announced time with id: {}", id, ar.cause());
- }
- }).recover(cause ->
- Future.failedFuture(new DataStorageException("Database operation failed", cause))
- );
+ return withTransaction(c ->
+ SqlTemplate.forUpdate(c, getDialect().updateValueAnnouncedById())
+ .execute(Map.of("id", id.bytes(), "updated", now))
+ .map(r -> r.rowCount() > 0 ? now : 0L)
+ ).andThen(ar -> {
+ if (ar.succeeded()) {
+ if (ar.result() != 0)
+ getLogger().debug("Updated value announced time with id: {}", id);
+ else
+ //noinspection LoggingSimilarMessage
+ getLogger().debug("No value found with id: {}", id);
+ } else {
+ getLogger().error("Failed to update value announced time with id: {}", id, ar.cause());
+ }
+ }).recover(cause ->
+ Future.failedFuture(new DataStorageException("Database operation failed", cause))
+ );
}
@Override
public Future removeValue(Id id) {
- log.debug("Removing value with id: {}", id);
- return client.preparedQuery(deleteValueById())
- .execute(Tuple.of(id.bytes()))
- .map(rowSet -> rowSet.rowCount() >= 1)
- .andThen(ar -> {
- if (ar.succeeded()) {
- if (ar.result())
- log.debug("Removed value with id: {}", id);
- else
- log.debug("No value found with id: {}", id);
- } else {
- log.error("Failed to remove value with id: {}", id, ar.cause());
- }
- }).recover(cause ->
- Future.failedFuture(new DataStorageException("Database operation failed", cause))
- );
+ getLogger().debug("Removing value with id: {}", id);
+ return withTransaction(c ->
+ SqlTemplate.forUpdate(c, getDialect().deleteValueById())
+ .execute(Map.of("id", id.bytes()))
+ .map(this::hasEffectedRows)
+ ).andThen(ar -> {
+ if (ar.succeeded()) {
+ if (ar.result())
+ getLogger().debug("Removed value with id: {}", id);
+ else
+ getLogger().debug("No value found with id: {}", id);
+ } else {
+ getLogger().error("Failed to remove value with id: {}", id, ar.cause());
+ }
+ }).recover(cause ->
+ Future.failedFuture(new DataStorageException("Database operation failed", cause))
+ );
}
@Override
@@ -410,92 +278,46 @@ public Future putPeer(PeerInfo peerInfo) {
@Override
public Future putPeer(PeerInfo peerInfo, boolean persistent) {
- return putPeer(client, peerInfo, persistent);
- }
-
- protected Future putPeer(SqlClient sqlClient, PeerInfo peerInfo, boolean persistent) {
- log.debug("Putting peer with id: {} @ {}, persistent: {}", peerInfo.getId(), peerInfo.getNodeId(), persistent);
- log.debug("Trying to check the existing peer with id: {} @ {}", peerInfo.getId(), peerInfo.getNodeId());
- return getPeer(sqlClient, peerInfo.getId(), peerInfo.getNodeId()).compose(existing -> {
+ getLogger().debug("Putting peer with id: {} @ {}, persistent: {}", peerInfo.getId(), peerInfo.getNodeId(), persistent);
+ getLogger().debug("Trying to check the existing peer with id: {} @ {}", peerInfo.getId(), peerInfo.getNodeId());
+ return getPeer(peerInfo.getId(), peerInfo.getNodeId()).compose(existing -> {
if (existing != null && existing.hasPrivateKey() && !peerInfo.hasPrivateKey()) {
// Skip update if the existing peer info is owned by this node and the new peer info is not.
- // Should not throw NotOwnerException, just silently ignores to avoid disrupting valid operations.
- log.info("Skipping to update peer for id {} @ {}: owned by this node", peerInfo.getId(), peerInfo.getNodeId());
+ // Should not throw NotOwnerException, just silently ignore to avoid disrupting valid operations.
+ getLogger().info("Skipping to update peer for id {} @ {}: owned by this node", peerInfo.getId(), peerInfo.getNodeId());
return Future.succeededFuture(peerInfo);
}
- long now = System.currentTimeMillis();
- return sqlClient.preparedQuery(upsertPeer())
- .execute(Tuple.of(peerInfo.getId().bytes(),
- peerInfo.getNodeId().bytes(),
- persistent,
- peerInfo.getPrivateKey(),
- peerInfo.getOrigin() != null ? peerInfo.getOrigin().bytes() : null,
- peerInfo.getPort(),
- peerInfo.getAlternativeURI(),
- peerInfo.getSignature(),
- now,
- now))
- .map(v -> peerInfo);
+ return withTransaction(c ->
+ SqlTemplate.forUpdate(c, getDialect().upsertPeer())
+ .execute(peerToMap(peerInfo, persistent))
+ .map(v -> peerInfo));
}).andThen(ar -> {
if (ar.succeeded())
- log.debug("Put peer with id: {} @ {} successfully", peerInfo.getId(), peerInfo.getNodeId());
+ getLogger().debug("Put peer with id: {} @ {} successfully", peerInfo.getId(), peerInfo.getNodeId());
else
- log.error("Failed to put peer with id: {} @ {}", peerInfo.getId(), peerInfo.getNodeId(), ar.cause());
+ getLogger().error("Failed to put peer with id: {} @ {}", peerInfo.getId(), peerInfo.getNodeId(), ar.cause());
}).recover(cause ->
Future.failedFuture(new DataStorageException("Database operation failed", cause))
);
}
- /* noinspection
- protected Future putPeersSequentially(SqlClient sqlClient, List peerInfos, int index) {
- return index >= peerInfos.size() ?
- Future.succeededFuture() :
- putPeer(sqlClient, peerInfos.get(index), false).compose(r ->
- putPeersSequentially(sqlClient, peerInfos, index + 1)
- );
- }
-
@Override
public Future> putPeers(List peerInfos) {
if (peerInfos.isEmpty())
return Future.succeededFuture(peerInfos);
- return pool().withTransaction(connection ->
- putPeersSequentially(connection, peerInfos, 0)
- .map(v -> peerInfos)
- );
- }
- */
-
- @Override
- public Future> putPeers(List peerInfos) {
- if (peerInfos.isEmpty())
- return Future.succeededFuture(peerInfos);
+ List