Skip to content

[improve][io][kca] support fully-qualified topic names in source records#10

Closed
efcasado wants to merge 2 commits intomasterfrom
kca-fq-destination-topic-names
Closed

[improve][io][kca] support fully-qualified topic names in source records#10
efcasado wants to merge 2 commits intomasterfrom
kca-fq-destination-topic-names

Conversation

@efcasado
Copy link

@efcasado efcasado commented May 5, 2025

Motivation

The current implementation of the Kafka Connect adaptor in Pulsar IO does not support fully-qualified Pulsar topic names in source records. Instead, it forcefully prepends the default persistent://<tenant>/<namespace>/ prefix to all destination topics. This behavior can be problematic in multi-tenant environments where dynamic topic routing is required.

For example, consider a setup with:

  • A shared PostgreSQL instance where tenants are isolated in separate schemas (e.g. tenant1, tenant2)
  • A shared Pulsar cluster where each tenant has its own Pulsar tenant (e.g. tenant1, tenant2)
  • A single Debezium PostgreSQL source connector deployed in the global (shared) tenant

Using Kafka Connect transformations, users may want to route records to tenant-specific topics based on the schema:

transforms.reroute.topic.regex: "mydatabaseserver.(.*).orders"
transforms.reroute.topic.replacement: "persistent://$1/procurement/orders"

With this configuration, changes to tenant1.orders should go to persistent://tenant1/procurement/orders, and tenant2.orders to persistent://tenant2/procurement/orders.

However, the current implementation prepends persistent://global/procurement/ to the already fully-qualified topic, resulting in invalid topic names like persistent://global/procurement/persistent://tenant1/procurement/orders. This causes runtime exceptions and connector failure loops.

By supporting fully-qualified topic names, this change enables more flexible and tenant-aware architectures without requiring additional processing layers.

Modifications

This change enhances AbstractKafkaConnectSource to correctly handle fully-qualified topic names by using the org.apache.pulsar.common.naming.TopicName utility. If the topic is valid and fully-qualified, it is respected as-is. Otherwise, the adaptor falls back to the existing behavior, ensuring backward compatibility.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • Extended the existing KafkaConnectSourceTest test to verify that transforms can be initialized and applied

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@efcasado efcasado force-pushed the kca-fq-destination-topic-names branch 2 times, most recently from 5b35aaa to 6afa94c Compare May 5, 2025 19:07
@efcasado efcasado force-pushed the kca-fq-destination-topic-names branch from 6afa94c to fa9a291 Compare May 5, 2025 19:43
@efcasado efcasado changed the title [improve][io] support fully-qualified topic names in kca source records [improve][io][kca] support fully-qualified topic names in source records May 5, 2025
@efcasado efcasado closed this May 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant