diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/TransactionPayloadEventData.java b/src/main/java/com/github/shyiko/mysql/binlog/event/TransactionPayloadEventData.java index f5014cc8..573b7712 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/TransactionPayloadEventData.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/TransactionPayloadEventData.java @@ -4,8 +4,8 @@ public class TransactionPayloadEventData implements EventData { - private int payloadSize; - private int uncompressedSize; + private long payloadSize; + private long uncompressedSize; private int compressionType; private byte[] payload; private ArrayList uncompressedEvents; @@ -18,19 +18,19 @@ public void setUncompressedEvents(ArrayList uncompressedEvents) { this.uncompressedEvents = uncompressedEvents; } - public int getPayloadSize() { + public long getPayloadSize() { return payloadSize; } - public void setPayloadSize(int payloadSize) { + public void setPayloadSize(long payloadSize) { this.payloadSize = payloadSize; } - public int getUncompressedSize() { + public long getUncompressedSize() { return uncompressedSize; } - public void setUncompressedSize(int uncompressedSize) { + public void setUncompressedSize(long uncompressedSize) { this.uncompressedSize = uncompressedSize; } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java index a8e84876..618b5054 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java @@ -15,18 +15,18 @@ */ package com.github.shyiko.mysql.binlog.event.deserialization; -import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdInputStream; import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.TransactionPayloadEventData; import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; /** * @author Somesh Malviya * @author Debjeet Sarkar + * @author Pratik Pandey */ public class TransactionPayloadEventDataDeserializer implements EventDataDeserializer { public static final int OTW_PAYLOAD_HEADER_END_MARK = 0; @@ -49,9 +49,9 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) if (fieldType == OTW_PAYLOAD_HEADER_END_MARK) { break; } - // Read the size of the field + // Read the size of the field (use readPackedLong to support large field sizes) if (inputStream.available() >= 1) { - fieldLen = inputStream.readPackedInteger(); + fieldLen = inputStream.readPackedInt(); } switch (fieldType) { case OTW_PAYLOAD_SIZE_FIELD: @@ -64,7 +64,7 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) break; case OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD: // Fetch the uncompressed size - eventData.setUncompressedSize(inputStream.readPackedInteger()); + eventData.setUncompressedSize(inputStream.readPackedLong()); break; default: // Ignore unrecognized field @@ -76,27 +76,31 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) // Default the uncompressed to the payload size eventData.setUncompressedSize(eventData.getPayloadSize()); } - // set the payload to the rest of the input buffer + eventData.setPayload(inputStream.read(eventData.getPayloadSize())); - // Decompress the payload - byte[] src = eventData.getPayload(); - byte[] dst = ByteBuffer.allocate(eventData.getUncompressedSize()).array(); - Zstd.decompressByteArray(dst, 0, dst.length, src, 0, src.length); + // Use streaming decompression to handle uncompressed sizes up to 4GB + // This avoids hitting Java's 2GB array limit by processing events as they're decompressed + ArrayList decompressedEvents = getDecompressedEvents(eventData); + + eventData.setUncompressedEvents(decompressedEvents); + + return eventData; + } - // Read and store events from decompressed byte array into input stream + private static ArrayList getDecompressedEvents(TransactionPayloadEventData eventData) throws IOException { ArrayList decompressedEvents = new ArrayList<>(); EventDeserializer transactionPayloadEventDeserializer = new EventDeserializer(); - ByteArrayInputStream destinationInputStream = new ByteArrayInputStream(dst); - Event internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream); - while(internalEvent != null) { - decompressedEvents.add(internalEvent); - internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream); - } - - eventData.setUncompressedEvents(decompressedEvents); + try (ZstdInputStream zstdInputStream = new ZstdInputStream(new java.io.ByteArrayInputStream(eventData.getPayload()))) { + ByteArrayInputStream destinationInputStream = new ByteArrayInputStream(zstdInputStream); - return eventData; + Event internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream); + while(internalEvent != null) { + decompressedEvents.add(internalEvent); + internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream); + } + } + return decompressedEvents; } } diff --git a/src/test/java/com/github/shyiko/mysql/binlog/TransactionPayloadIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/TransactionPayloadIntegrationTest.java new file mode 100644 index 00000000..a4afd19a --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/TransactionPayloadIntegrationTest.java @@ -0,0 +1,179 @@ +package com.github.shyiko.mysql.binlog; + +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.TransactionPayloadEventData; +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; +import org.testng.SkipException; +import org.testng.annotations.Test; + +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +import static org.testng.Assert.*; + +/** + * @author Pratik Pandey + * Integration test for TRANSACTION_PAYLOAD event deserialization. + * Requires MySQL 8.0.20+ with binlog_transaction_compression enabled. + * Run Using: MYSQL_VERSION=8.0 mvn test -Dtest=TransactionPayloadIntegrationTest + */ +public class TransactionPayloadIntegrationTest extends AbstractIntegrationTest { + + @Override + protected MysqlOnetimeServerOptions getOptions() { + MysqlOnetimeServerOptions options = super.getOptions(); + // Enable transaction compression (requires MySQL 8.0.20+) + options.extraParams = "--binlog-transaction-compression=ON --binlog-transaction-compression-level-zstd=3"; + return options; + } + + @Test + public void testVeryLargeTransactionNear2GB() throws Exception { + // This test simulates a transaction with uncompressed size approaching 2GB + // to validate streaming decompression handles the upper limits correctly + + // Expected behavior: + // - Uncompressed size can exceed 2GB (uses streaming decompression) + // - Compressed size must stay under 2GB (Java array limit) + + if (!mysqlVersion.atLeast(8, 0)) { + throw new SkipException("Transaction compression requires MySQL 8.0.20+"); + } + + CapturingEventListener capturingEventListener = new CapturingEventListener(); + client.registerEventListener(capturingEventListener); + client.unregisterEventListener(eventListener); + client.registerEventListener(eventListener); + + try { + // Create table with large BLOB column to generate big transactions + master.execute(new BinaryLogClientIntegrationTest.Callback() { + public void execute(Statement statement) throws SQLException { + statement.execute("drop table if exists very_large_txn_test"); + // LONGTEXT can store up to 4GB, perfect for our test + statement.execute("create table very_large_txn_test (" + + "id int primary key, " + + "data1 LONGTEXT, " + + "data2 LONGTEXT, " + + "data3 LONGTEXT)"); + } + }); + eventListener.waitForAtLeast(EventType.QUERY, 2, BinaryLogClientIntegrationTest.DEFAULT_TIMEOUT); + eventListener.reset(); + + // Generate large repeating data that compresses well + // We want uncompressed to be ~2-3GB but compressed to stay under 2GB + final StringBuilder largeChunk = new StringBuilder(); + final String pattern = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz!@#$%^&*()_+REPEATING_PATTERN_"; + + // Create a 1MB chunk of repeating data (compresses very well) + int targetChunkSize = 1024 * 1024; // 1MB target + int iterations = targetChunkSize / pattern.length(); // Calculate exact iterations needed + for (int i = 0; i < iterations; i++) { + largeChunk.append(pattern); + } + final String chunk = largeChunk.toString(); + + System.out.println("Starting very large transaction test..."); + System.out.println("Chunk size: " + chunk.length() + " bytes"); + + long startTime = System.currentTimeMillis(); + + // Insert rows with large data + // With 3 columns of 1MB each per row, and good compression: + // - ~700 rows = ~2GB uncompressed + // - Should compress to ~200-500MB depending on compression ratio + final int numRows = 800; + System.out.println("Inserting " + numRows + " rows with ~3MB each..."); + + master.execute(new BinaryLogClientIntegrationTest.Callback() { + public void execute(Statement statement) throws SQLException { + statement.execute("BEGIN"); + for (int i = 0; i < numRows; i++) { + // Use prepared statement to handle large data efficiently + String sql = String.format( + "insert into very_large_txn_test values(%d, '%s', '%s', '%s')", + i, chunk, chunk, chunk + ); + statement.execute(sql); + + if (i % 50 == 0 && i > 0) { + System.out.println(" Inserted " + i + " rows (" + + ((long) i * 3 * chunk.length() / (1024 * 1024)) + " MB uncompressed)..."); + } + } + System.out.println("Committing transaction..."); + statement.execute("COMMIT"); + } + }); + + long insertTime = System.currentTimeMillis() - startTime; + System.out.println("Transaction committed in " + (insertTime / 1000) + " seconds"); + + // Wait for transaction payload event (give it more time for large transactions) + long largeTimeout = BinaryLogClientIntegrationTest.DEFAULT_TIMEOUT * 1000; // 30 seconds + eventListener.waitFor(EventType.TRANSACTION_PAYLOAD, 1, largeTimeout); + + // Verify the large payload was handled correctly + List payloadEvents = + capturingEventListener.getEvents(TransactionPayloadEventData.class); + + assertTrue(payloadEvents.size() > 0, "Should have captured TRANSACTION_PAYLOAD event"); + + TransactionPayloadEventData payloadEventData = payloadEvents.get(0); + assertNotNull(payloadEventData, "TRANSACTION_PAYLOAD event data should not be null"); + + long uncompressedSize = payloadEventData.getUncompressedSize(); + long compressedSize = payloadEventData.getPayloadSize(); + + // Validate sizes + assertTrue(compressedSize > 0, "Compressed size should be > 0"); + assertTrue(compressedSize < Integer.MAX_VALUE, + "Compressed size must be < 2GB (Java array limit): " + compressedSize); + assertTrue(uncompressedSize > 1024L * 1024 * 1024, + "Uncompressed size should be > 1GB: " + (uncompressedSize / (1024 * 1024)) + " MB"); + + // Verify compression ratio + double compressionRatio = (double) uncompressedSize / compressedSize; + assertTrue(compressionRatio > 2.0, + "Should have good compression ratio (>2x) for repetitive data, got: " + + String.format("%.2fx", compressionRatio)); + + // Verify all events were decompressed successfully via streaming + List uncompressedEvents = payloadEventData.getUncompressedEvents(); + assertNotNull(uncompressedEvents, "Should have uncompressed events"); + assertFalse(uncompressedEvents.isEmpty(), "Should have decompressed events successfully"); + + // Count WriteRowsEventData events + int writeRowsCount = 0; + for (Event event : uncompressedEvents) { + if (event.getData() instanceof WriteRowsEventData) { + writeRowsCount++; + } + } + assertTrue(writeRowsCount > 0, "Should have WriteRowsEventData in the payload"); + + long totalTime = System.currentTimeMillis() - startTime; + + System.out.println("\n=== Very Large Transaction Test Results ==="); + System.out.printf("Rows inserted: %d%n", numRows); + System.out.printf("Estimated uncompressed data: ~%d MB%n", + ((long) numRows * 3 * chunk.length()) / (1024 * 1024)); + System.out.printf("Actual uncompressed size: %d MB%n", + uncompressedSize / (1024 * 1024)); + System.out.printf("Compressed size: %.2f MB (%.1f%% of limit)%n", + compressedSize / (1024.0 * 1024.0), + (compressedSize * 100.0 / Integer.MAX_VALUE)); + System.out.printf("Compression ratio: %.2fx%n", compressionRatio); + System.out.printf("Events decompressed: %d%n", uncompressedEvents.size()); + System.out.printf("Write events: %d%n", writeRowsCount); + System.out.printf("Total time: %.1f seconds%n", totalTime / 1000.0); + System.out.println("===========================================\n"); + + } finally { + client.unregisterEventListener(capturingEventListener); + } + } +}