Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -386,4 +386,8 @@ public static String escapeRegex(String input) {
Matcher matcher = REGEX_SPECIAL_CHARS.matcher(input);
return matcher.replaceAll("\\\\$0");
}

public static String removeSqlNote(String sql) {
return sql.replaceAll("(?s)/\\*.*?\\*/|--.*?\n", "").trim();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,15 @@ protected void initPublication() {
Set<TableId> tablesToCapture = determineCapturedTables();
Set<TableId> existsTables = new HashSet<>();
String catalog = connection().getCatalog();
try (ResultSet resultSet = stmt.executeQuery(String.format("select * from pg_publication_tables where pubname='%s'", publicationName))) {
try (ResultSet resultSet = stmt.executeQuery(String.format("select schemaname, tablename from pg_publication_tables pt join pg_publication pp\n" +
" on pt.pubname = pp.pubname\n" +
"where pt.pubname='%s' and pp.pubviaroot='false'\n" +
"union all\n" +
"select schemaname, pc.relname from pg_publication_tables pt join pg_publication pp\n" +
" on pt.pubname = pp.pubname join pg_class pc on pc.oid in (SELECT inhrelid\n" +
" FROM pg_inherits\n" +
" WHERE inhparent = (pt.schemaname||'.'||pt.tablename)::regclass)\n" +
"where pt.pubname='%s' and pp.pubviaroot='true'", publicationName, publicationName))) {
while (resultSet.next()) {
existsTables.add(new TableId(catalog, resultSet.getString("schemaname"), resultSet.getString("tablename")));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1954,7 +1954,7 @@ fullColumnName
;

indexColumnName
: (uid | STRING_LITERAL) ('(' decimalLiteral ')')? sortType=(ASC | DESC)?
: ('(' expression ')' | uid | STRING_LITERAL) ('(' decimalLiteral ')')? sortType=(ASC | DESC)?
;

userName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,36 @@
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import io.tapdata.common.concurrent.ConcurrentProcessor;
import io.tapdata.common.concurrent.TapExecutors;
import io.tapdata.common.ddl.DDLFactory;
import io.tapdata.common.ddl.ccj.CCJBaseDDLWrapper;
import io.tapdata.common.ddl.type.DDLParserType;
import io.tapdata.common.ddl.wrapper.DDLWrapperConfig;
import io.tapdata.connector.mysql.config.MysqlConfig;
import io.tapdata.connector.mysql.entity.MysqlBinlogPosition;
import io.tapdata.connector.mysql.util.MySQLJsonParser;
import io.tapdata.entity.event.TapEvent;
import io.tapdata.entity.event.ddl.TapDDLEvent;
import io.tapdata.entity.event.ddl.TapDDLUnknownEvent;
import io.tapdata.entity.event.dml.TapDeleteRecordEvent;
import io.tapdata.entity.event.dml.TapInsertRecordEvent;
import io.tapdata.entity.event.dml.TapUpdateRecordEvent;
import io.tapdata.entity.logger.Log;
import io.tapdata.entity.schema.TapField;
import io.tapdata.entity.schema.TapTable;
import io.tapdata.entity.utils.cache.KVReadOnlyMap;
import io.tapdata.kit.EmptyKit;
import io.tapdata.kit.StringKit;
import io.tapdata.pdk.apis.consumer.StreamReadConsumer;

import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static io.tapdata.base.ConnectorBase.list;

Expand All @@ -38,10 +49,16 @@ public class MysqlReaderV2 {
private StreamReadConsumer consumer;
private final AtomicReference<Exception> exception = new AtomicReference<>();
private final Map<Long, TableMapEventData> tableMapEventByTableId = new ConcurrentHashMap<>();
private final Map<String, LinkedHashMap<String, String>> dataTypeMap = new ConcurrentHashMap<>();
private final Map<String, Map<String, Object[]>> enumDataTypeMap = new ConcurrentHashMap<>();
private final TimeZone timeZone;
private final DDLWrapperConfig DDL_WRAPPER_CONFIG = CCJBaseDDLWrapper.CCJDDLWrapperConfig.create().split("`");
private final DDLParserType ddlParserType = DDLParserType.MYSQL_CCJ_SQL_PARSER;

public MysqlReaderV2(MysqlJdbcContextV2 mysqlJdbcContext, Log tapLogger) {
public MysqlReaderV2(MysqlJdbcContextV2 mysqlJdbcContext, Log tapLogger, TimeZone timeZone) {
this.tapLogger = tapLogger;
mysqlConfig = (MysqlConfig) mysqlJdbcContext.getConfig();
this.timeZone = timeZone;
}

public void init(List<String> tableList, KVReadOnlyMap<TapTable> tableMap, Object offsetState, int recordSize, StreamReadConsumer consumer) throws Throwable {
Expand All @@ -59,7 +76,11 @@ public void startMiner(Supplier<Boolean> isAlive) throws Throwable {

try (ConcurrentProcessor<ScanEvent, OffsetEvent> concurrentProcessor = TapExecutors.createSimple(8, 32, "MysqlReader-Processor")) {
client.setServerId(randomServerId());

EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG_MICRO
);
client.setEventDeserializer(eventDeserializer);
// 设置起始位置
if (offsetState instanceof MysqlBinlogPosition) {
MysqlBinlogPosition position = (MysqlBinlogPosition) offsetState;
Expand All @@ -81,6 +102,9 @@ public void startMiner(Supplier<Boolean> isAlive) throws Throwable {
tapLogger.info("Binlog rotated to: {}/{}", rotateEventData.getBinlogFilename(), rotateEventData.getBinlogPosition());
} else if (eventType == EventType.TABLE_MAP) {
handleTableMapEvent(event);
} else if (eventType == EventType.QUERY) {
concurrentProcessor.runAsyncWithBlocking(new ScanEvent(event, currentBinlogFile.get()), this::emit);
return;
}

// 异步处理事件
Expand Down Expand Up @@ -140,6 +164,39 @@ private OffsetEvent emit(ScanEvent scanEvent) {
return null;
}

if (eventType == EventType.QUERY) {
QueryEventData queryEventData = event.getData();
long eventTime = header.getTimestamp();
String ddl = StringKit.removeSqlNote(queryEventData.getSql());
OffsetEvent offsetEvent = new OffsetEvent();
List<TapEvent> ddlEvents = new ArrayList<>();
try {
DDLFactory.ddlToTapDDLEvent(
ddlParserType,
ddl,
DDL_WRAPPER_CONFIG,
tableMap,
tapDDLEvent -> {
tapDDLEvent.setTime(System.currentTimeMillis());
tapDDLEvent.setReferenceTime(eventTime);
tapDDLEvent.setOriginDDL(ddl);
ddlEvents.add(tapDDLEvent);
tapLogger.info("Read DDL: " + ddl + ", about to be packaged as some event(s)");
}
);
} catch (Throwable e) {
TapDDLEvent tapDDLEvent = new TapDDLUnknownEvent();
tapDDLEvent.setTime(System.currentTimeMillis());
tapDDLEvent.setReferenceTime(eventTime);
tapDDLEvent.setOriginDDL(ddl);
ddlEvents.add(tapDDLEvent);
}
offsetEvent.setTapEvent(ddlEvents);
offsetEvent.setMysqlBinlogPosition(extractBinlogPosition(event, scanEvent.getFileName()));
ddlEvents.forEach(e -> ddlFlush(((TapDDLEvent) e).getTableId()));
return offsetEvent;
}

// 处理数据变更事件
List<TapEvent> tapEvents;
MysqlBinlogPosition position;
Expand Down Expand Up @@ -185,10 +242,35 @@ private void handleTableMapEvent(Event event) {

// 保存映射关系
tableMapEventByTableId.put(tableId, tableMapEventData);

ddlFlush(table);
tapLogger.debug("Table map event: tableId={}, database={}, table={}", tableId, database, table);
}

private void ddlFlush(String table) {
if (EmptyKit.isBlank(table)) {
return;
}
LinkedHashMap<String, String> dataTypes = tableMap.get(table).getNameFieldMap().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> StringKit.removeParentheses(e.getValue().getDataType()),
(existing, replacement) -> existing, LinkedHashMap::new));
dataTypeMap.put(table, dataTypes);
Map<String, Object[]> enumMap = tableMap.get(table).getNameFieldMap().entrySet().stream().filter(v -> v.getValue().getDataType().startsWith("enum"))
.collect(Collectors.toMap(Map.Entry::getKey, e -> {
String enumType = e.getValue().getDataType();
Object[] enumValues = enumType.substring("enum(".length(), enumType.length() - 1).split(",");
for (int i = 0; i < enumValues.length; i++) {
String element = ((String) enumValues[i]).trim();
if (element.startsWith("'")) {
enumValues[i] = StringKit.removeHeadTail(element, "'", null);
} else {
enumValues[i] = new BigDecimal(element);
}
}
return enumValues;
}));
enumDataTypeMap.put(table, enumMap);
}

/**
* 处理 INSERT 事件
*/
Expand All @@ -213,7 +295,7 @@ private List<TapEvent> handleInsertEvent(Event event) {
}
List<TapEvent> tapEvents = new ArrayList<>();
for (Serializable[] row : rows) {
Map<String, Object> after = convertRowToMap(tableId, row, tapTable);
Map<String, Object> after = convertRowToMap(row, dataTypeMap.get(tableName), enumDataTypeMap.get(tableName));
TapInsertRecordEvent insertEvent = new TapInsertRecordEvent();
insertEvent.init();
insertEvent.table(tableName);
Expand Down Expand Up @@ -249,8 +331,8 @@ private List<TapEvent> handleUpdateEvent(Event event) {
}
List<TapEvent> tapEvents = new ArrayList<>();
for (Map.Entry<Serializable[], Serializable[]> row : rows) {
Map<String, Object> before = convertRowToMap(tableId, row.getKey(), tapTable);
Map<String, Object> after = convertRowToMap(tableId, row.getValue(), tapTable);
Map<String, Object> before = convertRowToMap(row.getKey(), dataTypeMap.get(tableName), enumDataTypeMap.get(tableName));
Map<String, Object> after = convertRowToMap(row.getValue(), dataTypeMap.get(tableName), enumDataTypeMap.get(tableName));

TapUpdateRecordEvent updateEvent = new TapUpdateRecordEvent();
updateEvent.init();
Expand Down Expand Up @@ -288,7 +370,7 @@ private List<TapEvent> handleDeleteEvent(Event event) {
}
List<TapEvent> tapEvents = new ArrayList<>();
for (Serializable[] row : rows) {
Map<String, Object> before = convertRowToMap(tableId, row, tapTable);
Map<String, Object> before = convertRowToMap(row, dataTypeMap.get(tableName), enumDataTypeMap.get(tableName));

TapDeleteRecordEvent deleteEvent = new TapDeleteRecordEvent();
deleteEvent.init();
Expand All @@ -304,31 +386,65 @@ private List<TapEvent> handleDeleteEvent(Event event) {
/**
* 将行数据数组转换为 Map
*/
private Map<String, Object> convertRowToMap(long tableId, Serializable[] row, TapTable tapTable) {
private Map<String, Object> convertRowToMap(Serializable[] row, LinkedHashMap<String, String> dataTypes, Map<String, Object[]> enumMap) {
Map<String, Object> result = new LinkedHashMap<>();

if (row == null || tapTable == null) {
return result;
}

LinkedHashMap<String, TapField> nameFieldMap = tapTable.getNameFieldMap();
if (nameFieldMap == null || nameFieldMap.isEmpty()) {
if (row == null || EmptyKit.isEmpty(dataTypes)) {
return result;
}

// 获取字段名列表(按顺序)
List<String> fieldNames = new ArrayList<>(nameFieldMap.keySet());
List<String> fieldNames = new ArrayList<>(dataTypes.keySet());

// 将数组值映射到字段名
for (int i = 0; i < row.length && i < fieldNames.size(); i++) {
String fieldName = fieldNames.get(i);
Object value = row[i];
result.put(fieldName, value);
result.put(fieldName, filterValue(value, dataTypes.get(fieldName), enumMap.get(fieldName)));
}

return result;
}

private Object filterValue(Object value, String dataType, Object[] enumValues) {
if (value == null) {
return null;
}
switch (dataType) {
case "time":
case "date": {
if (value instanceof Long) {
return Instant.ofEpochSecond(((Long) value) / 1000000, (((Long) value) % 1000000) * 1000);
}
}
case "datetime": {
if (value instanceof Long) {
return Instant.ofEpochSecond(((Long) value) / 1000000 - mysqlConfig.getZoneOffsetHour() * 60 * 60, ((Long) value % 1000000) * 1000);
}
}
case "timestamp": {
if (value instanceof Long) {
return Instant.ofEpochSecond(((Long) value) / 1000000 + timeZone.getRawOffset() / 1000, ((Long) value % 1000000) * 1000).atZone(ZoneOffset.UTC);
}
}
case "bit":
return ((BitSet) value).get(0);
case "binary":
case "varbinary":
return String.valueOf(value).getBytes();
case "json":
return MySQLJsonParser.parseMySQLJsonBinary((byte[]) value);
case "tinytext":
case "mediumtext":
case "text":
case "longtext":
return new String((byte[]) value);
case "enum":
return enumValues[(int) value - 1];
}
return value;
}

/**
* 获取表名
*/
Expand Down Expand Up @@ -393,6 +509,9 @@ static class OffsetEvent {
private List<TapEvent> tapEvents;
private MysqlBinlogPosition mysqlBinlogPosition;

public OffsetEvent() {
}

public OffsetEvent(List<TapEvent> tapEvents, MysqlBinlogPosition mysqlBinlogPosition) {
this.tapEvents = tapEvents;
this.mysqlBinlogPosition = mysqlBinlogPosition;
Expand Down
Loading
Loading