Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@


public class TransactionPayloadEventData implements EventData {
private int payloadSize;
private int uncompressedSize;
private long payloadSize;
private long uncompressedSize;
private int compressionType;
private byte[] payload;
private ArrayList<Event> uncompressedEvents;
Expand All @@ -18,19 +18,19 @@ public void setUncompressedEvents(ArrayList<Event> uncompressedEvents) {
this.uncompressedEvents = uncompressedEvents;
}

public int getPayloadSize() {
public long getPayloadSize() {
return payloadSize;
}

public void setPayloadSize(int payloadSize) {
public void setPayloadSize(long payloadSize) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

payloadSize can stay an int too i think

this.payloadSize = payloadSize;
}

public int getUncompressedSize() {
public long getUncompressedSize() {
return uncompressedSize;
}

public void setUncompressedSize(int uncompressedSize) {
public void setUncompressedSize(long uncompressedSize) {
this.uncompressedSize = uncompressedSize;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
*/
package com.github.shyiko.mysql.binlog.event.deserialization;

import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdInputStream;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.TransactionPayloadEventData;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;

/**
* @author <a href="mailto:somesh.malviya@booking.com">Somesh Malviya</a>
* @author <a href="mailto:debjeet.sarkar@booking.com">Debjeet Sarkar</a>
* @author <a href="mailto:pratik.pandey@booking.com">Pratik Pandey</a>
*/
public class TransactionPayloadEventDataDeserializer implements EventDataDeserializer<TransactionPayloadEventData> {
public static final int OTW_PAYLOAD_HEADER_END_MARK = 0;
Expand All @@ -49,9 +49,9 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream)
if (fieldType == OTW_PAYLOAD_HEADER_END_MARK) {
break;
}
// Read the size of the field
// Read the size of the field (use readPackedLong to support large field sizes)
if (inputStream.available() >= 1) {
fieldLen = inputStream.readPackedInteger();
fieldLen = inputStream.readPackedInt();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

}
switch (fieldType) {
case OTW_PAYLOAD_SIZE_FIELD:
Expand All @@ -64,7 +64,7 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream)
break;
case OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD:
// Fetch the uncompressed size
eventData.setUncompressedSize(inputStream.readPackedInteger());
eventData.setUncompressedSize(inputStream.readPackedLong());
break;
default:
// Ignore unrecognized field
Expand All @@ -76,27 +76,31 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream)
// Default the uncompressed to the payload size
eventData.setUncompressedSize(eventData.getPayloadSize());
}
// set the payload to the rest of the input buffer

eventData.setPayload(inputStream.read(eventData.getPayloadSize()));

// Decompress the payload
byte[] src = eventData.getPayload();
byte[] dst = ByteBuffer.allocate(eventData.getUncompressedSize()).array();
Zstd.decompressByteArray(dst, 0, dst.length, src, 0, src.length);
// Use streaming decompression to handle uncompressed sizes up to 4GB
// This avoids hitting Java's 2GB array limit by processing events as they're decompressed
ArrayList<Event> decompressedEvents = getDecompressedEvents(eventData);

eventData.setUncompressedEvents(decompressedEvents);

return eventData;
}

// Read and store events from decompressed byte array into input stream
private static ArrayList<Event> getDecompressedEvents(TransactionPayloadEventData eventData) throws IOException {
ArrayList<Event> decompressedEvents = new ArrayList<>();
EventDeserializer transactionPayloadEventDeserializer = new EventDeserializer();
ByteArrayInputStream destinationInputStream = new ByteArrayInputStream(dst);

Event internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream);
while(internalEvent != null) {
decompressedEvents.add(internalEvent);
internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream);
}

eventData.setUncompressedEvents(decompressedEvents);
try (ZstdInputStream zstdInputStream = new ZstdInputStream(new java.io.ByteArrayInputStream(eventData.getPayload()))) {
ByteArrayInputStream destinationInputStream = new ByteArrayInputStream(zstdInputStream);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you talk more about the need to move to using input streams here? the old code looked much simpler, did it overflow the output buffer?


return eventData;
Event internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream);
while(internalEvent != null) {
decompressedEvents.add(internalEvent);
internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream);
}
}
return decompressedEvents;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package com.github.shyiko.mysql.binlog;

import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.TransactionPayloadEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import org.testng.SkipException;
import org.testng.annotations.Test;

import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;

import static org.testng.Assert.*;

