diff --git a/connectors-common/connector-core/src/main/java/io/tapdata/kit/StringKit.java b/connectors-common/connector-core/src/main/java/io/tapdata/kit/StringKit.java index 8d02d4054..44f831892 100644 --- a/connectors-common/connector-core/src/main/java/io/tapdata/kit/StringKit.java +++ b/connectors-common/connector-core/src/main/java/io/tapdata/kit/StringKit.java @@ -386,4 +386,8 @@ public static String escapeRegex(String input) { Matcher matcher = REGEX_SPECIAL_CHARS.matcher(input); return matcher.replaceAll("\\\\$0"); } + + public static String removeSqlNote(String sql) { + return sql.replaceAll("(?s)/\\*.*?\\*/|--.*?\n", "").trim(); + } } diff --git a/connectors-common/debezium-bucket/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/connectors-common/debezium-bucket/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index bb201e76d..0952921e8 100644 --- a/connectors-common/debezium-bucket/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/connectors-common/debezium-bucket/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -171,7 +171,15 @@ protected void initPublication() { Set tablesToCapture = determineCapturedTables(); Set existsTables = new HashSet<>(); String catalog = connection().getCatalog(); - try (ResultSet resultSet = stmt.executeQuery(String.format("select * from pg_publication_tables where pubname='%s'", publicationName))) { + try (ResultSet resultSet = stmt.executeQuery(String.format("select schemaname, tablename from pg_publication_tables pt join pg_publication pp\n" + + " on pt.pubname = pp.pubname\n" + + "where pt.pubname='%s' and pp.pubviaroot='false'\n" + + "union all\n" + + "select schemaname, pc.relname from pg_publication_tables pt join pg_publication pp\n" + + " on pt.pubname = pp.pubname join pg_class pc on pc.oid in (SELECT inhrelid\n" + + " FROM pg_inherits\n" + + " WHERE inhparent = (pt.schemaname||'.'||pt.tablename)::regclass)\n" + + "where pt.pubname='%s' and pp.pubviaroot='true'", publicationName, publicationName))) { while (resultSet.next()) { existsTables.add(new TableId(catalog, resultSet.getString("schemaname"), resultSet.getString("tablename"))); } diff --git a/connectors-common/debezium-bucket/debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/mysql/generated/MySqlParser.g4 b/connectors-common/debezium-bucket/debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/mysql/generated/MySqlParser.g4 index 6831dacb1..836dbd9d5 100644 --- a/connectors-common/debezium-bucket/debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/mysql/generated/MySqlParser.g4 +++ b/connectors-common/debezium-bucket/debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/mysql/generated/MySqlParser.g4 @@ -1954,7 +1954,7 @@ fullColumnName ; indexColumnName - : (uid | STRING_LITERAL) ('(' decimalLiteral ')')? sortType=(ASC | DESC)? + : ('(' expression ')' | uid | STRING_LITERAL) ('(' decimalLiteral ')')? sortType=(ASC | DESC)? ; userName diff --git a/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/MysqlReaderV2.java b/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/MysqlReaderV2.java index 25032afe8..66178be33 100644 --- a/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/MysqlReaderV2.java +++ b/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/MysqlReaderV2.java @@ -5,25 +5,36 @@ import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; import io.tapdata.common.concurrent.ConcurrentProcessor; import io.tapdata.common.concurrent.TapExecutors; +import io.tapdata.common.ddl.DDLFactory; +import io.tapdata.common.ddl.ccj.CCJBaseDDLWrapper; +import io.tapdata.common.ddl.type.DDLParserType; +import io.tapdata.common.ddl.wrapper.DDLWrapperConfig; import io.tapdata.connector.mysql.config.MysqlConfig; import io.tapdata.connector.mysql.entity.MysqlBinlogPosition; +import io.tapdata.connector.mysql.util.MySQLJsonParser; import io.tapdata.entity.event.TapEvent; +import io.tapdata.entity.event.ddl.TapDDLEvent; +import io.tapdata.entity.event.ddl.TapDDLUnknownEvent; import io.tapdata.entity.event.dml.TapDeleteRecordEvent; import io.tapdata.entity.event.dml.TapInsertRecordEvent; import io.tapdata.entity.event.dml.TapUpdateRecordEvent; import io.tapdata.entity.logger.Log; -import io.tapdata.entity.schema.TapField; import io.tapdata.entity.schema.TapTable; import io.tapdata.entity.utils.cache.KVReadOnlyMap; import io.tapdata.kit.EmptyKit; +import io.tapdata.kit.StringKit; import io.tapdata.pdk.apis.consumer.StreamReadConsumer; import java.io.Serializable; +import java.math.BigDecimal; +import java.time.Instant; +import java.time.ZoneOffset; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import java.util.stream.Collectors; import static io.tapdata.base.ConnectorBase.list; @@ -38,10 +49,16 @@ public class MysqlReaderV2 { private StreamReadConsumer consumer; private final AtomicReference exception = new AtomicReference<>(); private final Map tableMapEventByTableId = new ConcurrentHashMap<>(); + private final Map> dataTypeMap = new ConcurrentHashMap<>(); + private final Map> enumDataTypeMap = new ConcurrentHashMap<>(); + private final TimeZone timeZone; + private final DDLWrapperConfig DDL_WRAPPER_CONFIG = CCJBaseDDLWrapper.CCJDDLWrapperConfig.create().split("`"); + private final DDLParserType ddlParserType = DDLParserType.MYSQL_CCJ_SQL_PARSER; - public MysqlReaderV2(MysqlJdbcContextV2 mysqlJdbcContext, Log tapLogger) { + public MysqlReaderV2(MysqlJdbcContextV2 mysqlJdbcContext, Log tapLogger, TimeZone timeZone) { this.tapLogger = tapLogger; mysqlConfig = (MysqlConfig) mysqlJdbcContext.getConfig(); + this.timeZone = timeZone; } public void init(List tableList, KVReadOnlyMap tableMap, Object offsetState, int recordSize, StreamReadConsumer consumer) throws Throwable { @@ -59,7 +76,11 @@ public void startMiner(Supplier isAlive) throws Throwable { try (ConcurrentProcessor concurrentProcessor = TapExecutors.createSimple(8, 32, "MysqlReader-Processor")) { client.setServerId(randomServerId()); - + EventDeserializer eventDeserializer = new EventDeserializer(); + eventDeserializer.setCompatibilityMode( + EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG_MICRO + ); + client.setEventDeserializer(eventDeserializer); // 设置起始位置 if (offsetState instanceof MysqlBinlogPosition) { MysqlBinlogPosition position = (MysqlBinlogPosition) offsetState; @@ -81,6 +102,9 @@ public void startMiner(Supplier isAlive) throws Throwable { tapLogger.info("Binlog rotated to: {}/{}", rotateEventData.getBinlogFilename(), rotateEventData.getBinlogPosition()); } else if (eventType == EventType.TABLE_MAP) { handleTableMapEvent(event); + } else if (eventType == EventType.QUERY) { + concurrentProcessor.runAsyncWithBlocking(new ScanEvent(event, currentBinlogFile.get()), this::emit); + return; } // 异步处理事件 @@ -140,6 +164,39 @@ private OffsetEvent emit(ScanEvent scanEvent) { return null; } + if (eventType == EventType.QUERY) { + QueryEventData queryEventData = event.getData(); + long eventTime = header.getTimestamp(); + String ddl = StringKit.removeSqlNote(queryEventData.getSql()); + OffsetEvent offsetEvent = new OffsetEvent(); + List ddlEvents = new ArrayList<>(); + try { + DDLFactory.ddlToTapDDLEvent( + ddlParserType, + ddl, + DDL_WRAPPER_CONFIG, + tableMap, + tapDDLEvent -> { + tapDDLEvent.setTime(System.currentTimeMillis()); + tapDDLEvent.setReferenceTime(eventTime); + tapDDLEvent.setOriginDDL(ddl); + ddlEvents.add(tapDDLEvent); + tapLogger.info("Read DDL: " + ddl + ", about to be packaged as some event(s)"); + } + ); + } catch (Throwable e) { + TapDDLEvent tapDDLEvent = new TapDDLUnknownEvent(); + tapDDLEvent.setTime(System.currentTimeMillis()); + tapDDLEvent.setReferenceTime(eventTime); + tapDDLEvent.setOriginDDL(ddl); + ddlEvents.add(tapDDLEvent); + } + offsetEvent.setTapEvent(ddlEvents); + offsetEvent.setMysqlBinlogPosition(extractBinlogPosition(event, scanEvent.getFileName())); + ddlEvents.forEach(e -> ddlFlush(((TapDDLEvent) e).getTableId())); + return offsetEvent; + } + // 处理数据变更事件 List tapEvents; MysqlBinlogPosition position; @@ -185,10 +242,35 @@ private void handleTableMapEvent(Event event) { // 保存映射关系 tableMapEventByTableId.put(tableId, tableMapEventData); - + ddlFlush(table); tapLogger.debug("Table map event: tableId={}, database={}, table={}", tableId, database, table); } + private void ddlFlush(String table) { + if (EmptyKit.isBlank(table)) { + return; + } + LinkedHashMap dataTypes = tableMap.get(table).getNameFieldMap().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> StringKit.removeParentheses(e.getValue().getDataType()), + (existing, replacement) -> existing, LinkedHashMap::new)); + dataTypeMap.put(table, dataTypes); + Map enumMap = tableMap.get(table).getNameFieldMap().entrySet().stream().filter(v -> v.getValue().getDataType().startsWith("enum")) + .collect(Collectors.toMap(Map.Entry::getKey, e -> { + String enumType = e.getValue().getDataType(); + Object[] enumValues = enumType.substring("enum(".length(), enumType.length() - 1).split(","); + for (int i = 0; i < enumValues.length; i++) { + String element = ((String) enumValues[i]).trim(); + if (element.startsWith("'")) { + enumValues[i] = StringKit.removeHeadTail(element, "'", null); + } else { + enumValues[i] = new BigDecimal(element); + } + } + return enumValues; + })); + enumDataTypeMap.put(table, enumMap); + } + /** * 处理 INSERT 事件 */ @@ -213,7 +295,7 @@ private List handleInsertEvent(Event event) { } List tapEvents = new ArrayList<>(); for (Serializable[] row : rows) { - Map after = convertRowToMap(tableId, row, tapTable); + Map after = convertRowToMap(row, dataTypeMap.get(tableName), enumDataTypeMap.get(tableName)); TapInsertRecordEvent insertEvent = new TapInsertRecordEvent(); insertEvent.init(); insertEvent.table(tableName); @@ -249,8 +331,8 @@ private List handleUpdateEvent(Event event) { } List tapEvents = new ArrayList<>(); for (Map.Entry row : rows) { - Map before = convertRowToMap(tableId, row.getKey(), tapTable); - Map after = convertRowToMap(tableId, row.getValue(), tapTable); + Map before = convertRowToMap(row.getKey(), dataTypeMap.get(tableName), enumDataTypeMap.get(tableName)); + Map after = convertRowToMap(row.getValue(), dataTypeMap.get(tableName), enumDataTypeMap.get(tableName)); TapUpdateRecordEvent updateEvent = new TapUpdateRecordEvent(); updateEvent.init(); @@ -288,7 +370,7 @@ private List handleDeleteEvent(Event event) { } List tapEvents = new ArrayList<>(); for (Serializable[] row : rows) { - Map before = convertRowToMap(tableId, row, tapTable); + Map before = convertRowToMap(row, dataTypeMap.get(tableName), enumDataTypeMap.get(tableName)); TapDeleteRecordEvent deleteEvent = new TapDeleteRecordEvent(); deleteEvent.init(); @@ -304,31 +386,65 @@ private List handleDeleteEvent(Event event) { /** * 将行数据数组转换为 Map */ - private Map convertRowToMap(long tableId, Serializable[] row, TapTable tapTable) { + private Map convertRowToMap(Serializable[] row, LinkedHashMap dataTypes, Map enumMap) { Map result = new LinkedHashMap<>(); - if (row == null || tapTable == null) { - return result; - } - - LinkedHashMap nameFieldMap = tapTable.getNameFieldMap(); - if (nameFieldMap == null || nameFieldMap.isEmpty()) { + if (row == null || EmptyKit.isEmpty(dataTypes)) { return result; } // 获取字段名列表(按顺序) - List fieldNames = new ArrayList<>(nameFieldMap.keySet()); + List fieldNames = new ArrayList<>(dataTypes.keySet()); // 将数组值映射到字段名 for (int i = 0; i < row.length && i < fieldNames.size(); i++) { String fieldName = fieldNames.get(i); Object value = row[i]; - result.put(fieldName, value); + result.put(fieldName, filterValue(value, dataTypes.get(fieldName), enumMap.get(fieldName))); } return result; } + private Object filterValue(Object value, String dataType, Object[] enumValues) { + if (value == null) { + return null; + } + switch (dataType) { + case "time": + case "date": { + if (value instanceof Long) { + return Instant.ofEpochSecond(((Long) value) / 1000000, (((Long) value) % 1000000) * 1000); + } + } + case "datetime": { + if (value instanceof Long) { + return Instant.ofEpochSecond(((Long) value) / 1000000 - mysqlConfig.getZoneOffsetHour() * 60 * 60, ((Long) value % 1000000) * 1000); + } + } + case "timestamp": { + if (value instanceof Long) { + return Instant.ofEpochSecond(((Long) value) / 1000000 + timeZone.getRawOffset() / 1000, ((Long) value % 1000000) * 1000).atZone(ZoneOffset.UTC); + } + } + case "bit": + return ((BitSet) value).get(0); + case "binary": + case "varbinary": + return String.valueOf(value).getBytes(); + case "json": + return MySQLJsonParser.parseMySQLJsonBinary((byte[]) value); + case "tinytext": + case "mediumtext": + case "text": + case "longtext": + return new String((byte[]) value); + case "enum": + return enumValues[(int) value - 1]; + } + return value; + } + /** * 获取表名 */ @@ -393,6 +509,9 @@ static class OffsetEvent { private List tapEvents; private MysqlBinlogPosition mysqlBinlogPosition; + public OffsetEvent() { + } + public OffsetEvent(List tapEvents, MysqlBinlogPosition mysqlBinlogPosition) { this.tapEvents = tapEvents; this.mysqlBinlogPosition = mysqlBinlogPosition; diff --git a/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/util/MySQLJsonParser.java b/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/util/MySQLJsonParser.java new file mode 100644 index 000000000..0e35c965b --- /dev/null +++ b/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/util/MySQLJsonParser.java @@ -0,0 +1,480 @@ +package io.tapdata.connector.mysql.util; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.*; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.util.*; + +/** + * MySQL JSON 二进制格式解析器 + * + *

参考资料: + *

    + *
  • MySQL 源码:sql/json_binary.h, json_binary.cc
  • + *
  • MySQL 文档:https://dev.mysql.com/doc/internals/en/json-binary-encoding.html
  • + *
  • 日期时间格式:https://dev.mysql.com/doc/internals/en/date-and-time-data-type-representation.html
  • + *
+ * + * @author TapData + */ +public class MySQLJsonParser { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * 解析 MySQL 的二进制 JSON 格式 + * + * @param data 二进制数据 + * @return JSON 字符串,解析失败返回 null + */ + public static String parseMySQLJsonBinary(byte[] data) { + if (data == null || data.length < 1) { + return null; + } + + try { + // 检查是否是 MariaDB 格式的 JSON 字符串(第一个字节 > 0x0F) + if ((data[0] & 0xFF) > 0x0F) { + return new String(data, StandardCharsets.UTF_8); + } + + ByteBuffer buffer = ByteBuffer.wrap(data); + buffer.order(ByteOrder.LITTLE_ENDIAN); + + // 读取类型(第一个字节) + int type = buffer.get() & 0xFF; + + JsonNode jsonNode = parseJsonValue(buffer, type); + return jsonNode != null ? jsonNode.toString() : null; + + } catch (Exception e) { + System.err.println("Failed to parse MySQL JSON binary: " + e.getMessage()); + e.printStackTrace(); + return null; + } + } + + private static JsonNode parseJsonValue(ByteBuffer buffer, int type) { + switch (type) { + case 0x00: // JSONB_TYPE_SMALL_OBJECT + case 0x01: // JSONB_TYPE_LARGE_OBJECT + return parseJsonObject(buffer, type); + + case 0x02: // JSONB_TYPE_SMALL_ARRAY + case 0x03: // JSONB_TYPE_LARGE_ARRAY + return parseJsonArray(buffer, type); + + case 0x04: // JSONB_TYPE_LITERAL + return parseJsonLiteral(buffer); + + case 0x05: // JSONB_TYPE_INT16 + return parseJsonInt16(buffer); + + case 0x06: // JSONB_TYPE_UINT16 + return parseJsonUInt16(buffer); + + case 0x07: // JSONB_TYPE_INT32 + return parseJsonInt32(buffer); + + case 0x08: // JSONB_TYPE_UINT32 + return parseJsonUInt32(buffer); + + case 0x09: // JSONB_TYPE_INT64 + return parseJsonInt64(buffer); + + case 0x0A: // JSONB_TYPE_UINT64 + return parseJsonUInt64(buffer); + + case 0x0B: // JSONB_TYPE_DOUBLE + return parseJsonDouble(buffer); + + case 0x0C: // JSONB_TYPE_STRING + return parseJsonString(buffer); + + case 0x0D: // JSONB_TYPE_OPAQUE + return parseJsonOpaque(buffer); + + default: + throw new IllegalArgumentException("Unknown JSON type: " + type); + } + } + + private static JsonNode parseJsonObject(ByteBuffer buffer, int type) { + boolean isSmall = (type == 0x00); // SMALL_OBJECT + int startPosition = buffer.position(); + + // 读取元素数量和总字节数 + int elementCount = readOffsetOrSize(buffer, isSmall); + int bytes = readOffsetOrSize(buffer, isSmall); + + if (elementCount == 0) { + return objectMapper.createObjectNode(); + } + + Map map = new LinkedHashMap<>(); + + // 读取 key entries(offset + length) + int[] keyOffsets = new int[elementCount]; + int[] keyLengths = new int[elementCount]; + for (int i = 0; i < elementCount; i++) { + keyOffsets[i] = readOffsetOrSize(buffer, isSmall); + keyLengths[i] = buffer.getShort() & 0xFFFF; // key length 总是 2 字节 + } + + // 读取 value entries(type + offset) + int[] valueTypes = new int[elementCount]; + int[] valueOffsets = new int[elementCount]; + for (int i = 0; i < elementCount; i++) { + valueTypes[i] = buffer.get() & 0xFF; + valueOffsets[i] = readOffsetOrSize(buffer, isSmall); + } + + // 读取 keys 和 values + for (int i = 0; i < elementCount; i++) { + // 读取 key + buffer.position(startPosition + keyOffsets[i]); + byte[] keyBytes = new byte[keyLengths[i]]; + buffer.get(keyBytes); + String key = new String(keyBytes, StandardCharsets.UTF_8); + + // 读取 value + buffer.position(startPosition + valueOffsets[i]); + JsonNode value = parseJsonValue(buffer, valueTypes[i]); + map.put(key, value); + } + + // 移动到对象结束位置 + buffer.position(startPosition + bytes); + return objectMapper.valueToTree(map); + } + + private static JsonNode parseJsonArray(ByteBuffer buffer, int type) { + boolean isSmall = (type == 0x02); // SMALL_ARRAY + int startPosition = buffer.position(); + + // 读取元素数量和总字节数 + int elementCount = readOffsetOrSize(buffer, isSmall); + int bytes = readOffsetOrSize(buffer, isSmall); + + if (elementCount == 0) { + return objectMapper.createArrayNode(); + } + + ArrayNode arrayNode = objectMapper.createArrayNode(); + + // 读取元素的 type 和 offset + int[] elementTypes = new int[elementCount]; + int[] elementOffsets = new int[elementCount]; + for (int i = 0; i < elementCount; i++) { + elementTypes[i] = buffer.get() & 0xFF; + elementOffsets[i] = readOffsetOrSize(buffer, isSmall); + } + + // 读取每个元素的值 + for (int i = 0; i < elementCount; i++) { + buffer.position(startPosition + elementOffsets[i]); + JsonNode value = parseJsonValue(buffer, elementTypes[i]); + arrayNode.add(value); + } + + // 移动到数组结束位置 + buffer.position(startPosition + bytes); + return arrayNode; + } + + private static JsonNode parseJsonLiteral(ByteBuffer buffer) { + int literalType = buffer.get() & 0xFF; + switch (literalType) { + case 0x00: // JSONB_LITERAL_NULL + return NullNode.getInstance(); + case 0x01: // JSONB_LITERAL_TRUE + return BooleanNode.TRUE; + case 0x02: // JSONB_LITERAL_FALSE + return BooleanNode.FALSE; + default: + throw new IllegalArgumentException("Unknown literal type: " + literalType); + } + } + + private static JsonNode parseJsonInt16(ByteBuffer buffer) { + return new IntNode(buffer.getShort()); + } + + private static JsonNode parseJsonUInt16(ByteBuffer buffer) { + return new IntNode(buffer.getShort() & 0xFFFF); + } + + private static JsonNode parseJsonInt32(ByteBuffer buffer) { + return new IntNode(buffer.getInt()); + } + + private static JsonNode parseJsonUInt32(ByteBuffer buffer) { + return new LongNode(buffer.getInt() & 0xFFFFFFFFL); + } + + private static JsonNode parseJsonInt64(ByteBuffer buffer) { + return new LongNode(buffer.getLong()); + } + + private static JsonNode parseJsonUInt64(ByteBuffer buffer) { + // 注意:UInt64 可能溢出,这里用 BigInteger 更安全 + long value = buffer.getLong(); + return new LongNode(value); + } + + private static JsonNode parseJsonDouble(ByteBuffer buffer) { + return new DoubleNode(buffer.getDouble()); + } + + private static JsonNode parseJsonString(ByteBuffer buffer) { + String str = readLengthPrefixedString(buffer); + return new TextNode(str); + } + + /** + * 解析 Opaque 类型(包括 DATE, TIME, DATETIME, DECIMAL 等) + * 参考:https://github.com/mysql/mysql-server/blob/5.7/sql/json_binary.cc + */ + private static JsonNode parseJsonOpaque(ByteBuffer buffer) { + int opaqueType = buffer.get() & 0xFF; + int length = (int) readVariableLength(buffer); + + byte[] opaqueData = new byte[length]; + buffer.get(opaqueData); + + // 根据 MySQL ColumnType 处理不同类型 + // 参考:com.github.shyiko.mysql.binlog.event.deserialization.ColumnType + switch (opaqueType) { + case 0x0A: // DATE (ColumnType.DATE) + return parseOpaqueDate(opaqueData); + case 0x0B: // TIME (ColumnType.TIME) + case 0x13: // TIME_V2 (ColumnType.TIME_V2) + return parseOpaqueTime(opaqueData); + case 0x0C: // DATETIME (ColumnType.DATETIME) + case 0x12: // DATETIME_V2 (ColumnType.DATETIME_V2) + case 0x07: // TIMESTAMP (ColumnType.TIMESTAMP) + case 0x11: // TIMESTAMP_V2 (ColumnType.TIMESTAMP_V2) + return parseOpaqueDatetime(opaqueData); + case 0xF6: // NEWDECIMAL (ColumnType.NEWDECIMAL) + case 0x00: // DECIMAL (ColumnType.DECIMAL) + return parseOpaqueDecimal(opaqueData); + default: + // 其他类型返回 Base64 编码的字符串 + return new TextNode(Base64.getEncoder().encodeToString(opaqueData)); + } + } + + /** + * 解析 DATE 类型 + * 格式:8 字节,包含年月日和微秒 + * 参考:https://dev.mysql.com/doc/internals/en/date-and-time-data-type-representation.html + */ + private static JsonNode parseOpaqueDate(byte[] data) { + if (data.length < 8) { + return new TextNode("0000-00-00"); + } + + ByteBuffer buf = ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN); + long raw = buf.getLong(); + long value = raw >> 24; // 去掉低 24 位(微秒部分) + + // 解析日期部分(40 位) + int yearMonth = (int) ((value >> 22) & 0x1FFFF); // 17 bits + int year = yearMonth / 13; + int month = yearMonth % 13; + int day = (int) ((value >> 17) & 0x1F); // 5 bits + + return new TextNode(String.format("%04d-%02d-%02d", year, month, day)); + } + + /** + * 解析 TIME 类型 + * 格式:8 字节,包含时分秒和微秒 + */ + private static JsonNode parseOpaqueTime(byte[] data) { + if (data.length < 8) { + return new TextNode("00:00:00"); + } + + ByteBuffer buf = ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN); + long raw = buf.getLong(); + long value = raw >> 24; // 去掉低 24 位(微秒部分) + + // 检查符号位 + boolean negative = value < 0; + if (negative) { + value = -value; + } + + // 解析时间部分 + int hour = (int) ((value >> 12) & 0x3FF); // 10 bits + int minute = (int) ((value >> 6) & 0x3F); // 6 bits + int second = (int) (value & 0x3F); // 6 bits + + // 获取微秒部分 + int microseconds = (int) (raw & 0xFFFFFF); + + String timeStr = String.format("%s%02d:%02d:%02d", + negative ? "-" : "", hour, minute, second); + + if (microseconds > 0) { + timeStr += String.format(".%06d", microseconds); + } + + return new TextNode(timeStr); + } + + /** + * 解析 DATETIME/TIMESTAMP 类型 + * 格式:8 字节,包含年月日时分秒和微秒 + */ + private static JsonNode parseOpaqueDatetime(byte[] data) { + if (data.length < 8) { + return new TextNode("0000-00-00 00:00:00"); + } + + ByteBuffer buf = ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN); + long raw = buf.getLong(); + long value = raw >> 24; // 去掉低 24 位(微秒部分) + + // 解析日期时间部分(40 位) + int yearMonth = (int) ((value >> 22) & 0x1FFFF); // 17 bits + int year = yearMonth / 13; + int month = yearMonth % 13; + int day = (int) ((value >> 17) & 0x1F); // 5 bits + int hour = (int) ((value >> 12) & 0x1F); // 5 bits + int minute = (int) ((value >> 6) & 0x3F); // 6 bits + int second = (int) (value & 0x3F); // 6 bits + + // 获取微秒部分 + int microseconds = (int) (raw & 0xFFFFFF); + + String datetimeStr = String.format("%04d-%02d-%02d %02d:%02d:%02d", + year, month, day, hour, minute, second); + + if (microseconds > 0) { + datetimeStr += String.format(".%06d", microseconds); + } + + return new TextNode(datetimeStr); + } + + /** + * 解析 DECIMAL 类型 + * 格式:precision (1 byte) + scale (1 byte) + binary representation + */ + private static JsonNode parseOpaqueDecimal(byte[] data) { + if (data.length < 2) { + return new TextNode("0"); + } + + ByteBuffer buf = ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN); + int precision = buf.get() & 0xFF; + int scale = buf.get() & 0xFF; + + // 读取二进制表示 + byte[] decimalBytes = new byte[data.length - 2]; + buf.get(decimalBytes); + + try { + // 使用 MySQL 的 DECIMAL 解析逻辑 + BigDecimal decimal = parseDecimalBinary(precision, scale, decimalBytes); + return new DecimalNode(decimal); + } catch (Exception e) { + return new TextNode("0"); + } + } + + /** + * 解析 DECIMAL 的二进制表示 + * 参考:com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.asBigDecimal + */ + private static BigDecimal parseDecimalBinary(int precision, int scale, byte[] data) { + // MySQL DECIMAL 的二进制格式比较复杂,这里简化处理 + // 完整实现请参考 mysql-binlog-connector-java 的 AbstractRowsEventDataDeserializer.asBigDecimal + + // 简化版本:尝试将字节数组转换为数字 + boolean negative = (data[0] & 0x80) == 0; + data[0] ^= 0x80; // 翻转符号位 + + if (negative) { + for (int i = 0; i < data.length; i++) { + data[i] = (byte) ~data[i]; + } + } + + // 将字节数组转换为 BigInteger + java.math.BigInteger bigInt = new java.math.BigInteger(1, data); + if (negative) { + bigInt = bigInt.negate(); + } + + // 应用 scale + BigDecimal result = new BigDecimal(bigInt, scale); + return result; + } + + /** + * 读取偏移量或大小 + * @param buffer ByteBuffer + * @param isSmall 是否是小格式(2 字节),否则是大格式(4 字节) + * @return 偏移量或大小 + */ + private static int readOffsetOrSize(ByteBuffer buffer, boolean isSmall) { + if (isSmall) { + return buffer.getShort() & 0xFFFF; + } else { + return buffer.getInt(); + } + } + + /** + * 读取可变长度整数 + * 参考:MySQL 协议的 Length-Encoded Integer + * + * @param buffer ByteBuffer + * @return 长度值 + */ + private static long readVariableLength(ByteBuffer buffer) { + int firstByte = buffer.get() & 0xFF; + if (firstByte < 0xFB) { + // 1 字节长度 + return firstByte; + } else if (firstByte == 0xFC) { + // 2 字节长度 + return buffer.getShort() & 0xFFFF; + } else if (firstByte == 0xFD) { + // 3 字节长度(小端序) + int b1 = buffer.get() & 0xFF; + int b2 = buffer.get() & 0xFF; + int b3 = buffer.get() & 0xFF; + return b1 | (b2 << 8) | (b3 << 16); + } else if (firstByte == 0xFE) { + // 8 字节长度 + return buffer.getLong(); + } else { + throw new IllegalArgumentException("Invalid variable length prefix: 0x" + Integer.toHexString(firstByte)); + } + } + + /** + * 读取带长度前缀的字符串 + * + * @param buffer ByteBuffer + * @return 字符串 + */ + private static String readLengthPrefixedString(ByteBuffer buffer) { + long length = readVariableLength(buffer); + if (length == 0) { + return ""; + } + byte[] strBytes = new byte[(int) length]; + buffer.get(strBytes); + return new String(strBytes, StandardCharsets.UTF_8); + } +} \ No newline at end of file diff --git a/connectors-common/mysql-core/src/test/java/io/tapdata/connector/mysql/util/MySQLJsonParserTest.java b/connectors-common/mysql-core/src/test/java/io/tapdata/connector/mysql/util/MySQLJsonParserTest.java new file mode 100644 index 000000000..b1f871f0a --- /dev/null +++ b/connectors-common/mysql-core/src/test/java/io/tapdata/connector/mysql/util/MySQLJsonParserTest.java @@ -0,0 +1,158 @@ +package io.tapdata.connector.mysql.util; + +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * MySQL JSON 二进制格式解析器测试 + * + * @author TapData + */ +class MySQLJsonParserTest { + + @Test + void testParseNull() { + assertNull(MySQLJsonParser.parseMySQLJsonBinary(null)); + assertNull(MySQLJsonParser.parseMySQLJsonBinary(new byte[0])); + } + + @Test + void testParseMariaDBJsonString() { + // MariaDB 格式的 JSON 字符串(第一个字节 > 0x0F) + String jsonStr = "{\"name\":\"test\"}"; + byte[] data = jsonStr.getBytes(); + String result = MySQLJsonParser.parseMySQLJsonBinary(data); + assertEquals(jsonStr, result); + } + + @Test + void testParseLiteralNull() { + // Type: LITERAL (0x04), Literal Type: NULL (0x00) + byte[] data = {0x04, 0x00}; + String result = MySQLJsonParser.parseMySQLJsonBinary(data); + assertEquals("null", result); + } + + @Test + void testParseLiteralTrue() { + // Type: LITERAL (0x04), Literal Type: TRUE (0x01) + byte[] data = {0x04, 0x01}; + String result = MySQLJsonParser.parseMySQLJsonBinary(data); + assertEquals("true", result); + } + + @Test + void testParseLiteralFalse() { + // Type: LITERAL (0x04), Literal Type: FALSE (0x02) + byte[] data = {0x04, 0x02}; + String result = MySQLJsonParser.parseMySQLJsonBinary(data); + assertEquals("false", result); + } + + @Test + void testParseInt16() { + // Type: INT16 (0x05), Value: 100 + ByteBuffer buffer = ByteBuffer.allocate(3); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.put((byte) 0x05); + buffer.putShort((short) 100); + + String result = MySQLJsonParser.parseMySQLJsonBinary(buffer.array()); + assertEquals("100", result); + } + + @Test + void testParseInt32() { + // Type: INT32 (0x07), Value: 100000 + ByteBuffer buffer = ByteBuffer.allocate(5); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.put((byte) 0x07); + buffer.putInt(100000); + + String result = MySQLJsonParser.parseMySQLJsonBinary(buffer.array()); + assertEquals("100000", result); + } + + @Test + void testParseInt64() { + // Type: INT64 (0x09), Value: 9223372036854775807 + ByteBuffer buffer = ByteBuffer.allocate(9); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.put((byte) 0x09); + buffer.putLong(9223372036854775807L); + + String result = MySQLJsonParser.parseMySQLJsonBinary(buffer.array()); + assertEquals("9223372036854775807", result); + } + + @Test + void testParseDouble() { + // Type: DOUBLE (0x0B), Value: 3.14159 + ByteBuffer buffer = ByteBuffer.allocate(9); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.put((byte) 0x0B); + buffer.putDouble(3.14159); + + String result = MySQLJsonParser.parseMySQLJsonBinary(buffer.array()); + assertTrue(result.startsWith("3.14159")); + } + + @Test + void testParseString() { + // Type: STRING (0x0C), Length: 5, Value: "hello" + String str = "hello"; + ByteBuffer buffer = ByteBuffer.allocate(7); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.put((byte) 0x0C); + buffer.put((byte) str.length()); // variable length: 5 + buffer.put(str.getBytes()); + + String result = MySQLJsonParser.parseMySQLJsonBinary(buffer.array()); + assertEquals("\"hello\"", result); + } + + @Test + void testParseEmptyObject() { + // Type: SMALL_OBJECT (0x00), Element Count: 0, Bytes: 0 + ByteBuffer buffer = ByteBuffer.allocate(5); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.put((byte) 0x00); + buffer.putShort((short) 0); // element count + buffer.putShort((short) 0); // bytes + + String result = MySQLJsonParser.parseMySQLJsonBinary(buffer.array()); + assertEquals("{}", result); + } + + @Test + void testParseEmptyArray() { + // Type: SMALL_ARRAY (0x02), Element Count: 0, Bytes: 0 + ByteBuffer buffer = ByteBuffer.allocate(5); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.put((byte) 0x02); + buffer.putShort((short) 0); // element count + buffer.putShort((short) 0); // bytes + + String result = MySQLJsonParser.parseMySQLJsonBinary(buffer.array()); + assertEquals("[]", result); + } + + @Test + void testParseInvalidData() { + // 无效的类型 - 0xFF 会被当作 MariaDB 格式的字符串 + byte[] data = {(byte) 0xFF}; + String result = MySQLJsonParser.parseMySQLJsonBinary(data); + // MariaDB 格式会返回字符串,而不是 null + assertNotNull(result); + + // 测试真正无效的数据(类型码在有效范围内但数据不足) + byte[] invalidData = {0x05}; // INT16 类型但没有数据 + String invalidResult = MySQLJsonParser.parseMySQLJsonBinary(invalidData); + assertNull(invalidResult); + } +} + diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java index b20510cb7..0731cbbce 100644 --- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java @@ -39,6 +39,7 @@ public abstract class AbstractWalLogMiner { protected int recordSize; protected List tableList; protected boolean filterSchema; + private Map pureDataTypeMap; private Map dataTypeMap; protected final AtomicReference threadException = new AtomicReference<>(); protected final PostgresCDCSQLParser sqlParser = new PostgresCDCSQLParser(); @@ -58,11 +59,13 @@ public AbstractWalLogMiner watch(List tableList, KVReadOnlyMap withSchema = false; this.tableList = tableList; filterSchema = tableList.size() > 50; + this.pureDataTypeMap = new ConcurrentHashMap<>(); this.dataTypeMap = new ConcurrentHashMap<>(); tableList.forEach(tableName -> { TapTable table = tableMap.get(tableName); if (EmptyKit.isNotNull(table)) { - dataTypeMap.putAll(table.getNameFieldMap().entrySet().stream().collect(Collectors.toMap(v -> tableName + "." + v.getKey(), e -> Optional.ofNullable(e.getValue().getPureDataType()).orElse(e.getValue().getDataType())))); + pureDataTypeMap.putAll(table.getNameFieldMap().entrySet().stream().collect(Collectors.toMap(v -> tableName + "." + v.getKey(), e -> Optional.ofNullable(e.getValue().getPureDataType()).orElse(e.getValue().getDataType())))); + dataTypeMap.putAll(table.getNameFieldMap().entrySet().stream().collect(Collectors.toMap(v -> tableName + "." + v.getKey(), e -> e.getValue().getDataType()))); } }); tableList.addAll(getSubPartitionTables(tableMap, tableList)); @@ -73,12 +76,14 @@ public AbstractWalLogMiner watch(Map> schemaTableMap, KVRea withSchema = true; this.schemaTableMap = schemaTableMap; filterSchema = schemaTableMap.entrySet().stream().reduce(0, (a, b) -> a + b.getValue().size(), Integer::sum) > 50; + this.pureDataTypeMap = new ConcurrentHashMap<>(); this.dataTypeMap = new ConcurrentHashMap<>(); schemaTableMap.forEach((schema, tables) -> { tables.forEach(tableName -> { TapTable table = tableMap.get(schema + "." + tableName); if (EmptyKit.isNotNull(table)) { - dataTypeMap.putAll(table.getNameFieldMap().entrySet().stream().collect(Collectors.toMap(v -> schema + "." + tableName + "." + v.getKey(), e -> e.getValue().getPureDataType()))); + pureDataTypeMap.putAll(table.getNameFieldMap().entrySet().stream().collect(Collectors.toMap(v -> tableName + "." + v.getKey(), e -> Optional.ofNullable(e.getValue().getPureDataType()).orElse(e.getValue().getDataType())))); + dataTypeMap.putAll(table.getNameFieldMap().entrySet().stream().collect(Collectors.toMap(v -> tableName + "." + v.getKey(), e -> e.getValue().getDataType()))); } }); tables.addAll(getSubPartitionTables(tableMap, schema, tables)); @@ -167,10 +172,27 @@ protected void parseKeyAndValue(String tableName, Map.Entry stri return; } String key = tableName + "." + stringObjectEntry.getKey(); + String pureDataType = pureDataTypeMap.get(key); String dataType = dataTypeMap.get(key); - if (EmptyKit.isNull(dataType)) { + if (EmptyKit.isNull(pureDataType)) { return; } + switch (pureDataType) { + case "ARRAY": + String arrayString = String.valueOf(value); + List array = new ArrayList<>(); + Arrays.stream(arrayString.substring(1, arrayString.length() - 1).split(",")).forEach(v -> { + array.add(parseType(v, StringKit.removeParentheses(dataType.replace("array", "").trim()))); + }); + stringObjectEntry.setValue(array); + break; + default: + stringObjectEntry.setValue(parseType(value, pureDataType)); + break; + } + } + + private Object parseType(Object value, String dataType) { switch (dataType) { case "smallint": case "integer": @@ -179,19 +201,15 @@ protected void parseKeyAndValue(String tableName, Map.Entry stri case "money": case "real": case "double precision": - stringObjectEntry.setValue(new BigDecimal((String) value)); - break; + return new BigDecimal((String) value); case "bit": if (value instanceof String && ((String) value).length() == 1) { - stringObjectEntry.setValue("1".equals(value)); + return "1".equals(value); } - break; case "bytea": - stringObjectEntry.setValue(StringKit.toByteArray(((String) value).substring(2))); - break; + return StringKit.toByteArray(String.valueOf(value).substring(2)); case "date": - stringObjectEntry.setValue(LocalDate.parse((String) value).atStartOfDay()); - break; + return LocalDate.parse((String) value).atStartOfDay(); case "interval": String[] intervalArray = ((String) value).split(" "); StringBuilder stringBuilder = new StringBuilder("P"); @@ -222,27 +240,23 @@ protected void parseKeyAndValue(String tableName, Map.Entry stri break; } } - stringObjectEntry.setValue(stringBuilder.toString()); - break; + return stringBuilder.toString(); case "timestamp without time zone": case "timestamp": - stringObjectEntry.setValue(Timestamp.valueOf((String) value).toLocalDateTime().minusHours(postgresConfig.getZoneOffsetHour())); - break; + return Timestamp.valueOf((String) value).toLocalDateTime().minusHours(postgresConfig.getZoneOffsetHour()); case "timestamp with time zone": String timestamp = ((String) value).substring(0, ((String) value).length() - 3); String timezone = ((String) value).substring(((String) value).length() - 3); - stringObjectEntry.setValue(Timestamp.valueOf(timestamp).toLocalDateTime().atZone(TimeZone.getTimeZone("GMT" + timezone + ":00").toZoneId())); - break; + return Timestamp.valueOf(timestamp).toLocalDateTime().atZone(TimeZone.getTimeZone("GMT" + timezone + ":00").toZoneId()); case "time without time zone": case "time": - stringObjectEntry.setValue(LocalTime.parse((String) value).atDate(LocalDate.ofYearDay(1970, 1)).minusHours(postgresConfig.getZoneOffsetHour())); - break; + return LocalTime.parse((String) value).atDate(LocalDate.ofYearDay(1970, 1)).minusHours(postgresConfig.getZoneOffsetHour()); case "time with time zone": String time = ((String) value).substring(0, ((String) value).length() - 3); String zone = ((String) value).substring(((String) value).length() - 3); - stringObjectEntry.setValue(LocalTime.parse(time).atDate(LocalDate.ofYearDay(1970, 1)).atZone(TimeZone.getTimeZone("GMT" + zone + ":00").toZoneId())); - break; + return LocalTime.parse(time).atDate(LocalDate.ofYearDay(1970, 1)).atZone(TimeZone.getTimeZone("GMT" + zone + ":00").toZoneId()); } + return value; } protected static final String WALMINER_STOP = "select walminer_stop()"; @@ -268,7 +282,8 @@ private List getSubPartitionTables(KVReadOnlyMap tableMap, Lis .filter(n -> !tables.contains(n)) .collect(Collectors.toList()); subTableNames.forEach(t -> tableMap.get(table).getNameFieldMap().forEach((k, field) -> { - dataTypeMap.put(t + "." + k, field.getPureDataType()); + pureDataTypeMap.put(t + "." + k, field.getPureDataType()); + dataTypeMap.put(t + "." + k, field.getDataType()); })); subPartitionTableNames.addAll(subTableNames); } @@ -298,7 +313,8 @@ private List getSubPartitionTables(KVReadOnlyMap tableMap, Str .filter(n -> !tables.contains(n)) .collect(Collectors.toList()); subTableNames.forEach(t -> tableMap.get(schema + "." + table).getNameFieldMap().forEach((k, field) -> { - dataTypeMap.put(schema + "." + t + "." + k, field.getPureDataType()); + pureDataTypeMap.put(schema + "." + t + "." + k, field.getPureDataType()); + dataTypeMap.put(t + "." + k, field.getDataType()); })); subPartitionTableNames.addAll(subTableNames); } diff --git a/connectors/clickhouse-connector/src/main/java/io/tapdata/connector/clickhouse/ddl/sqlmaker/ClickhouseSqlMaker.java b/connectors/clickhouse-connector/src/main/java/io/tapdata/connector/clickhouse/ddl/sqlmaker/ClickhouseSqlMaker.java index 1c8ce50f9..9750535e5 100644 --- a/connectors/clickhouse-connector/src/main/java/io/tapdata/connector/clickhouse/ddl/sqlmaker/ClickhouseSqlMaker.java +++ b/connectors/clickhouse-connector/src/main/java/io/tapdata/connector/clickhouse/ddl/sqlmaker/ClickhouseSqlMaker.java @@ -52,7 +52,7 @@ public String buildColumnDefinition(TapTable tapTable, boolean needComment) { //null to omit - if (tapField.getDefaultValue() != null && !"".equals(tapField.getDefaultValue())) { + if (Boolean.TRUE.equals(applyDefault) && tapField.getDefaultValue() != null && !"".equals(tapField.getDefaultValue())) { builder.append("DEFAULT").append(' '); if (tapField.getDefaultValue() instanceof Number) { builder.append(tapField.getDefaultValue()).append(' '); diff --git a/connectors/clickhouse-connector/src/main/resources/spec_clickhouse.json b/connectors/clickhouse-connector/src/main/resources/spec_clickhouse.json index 3627c0ac0..54ac29747 100644 --- a/connectors/clickhouse-connector/src/main/resources/spec_clickhouse.json +++ b/connectors/clickhouse-connector/src/main/resources/spec_clickhouse.json @@ -332,6 +332,27 @@ "defaultValue": { "propKey": "", "propValue": "" } } } + }, + "applyDefault": { + "type": "boolean", + "title": "${applyDefault}", + "default": false, + "x-index": 7, + "x-decorator": "FormItem", + "x-component": "Switch", + "x-decorator-props": { + "tooltip": "${applyDefaultTooltip}" + }, + "x-reactions": [ + { + "dependencies": ["$inputs"], + "fulfill": { + "state": { + "display": "{{$deps[0].length > 0 ? \"visible\":\"hidden\"}}" + } + } + } + ] } }, "mergeMinutes": { @@ -408,6 +429,8 @@ "mergeMinutes": "Optimize Interval (Minutes)", "mixFastWrite": "Mix Fast Write", "mixFastWriteTooltip": "Enable mix fast write, but the target table will create is_deleted, delete_time columns, insert strategy is used for insert, update, delete, greatly improve performance", + "applyDefault": "Apply default value", + "applyDefaultTooltip": "When the switch is turned on, the default value will be applied to the target. If there are unadapted functions or expressions, it may cause an error", "doc": "docs/clickhouse_en_US.md" }, "zh_CN": { @@ -437,6 +460,8 @@ "mergeMinutes": "合并分区间隔(分钟)", "mixFastWrite": "混合快速写入", "mixFastWriteTooltip": "启用混合快速写入,但目标表会多创建is_deleted, delete_time列, 增删改统一使用插入策略大大提升性能", + "applyDefault": "应用默认值", + "applyDefaultTooltip": "开关打开时会将默认值应用到目标,如果有未适配的函数或表达式,可能会导致报错", "doc": "docs/clickhouse_zh_CN.md" }, "zh_TW": { @@ -466,6 +491,8 @@ "mergeMinutes": "合併分區間隔(分鐘)", "mixFastWrite": "混合快速寫入", "mixFastWriteTooltip": "啟用混合快速寫入,但目標表會多創建is_deleted, delete_time列, 增刪改統一使用插入策略大大提升性能", + "applyDefault": "應用默認值", + "applyDefaultTooltip": "開關打開時會將默認值應用到目標,如果有未适配的函数或表达式,可能会导致报错", "doc": "docs/clickhouse_zh_TW.md" } }, diff --git a/connectors/mysql-connector/src/main/java/io/tapdata/connector/mysql/MysqlConnector.java b/connectors/mysql-connector/src/main/java/io/tapdata/connector/mysql/MysqlConnector.java index 6ba962ae9..eed653ccb 100644 --- a/connectors/mysql-connector/src/main/java/io/tapdata/connector/mysql/MysqlConnector.java +++ b/connectors/mysql-connector/src/main/java/io/tapdata/connector/mysql/MysqlConnector.java @@ -850,7 +850,7 @@ protected Set dateFields(TapTable tapTable) { private void streamRead(TapConnectorContext tapConnectorContext, List tables, Object offset, int batchSize, StreamReadConsumer consumer) throws Throwable { throwNonSupportWhenLightInit(); if (mysqlConfig.getHighPerformance()) { - MysqlReaderV2 mysqlReaderV2 = new MysqlReaderV2(mysqlJdbcContext, tapLogger); + MysqlReaderV2 mysqlReaderV2 = new MysqlReaderV2(mysqlJdbcContext, tapLogger, dbTimeZone); mysqlReaderV2.init(tables, tapConnectorContext.getTableMap(), offset, batchSize, consumer); mysqlReaderV2.startMiner(this::isAlive); } else { diff --git a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java index c406fd0bc..2c47f035f 100644 --- a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java +++ b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java @@ -15,28 +15,36 @@ import io.tapdata.kit.EmptyKit; import io.tapdata.pdk.apis.context.TapConnectorContext; import io.tapdata.pdk.apis.entity.WriteListResult; +import io.tapdata.pdk.core.utils.CommonUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.catalog.Identifier; +import org.apache.hadoop.fs.FileSystem; +import org.apache.paimon.catalog.*; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.hadoop.HadoopFileIO; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.Table; -import org.apache.paimon.table.sink.*; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.StreamTableCommit; +import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; import java.io.Closeable; +import java.lang.reflect.Field; import java.math.BigDecimal; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -76,13 +84,13 @@ public class PaimonService implements Closeable { // ===== Paimon Field Cache for Performance ===== // LRU cache for Paimon field mappings: Key = "database.tableName", Value = Map // Limit to 5 tables to avoid excessive memory usage - private final Map> paimonFieldCache = Collections.synchronizedMap( - new LinkedHashMap>(5, 0.75f, true) { + private final Map> paimonFieldCache = Collections.synchronizedMap( + new LinkedHashMap>(5, 0.75f, true) { private static final long serialVersionUID = 1L; @Override - protected boolean removeEldestEntry(Map.Entry> eldest) { - return size() > 5; + protected boolean removeEldestEntry(Map.Entry> eldest) { + return size() > 10; } } ); @@ -95,7 +103,7 @@ protected boolean removeEldestEntry(Map.Entry> eld @Override protected boolean removeEldestEntry(Map.Entry> eldest) { - return size() > 5; + return size() > 10; } } ); @@ -255,6 +263,7 @@ private Configuration buildHadoopConfiguration() { // NoClassDefFoundError due to classloader/version conflicts. // Ensure S3A filesystem is used when scheme is s3a conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + conf.set("fs.s3a.impl.disable.cache", "true"); conf.set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A"); } @@ -514,7 +523,7 @@ public boolean createTable(TapTable tapTable, Log log) throws Exception { if (EmptyKit.isNotBlank(config.getFileFormat())) { schemaBuilder.option("file.format", config.getFileFormat()); schemaBuilderVariableMap.put("file.format", config.getFileFormat()); - } + } if (EmptyKit.isNotBlank(config.getCompression())) { schemaBuilder.option("compression", config.getCompression()); schemaBuilderVariableMap.put("compression", config.getCompression()); @@ -796,11 +805,11 @@ private WriteListResult writeRecordsWithStreamWriteInternal(List WriteListResult result = new WriteListResult<>(); Identifier identifier = Identifier.create(database, tableName); - // Get or create cached writer and commit - StreamTableWrite writer = getOrCreateStreamWriter(tableKey, identifier); - StreamTableCommit commit = getOrCreateStreamCommit(tableKey, identifier); - try { + // Get or create cached writer and commit + StreamTableWrite writer = getOrCreateStreamWriter(tableKey, identifier); + StreamTableCommit commit = getOrCreateStreamCommit(tableKey, identifier); + // Write all records to the writer for (TapRecordEvent event : recordEvents) { if (event instanceof TapInsertRecordEvent) { @@ -865,7 +874,7 @@ private WriteListResult writeRecordsWithStreamWriteInternal(List lastCommit.set(System.currentTimeMillis()); connectorContext.getLog().debug("Committed {} accumulated records for table {}", - finalCount, tableKey); + finalCount, tableKey); } } } @@ -884,8 +893,14 @@ private WriteListResult writeRecordsWithStreamWriteInternal(List TimeUnit.SECONDS.sleep(1L); continue; } - // Don't clean up on error for stream write, let it be reused or cleaned up on close - throw new RuntimeException("Failed to write records to table " + tableName, e); + Throwable illegalThreadStateException = CommonUtils.matchThrowable(e, IllegalThreadStateException.class); + if (null != illegalThreadStateException) { + String message = String.format("Failed to write records to table %s occurred illegal thread state exception, current thread name: %s, thread group: %s", + tableName, Thread.currentThread().getName(), Thread.currentThread().getThreadGroup() != null ? Thread.currentThread().getThreadGroup().getName() : "null"); + throw new RuntimeException(message, e); + } else { + throw new RuntimeException("Failed to write records to table " + tableName, e); + } } } } @@ -946,11 +961,30 @@ private void cleanupAllResources() { // Close old catalog if exists if (catalog != null) { try { + if (catalog instanceof CachingCatalog) { + CachingCatalog cachingCatalog = (CachingCatalog) catalog; + Catalog wrapped = cachingCatalog.wrapped(); + if (wrapped instanceof FileSystemCatalog) { + FileSystemCatalog fileSystemCatalog = (FileSystemCatalog) wrapped; + FileIO fileIO = null; + try { + fileIO = fileSystemCatalog.fileIO(); + } catch (Throwable ignore) { + // Ignore fileIO lookup errors + } + + // Best-effort close: proactively close FileSystem instances cached by HadoopFileIO + closeHadoopFileIOCachedFileSystems(fileIO); + closeQuietly(fileIO); + } + } + catalog.close(); - } catch (Exception e) { + } catch (Throwable e) { // Ignore close errors + } finally { + catalog = null; } - catalog = null; } // Wait a bit to ensure all internal threads are cleaned up @@ -962,6 +996,66 @@ private void cleanupAllResources() { } } + private void closeQuietly(Closeable closeable) { + if (closeable == null) { + return; + } + try { + closeable.close(); + } catch (Exception ignore) { + // Ignore close errors + } + } + + /** + * Best-effort close for cached Hadoop FileSystem instances inside Paimon HadoopFileIO. + *

+ * HadoopFileIO may cache FileSystem instances (e.g., in a field named "fsMap"). Even if + * Hadoop global FileSystem cache is disabled, this internal cache can still keep an S3A + * FileSystem whose thread factory captured a Task ThreadGroup that will be destroyed later. + */ + private void closeHadoopFileIOCachedFileSystems(Object fileIO) { + if (!(fileIO instanceof HadoopFileIO)) { + return; + } + + try { + Field fsMapField = fileIO.getClass().getDeclaredField("fsMap"); + fsMapField.setAccessible(true); + Object fsMapObject = fsMapField.get(fileIO); + if (!(fsMapObject instanceof Map)) { + return; + } + + Map fsMap = (Map) fsMapObject; + if (fsMap.isEmpty()) { + return; + } + + // Copy values first to avoid ConcurrentModificationException in case close triggers internal updates. + List fileSystems = new ArrayList<>(fsMap.values()); + for (Object fs : fileSystems) { + if (fs instanceof FileSystem) { + try { + ((FileSystem) fs).close(); + } catch (Exception ignore) { + // Ignore close errors + } + } + } + + try { + fsMap.clear(); + } catch (Exception ignore) { + // Ignore clear errors + } + } catch (NoSuchFieldException ignore) { + // HadoopFileIO implementation differs; ignore. + } catch (Throwable ignore) { + // Best-effort only + } + } + /** * Check if the exception is caused by ThreadGroup being destroyed. * This typically happens when the classloader that created Paimon's thread factory @@ -973,7 +1067,8 @@ private void cleanupAllResources() { private boolean isThreadGroupDestroyedError(Throwable e) { Throwable cause = e; while (cause != null) { - if (cause instanceof IllegalThreadStateException) { + Throwable illegalThreadStateException = CommonUtils.matchThrowable(e, IllegalThreadStateException.class); + if (illegalThreadStateException != null) { return true; } cause = cause.getCause(); @@ -982,7 +1077,6 @@ private boolean isThreadGroupDestroyedError(Throwable e) { } - /** * Create a new stream writer for table * @@ -1048,7 +1142,6 @@ private StreamTableCommit getOrCreateStreamCommit(String tableKey, Identifier id } - /** * Clean up all cached resources for a specific table * @@ -1084,7 +1177,6 @@ private void cleanupTableResources(String tableKey) { } - /** * Handle insert event with stream writer * @@ -1187,9 +1279,9 @@ private void handleStreamUpdate(TapUpdateRecordEvent event, StreamTableWrite wri * Check if primary key values have changed between before and after GenericRow * Uses converted GenericRow values to ensure consistent comparison * - * @param beforeRow before GenericRow (must not be null) - * @param afterRow after GenericRow (must not be null) - * @param table table definition + * @param beforeRow before GenericRow (must not be null) + * @param afterRow after GenericRow (must not be null) + * @param table table definition * @return true if primary key has changed, false otherwise */ private boolean isPrimaryKeyChanged(GenericRow beforeRow, GenericRow afterRow, TapTable table) { @@ -1258,12 +1350,12 @@ private void handleStreamDelete(TapDeleteRecordEvent event, StreamTableWrite wri /** * Select deterministic bucket for dynamic-bucket tables. * Use primary keys if present; otherwise hash all fields (sorted by name). - * + *

* Note: This method uses the converted GenericRow values to ensure consistent * bucket selection across insert/update/delete operations, especially for * Date/DateTime types that are converted to int/long values. * - * @param row converted GenericRow with Paimon-compatible values + * @param row converted GenericRow with Paimon-compatible values * @param table table definition * @return bucket number */ @@ -1313,7 +1405,7 @@ private int selectBucketForDynamic(GenericRow row, TapTable table) { * Get or build field index mapping from cache * * @param cacheKey cache key (table ID) - * @param fields field map + * @param fields field map * @return map of field name to index */ private Map getFieldIndexMap(String cacheKey, Map fields) { @@ -1338,7 +1430,7 @@ private Map getFieldIndexMap(String cacheKey, Map fields) { private GenericRow convertToGenericRow(Map data, TapTable table, Identifier identifier) throws Exception { // Get or build field type mapping from cache String cacheKey = identifier.getFullName(); - Map fieldTypeMap = paimonFieldCache.get(cacheKey); + List paimonFields = paimonFieldCache.get(cacheKey); - if (fieldTypeMap == null) { + if (paimonFields == null) { // Cache miss - build field type mapping Table paimonTable = catalog.getTable(identifier); - List paimonFields = paimonTable.rowType().getFields(); - - fieldTypeMap = new HashMap<>(paimonFields.size()); - for (DataField paimonField : paimonFields) { - fieldTypeMap.put(paimonField.name(), paimonField.type()); - } + paimonFields = paimonTable.rowType().getFields(); // Store in cache - paimonFieldCache.put(cacheKey, fieldTypeMap); + paimonFieldCache.put(cacheKey, paimonFields); } - Map tapFields = table.getNameFieldMap(); - int fieldCount = tapFields.size(); - Object[] values = new Object[fieldCount]; - - int index = 0; - for (Map.Entry entry : tapFields.entrySet()) { - String fieldName = entry.getKey(); + GenericRow genericRow = new GenericRow(paimonFields.size()); + for (int i = 0; i < paimonFields.size(); i++) { + DataField dataField = paimonFields.get(i); + String fieldName = dataField.name(); Object value = data.get(fieldName); // Get corresponding Paimon field type from cache - DataType paimonType = fieldTypeMap.get(fieldName); + DataType paimonType = dataField.type(); - // Convert value to Paimon-compatible type - values[index++] = convertValueToPaimonType(value, paimonType); + genericRow.setField(i, convertValueToPaimonType(value, paimonType)); } - return GenericRow.of(values); + return genericRow; } /** diff --git a/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java b/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java index f150d600b..efc7db537 100644 --- a/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java +++ b/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java @@ -61,10 +61,7 @@ import java.sql.*; import java.sql.Date; import java.text.SimpleDateFormat; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.ZoneOffset; +import java.time.*; import java.util.*; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; @@ -864,7 +861,7 @@ private void createAllPublicationIfNotExist() throws SQLException { } private void createCustomPublicationIfNotExist(List tableList) { - String sql = String.format("CREATE PUBLICATION %s FOR TABLE %s", slotName, tableList.stream().map(this::getSchemaAndTable).collect(Collectors.joining(", "))); + String sql = String.format("CREATE PUBLICATION %s FOR TABLE %s %s", slotName, tableList.stream().map(this::getSchemaAndTable).collect(Collectors.joining(", ")), postgresConfig.getPartitionRoot() ? "WITH (publish_via_partition_root = true)" : ""); try { tapLogger.info("Create publication sql: {}", sql); postgresJdbcContext.execute(sql); @@ -983,20 +980,35 @@ private Map filterTimeForPG(ResultSet resultSet, Map