diff --git a/src/test/java/org/apache/accumulo/proxy/its/SimpleProxyBase.java b/src/test/java/org/apache/accumulo/proxy/its/SimpleProxyBase.java index 5435135..7289586 100644 --- a/src/test/java/org/apache/accumulo/proxy/its/SimpleProxyBase.java +++ b/src/test/java/org/apache/accumulo/proxy/its/SimpleProxyBase.java @@ -31,6 +31,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStreamReader; +import java.io.UncheckedIOException; import java.net.InetAddress; import java.net.URI; import java.nio.ByteBuffer; @@ -52,11 +53,13 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer; import org.apache.accumulo.core.client.admin.compaction.TooManyDeletesSelector; import org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer; import org.apache.accumulo.core.clientImpl.Namespace; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; @@ -114,6 +117,7 @@ import org.apache.accumulo.proxy.thrift.UnknownScanner; import org.apache.accumulo.proxy.thrift.UnknownWriter; import org.apache.accumulo.proxy.thrift.WriterOptions; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.util.PortUtils; import org.apache.accumulo.test.constraints.MaxMutationSize; import org.apache.accumulo.test.constraints.NumericValueConstraint; @@ -2315,6 +2319,78 @@ public void testCompactionSelector() throws Exception { assertEquals(1, countFiles(tableNames[1]), messagePrefix + tableNames[2]); } + /** + * Retrieves the collective size of all the files in a table. + */ + private long getFileSizes(ServerContext ctx, String tableName) { + TableId tableId = TableId.of(ctx.tableOperations().tableIdMap().get(tableName)); + try (var tabletsMetadata = ctx.getAmple().readTablets().forTable(tableId).build();) { + return tabletsMetadata.stream().flatMap(tm -> tm.getFiles().stream()).mapToLong(stf -> { + try { + return FileSystem.getLocal(new Configuration()).getFileStatus(stf.getPath()).getLen(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).sum(); + } + } + + /** + * Testing the functionality for the CompactionConfigurer by testing an implementation of it. The + * implementation being tested is the CompressionConfigurer. + */ + @Test + public void testCompactionConfigurer() throws Exception { + // Delete the table to start fresh + client.deleteTable(sharedSecret, tableName); + // Create two tables + final String[] tableNames = getUniqueNameArray(2); + for (String tableName : tableNames) { + client.createTable(sharedSecret, tableName, true, TimeType.MILLIS); + client.setTableProperty(sharedSecret, tableName, "table.file.compress.type", "none"); + } + + // Create data to add to the tables + Map> mutation = new HashMap<>(); + byte[] data = new byte[100000]; + Arrays.fill(data, (byte) 65); + for (int i = 0; i < 10; i++) { + String row = String.format("%09d", i); + ColumnUpdate columnUpdate = new ColumnUpdate(s2bb("big"), s2bb("files")); + columnUpdate.setDeleteCell(false); + columnUpdate.setValue(data); + mutation.put(s2bb(row), List.of(columnUpdate)); + } + for (String tableName : tableNames) { + client.updateAndFlush(sharedSecret, tableName, mutation); + client.flushTable(sharedSecret, tableName, null, null, true); + } + + // Checking the sizes of the files before compaction + for (String tableName : tableNames) { + long sizes = getFileSizes(getCluster().getServerContext(), tableName); + assertTrue(sizes > data.length * 10.0 && sizes < data.length * 11.0); + } + + // Create a PluginConfig for the CompressionConfigurer + PluginConfig configurerCompact = new PluginConfig(CompressionConfigurer.class.getName(), + Map.of(CompressionConfigurer.LARGE_FILE_COMPRESSION_THRESHOLD, data.length + "", + CompressionConfigurer.LARGE_FILE_COMPRESSION_TYPE, "gz")); + + // Compacting the tables one with the Configurer, one without + client.compactTable(sharedSecret, tableNames[0], null, null, null, true, true, null, + configurerCompact); + client.compactTable(sharedSecret, tableNames[1], null, null, null, true, true, null, null); + + // Checking to see that the data sizes are the appropriate size. Based on the data, it will be + // significantly smaller with compression + long sizes1 = getFileSizes(getCluster().getServerContext(), tableNames[0]); + long sizes2 = getFileSizes(getCluster().getServerContext(), tableNames[1]); + assertTrue(sizes1 < data.length); + assertTrue(sizes1 < sizes2, "Size1 is " + sizes1 + ", size2 is " + sizes2); + assertTrue(sizes2 > data.length * 10.0 && sizes2 < data.length * 11.0); + } + @Test public void namespaceOperations() throws Exception { // default namespace and accumulo namespace