/**
* @author <a href="mailto:pratik.pandey@booking.com">Pratik Pandey</a>
* Integration test for TRANSACTION_PAYLOAD event deserialization.
* Requires MySQL 8.0.20+ with binlog_transaction_compression enabled.
* Run Using: MYSQL_VERSION=8.0 mvn test -Dtest=TransactionPayloadIntegrationTest
*/
public class TransactionPayloadIntegrationTest extends AbstractIntegrationTest {

@Override
protected MysqlOnetimeServerOptions getOptions() {
MysqlOnetimeServerOptions options = super.getOptions();
// Enable transaction compression (requires MySQL 8.0.20+)
options.extraParams = "--binlog-transaction-compression=ON --binlog-transaction-compression-level-zstd=3";
return options;
}

@Test
public void testVeryLargeTransactionNear2GB() throws Exception {
// This test simulates a transaction with uncompressed size approaching 2GB
// to validate streaming decompression handles the upper limits correctly

// Expected behavior:
// - Uncompressed size can exceed 2GB (uses streaming decompression)
// - Compressed size must stay under 2GB (Java array limit)

if (!mysqlVersion.atLeast(8, 0)) {
throw new SkipException("Transaction compression requires MySQL 8.0.20+");
}

CapturingEventListener capturingEventListener = new CapturingEventListener();
client.registerEventListener(capturingEventListener);
client.unregisterEventListener(eventListener);
client.registerEventListener(eventListener);

try {
// Create table with large BLOB column to generate big transactions
master.execute(new BinaryLogClientIntegrationTest.Callback<Statement>() {
public void execute(Statement statement) throws SQLException {
statement.execute("drop table if exists very_large_txn_test");
// LONGTEXT can store up to 4GB, perfect for our test
statement.execute("create table very_large_txn_test (" +
"id int primary key, " +
"data1 LONGTEXT, " +
"data2 LONGTEXT, " +
"data3 LONGTEXT)");
}
});
eventListener.waitForAtLeast(EventType.QUERY, 2, BinaryLogClientIntegrationTest.DEFAULT_TIMEOUT);
eventListener.reset();

// Generate large repeating data that compresses well
// We want uncompressed to be ~2-3GB but compressed to stay under 2GB
final StringBuilder largeChunk = new StringBuilder();
final String pattern = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz!@#$%^&*()_+REPEATING_PATTERN_";

// Create a 1MB chunk of repeating data (compresses very well)
int targetChunkSize = 1024 * 1024; // 1MB target
int iterations = targetChunkSize / pattern.length(); // Calculate exact iterations needed
for (int i = 0; i < iterations; i++) {
largeChunk.append(pattern);
}
final String chunk = largeChunk.toString();

System.out.println("Starting very large transaction test...");
System.out.println("Chunk size: " + chunk.length() + " bytes");

long startTime = System.currentTimeMillis();

// Insert rows with large data
// With 3 columns of 1MB each per row, and good compression:
// - ~700 rows = ~2GB uncompressed
// - Should compress to ~200-500MB depending on compression ratio
final int numRows = 800;
System.out.println("Inserting " + numRows + " rows with ~3MB each...");

master.execute(new BinaryLogClientIntegrationTest.Callback<Statement>() {
public void execute(Statement statement) throws SQLException {
statement.execute("BEGIN");
for (int i = 0; i < numRows; i++) {
// Use prepared statement to handle large data efficiently
String sql = String.format(
"insert into very_large_txn_test values(%d, '%s', '%s', '%s')",
i, chunk, chunk, chunk
);
statement.execute(sql);

if (i % 50 == 0 && i > 0) {
System.out.println(" Inserted " + i + " rows (" +
((long) i * 3 * chunk.length() / (1024 * 1024)) + " MB uncompressed)...");
}
}
System.out.println("Committing transaction...");
statement.execute("COMMIT");
}
});

long insertTime = System.currentTimeMillis() - startTime;
System.out.println("Transaction committed in " + (insertTime / 1000) + " seconds");

// Wait for transaction payload event (give it more time for large transactions)
long largeTimeout = BinaryLogClientIntegrationTest.DEFAULT_TIMEOUT * 1000; // 30 seconds
eventListener.waitFor(EventType.TRANSACTION_PAYLOAD, 1, largeTimeout);

// Verify the large payload was handled correctly
List<TransactionPayloadEventData> payloadEvents =
capturingEventListener.getEvents(TransactionPayloadEventData.class);

assertTrue(payloadEvents.size() > 0, "Should have captured TRANSACTION_PAYLOAD event");

TransactionPayloadEventData payloadEventData = payloadEvents.get(0);
assertNotNull(payloadEventData, "TRANSACTION_PAYLOAD event data should not be null");

long uncompressedSize = payloadEventData.getUncompressedSize();
long compressedSize = payloadEventData.getPayloadSize();

// Validate sizes
assertTrue(compressedSize > 0, "Compressed size should be > 0");
assertTrue(compressedSize < Integer.MAX_VALUE,
"Compressed size must be < 2GB (Java array limit): " + compressedSize);
assertTrue(uncompressedSize > 1024L * 1024 * 1024,
"Uncompressed size should be > 1GB: " + (uncompressedSize / (1024 * 1024)) + " MB");

// Verify compression ratio
double compressionRatio = (double) uncompressedSize / compressedSize;
assertTrue(compressionRatio > 2.0,
"Should have good compression ratio (>2x) for repetitive data, got: " +
String.format("%.2fx", compressionRatio));

// Verify all events were decompressed successfully via streaming
List<Event> uncompressedEvents = payloadEventData.getUncompressedEvents();
assertNotNull(uncompressedEvents, "Should have uncompressed events");
assertFalse(uncompressedEvents.isEmpty(), "Should have decompressed events successfully");

// Count WriteRowsEventData events
int writeRowsCount = 0;
for (Event event : uncompressedEvents) {
if (event.getData() instanceof WriteRowsEventData) {
writeRowsCount++;
}
}
assertTrue(writeRowsCount > 0, "Should have WriteRowsEventData in the payload");

long totalTime = System.currentTimeMillis() - startTime;

System.out.println("\n=== Very Large Transaction Test Results ===");
System.out.printf("Rows inserted: %d%n", numRows);
System.out.printf("Estimated uncompressed data: ~%d MB%n",
((long) numRows * 3 * chunk.length()) / (1024 * 1024));
System.out.printf("Actual uncompressed size: %d MB%n",
uncompressedSize / (1024 * 1024));
System.out.printf("Compressed size: %.2f MB (%.1f%% of limit)%n",
compressedSize / (1024.0 * 1024.0),
(compressedSize * 100.0 / Integer.MAX_VALUE));
System.out.printf("Compression ratio: %.2fx%n", compressionRatio);
System.out.printf("Events decompressed: %d%n", uncompressedEvents.size());
System.out.printf("Write events: %d%n", writeRowsCount);
System.out.printf("Total time: %.1f seconds%n", totalTime / 1000.0);
System.out.println("===========================================\n");

} finally {
client.unregisterEventListener(capturingEventListener);
}
}
}