diff --git a/pom.xml b/pom.xml index 7418f11..3fc1d3c 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.tidesdb tidesdb-java - 0.6.5 + 0.6.6 jar TidesDB Java diff --git a/src/main/c/com_tidesdb_TidesDB.c b/src/main/c/com_tidesdb_TidesDB.c index 87a3c89..bdaba21 100644 --- a/src/main/c/com_tidesdb_TidesDB.c +++ b/src/main/c/com_tidesdb_TidesDB.c @@ -1067,6 +1067,76 @@ JNIEXPORT jlong JNICALL Java_com_tidesdb_ColumnFamily_nativeSetCommitHook(JNIEnv return (jlong)(uintptr_t)ctx; } +JNIEXPORT void JNICALL Java_com_tidesdb_ColumnFamily_nativePurge(JNIEnv *env, jclass cls, + jlong handle) +{ + tidesdb_column_family_t *cf = (tidesdb_column_family_t *)(uintptr_t)handle; + int result = tidesdb_purge_cf(cf); + + if (result != TDB_SUCCESS) + { + throwTidesDBException(env, result, getErrorMessage(result)); + } +} + +JNIEXPORT void JNICALL Java_com_tidesdb_ColumnFamily_nativeSyncWal(JNIEnv *env, jclass cls, + jlong handle) +{ + tidesdb_column_family_t *cf = (tidesdb_column_family_t *)(uintptr_t)handle; + int result = tidesdb_sync_wal(cf); + + if (result != TDB_SUCCESS) + { + throwTidesDBException(env, result, getErrorMessage(result)); + } +} + +JNIEXPORT void JNICALL Java_com_tidesdb_TidesDB_nativePurge(JNIEnv *env, jclass cls, jlong handle) +{ + tidesdb_t *db = (tidesdb_t *)(uintptr_t)handle; + int result = tidesdb_purge(db); + + if (result != TDB_SUCCESS) + { + throwTidesDBException(env, result, getErrorMessage(result)); + } +} + +JNIEXPORT jobject JNICALL Java_com_tidesdb_TidesDB_nativeGetDbStats(JNIEnv *env, jclass cls, + jlong handle) +{ + tidesdb_t *db = (tidesdb_t *)(uintptr_t)handle; + tidesdb_db_stats_t db_stats; + + int result = tidesdb_get_db_stats(db, &db_stats); + if (result != TDB_SUCCESS) + { + throwTidesDBException(env, result, getErrorMessage(result)); + return NULL; + } + + jclass dbStatsClass = (*env)->FindClass(env, "com/tidesdb/DbStats"); + jmethodID constructor = + (*env)->GetMethodID(env, dbStatsClass, "", "(IJJJIIJIIJIJJJJ)V"); + + return (*env)->NewObject(env, dbStatsClass, constructor, + (jint)db_stats.num_column_families, + (jlong)db_stats.total_memory, + (jlong)db_stats.available_memory, + (jlong)db_stats.resolved_memory_limit, + (jint)db_stats.memory_pressure_level, + (jint)db_stats.flush_pending_count, + (jlong)db_stats.total_memtable_bytes, + (jint)db_stats.total_immutable_count, + (jint)db_stats.total_sstable_count, + (jlong)db_stats.total_data_size_bytes, + (jint)db_stats.num_open_sstables, + (jlong)db_stats.global_seq, + (jlong)db_stats.txn_memory_bytes, + (jlong)db_stats.compaction_queue_size, + (jlong)db_stats.flush_queue_size); +} + JNIEXPORT jdouble JNICALL Java_com_tidesdb_ColumnFamily_nativeRangeCost(JNIEnv *env, jclass cls, jlong handle, jbyteArray keyA, diff --git a/src/main/java/com/tidesdb/ColumnFamily.java b/src/main/java/com/tidesdb/ColumnFamily.java index cb2708b..962dfa7 100644 --- a/src/main/java/com/tidesdb/ColumnFamily.java +++ b/src/main/java/com/tidesdb/ColumnFamily.java @@ -126,6 +126,27 @@ public void updateRuntimeConfig(ColumnFamilyConfig config, boolean persistToDisk persistToDisk); } + /** + * Forces a synchronous flush and aggressive compaction for this column family. + * Unlike {@link #compact()} and {@link #flushMemtable()} (which are non-blocking), + * purge blocks until all flush and compaction I/O is complete. + * + * @throws TidesDBException if the purge fails + */ + public void purge() throws TidesDBException { + nativePurge(nativeHandle); + } + + /** + * Forces an immediate fsync of the active write-ahead log for this column family. + * Useful for explicit durability control when using SYNC_NONE or SYNC_INTERVAL modes. + * + * @throws TidesDBException if the WAL sync fails + */ + public void syncWal() throws TidesDBException { + nativeSyncWal(nativeHandle); + } + /** * Estimates the computational cost of iterating between two keys in this column family. * The returned value is an opaque double — meaningful only for comparison with other @@ -190,4 +211,6 @@ private static native void nativeUpdateRuntimeConfig(long handle, long writeBuff int syncMode, long syncIntervalUs, boolean persistToDisk) throws TidesDBException; private static native double nativeRangeCost(long handle, byte[] keyA, byte[] keyB) throws TidesDBException; private static native long nativeSetCommitHook(long handle, CommitHook hook, long oldCtxHandle) throws TidesDBException; + private static native void nativePurge(long handle) throws TidesDBException; + private static native void nativeSyncWal(long handle) throws TidesDBException; } diff --git a/src/main/java/com/tidesdb/DbStats.java b/src/main/java/com/tidesdb/DbStats.java new file mode 100644 index 0000000..67eb36c --- /dev/null +++ b/src/main/java/com/tidesdb/DbStats.java @@ -0,0 +1,220 @@ +/** + * + * Copyright (C) TidesDB + * + * Original Author: Alex Gaetano Padula + * + * Licensed under the Mozilla Public License, v. 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.mozilla.org/en-US/MPL/2.0/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tidesdb; + +/** + * Database-level aggregate statistics across the entire TidesDB instance. + */ +public class DbStats { + + private final int numColumnFamilies; + private final long totalMemory; + private final long availableMemory; + private final long resolvedMemoryLimit; + private final int memoryPressureLevel; + private final int flushPendingCount; + private final long totalMemtableBytes; + private final int totalImmutableCount; + private final int totalSstableCount; + private final long totalDataSizeBytes; + private final int numOpenSstables; + private final long globalSeq; + private final long txnMemoryBytes; + private final long compactionQueueSize; + private final long flushQueueSize; + + public DbStats(int numColumnFamilies, long totalMemory, long availableMemory, + long resolvedMemoryLimit, int memoryPressureLevel, int flushPendingCount, + long totalMemtableBytes, int totalImmutableCount, int totalSstableCount, + long totalDataSizeBytes, int numOpenSstables, long globalSeq, + long txnMemoryBytes, long compactionQueueSize, long flushQueueSize) { + this.numColumnFamilies = numColumnFamilies; + this.totalMemory = totalMemory; + this.availableMemory = availableMemory; + this.resolvedMemoryLimit = resolvedMemoryLimit; + this.memoryPressureLevel = memoryPressureLevel; + this.flushPendingCount = flushPendingCount; + this.totalMemtableBytes = totalMemtableBytes; + this.totalImmutableCount = totalImmutableCount; + this.totalSstableCount = totalSstableCount; + this.totalDataSizeBytes = totalDataSizeBytes; + this.numOpenSstables = numOpenSstables; + this.globalSeq = globalSeq; + this.txnMemoryBytes = txnMemoryBytes; + this.compactionQueueSize = compactionQueueSize; + this.flushQueueSize = flushQueueSize; + } + + /** + * Gets the number of column families. + * + * @return number of column families + */ + public int getNumColumnFamilies() { + return numColumnFamilies; + } + + /** + * Gets the system total memory. + * + * @return total memory in bytes + */ + public long getTotalMemory() { + return totalMemory; + } + + /** + * Gets the system available memory at open time. + * + * @return available memory in bytes + */ + public long getAvailableMemory() { + return availableMemory; + } + + /** + * Gets the resolved memory limit (auto or configured). + * + * @return resolved memory limit in bytes + */ + public long getResolvedMemoryLimit() { + return resolvedMemoryLimit; + } + + /** + * Gets the current memory pressure level. + * 0=normal, 1=elevated, 2=high, 3=critical. + * + * @return memory pressure level + */ + public int getMemoryPressureLevel() { + return memoryPressureLevel; + } + + /** + * Gets the number of pending flush operations (queued + in-flight). + * + * @return flush pending count + */ + public int getFlushPendingCount() { + return flushPendingCount; + } + + /** + * Gets the total bytes in active memtables across all column families. + * + * @return total memtable bytes + */ + public long getTotalMemtableBytes() { + return totalMemtableBytes; + } + + /** + * Gets the total immutable memtables across all column families. + * + * @return total immutable count + */ + public int getTotalImmutableCount() { + return totalImmutableCount; + } + + /** + * Gets the total SSTables across all column families and levels. + * + * @return total SSTable count + */ + public int getTotalSstableCount() { + return totalSstableCount; + } + + /** + * Gets the total data size (klog + vlog) across all column families. + * + * @return total data size in bytes + */ + public long getTotalDataSizeBytes() { + return totalDataSizeBytes; + } + + /** + * Gets the number of currently open SSTable file handles. + * + * @return number of open SSTables + */ + public int getNumOpenSstables() { + return numOpenSstables; + } + + /** + * Gets the current global sequence number. + * + * @return global sequence number + */ + public long getGlobalSeq() { + return globalSeq; + } + + /** + * Gets the bytes held by in-flight transactions. + * + * @return transaction memory bytes + */ + public long getTxnMemoryBytes() { + return txnMemoryBytes; + } + + /** + * Gets the number of pending compaction tasks. + * + * @return compaction queue size + */ + public long getCompactionQueueSize() { + return compactionQueueSize; + } + + /** + * Gets the number of pending flush tasks in queue. + * + * @return flush queue size + */ + public long getFlushQueueSize() { + return flushQueueSize; + } + + @Override + public String toString() { + return "DbStats{" + + "numColumnFamilies=" + numColumnFamilies + + ", totalMemory=" + totalMemory + + ", availableMemory=" + availableMemory + + ", resolvedMemoryLimit=" + resolvedMemoryLimit + + ", memoryPressureLevel=" + memoryPressureLevel + + ", flushPendingCount=" + flushPendingCount + + ", totalMemtableBytes=" + totalMemtableBytes + + ", totalImmutableCount=" + totalImmutableCount + + ", totalSstableCount=" + totalSstableCount + + ", totalDataSizeBytes=" + totalDataSizeBytes + + ", numOpenSstables=" + numOpenSstables + + ", globalSeq=" + globalSeq + + ", txnMemoryBytes=" + txnMemoryBytes + + ", compactionQueueSize=" + compactionQueueSize + + ", flushQueueSize=" + flushQueueSize + + '}'; + } +} diff --git a/src/main/java/com/tidesdb/TidesDB.java b/src/main/java/com/tidesdb/TidesDB.java index c112a49..5b23d0f 100644 --- a/src/main/java/com/tidesdb/TidesDB.java +++ b/src/main/java/com/tidesdb/TidesDB.java @@ -283,6 +283,29 @@ public void cloneColumnFamily(String sourceName, String destName) throws TidesDB nativeCloneColumnFamily(nativeHandle, sourceName, destName); } + /** + * Forces a synchronous flush and aggressive compaction for all column families, + * then drains both the global flush and compaction queues. + * This blocks until all work is complete. + * + * @throws TidesDBException if the purge fails + */ + public void purge() throws TidesDBException { + checkNotClosed(); + nativePurge(nativeHandle); + } + + /** + * Retrieves aggregate statistics across the entire database instance. + * + * @return database-level statistics + * @throws TidesDBException if the stats cannot be retrieved + */ + public DbStats getDbStats() throws TidesDBException { + checkNotClosed(); + return nativeGetDbStats(nativeHandle); + } + private void checkNotClosed() { if (closed) { throw new IllegalStateException("TidesDB instance is closed"); @@ -329,4 +352,8 @@ private static native void nativeCreateColumnFamily(long handle, String name, private static native void nativeRenameColumnFamily(long handle, String oldName, String newName) throws TidesDBException; private static native void nativeCloneColumnFamily(long handle, String sourceName, String destName) throws TidesDBException; + + private static native void nativePurge(long handle) throws TidesDBException; + + private static native DbStats nativeGetDbStats(long handle) throws TidesDBException; } diff --git a/src/test/java/com/tidesdb/TidesDBTest.java b/src/test/java/com/tidesdb/TidesDBTest.java index 3715eec..65f9a78 100644 --- a/src/test/java/com/tidesdb/TidesDBTest.java +++ b/src/test/java/com/tidesdb/TidesDBTest.java @@ -1136,6 +1136,186 @@ void testMultiColumnFamilyTransaction() throws TidesDBException { } } + @Test + @Order(31) + void testPurgeCf() throws TidesDBException { + Config config = Config.builder(tempDir.resolve("testdb29").toString()) + .numFlushThreads(2) + .numCompactionThreads(2) + .logLevel(LogLevel.INFO) + .blockCacheSize(64 * 1024 * 1024) + .maxOpenSSTables(256) + .build(); + + try (TidesDB db = TidesDB.open(config)) { + ColumnFamilyConfig cfConfig = ColumnFamilyConfig.defaultConfig(); + db.createColumnFamily("test_cf", cfConfig); + + ColumnFamily cf = db.getColumnFamily("test_cf"); + + // Insert data + try (Transaction txn = db.beginTransaction()) { + for (int i = 0; i < 100; i++) { + txn.put(cf, ("key" + i).getBytes(), ("value" + i).getBytes()); + } + txn.commit(); + } + + // Purge the column family (synchronous flush + compaction) + cf.purge(); + + // Verify data still accessible after purge + try (Transaction txn = db.beginTransaction()) { + byte[] result = txn.get(cf, "key50".getBytes()); + assertNotNull(result); + assertArrayEquals("value50".getBytes(), result); + } + } + } + + @Test + @Order(32) + void testPurgeDb() throws TidesDBException { + Config config = Config.builder(tempDir.resolve("testdb30").toString()) + .numFlushThreads(2) + .numCompactionThreads(2) + .logLevel(LogLevel.INFO) + .blockCacheSize(64 * 1024 * 1024) + .maxOpenSSTables(256) + .build(); + + try (TidesDB db = TidesDB.open(config)) { + ColumnFamilyConfig cfConfig = ColumnFamilyConfig.defaultConfig(); + db.createColumnFamily("cf1", cfConfig); + db.createColumnFamily("cf2", cfConfig); + + ColumnFamily cf1 = db.getColumnFamily("cf1"); + ColumnFamily cf2 = db.getColumnFamily("cf2"); + + // Insert data into both column families + try (Transaction txn = db.beginTransaction()) { + for (int i = 0; i < 50; i++) { + txn.put(cf1, ("key" + i).getBytes(), ("value" + i).getBytes()); + txn.put(cf2, ("key" + i).getBytes(), ("value" + i).getBytes()); + } + txn.commit(); + } + + // Purge entire database + db.purge(); + + // Verify data still accessible after purge + try (Transaction txn = db.beginTransaction()) { + byte[] result1 = txn.get(cf1, "key25".getBytes()); + assertNotNull(result1); + assertArrayEquals("value25".getBytes(), result1); + + byte[] result2 = txn.get(cf2, "key25".getBytes()); + assertNotNull(result2); + assertArrayEquals("value25".getBytes(), result2); + } + } + } + + @Test + @Order(33) + void testSyncWal() throws TidesDBException { + Config config = Config.builder(tempDir.resolve("testdb31").toString()) + .numFlushThreads(2) + .numCompactionThreads(2) + .logLevel(LogLevel.INFO) + .blockCacheSize(64 * 1024 * 1024) + .maxOpenSSTables(256) + .build(); + + try (TidesDB db = TidesDB.open(config)) { + ColumnFamilyConfig cfConfig = ColumnFamilyConfig.builder() + .syncMode(SyncMode.SYNC_NONE) + .build(); + db.createColumnFamily("test_cf", cfConfig); + + ColumnFamily cf = db.getColumnFamily("test_cf"); + + // Write some data + try (Transaction txn = db.beginTransaction()) { + txn.put(cf, "key1".getBytes(), "value1".getBytes()); + txn.commit(); + } + + // Force WAL sync + cf.syncWal(); + + // Verify data accessible + try (Transaction txn = db.beginTransaction()) { + byte[] result = txn.get(cf, "key1".getBytes()); + assertNotNull(result); + assertArrayEquals("value1".getBytes(), result); + } + } + } + + @Test + @Order(34) + void testGetDbStats() throws TidesDBException { + Config config = Config.builder(tempDir.resolve("testdb32").toString()) + .numFlushThreads(2) + .numCompactionThreads(2) + .logLevel(LogLevel.INFO) + .blockCacheSize(64 * 1024 * 1024) + .maxOpenSSTables(256) + .build(); + + try (TidesDB db = TidesDB.open(config)) { + ColumnFamilyConfig cfConfig = ColumnFamilyConfig.defaultConfig(); + db.createColumnFamily("cf1", cfConfig); + db.createColumnFamily("cf2", cfConfig); + + ColumnFamily cf1 = db.getColumnFamily("cf1"); + + // Insert some data + try (Transaction txn = db.beginTransaction()) { + for (int i = 0; i < 100; i++) { + txn.put(cf1, ("key" + i).getBytes(), ("value" + i).getBytes()); + } + txn.commit(); + } + + DbStats dbStats = db.getDbStats(); + assertNotNull(dbStats); + assertEquals(2, dbStats.getNumColumnFamilies()); + assertTrue(dbStats.getTotalMemory() > 0); + assertTrue(dbStats.getResolvedMemoryLimit() > 0); + assertTrue(dbStats.getMemoryPressureLevel() >= 0); + assertTrue(dbStats.getGlobalSeq() > 0); + assertTrue(dbStats.getTotalMemtableBytes() >= 0); + assertTrue(dbStats.getTotalSstableCount() >= 0); + assertTrue(dbStats.getTotalDataSizeBytes() >= 0); + } + } + + @Test + @Order(35) + void testGetDbStatsToString() throws TidesDBException { + Config config = Config.builder(tempDir.resolve("testdb33").toString()) + .numFlushThreads(2) + .numCompactionThreads(2) + .logLevel(LogLevel.INFO) + .blockCacheSize(64 * 1024 * 1024) + .maxOpenSSTables(256) + .build(); + + try (TidesDB db = TidesDB.open(config)) { + ColumnFamilyConfig cfConfig = ColumnFamilyConfig.defaultConfig(); + db.createColumnFamily("test_cf", cfConfig); + + DbStats dbStats = db.getDbStats(); + assertNotNull(dbStats); + String str = dbStats.toString(); + assertTrue(str.contains("numColumnFamilies=")); + assertTrue(str.contains("totalMemory=")); + } + } + @Test @Order(21) void testTransactionResetNullIsolation() throws TidesDBException {