diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java index ad2e3d8001b82..270decd8ab533 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java @@ -46,6 +46,7 @@ import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.Source; import org.apache.pulsar.io.core.SourceContext; @@ -243,7 +244,17 @@ public abstract class AbstractKafkaSourceRecord implements Record { KafkaSchemaWrappedSchema valueSchema; AbstractKafkaSourceRecord(SourceRecord srcRecord) { - this.destinationTopic = Optional.of("persistent://" + topicNamespace + "/" + srcRecord.topic()); + String topic = srcRecord.topic(); + if (topic.contains("://")) { + try { + TopicName.get(topic); + this.destinationTopic = Optional.of(topic); + } catch (IllegalArgumentException e) { + this.destinationTopic = Optional.of("persistent://" + topicNamespace + "/" + topic); + } + } else { + this.destinationTopic = Optional.of("persistent://" + topicNamespace + "/" + topic); + } this.partitionIndex = Optional.ofNullable(srcRecord.kafkaPartition()); } diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java index f00749ba7df78..7347d07ee0e06 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java @@ -30,6 +30,7 @@ import java.nio.file.Files; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.connect.file.FileStreamSourceConnector; import org.apache.kafka.connect.runtime.TaskConfig; @@ -39,6 +40,7 @@ import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.SourceContext; +import org.apache.pulsar.io.kafka.connect.KafkaConnectSource.KafkaSourceRecord; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -122,6 +124,47 @@ void testTransformationWithNegatedPredicate() throws Exception { runTransformTest(config, false); } + @Test + void testShortTopicNames() throws Exception { + Map config = getConfig(); + config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask"); + config.put(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, "default-tenant/default-ns"); + + runTopicNameTest(config, "a-topic", "persistent://default-tenant/default-ns/a-topic"); + } + + @Test + void testFullyQualifiedTopicNames() throws Exception { + Map config = getConfig(); + config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask"); + config.put(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, "default-tenant/default-ns"); + + runTopicNameTest(config, "persistent://a-tenant/a-ns/a-topic", "persistent://a-tenant/a-ns/a-topic"); + } + + private void runTopicNameTest(Map config, String topicName, String expectedDestinationTopicName) throws Exception { + config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask"); + config.put(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, "default-tenant/default-ns"); + + kafkaConnectSource = new KafkaConnectSource(); + kafkaConnectSource.open(config, context); + + Map sourcePartition = new HashMap<>(); + Map sourceOffset = new HashMap<>(); + Map value = new HashMap<>(); + sourcePartition.put("test", "test"); + sourceOffset.put("test", 0); + value.put("myField", "42"); + SourceRecord srcRecord = new SourceRecord( + sourcePartition, sourceOffset, topicName, null, + null, null, null, value + ); + + KafkaSourceRecord record = kafkaConnectSource.processSourceRecord(srcRecord); + + assertEquals(Optional.of(expectedDestinationTopicName), record.destinationTopic); + } + private Map setupTransformConfig(boolean withPredicate, boolean negated) { Map config = getConfig(); config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask");