Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,23 @@

import io.tapdata.common.postman.enums.PostParam;

import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

public class Url {
String raw;
List<String> host;
List<String> path;
List<Map<String,Object>> query;
List<Map<String,Object>> variable;
public Url copyOne(){
List<String> host = new ArrayList<>();
List<String> path = new ArrayList<>();
List<Map<String, Object>> query = new ArrayList<>();
List<Map<String, Object>> variable = new ArrayList<>();

public Url copyOne() {
Url url = new Url();
url.raw(this.raw);
url.host(this.host);
Expand All @@ -21,13 +28,15 @@ public Url copyOne(){
return url;
}

public static Url create(){
public static Url create() {
return new Url();
}
public static Url create(String url){

public static Url create(String url) {
return Url.create().raw(url);
}
public static Url create(Map<String,Object> map){

public static Url create(Map<String, Object> map) {
try {
if (Objects.isNull(map)) return Url.create();
Object rawObj = map.get(PostParam.RAW);
Expand All @@ -37,100 +46,120 @@ public static Url create(Map<String,Object> map){
Object variableObj = map.get(PostParam.VARIABLE);

String raw = Objects.isNull(rawObj) ? null : (String) rawObj;
List<String> host = Objects.isNull(hostObj) ? null : (List<String>) hostObj;
List<String> path = Objects.isNull(pathObj) ? null : (List<String>) pathObj;
List<Map<String,Object>> query = Objects.isNull(queryObj) ? null : (List<Map<String,Object>>) queryObj;
List<Map<String,Object>> variable = Objects.isNull(variableObj) ? null : (List<Map<String,Object>>) variableObj;
return Url.create().raw(raw).host(host).path(path).query(query).variable(variable);
}catch (Exception e){
Url url = Url.create().raw(raw);
if (hostObj instanceof Collection) {
url.host().addAll((Collection<String>) hostObj);
}
if (pathObj instanceof Collection) {
url.path().addAll((Collection<String>) pathObj);
}
if (queryObj instanceof Collection) {
url.query().addAll((Collection<Map<String, Object>>) queryObj);
}
if (variableObj instanceof Collection) {
url.variable().addAll((Collection<Map<String, Object>>) variableObj);
}
return url;
} catch (Exception e) {
return Url.create();
}
}
public static Url create(Object urlObj){
if ( urlObj instanceof String) {

public static Url create(Object urlObj) {
if (urlObj instanceof String) {
return Url.create((String) urlObj);
}else if(urlObj instanceof Map){
Map<String,Object> map = (Map<String, Object>) urlObj;
} else if (urlObj instanceof Map) {
Map<String, Object> map = (Map<String, Object>) urlObj;
return Url.create(map);
}
return Url.create();
}
public String raw(){

public String raw() {
return this.raw;
}
public Url raw(String raw){

public Url raw(String raw) {
this.raw = raw;
return this;
}
public List<String> host(){

public List<String> host() {
return this.host;
}
public Url host(List<String> host){
this.host = host;

public Url host(List<String> host) {
this.host = Optional.ofNullable(host).orElse(new ArrayList<>());
return this;
}
public List<String> path(){
return this.path;

public List<String> path() {
return Optional.ofNullable(this.path).orElse(new ArrayList<>());
}
public Url path(List<String> path){
this.path = path;

public Url path(List<String> path) {
this.path = Optional.ofNullable(path).orElse(new ArrayList<>());
return this;
}
public List<Map<String,Object>> query(){
return this.query;

public List<Map<String, Object>> query() {
return Optional.ofNullable(this.query).orElse(new ArrayList<>());
}
public Url query(List<Map<String,Object>> query){
this.query = query;

public Url query(List<Map<String, Object>> query) {
this.query = Optional.ofNullable(query).orElse(new ArrayList<>());
return this;
}
public List<Map<String,Object>> variable(){
return this.variable;

public List<Map<String, Object>> variable() {
return Optional.ofNullable(this.variable).orElse(new ArrayList<>());
}
public Url variable(List<Map<String,Object>> variable){
this.variable = variable;

public Url variable(List<Map<String, Object>> variable) {
this.variable = Optional.ofNullable(variable).orElse(new ArrayList<>());
return this;
}

public Url variableAssignment(Map<String, Object> params) {
final String[] rawBack = {this.raw};
List<Map<String,Object>> queryBack = new ArrayList<>();
Map<String, Map<String, Object>> varMap = Objects.isNull(this.variable) ? new HashMap<>(): this.variable.stream().collect(Collectors.toMap(var -> String.valueOf(var.get(PostParam.KEY)), var -> var, (v1, v2) -> v2));
Map<String,Map<String,Object>> queryMap = Objects.isNull(this.query) ? new HashMap<>(): this.query.stream().collect(Collectors.toMap(var -> String.valueOf(var.get(PostParam.KEY)), var -> var, (v1, v2) -> v2));
varMap.forEach((var,varValue)->{
Object attributeParamValue = Objects.isNull(params)? null: params.get(var);
if (attributeParamValue instanceof Map){
attributeParamValue = ((Map<String,Object>)attributeParamValue).get(PostParam.VALUE);
List<Map<String, Object>> queryBack = new ArrayList<>();
Map<String, Map<String, Object>> varMap = Objects.isNull(this.variable) ? new HashMap<>() : this.variable.stream().collect(Collectors.toMap(var -> String.valueOf(var.get(PostParam.KEY)), var -> var, (v1, v2) -> v2));
Map<String, Map<String, Object>> queryMap = Objects.isNull(this.query) ? new HashMap<>() : this.query.stream().collect(Collectors.toMap(var -> String.valueOf(var.get(PostParam.KEY)), var -> var, (v1, v2) -> v2));
varMap.forEach((var, varValue) -> {
Object attributeParamValue = Objects.isNull(params) ? null : params.get(var);
if (attributeParamValue instanceof Map) {
attributeParamValue = ((Map<String, Object>) attributeParamValue).get(PostParam.VALUE);
}
Object valueObj = varValue.get(PostParam.VALUE);
String val = Objects.isNull(valueObj)? "": String.valueOf(valueObj);
if (Objects.nonNull(attributeParamValue) ){
String val = Objects.isNull(valueObj) ? "" : String.valueOf(valueObj);
if (Objects.nonNull(attributeParamValue)) {
val = String.valueOf(attributeParamValue);
}
rawBack[0] = rawBack[0].replaceAll(":"+var,val);
rawBack[0] = rawBack[0].replaceAll(":" + var, val);
});
queryMap.forEach((var,varValue)->{
Object attributeParamValue = Objects.isNull(params)? null: params.get(var);
if (attributeParamValue instanceof Map){
attributeParamValue = ((Map<String,Object>)attributeParamValue).get(PostParam.VALUE);
queryMap.forEach((var, varValue) -> {
Object attributeParamValue = Objects.isNull(params) ? null : params.get(var);
if (attributeParamValue instanceof Map) {
attributeParamValue = ((Map<String, Object>) attributeParamValue).get(PostParam.VALUE);
}
Object valueObj = varValue.get(PostParam.VALUE);
String val = Objects.isNull(valueObj)? "": String.valueOf(valueObj);
if (Objects.nonNull(attributeParamValue)){
String val = Objects.isNull(valueObj) ? "" : String.valueOf(valueObj);
if (Objects.nonNull(attributeParamValue)) {
val = String.valueOf(attributeParamValue);
}
Map<String,Object> backMap = new HashMap<>();
backMap.put(PostParam.KEY,var);
backMap.put(PostParam.VALUE,val);
backMap.put(PostParam.DESCRIPTION,varValue.get(PostParam.DESCRIPTION));
Map<String, Object> backMap = new HashMap<>();
backMap.put(PostParam.KEY, var);
backMap.put(PostParam.VALUE, val);
backMap.put(PostParam.DESCRIPTION, varValue.get(PostParam.DESCRIPTION));
queryBack.add(backMap);
});
if (Objects.nonNull(params) && !params.isEmpty()) {
for (Map.Entry<String,Object> varEntry : params.entrySet()) {
for (Map.Entry<String, Object> varEntry : params.entrySet()) {
Object value = varEntry.getValue();
if (value instanceof Map){
value = ((Map<String,Object>)value).get(PostParam.VALUE);
if (value instanceof Map) {
value = ((Map<String, Object>) value).get(PostParam.VALUE);
}
rawBack[0] = rawBack[0].replaceAll("\\{\\{"+varEntry.getKey()+"}}",Objects.isNull(value)?"":String.valueOf(value));
rawBack[0] = rawBack[0].replaceAll("\\{\\{" + varEntry.getKey() + "}}", Objects.isNull(value) ? "" : String.valueOf(value));
}
}
return Url.create().query(queryBack).variable(this.variable).raw(rawBack[0]).host(this.host).path(this.path);
Expand Down
2 changes: 1 addition & 1 deletion connectors-common/connector-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<dependency>
<groupId>io.tapdata</groupId>
<artifactId>tapdata-pdk-runner</artifactId>
<version>2.0-SNAPSHOT</version>
<version>2.5-SNAPSHOT</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
<dependency>
<groupId>io.tapdata</groupId>
<artifactId>tapdata-pdk-api</artifactId>
<version>2.0.1-SNAPSHOT</version>
<version>2.0.5-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion connectors-common/hive-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<properties>
<java.version>8</java.version>
<sql.core.version>1.0-SNAPSHOT</sql.core.version>
<tapdata.pdk.api.verison>2.0.0-SNAPSHOT</tapdata.pdk.api.verison>
<tapdata.pdk.api.verison>2.0.5-SNAPSHOT</tapdata.pdk.api.verison>

</properties>
<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;

import java.io.BufferedReader;
import java.io.IOException;
Expand Down Expand Up @@ -313,7 +314,7 @@ public void consumeOne(TapTable tapTable, int eventBatchSize, BiConsumer<List<Ta
}

@Override
public void streamConsume(List<String> tableList, int eventBatchSize, BiConsumer<List<TapEvent>, Object> eventsOffsetConsumer) {
public void streamConsume(List<String> tableList, StreamReadOneByOneConsumer eventsOffsetConsumer) {
throw new CoreException("The schemaRegister function is not supported as the source for the time being. ");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;

import javax.script.Invocable;
import javax.script.ScriptEngine;
Expand Down Expand Up @@ -641,13 +642,13 @@ public void consumeOne(TapTable tapTable, int eventBatchSize, BiConsumer<List<Ta
for (ConsumerRecord<byte[], byte[]> consumerRecord : consumerRecords) {
makeMessage(consumerRecord, tableName, list::add);
if (list.size() >= eventBatchSize) {
syncEventSubmit(list, eventsOffsetConsumer);
eventsOffsetConsumer.accept(list, TapSimplify.list());
list = TapSimplify.list();
}
}
}
if (EmptyKit.isNotEmpty(list)) {
syncEventSubmit(list, eventsOffsetConsumer);
eventsOffsetConsumer.accept(list, TapSimplify.list());
}
} catch (Exception e) {
throwable.set(e);
Expand All @@ -669,37 +670,27 @@ public void consumeOne(TapTable tapTable, int eventBatchSize, BiConsumer<List<Ta
}
}

private synchronized void syncEventSubmit(List<TapEvent> eventList, BiConsumer<List<TapEvent>, Object> eventsOffsetConsumer) {
eventsOffsetConsumer.accept(eventList, TapSimplify.list());
}

@Override
public void streamConsume(List<String> tableList, Object offset, int eventBatchSize, BiConsumer<List<TapEvent>, Object> eventsOffsetConsumer) {
public void streamConsume(List<String> tableList, Object offset, StreamReadOneByOneConsumer eventsOffsetConsumer) {
consuming.set(true);
int maxDelay = 500;
KafkaConfig kafkaConfig = (KafkaConfig) mqConfig;
ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration((kafkaConfig), connectorId, true);
try (KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfiguration.build())) {
KafkaOffset streamOffset = KafkaOffsetUtils.setConsumerByOffset(kafkaConsumer, tableList, offset, consuming);
try (BatchPusher<TapEvent> batchPusher = new BatchPusher<TapEvent>(
tapEvents -> eventsOffsetConsumer.accept(tapEvents, streamOffset.clone())
).batchSize(eventBatchSize).maxDelay(maxDelay)) {
// 将初始化的 offset 推送到目标,让指定时间的增量任务下次启动时拿到 offset
Optional.of(new HeartbeatEvent()).ifPresent(event -> {
event.setTime(System.currentTimeMillis());
batchPusher.add(event);
});
// 将初始化的 offset 推送到目标,让指定时间的增量任务下次启动时拿到 offset
Optional.of(new HeartbeatEvent()).ifPresent(event -> {
event.setTime(System.currentTimeMillis());
eventsOffsetConsumer.accept(event, streamOffset.clone());
});

// 消费数据
while (consuming.get()) {
ConsumerRecords<byte[], byte[]> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2L));
if (consumerRecords.isEmpty()) {
batchPusher.checkAndSummit();
} else {
for (ConsumerRecord<byte[], byte[]> consumerRecord : consumerRecords) {
streamOffset.addTopicOffset(consumerRecord); // 推进 offset
makeMessage(consumerRecord, consumerRecord.topic(), batchPusher::add);
}
// 消费数据
while (consuming.get()) {
ConsumerRecords<byte[], byte[]> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2L));
if (!consumerRecords.isEmpty()) {
for (ConsumerRecord<byte[], byte[]> consumerRecord : consumerRecords) {
streamOffset.addTopicOffset(consumerRecord); // 推进 offset
makeMessage(consumerRecord, consumerRecord.topic(), e -> eventsOffsetConsumer.accept(e, streamOffset.clone()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
*/
public class BatchPusher<T> implements AutoCloseable {

private int batchSize = 100;
private int maxDelay = 2000;
private long lastTime;
private final Consumer<List<T>> submitConsumer;
Expand All @@ -23,11 +22,6 @@ public BatchPusher(Consumer<List<T>> submitConsumer) {
this.submitConsumer = submitConsumer;
}

public BatchPusher<T> batchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}

public BatchPusher<T> maxDelay(int maxDelay) {
this.maxDelay = maxDelay;
return this;
Expand All @@ -39,7 +33,7 @@ public void add(T record) {
}

public void checkAndSummit() {
if (batchList.size() >= batchSize || (System.currentTimeMillis() - lastTime > maxDelay && !batchList.isEmpty())) {
if (System.currentTimeMillis() - lastTime > maxDelay && !batchList.isEmpty()) {
summit();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.tapdata.pdk.apis.entity.TestItem;
import io.tapdata.pdk.apis.entity.WriteListResult;
import io.tapdata.pdk.apis.functions.connection.ConnectionCheckItem;
import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;

import java.util.List;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -43,11 +44,11 @@ default void consumeOne(TapTable tapTable, int eventBatchSize, BiConsumer<List<T
throw new UnsupportedOperationException();
}

default void streamConsume(List<String> tableList, int eventBatchSize, BiConsumer<List<TapEvent>, Object> eventsOffsetConsumer) throws Throwable {
default void streamConsume(List<String> tableList, StreamReadOneByOneConsumer eventsOffsetConsumer) throws Throwable {
throw new UnsupportedOperationException();
}

default void streamConsume(List<String> tableList, Object offset, int eventBatchSize, BiConsumer<List<TapEvent>, Object> eventsOffsetConsumer) {
default void streamConsume(List<String> tableList, Object offset, StreamReadOneByOneConsumer eventsOffsetConsumer) {
throw new UnsupportedOperationException();
}
}
2 changes: 1 addition & 1 deletion connectors-common/mysql-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<java.version>8</java.version>
<mysql.driver.version>8.0.33</mysql.driver.version>
<debezium.version>1.5.4.Final</debezium.version>
<tapdata.pdk.api.version>2.0.0-SNAPSHOT</tapdata.pdk.api.version>
<tapdata.pdk.api.version>2.0.5-SNAPSHOT</tapdata.pdk.api.version>
<pdk-error-code.version>1.0-SNAPSHOT</pdk-error-code.version>
</properties>
<dependencyManagement>
Expand Down
Loading
Loading