diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml
index 5e192be95ecf4..12ebda087eb3d 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -39,6 +39,13 @@
provided
+
+ ${project.groupId}
+ pulsar-common
+ ${project.version}
+ compile
+
+
com.fasterxml.jackson.core
jackson-databind
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 06f66f60380d9..475724cc4e545 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -54,6 +54,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
@@ -91,6 +92,9 @@ public class KafkaConnectSink implements Sink {
protected String topicName;
private boolean sanitizeTopicName = false;
+ // Thi is a workaround for https://github.com/apache/pulsar/issues/19922
+ private boolean collapsePartitionedTopics = false;
+
private final Cache sanitizedTopicCache =
CacheBuilder.newBuilder().maximumSize(1000)
.expireAfterAccess(30, TimeUnit.MINUTES).build();
@@ -159,6 +163,7 @@ public void open(Map config, SinkContext ctx) throws Exception {
topicName = kafkaSinkConfig.getTopic();
unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName();
+ collapsePartitionedTopics = kafkaSinkConfig.isCollapsePartitionedTopics();
useIndexAsOffset = kafkaSinkConfig.isUseIndexAsOffset();
maxBatchBitsForOffset = kafkaSinkConfig.getMaxBatchBitsForOffset();
@@ -417,8 +422,19 @@ static BatchMessageSequenceRef getMessageSequenceRefForBatchMessage(MessageId me
@SuppressWarnings("rawtypes")
protected SinkRecord toSinkRecord(Record sourceRecord) {
- final int partition = sourceRecord.getPartitionIndex().orElse(0);
- final String topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
+ final int partition;
+ final String topic;
+
+ if (collapsePartitionedTopics
+ && sourceRecord.getTopicName().isPresent()
+ && TopicName.get(sourceRecord.getTopicName().get()).isPartitioned()) {
+ TopicName tn = TopicName.get(sourceRecord.getTopicName().get());
+ partition = tn.getPartitionIndex();
+ topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName(), sanitizeTopicName);
+ } else {
+ partition = sourceRecord.getPartitionIndex().orElse(0);
+ topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
+ }
final Object key;
final Object value;
final Schema keySchema;
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
index 19dd784578915..2525081a41ebb 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
@@ -94,6 +94,11 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
+ "In some cases it may result in topic name collisions (topic_a and topic.a will become the same)")
private boolean sanitizeTopicName = false;
+ @FieldDoc(
+ defaultValue = "false",
+ help = "Supply kafka record with topic name without -partition- suffix for partitioned topics.")
+ private boolean collapsePartitionedTopics = false;
+
public static PulsarKafkaConnectSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), PulsarKafkaConnectSinkConfig.class);
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index e9d454ed2fd5a..6ccfa3b71a2f7 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -1571,6 +1571,94 @@ public void testGetMessageSequenceRefForBatchMessage() throws Exception {
assertEquals(ref.getBatchIdx(), batchIdx);
}
+ @Test
+ public void collapsePartitionedTopicEnabledTest() throws Exception {
+ testCollapsePartitionedTopic(true,
+ "persistent://a/b/fake-topic-partition-0",
+ "persistent://a/b/fake-topic",
+ 0);
+
+ testCollapsePartitionedTopic(true,
+ "persistent://a/b/fake-topic-partition-1",
+ "persistent://a/b/fake-topic",
+ 1);
+
+ testCollapsePartitionedTopic(true,
+ "persistent://a/b/fake-topic",
+ "persistent://a/b/fake-topic",
+ 0);
+
+ testCollapsePartitionedTopic(true,
+ "fake-topic-partition-5",
+ "persistent://public/default/fake-topic",
+ 5);
+ }
+
+ @Test
+ public void collapsePartitionedTopicDisabledTest() throws Exception {
+ testCollapsePartitionedTopic(false,
+ "persistent://a/b/fake-topic-partition-0",
+ "persistent://a/b/fake-topic-partition-0",
+ 0);
+
+ testCollapsePartitionedTopic(false,
+ "persistent://a/b/fake-topic-partition-1",
+ "persistent://a/b/fake-topic-partition-1",
+ 0);
+
+ testCollapsePartitionedTopic(false,
+ "persistent://a/b/fake-topic",
+ "persistent://a/b/fake-topic",
+ 0);
+
+ testCollapsePartitionedTopic(false,
+ "fake-topic-partition-5",
+ "fake-topic-partition-5",
+ 0);
+ }
+
+ private void testCollapsePartitionedTopic(boolean isEnabled,
+ String pulsarTopic,
+ String expectedKafkaTopic,
+ int expectedPartition) throws Exception {
+ props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
+ props.put("collapsePartitionedTopics", Boolean.toString(isEnabled));
+
+ KafkaConnectSink sink = new KafkaConnectSink();
+ sink.open(props, context);
+
+ AvroSchema pulsarAvroSchema
+ = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
+
+ final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+ obj.put("field1", (byte) 10);
+ obj.put("field2", "test");
+ obj.put("field3", (short) 100);
+
+ final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema);
+ Message msg = mock(MessageImpl.class);
+ when(msg.getValue()).thenReturn(rec);
+ when(msg.getKey()).thenReturn("key");
+ when(msg.hasKey()).thenReturn(true);
+ when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
+
+ final AtomicInteger status = new AtomicInteger(0);
+ Record record = PulsarRecord.builder()
+ .topicName(pulsarTopic)
+ .message(msg)
+ .schema(pulsarAvroSchema)
+ .ackFunction(status::incrementAndGet)
+ .failFunction(status::decrementAndGet)
+ .build();
+
+ SinkRecord sinkRecord = sink.toSinkRecord(record);
+
+ Assert.assertEquals(sinkRecord.topic(), expectedKafkaTopic);
+ Assert.assertEquals(sinkRecord.kafkaPartition(), expectedPartition);
+
+ sink.close();
+ }
+
@SneakyThrows
private java.util.Date getDateFromString(String dateInString) {
SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");