From ddc60db5beb56b66ab790519ddd6a38a3589f075 Mon Sep 17 00:00:00 2001 From: Enrique Fernandez Date: Sat, 26 Apr 2025 18:58:51 +0200 Subject: [PATCH 01/12] support kafka connect transforms --- pulsar-io/kafka-connect-adaptor/pom.xml | 13 ++++ .../connect/AbstractKafkaConnectSource.java | 2 +- .../io/kafka/connect/KafkaConnectSource.java | 61 ++++++++++++++++++- .../kafka/connect/KafkaConnectSourceTest.java | 42 +++++++++++++ 4 files changed, 114 insertions(+), 4 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml index b12665eb4785a..a29b5618bf80a 100644 --- a/pulsar-io/kafka-connect-adaptor/pom.xml +++ b/pulsar-io/kafka-connect-adaptor/pom.xml @@ -166,6 +166,19 @@ + + org.apache.kafka + connect-transforms + ${kafka-client.version} + test + + + jose4j + org.bitbucket.b_c + + + + ${project.groupId} testmocks diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java index a7bf32d9bc77f..ad2e3d8001b82 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java @@ -176,7 +176,7 @@ public synchronized Record read() throws Exception { } if (currentBatch.hasNext()) { AbstractKafkaSourceRecord processRecord = processSourceRecord(currentBatch.next()); - if (processRecord.isEmpty()) { + if (processRecord == null || processRecord.isEmpty()) { outstandingRecords.decrementAndGet(); continue; } else { diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java index f2ee8a8e6cafe..f3fb8ebd8191e 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java @@ -21,7 +21,9 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import io.confluent.connect.avro.AvroData; +import java.util.ArrayList; import java.util.Base64; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -31,6 +33,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.transforms.Transformation; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; @@ -51,6 +54,8 @@ public class KafkaConnectSource extends AbstractKafkaConnectSource> transformations = new ArrayList<>(); + public void open(Map config, SourceContext sourceContext) throws Exception { if (config.get(JSON_WITH_ENVELOPE_CONFIG) != null) { jsonWithEnvelope = Boolean.parseBoolean(config.get(JSON_WITH_ENVELOPE_CONFIG).toString()); @@ -60,17 +65,67 @@ public void open(Map config, SourceContext sourceContext) throws } log.info("jsonWithEnvelope: {}", jsonWithEnvelope); + initTransforms(config); super.open(config, sourceContext); } + private void initTransforms(Map config) { + transformations.clear(); + Object transformsListObj = config.get("transforms"); + if (transformsListObj != null) { + String transformsList = transformsListObj.toString(); + for (String transformName : transformsList.split(",")) { + transformName = transformName.trim(); + String prefix = "transforms." + transformName + "."; + String typeKey = prefix + "type"; + Object classNameObj = config.get(typeKey); + if (classNameObj == null) { + continue; + } + String className = classNameObj.toString(); + try { + @SuppressWarnings("unchecked") + Class> clazz = + (Class>) Class.forName(className); + Transformation transform = clazz.getDeclaredConstructor().newInstance(); + java.util.Map transformConfig = config.entrySet().stream() + .filter(e -> e.getKey().startsWith(prefix)) + .collect(java.util.stream.Collectors.toMap( + e -> e.getKey().substring(prefix.length()), + java.util.Map.Entry::getValue + )); + log.info("transform config: {}", transformConfig); + transform.configure(transformConfig); + transformations.add(transform); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate SMT: " + className, e); + } + } + } + } + + private static final AvroData avroData = new AvroData(1000); public synchronized KafkaSourceRecord processSourceRecord(final SourceRecord srcRecord) { - KafkaSourceRecord record = new KafkaSourceRecord(srcRecord); - offsetWriter.offset(srcRecord.sourcePartition(), srcRecord.sourceOffset()); + SourceRecord transformedRecord = applyTransforms(srcRecord); + if (transformedRecord == null) { + return null; + } + KafkaSourceRecord record = new KafkaSourceRecord(transformedRecord); + offsetWriter.offset(transformedRecord.sourcePartition(), transformedRecord.sourceOffset()); return record; } - private static final AvroData avroData = new AvroData(1000); + public SourceRecord applyTransforms(SourceRecord record) { + SourceRecord current = record; + for (Transformation transform : transformations) { + if (current == null) { + break; + } + current = transform.apply(current); + } + return current; + } public class KafkaSourceRecord extends AbstractKafkaSourceRecord> implements KVRecord { diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java index 8852ba02b040f..aa1a9d8192145 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java @@ -21,8 +21,10 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.assertEquals; import java.io.File; import java.io.OutputStream; import java.nio.file.Files; @@ -31,6 +33,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.connect.file.FileStreamSourceConnector; import org.apache.kafka.connect.runtime.TaskConfig; +import org.apache.kafka.connect.source.SourceRecord; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.schema.KeyValue; @@ -101,6 +104,45 @@ public void testOpenAndReadTaskDirect() throws Exception { testOpenAndReadTask(config); } + @Test + void testCastTransformation() throws Exception{ + // Config for Cast SMT to cast "myField" to int32 + Map config = getConfig(); + config.put(TaskConfig.TASK_CLASS_CONFIG, + "org.apache.kafka.connect.file.FileStreamSourceTask"); + config.put("transforms", "Cast"); + config.put("transforms.Cast.type", "org.apache.kafka.connect.transforms.Cast$Value"); + config.put("transforms.Cast.spec", "myField:int32"); + + // Instantiate the source and initialize transformations + kafkaConnectSource = new KafkaConnectSource(); + kafkaConnectSource.open(config, context); + + // Test source record + Map value = new HashMap<>(); + value.put("myField", "42"); + SourceRecord record = new SourceRecord( + null, + null, + "test-topic", + null, + null, // key schema + null, // key + null, // value schema + value // value + ); + + // Apply transforms + SourceRecord transformed = kafkaConnectSource.applyTransforms(record); + + // Assert "myField" is now an Integer with value 42 + @SuppressWarnings("unchecked") + Map transformedValue = (Map) transformed.value(); + assertNotNull(transformedValue); + assertEquals(42, transformedValue.get("myField")); + assertTrue(transformedValue.get("myField") instanceof Integer); + } + private Map getConfig() { Map config = new HashMap<>(); From f59cd00978f36da77e364b702d4b0bb471b6f751 Mon Sep 17 00:00:00 2001 From: Enrique Fernandez Date: Mon, 28 Apr 2025 18:11:56 +0200 Subject: [PATCH 02/12] include connect-transforms in the kafka-connect-adaptor bundle --- pulsar-io/kafka-connect-adaptor/pom.xml | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml index a29b5618bf80a..b70996b77b11a 100644 --- a/pulsar-io/kafka-connect-adaptor/pom.xml +++ b/pulsar-io/kafka-connect-adaptor/pom.xml @@ -110,6 +110,18 @@ + + org.apache.kafka + connect-transforms + ${kafka-client.version} + + + jose4j + org.bitbucket.b_c + + + + ${project.groupId} @@ -166,19 +178,6 @@ - - org.apache.kafka - connect-transforms - ${kafka-client.version} - test - - - jose4j - org.bitbucket.b_c - - - - ${project.groupId} testmocks From cc67a37c0c878864506a68c81d9faac62f3bbd77 Mon Sep 17 00:00:00 2001 From: Enrique Fernandez Date: Mon, 28 Apr 2025 18:55:43 +0200 Subject: [PATCH 03/12] support kafka connect predicates --- .../io/kafka/connect/KafkaConnectSource.java | 62 ++++++++++++++++++- 1 file changed, 59 insertions(+), 3 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java index f3fb8ebd8191e..358d39e6d1763 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java @@ -23,16 +23,19 @@ import io.confluent.connect.avro.AvroData; import java.util.ArrayList; import java.util.Base64; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.transforms.predicates.Predicate; import org.apache.kafka.connect.transforms.Transformation; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.schema.KeyValue; @@ -54,7 +57,8 @@ public class KafkaConnectSource extends AbstractKafkaConnectSource> transformations = new ArrayList<>(); + private Map> predicates = new HashMap<>(); +private List, Transformation>> transformations = new ArrayList<>(); public void open(Map config, SourceContext sourceContext) throws Exception { if (config.get(JSON_WITH_ENVELOPE_CONFIG) != null) { @@ -65,10 +69,45 @@ public void open(Map config, SourceContext sourceContext) throws } log.info("jsonWithEnvelope: {}", jsonWithEnvelope); + initPredicates(config); initTransforms(config); super.open(config, sourceContext); } + private void initPredicates(Map config) { + Object predicatesListObj = config.get("predicates"); + if (predicatesListObj != null) { + String predicatesList = predicatesListObj.toString(); + for (String predicateName : predicatesList.split(",")) { + predicateName = predicateName.trim(); + String prefix = "predicates." + predicateName + "."; + String typeKey = prefix + "type"; + Object classNameObj = config.get(typeKey); + if (classNameObj == null) { + continue; + } + String className = classNameObj.toString(); + try { + @SuppressWarnings("unchecked") + Class> clazz = + (Class>) Class.forName(className); + Predicate predicate = clazz.getDeclaredConstructor().newInstance(); + java.util.Map predicateConfig = config.entrySet().stream() + .filter(e -> e.getKey().startsWith(prefix)) + .collect(java.util.stream.Collectors.toMap( + e -> e.getKey().substring(prefix.length()), + java.util.Map.Entry::getValue + )); + log.info("predicate config: {}", predicateConfig); + predicate.configure(predicateConfig); + predicates.put(predicateName, predicate); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate predicate: " + className, e); + } + } + } + } + private void initTransforms(Map config) { transformations.clear(); Object transformsListObj = config.get("transforms"); @@ -95,8 +134,17 @@ private void initTransforms(Map config) { java.util.Map.Entry::getValue )); log.info("transform config: {}", transformConfig); + String predicateName = (String) transformConfig.get("predicate"); + Predicate predicate = null; + if (predicateName != null) { + predicate = predicates.get(predicateName); + if (predicate == null) { + log.warn("Transform {} references non-existent predicate: {}", + transformName, predicateName); + } + } transform.configure(transformConfig); - transformations.add(transform); + transformations.add(Pair.of(predicate, transform)); } catch (Exception e) { throw new RuntimeException("Failed to instantiate SMT: " + className, e); } @@ -118,10 +166,18 @@ public synchronized KafkaSourceRecord processSourceRecord(final SourceRecord src public SourceRecord applyTransforms(SourceRecord record) { SourceRecord current = record; - for (Transformation transform : transformations) { + for (Pair, Transformation> pair : transformations) { if (current == null) { break; } + + Predicate predicate = pair.getLeft(); + Transformation transform = pair.getRight(); + + if (predicate != null && !predicate.test(current)) { + continue; + } + current = transform.apply(current); } return current; From 08a8331f56ca7729d78fbd3e591e4f48040ffcf4 Mon Sep 17 00:00:00 2001 From: Enrique Fernandez Date: Mon, 28 Apr 2025 19:33:25 +0200 Subject: [PATCH 04/12] support negated predicates --- .../io/kafka/connect/KafkaConnectSource.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java index 358d39e6d1763..57a615033055b 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java @@ -58,7 +58,13 @@ public class KafkaConnectSource extends AbstractKafkaConnectSource> predicates = new HashMap<>(); -private List, Transformation>> transformations = new ArrayList<>(); + + private record PredicatedTransform( + Predicate predicate, + Transformation transform, + boolean negated + ) {} + private List transformations = new ArrayList<>(); public void open(Map config, SourceContext sourceContext) throws Exception { if (config.get(JSON_WITH_ENVELOPE_CONFIG) != null) { @@ -135,6 +141,7 @@ private void initTransforms(Map config) { )); log.info("transform config: {}", transformConfig); String predicateName = (String) transformConfig.get("predicate"); + boolean negated = (boolean) transformConfig.getOrDefault("negated", false); Predicate predicate = null; if (predicateName != null) { predicate = predicates.get(predicateName); @@ -144,7 +151,7 @@ private void initTransforms(Map config) { } } transform.configure(transformConfig); - transformations.add(Pair.of(predicate, transform)); + transformations.add(new PredicatedTransform(predicate, transform, negated)); } catch (Exception e) { throw new RuntimeException("Failed to instantiate SMT: " + className, e); } @@ -166,19 +173,16 @@ public synchronized KafkaSourceRecord processSourceRecord(final SourceRecord src public SourceRecord applyTransforms(SourceRecord record) { SourceRecord current = record; - for (Pair, Transformation> pair : transformations) { + for (PredicatedTransform pt : transformations) { if (current == null) { break; } - Predicate predicate = pair.getLeft(); - Transformation transform = pair.getRight(); - - if (predicate != null && !predicate.test(current)) { + if (pt.predicate != null && (pt.negated != pt.predicate.test(current))) { continue; } - current = transform.apply(current); + current = pt.transform.apply(current); } return current; } From 1af9274eaa50d768881450c6adb1265c67b9f3e9 Mon Sep 17 00:00:00 2001 From: Enrique Fernandez Date: Mon, 28 Apr 2025 19:50:43 +0200 Subject: [PATCH 05/12] address style errors --- .../pulsar/io/kafka/connect/KafkaConnectSource.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java index 57a615033055b..a8071d52cc288 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java @@ -30,13 +30,12 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.source.SourceRecord; -import org.apache.kafka.connect.transforms.predicates.Predicate; import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.predicates.Predicate; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; @@ -58,7 +57,7 @@ public class KafkaConnectSource extends AbstractKafkaConnectSource> predicates = new HashMap<>(); - + private record PredicatedTransform( Predicate predicate, Transformation transform, @@ -177,11 +176,11 @@ public SourceRecord applyTransforms(SourceRecord record) { if (current == null) { break; } - + if (pt.predicate != null && (pt.negated != pt.predicate.test(current))) { continue; } - + current = pt.transform.apply(current); } return current; From 67f8ca7cacb458b94dff842f664ee5b7d554c756 Mon Sep 17 00:00:00 2001 From: Enrique Fernandez Date: Mon, 28 Apr 2025 19:51:09 +0200 Subject: [PATCH 06/12] unit tests for predicates --- .../kafka/connect/KafkaConnectSourceTest.java | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java index aa1a9d8192145..558ccb73550d8 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java @@ -143,6 +143,93 @@ void testCastTransformation() throws Exception{ assertTrue(transformedValue.get("myField") instanceof Integer); } + @Test + void testTransformationWithPredicate() throws Exception{ + // Config for Cast SMT to cast "myField" to int32 + Map config = getConfig(); + config.put(TaskConfig.TASK_CLASS_CONFIG, + "org.apache.kafka.connect.file.FileStreamSourceTask"); + config.put("predicates", "TopicMatch"); + config.put("predicates.TopicMatch.type", "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"); + config.put("predicates.TopicMatch.pattern", "test-topic"); + config.put("transforms", "Cast"); + config.put("transforms.Cast.type", "org.apache.kafka.connect.transforms.Cast$Value"); + config.put("transforms.Cast.spec", "myField:int32"); + config.put("transforms.Cast.predicate", "TopicMatch"); + + // Instantiate the source and initialize transformations + kafkaConnectSource = new KafkaConnectSource(); + kafkaConnectSource.open(config, context); + + // Test source record + Map value = new HashMap<>(); + value.put("myField", "42"); + SourceRecord record = new SourceRecord( + null, + null, + "test-topic", + null, + null, // key schema + null, // key + null, // value schema + value // value + ); + + // Apply transforms + SourceRecord transformed = kafkaConnectSource.applyTransforms(record); + + // Assert "myField" is now an Integer with value 42 + @SuppressWarnings("unchecked") + Map transformedValue = (Map) transformed.value(); + assertNotNull(transformedValue); + assertEquals(42, transformedValue.get("myField")); + assertTrue(transformedValue.get("myField") instanceof Integer); + } + + @Test + void testTransformationWithNegatedPredicate() throws Exception{ + // Config for Cast SMT to cast "myField" to int32 + Map config = getConfig(); + config.put(TaskConfig.TASK_CLASS_CONFIG, + "org.apache.kafka.connect.file.FileStreamSourceTask"); + config.put("predicates", "TopicMatch"); + config.put("predicates.TopicMatch.type", "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"); + config.put("predicates.TopicMatch.pattern", "test-topic"); + config.put("transforms", "Cast"); + config.put("transforms.Cast.type", "org.apache.kafka.connect.transforms.Cast$Value"); + config.put("transforms.Cast.spec", "myField:int32"); + config.put("transforms.Cast.predicate", "TopicMatch"); + config.put("transforms.Cast.negate", "true"); + + // Instantiate the source and initialize transformations + kafkaConnectSource = new KafkaConnectSource(); + kafkaConnectSource.open(config, context); + + // Test source record + Map value = new HashMap<>(); + value.put("myField", "42"); + SourceRecord record = new SourceRecord( + null, + null, + "test-topic", + null, + null, // key schema + null, // key + null, // value schema + value // value + ); + + // Apply transforms + SourceRecord transformed = kafkaConnectSource.applyTransforms(record); + + // Assert "myField" is now an Integer with value 42 + @SuppressWarnings("unchecked") + Map transformedValue = (Map) transformed.value(); + assertNotNull(transformedValue); + assertEquals("42", transformedValue.get("myField")); + assertTrue(transformedValue.get("myField") instanceof String); + } + private Map getConfig() { Map config = new HashMap<>(); From 5da247b583e3c83ab8296ce391b608f799c76f8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Enrique=20Fern=C3=A1ndez?= Date: Mon, 28 Apr 2025 19:55:05 +0200 Subject: [PATCH 07/12] support cases where negated value is a string at runtime Co-authored-by: Lari Hotari --- .../org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java index a8071d52cc288..746c94d2c782f 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java @@ -140,7 +140,7 @@ private void initTransforms(Map config) { )); log.info("transform config: {}", transformConfig); String predicateName = (String) transformConfig.get("predicate"); - boolean negated = (boolean) transformConfig.getOrDefault("negated", false); + boolean negated = Boolean.parseBoolean(String.valueOf(transformConfig.getOrDefault("negated", "false"))); Predicate predicate = null; if (predicateName != null) { predicate = predicates.get(predicateName); From 01b90e280c3146eafb482da825c51eb8f231db00 Mon Sep 17 00:00:00 2001 From: Enrique Fernandez Date: Mon, 28 Apr 2025 20:11:22 +0200 Subject: [PATCH 08/12] address style errors --- .../org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java index 746c94d2c782f..f0c496007f6b0 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java @@ -140,7 +140,8 @@ private void initTransforms(Map config) { )); log.info("transform config: {}", transformConfig); String predicateName = (String) transformConfig.get("predicate"); - boolean negated = Boolean.parseBoolean(String.valueOf(transformConfig.getOrDefault("negated", "false"))); + boolean negated = Boolean.parseBoolean( + String.valueOf(transformConfig.getOrDefault("negated", "false"))); Predicate predicate = null; if (predicateName != null) { predicate = predicates.get(predicateName); From 50f5a0c22f9a9943bcd109bf6c67de08a173179e Mon Sep 17 00:00:00 2001 From: Enrique Fernandez Date: Tue, 29 Apr 2025 08:50:53 +0200 Subject: [PATCH 09/12] reduce code duplication in tests --- .../kafka/connect/KafkaConnectSourceTest.java | 138 +++++------------- 1 file changed, 40 insertions(+), 98 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java index 558ccb73550d8..f00749ba7df78 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java @@ -105,129 +105,71 @@ public void testOpenAndReadTaskDirect() throws Exception { } @Test - void testCastTransformation() throws Exception{ - // Config for Cast SMT to cast "myField" to int32 - Map config = getConfig(); - config.put(TaskConfig.TASK_CLASS_CONFIG, - "org.apache.kafka.connect.file.FileStreamSourceTask"); - config.put("transforms", "Cast"); - config.put("transforms.Cast.type", "org.apache.kafka.connect.transforms.Cast$Value"); - config.put("transforms.Cast.spec", "myField:int32"); - - // Instantiate the source and initialize transformations - kafkaConnectSource = new KafkaConnectSource(); - kafkaConnectSource.open(config, context); - - // Test source record - Map value = new HashMap<>(); - value.put("myField", "42"); - SourceRecord record = new SourceRecord( - null, - null, - "test-topic", - null, - null, // key schema - null, // key - null, // value schema - value // value - ); - - // Apply transforms - SourceRecord transformed = kafkaConnectSource.applyTransforms(record); + void testTransformation() throws Exception { + Map config = setupTransformConfig(false, false); + runTransformTest(config, true); + } - // Assert "myField" is now an Integer with value 42 - @SuppressWarnings("unchecked") - Map transformedValue = (Map) transformed.value(); - assertNotNull(transformedValue); - assertEquals(42, transformedValue.get("myField")); - assertTrue(transformedValue.get("myField") instanceof Integer); + @Test + void testTransformationWithPredicate() throws Exception { + Map config = setupTransformConfig(true, false); + runTransformTest(config, true); } @Test - void testTransformationWithPredicate() throws Exception{ - // Config for Cast SMT to cast "myField" to int32 + void testTransformationWithNegatedPredicate() throws Exception { + Map config = setupTransformConfig(true, true); + runTransformTest(config, false); + } + + private Map setupTransformConfig(boolean withPredicate, boolean negated) { Map config = getConfig(); - config.put(TaskConfig.TASK_CLASS_CONFIG, - "org.apache.kafka.connect.file.FileStreamSourceTask"); - config.put("predicates", "TopicMatch"); - config.put("predicates.TopicMatch.type", "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"); - config.put("predicates.TopicMatch.pattern", "test-topic"); + config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask"); + + if (withPredicate) { + config.put("predicates", "TopicMatch"); + config.put("predicates.TopicMatch.type", "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"); + config.put("predicates.TopicMatch.pattern", "test-topic"); + } + config.put("transforms", "Cast"); config.put("transforms.Cast.type", "org.apache.kafka.connect.transforms.Cast$Value"); config.put("transforms.Cast.spec", "myField:int32"); - config.put("transforms.Cast.predicate", "TopicMatch"); - - // Instantiate the source and initialize transformations - kafkaConnectSource = new KafkaConnectSource(); - kafkaConnectSource.open(config, context); - - // Test source record - Map value = new HashMap<>(); - value.put("myField", "42"); - SourceRecord record = new SourceRecord( - null, - null, - "test-topic", - null, - null, // key schema - null, // key - null, // value schema - value // value - ); - // Apply transforms - SourceRecord transformed = kafkaConnectSource.applyTransforms(record); + if (withPredicate) { + config.put("transforms.Cast.predicate", "TopicMatch"); + if (negated) { + config.put("transforms.Cast.negate", "true"); + } + } - // Assert "myField" is now an Integer with value 42 - @SuppressWarnings("unchecked") - Map transformedValue = (Map) transformed.value(); - assertNotNull(transformedValue); - assertEquals(42, transformedValue.get("myField")); - assertTrue(transformedValue.get("myField") instanceof Integer); + return config; } - @Test - void testTransformationWithNegatedPredicate() throws Exception{ - // Config for Cast SMT to cast "myField" to int32 - Map config = getConfig(); - config.put(TaskConfig.TASK_CLASS_CONFIG, - "org.apache.kafka.connect.file.FileStreamSourceTask"); - config.put("predicates", "TopicMatch"); - config.put("predicates.TopicMatch.type", "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"); - config.put("predicates.TopicMatch.pattern", "test-topic"); - config.put("transforms", "Cast"); - config.put("transforms.Cast.type", "org.apache.kafka.connect.transforms.Cast$Value"); - config.put("transforms.Cast.spec", "myField:int32"); - config.put("transforms.Cast.predicate", "TopicMatch"); - config.put("transforms.Cast.negate", "true"); - - // Instantiate the source and initialize transformations + private void runTransformTest(Map config, boolean expectTransformed) throws Exception { kafkaConnectSource = new KafkaConnectSource(); kafkaConnectSource.open(config, context); - // Test source record Map value = new HashMap<>(); value.put("myField", "42"); SourceRecord record = new SourceRecord( - null, - null, - "test-topic", - null, - null, // key schema - null, // key - null, // value schema - value // value + null, null, "test-topic", null, + null, null, null, value ); - // Apply transforms SourceRecord transformed = kafkaConnectSource.applyTransforms(record); - // Assert "myField" is now an Integer with value 42 @SuppressWarnings("unchecked") Map transformedValue = (Map) transformed.value(); assertNotNull(transformedValue); - assertEquals("42", transformedValue.get("myField")); - assertTrue(transformedValue.get("myField") instanceof String); + + if (expectTransformed) { + assertEquals(42, ((Number)transformedValue.get("myField")).intValue()); + assertTrue(transformedValue.get("myField") instanceof Number); + } else { + assertEquals("42", transformedValue.get("myField")); + assertTrue(transformedValue.get("myField") instanceof String); + } } private Map getConfig() { From 5f66409c3316a0c643c937ccd266635fb7302889 Mon Sep 17 00:00:00 2001 From: Enrique Fernandez Date: Tue, 29 Apr 2025 09:26:16 +0200 Subject: [PATCH 10/12] fix predicate evaluation logic --- .../org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java index f0c496007f6b0..1f05d69fa5933 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java @@ -178,7 +178,7 @@ public SourceRecord applyTransforms(SourceRecord record) { break; } - if (pt.predicate != null && (pt.negated != pt.predicate.test(current))) { + if (pt.predicate != null && !(pt.negated != pt.predicate.test(current))) { continue; } From 37d596e7f821bdf9c9fb521a8ba572a4cabcb5e5 Mon Sep 17 00:00:00 2001 From: Enrique Fernandez Date: Tue, 29 Apr 2025 10:02:32 +0200 Subject: [PATCH 11/12] fix typo in negated predicate setting --- .../org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java index 1f05d69fa5933..8a4deff0fc9f6 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java @@ -141,7 +141,7 @@ private void initTransforms(Map config) { log.info("transform config: {}", transformConfig); String predicateName = (String) transformConfig.get("predicate"); boolean negated = Boolean.parseBoolean( - String.valueOf(transformConfig.getOrDefault("negated", "false"))); + String.valueOf(transformConfig.getOrDefault("negate", "false"))); Predicate predicate = null; if (predicateName != null) { predicate = predicates.get(predicateName); From 9b7134a9da3aef7bb158801286e4a5a4bd2cefae Mon Sep 17 00:00:00 2001 From: Enrique Fernandez Date: Tue, 29 Apr 2025 14:59:42 +0200 Subject: [PATCH 12/12] commit offsets of filtered records --- .../apache/pulsar/io/kafka/connect/KafkaConnectSource.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java index 8a4deff0fc9f6..3d5d76d4230f6 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java @@ -163,11 +163,13 @@ private void initTransforms(Map config) { public synchronized KafkaSourceRecord processSourceRecord(final SourceRecord srcRecord) { SourceRecord transformedRecord = applyTransforms(srcRecord); + + offsetWriter.offset(srcRecord.sourcePartition(), srcRecord.sourceOffset()); if (transformedRecord == null) { return null; } + KafkaSourceRecord record = new KafkaSourceRecord(transformedRecord); - offsetWriter.offset(transformedRecord.sourcePartition(), transformedRecord.sourceOffset()); return record; }