diff --git a/124-stretch-cluster.md b/124-stretch-cluster.md new file mode 100644 index 00000000..bcb9683b --- /dev/null +++ b/124-stretch-cluster.md @@ -0,0 +1,1719 @@ +# Stretch Kafka Cluster + +The Strimzi Kafka Operator currently manages Kafka clusters within a single Kubernetes cluster. +This proposal extends its support to stretch Kafka clusters, where broker and controller Kafka pods are distributed across multiple Kubernetes clusters. + +## Current situation + +At present, the availability of Strimzi-managed Kafka clusters is constrained by the availability of the underlying Kubernetes cluster. +If a Kubernetes cluster experiences an outage, the entire Kafka cluster becomes unavailable, disrupting all connected Kafka clients. + +While it is possible to enhance availability by running Kubernetes across multiple availability zones and configuring Strimzi with affinity rules, tolerations, or topology spread constraints, such configurations are still limited to a single Kubernetes control plane. +Any failure or disruption to this control plane or a broader infrastructure issue affecting the cluster can impact cluster operations and affect connected clients. +In contrast, the stretch cluster distributes Kafka nodes across independent Kubernetes clusters, each with its own control plane and fault domain. +This enhances fault tolerance by enabling Kafka to continue operating even if one entire Kubernetes cluster becomes unavailable. + +The intent is not to imply that Kubernetes control planes are unreliable, they are mature and widely used in HA configurations. +Rather, the stretch approach supports scenarios where users operate multiple isolated Kubernetes clusters for organizational, operational, or regulatory reasons, and want Kafka to span these boundaries while maintaining quorum and availability. + +## Motivation + +A stretch Kafka cluster allows Kafka nodes to be distributed across multiple Kubernetes clusters. +This approach facilitates several valuable use cases: + +- **High Availability**: In a stretch cluster deployment, Kafka broker and Controller pods are distributed across multiple, fully independent Kubernetes clusters. +Each cluster operates its own control plane and infrastructure. +This separation improves resilience because the failure of one Kubernetes cluster whether due to control plane issues, networking failures, or infrastructure outages does not affect the availability of Kafka brokers running in the other clusters. +Clients can continue producing and consuming data without interruption as long as a quorum of brokers and controllers remains operational. +Unlike MirrorMaker 2 (MM2), a stretch cluster provides strong data durability through synchronous replication and enables fast disaster recovery with automated client failover. + +- **Migration Flexibility**: A stretch Kafka cluster enables migration flexibility, whether it's moving the entire cluster across Kubernetes environments or cloud providers without downtime, or relocating individual Kafka nodes as needed. +This flexibility helps with maintenance, scaling, and workload transitions between environments. + +## Proposal + +This proposal enhances the Strimzi Kafka operator to support stretch Kafka clusters, where broker, controller, and combined-role Kafka pods are distributed across multiple Kubernetes clusters. +The goal is to ensure high availability of Kafka client operations such as producing and consuming messages even in the event of a single cluster failure, including failure of the central cluster. +The proposal outlines the high level topology, design concepts, and detailed implementation considerations for such deployments. + +### Limitations and Considerations + +While a stretch Kafka cluster offers several advantages, it also introduces some challenges: + +- **Increased Network Complexity and Costs**: Communication between brokers and controllers across clusters relies on network connectivity, which can be less reliable and more costly than intra-cluster communication. + +- **Latency Requirements**: Stretch Kafka clusters are best suited for environments with low latency and high bandwidth network connections between Kubernetes clusters. +High latency can adversely affect performance and synchronization of Kafka nodes. + +- **Optimal Use Case**: Stretch clusters are optimized for same datacenter multi availability zone deployments where inter-cluster latency is < 5ms. +Testing validates excellent performance (50,000 msg/sec throughput with full availability) in this configuration. + + **Important:** Latency recommendations below are based on conservative worst-case testing. + Real world same datacenter multi-AZ deployments will perform significantly better than these thresholds suggest. + See "Performance Characteristics and Operational Guidelines" section for detailed validation results and testing methodology. + + For cross region disaster recovery with higher latency (> 50ms), MirrorMaker 2 asynchronous replication is more appropriate. + +### Prerequisites + +- **Multiple Kubernetes clusters**: Stretch Kafka clusters require multiple Kubernetes clusters. +The recommended minimum number of clusters is 3 to simplify achieving quorum for Kafka controllers and enhance High Availability (HA). +However, the Cluster Operator does not enforce this as a hard requirement. +Stretch clusters can be deployed with fewer than 3 clusters to allow migration flexibility, resource optimization scenarios, or test environments. + +- **Low Latency and High Bandwidth**: Kafka clusters require low-latency and high-bandwidth network connections between clusters. + - **Recommended:** < 5ms inter-cluster latency (same datacenter, multi-AZ deployment) + - **Maximum viable:** < 50ms (severe performance degradation expected) + - **Bandwidth:** >= 10 Gbps recommended + - **Packet loss:** < 0.1% + - **Jitter:** < 2ms + + **Note:** These thresholds are based on conservative worst-case testing methodology. + Real world same datacenter deployments where only cross cluster traffic experiences latency will perform significantly better. + Extensive validation with 27 brokers across 3 clusters achieved 50,000 msg/sec throughput with < 1ms inter-cluster latency. + See "Performance Characteristics and Operational Guidelines" section for complete testing results. + + Users MUST validate network characteristics before deployment using latency tests, bandwidth tests, and optional Chaos Mesh validation. + +- **cross cluster networking**: Stretch clusters require network connectivity between Kubernetes clusters. +This may be provided through different mechanisms such as MCS based networking solutions, LoadBalancer-based access, or NodePort-based access. +This proposal defines a Service Provider Interface (SPI) that allows multiple networking implementations to be plugged in based on the chosen connectivity model. + + +### Networking Architecture: Plugin-Based Design + +#### Overview + +The core challenge in stretch Kafka clusters is enabling cross cluster pod-to-pod communication for Kafka brokers and controllers. +Different deployment environments may require different networking approaches: +- Multi-Cluster Services (MCS) API for DNS-based service discovery +- NodePort services with fixed node IPs +- LoadBalancer services with external IPs +- Custom cloud specific solutions + +Rather than implementing these mechanisms directly in the operator, this proposal introduces a **Service Provider Interface (SPI)** that allows networking implementations to be loaded as external plugins. + +#### Stretch Networking Provider SPI + +The `StretchNetworkingProvider` interface defines the contract that all networking plugins must implement: + +```java +public interface StretchNetworkingProvider { + // Initialize provider with configuration and cluster access + Future init(Map config, + ResourceOperatorSupplier centralSupplier, + RemoteResourceOperatorSupplier remoteSupplier); + + // Create networking resources (Services, ServiceExports, etc.) + Future> createNetworkingResources( + Reconciliation reconciliation, + String namespace, + String podName, + String clusterId, + Map ports); + + // Discover the actual endpoint for a pod (critical for configuration) + Future discoverPodEndpoint( + Reconciliation reconciliation, + String namespace, + String serviceName, + String clusterId, + String portName); + + // Generate advertised.listeners configuration + Future generateAdvertisedListeners( + Reconciliation reconciliation, + String namespace, + String podName, + String clusterId, + Map listeners); + + // Generate controller.quorum.voters configuration + Future generateQuorumVoters( + Reconciliation reconciliation, + String namespace, + List controllerPods, + String replicationPortName); + + // Generate DNS names for services and pods + String generateServiceDnsName(String namespace, String serviceName, String clusterId); + String generatePodDnsName(String namespace, String serviceName, String podName, String clusterId); + + // Clean up networking resources + Future deleteNetworkingResources( + Reconciliation reconciliation, + String namespace, + String podName, + String clusterId); + + String getProviderName(); +} +``` + +#### Plugin Loading Mechanism + +Networking providers are loaded dynamically at operator startup using Java's ServiceLoader mechanism. +The central cluster operator configuration specifies which plugin to use: + +```yaml +env: +- name: STRIMZI_STRETCH_PLUGIN_CLASS_NAME + value: io.strimzi.plugin.stretch.NodePortNetworkingProvider +- name: STRIMZI_STRETCH_PLUGIN_CLASS_PATH + value: /opt/strimzi/plugins/* +``` + +The `StretchNetworkingProviderFactory` handles plugin discovery and initialization: +1. Creates a custom ClassLoader with the specified plugin classpath +2. Loads the provider class using reflection +3. Calls the `init()` method to initialize the provider +4. Returns the initialized provider for use by the operator + +This approach provides: +- **Separation of concerns**: Core operator logic is decoupled from networking implementation details +- **Extensibility**: Users can implement custom providers without modifying operator code +- **Testing**: Providers can be tested independently and swapped for different environments + +#### Plugin JAR Deployment + +Users must provide the networking provider plugin JAR file to the cluster operator. +This is accomplished by creating a ConfigMap containing the plugin JAR and mounting it as a volume in the operator deployment. + +**Step 1: Create ConfigMap with Plugin JAR** + +```bash +# Download or build the plugin JAR +# For example, using the NodePort provider +wget https://github.com/strimzi/strimzi-stretch-nodeport-plugin/releases/download/v0.1.0/nodeport-plugin.jar + +# Create ConfigMap from the JAR file +kubectl create configmap stretch-plugin-nodeport \ + --from-file=nodeport-plugin.jar=nodeport-plugin.jar \ + --namespace kafka +``` + +**Step 2: Mount ConfigMap in Operator Deployment** + +Modify the cluster operator deployment to mount the ConfigMap as a volume: + +```yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: strimzi-cluster-operator + namespace: kafka +spec: + template: + spec: + containers: + - name: strimzi-cluster-operator + image: quay.io/strimzi/operator:latest + env: + - name: STRIMZI_STRETCH_PLUGIN_CLASS_NAME + value: io.strimzi.plugin.stretch.NodePortNetworkingProvider + - name: STRIMZI_STRETCH_PLUGIN_CLASS_PATH + value: /opt/strimzi/plugins/* + volumeMounts: + - name: plugin-volume + mountPath: /opt/strimzi/plugins + readOnly: true + volumes: + - name: plugin-volume + configMap: + name: stretch-plugin-nodeport +``` + +The ConfigMap must be created in the same namespace where the cluster operator is deployed. +The `mountPath` specified in `volumeMounts` must match the directory referenced in `STRIMZI_STRETCH_PLUGIN_CLASS_PATH`. +The ConfigMap must be created before the operator pod starts, otherwise the operator will fail to load the plugin. + +#### Reference Implementations + +The Strimzi project provides reference implementations in separate repositories: + +**NodePort Provider** (`strimzi-stretch-nodeport-plugin`) +- Creates NodePort services for each Kafka pod +- Caches one stable node IP per cluster (NodePort is accessible from any node) +- Returns endpoints in format: `nodeIP:nodePort` (e.g., `10.21.37.21:31001`) +- Best for: On-premises deployments, development environments + +**LoadBalancer Provider** (`strimzi-stretch-loadbalancer-plugin`) +- Creates LoadBalancer services for each Kafka pod +- Waits for LoadBalancer IP assignment with exponential backoff retry +- Returns endpoints in format: `loadBalancerIP:port` (e.g., `10.21.50.10:9091`) +- Best for: Cloud environments with LoadBalancer support (AWS ELB, GCP, Azure) + +**MCS Provider** (Third-party reference) +- Creates Service + ServiceExport resources per the Multi-Cluster Services API +- Returns DNS-based endpoints: `pod.clusterId.svc.clusterset.local:port` +- Best for: Environments with MCS implementation (Submariner, Cilium Cluster Mesh) + +Each provider handles the specific requirements of its networking approach, including resource creation, endpoint discovery, and cleanup. + + +### High-level Architecture + +#### Topology of a Stretch Cluster + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ STRETCH KAFKA CLUSTER │ +│ (3 Kubernetes Clusters) │ +└─────────────────────────────────────────────────────────────────────────────┘ + +┌──────────────────────┐ ┌──────────────────────┐ ┌──────────────────────┐ +│ Central Cluster │ │ Remote Cluster 1 │ │ Remote Cluster 2 │ +│ (cluster-central) │ │ (cluster-east) │ │ (cluster-west) │ +├──────────────────────┤ ├──────────────────────┤ ├──────────────────────┤ +│ │ │ │ │ │ +│ ┌──────────────────┐ │ │ ┌──────────────────┐ │ │ ┌──────────────────┐ │ +│ │ Strimzi │ │ │ │ Strimzi │ │ │ │ Strimzi │ │ +│ │ Cluster │ │ │ │ Cluster │ │ │ │ Cluster │ │ +│ │ Operator │ │ │ │ Operator │ │ │ │ Operator │ │ +│ │ │ │ │ │ │ │ │ │ │ │ +│ │ ┌────────────┐ │ │ │ │ │ │ │ │ │ │ +│ │ │ Plugin JAR │ │ │ │ │ (No plugin) │ │ │ │ (No plugin) │ │ +│ │ │ (NodePort, │ │ │ │ │ │ │ │ │ │ │ +│ │ │ LoadBal, │ │ │ │ │ │ │ │ │ │ │ +│ │ │ or MCS) │ │ │ │ │ │ │ │ │ │ │ +│ │ └────────────┘ │ │ │ │ │ │ │ │ │ │ +│ │ │ │ │ │ │ │ │ │ │ │ +│ │ Reconciles: │ │ │ │ Reconciles: │ │ │ │ Reconciles: │ │ +│ │ • Kafka CR │ │ │ │ • StrimziPodSet │ │ │ │ • StrimziPodSet │ │ +│ │ • KafkaNodePool │ │ │ │ (pod creation)│ │ │ │ (pod creation)│ │ +│ │ • All resources │ │ │ │ │ │ │ │ │ │ +│ └──────────────────┘ │ │ └──────────────────┘ │ │ └──────────────────┘ │ +│ │ │ │ │ │ +│ ┌──────────────────┐ │ │ │ │ │ +│ │ Custom Resources │ │ │ │ │ │ +│ ├──────────────────┤ │ │ │ │ │ +│ │ • Kafka CR │ │ │ (No CRs here) │ │ (No CRs here) │ +│ │ • KafkaNodePool │ │ │ │ │ │ +│ │ • KafkaConnect │ │ │ │ │ │ +│ │ • KafkaTopic │ │ │ │ │ │ +│ └──────────────────┘ │ │ │ │ │ +│ │ │ │ │ │ │ +│ │ │ │ │ │ │ +│ ▼ │ │ │ │ │ +│ ┌──────────────────┐ │ │ ┌──────────────────┐ │ │ ┌──────────────────┐ │ +│ │ Kafka Resources │ │ │ │ Kafka Resources │ │ │ │ Kafka Resources │ │ +│ ├──────────────────┤ │ │ ├──────────────────┤ │ │ ├──────────────────┤ │ +│ │ │ │ │ │ │ │ │ │ │ │ +│ │ StrimziPodSet: │ │ │ │ GC ConfigMap │ │ │ │ GC ConfigMap │ │ +│ │ pool-central │ │ │ │ (Standalone) │ │ │ │ (Standalone) │ │ +│ │ │ │ │ │ │ │ │ │ │ │ │ │ +│ │ ┌──────┐ ┌─────┐ │ │ │ │ ▼ owns │ │ │ │ ▼ owns │ │ +│ │ │kafka │ │kafka│ │ │ │ │ StrimziPodSet: │ │ │ │ StrimziPodSet: │ │ +│ │ │ -0 │ │ -1 │ │ │ │ │ pool-east │ │ │ │ pool-west │ │ +│ │ │(ctrl)│ │(ctrl│ │ │ │ │ │ │ │ │ │ │ │ │ +│ │ │(brkr)│ │brkr)│ │ │ │ │ ▼ owns │ │ │ │ ▼ owns │ │ +│ │ └──────┘ └─────┘ │ │ │ │ ┌──────┐ ┌─────┐ │ │ │ │ ┌──────┐ │ │ +│ │ │ │ │ │ │kafka │ │kafka│ │ │ │ │ │kafka │ │ │ +│ │ ConfigMaps │ │ │ │ │ -2 │ │ -3 │ │ │ │ │ │ -4 │ │ │ +│ │ Secrets (CA) │ │ │ │ │(ctrl)│ │(brkr│ │ │ │ │ │(ctrl)│ │ │ +│ │ Services │ │ │ │ │(brkr)│ │only)│ │ │ │ │ │(brkr)│ │ │ +│ │ PVCs │ │ │ │ └──────┘ └─────┘ │ │ │ │ └──────┘ │ │ +│ └──────────────────┘ │ │ │ │ │ │ │ │ │ +│ │ │ │ ConfigMaps │ │ │ │ ConfigMaps │ │ +│ │ │ │ Secrets (CA) │ │ │ │ Secrets (CA) │ │ +│ │ │ │ Services │ │ │ │ Services │ │ +│ │ │ │ PVCs │ │ │ │ PVCs │ │ +│ │ │ │ (All owned by │ │ │ │ (All owned by │ │ +│ │ │ │ GC ConfigMap) │ │ │ │ GC ConfigMap) │ │ +│ │ │ └──────────────────┘ │ │ └──────────────────┘ │ +└──────────────────────┘ └──────────────────────┘ └──────────────────────┘ + │ │ │ + └─────────────────────────┴─────────────────────────┘ + │ + ┌────────────────┴────────────────┐ + │ Cross-Cluster Networking │ + │ (Provided by Plugin) │ + │ │ + │ All implementations are │ + │ external plugins: │ + │ • NodePort Plugin │ + │ • LoadBalancer Plugin │ + │ • MCS Plugin │ + │ • Custom Plugin │ + └─────────────────────────────────┘ + +Legend: + • Central operator manages ALL resources across all clusters + • Remote operators only reconcile StrimziPodSet (create/update pods) + • Plugin JAR (NodePort/LoadBalancer/MCS) mounted only in central operator + • GC ConfigMap is standalone (no owner) and owns ALL remote resources +``` + +The diagram illustrates a topology comprising three Kubernetes clusters. + +One cluster is designated as the "central cluster", while additional clusters are considered "remote". +The central cluster acts as the control plane where users create all custom resources for the Kafka cluster: Kafka, KafkaNodePool, KafkaUser, KafkaTopic, etc. + +A KafkaNodePool definition specifies a target Kubernetes cluster (central or remote) as the deployment target. +The operator on the central cluster creates all necessary resources for the node pool on the target cluster. + +#### Resource Distribution + +The following table shows which resources are created in each cluster: + +| Resource Type | Central Cluster | Remote Clusters | +|--------------|----------------|----------------| +| **Custom Resources (Kafka, KafkaNodePool)** | Created by user | Never created | +| **StrimziPodSet** | Created for central pools | Created by central operator | +| **Pods** | Managed by local operator | Managed by local operator | +| **Services** | Created for central pods | Created by central operator | +| **ConfigMaps** | Created for central pods | Created by central operator | +| **Secrets (CA, certs)** | Created once | Replicated by central operator | +| **PersistentVolumeClaims** | Created for central pools | Created by central operator | +| **ServiceExport/Routes/Ingress** | Per networking provider | Per networking provider | + +The central operator has write access to remote clusters via kubeconfig and creates resources directly. +Remote operators are responsible only for reconciling StrimziPodSet resources (creating/updating pods). + +### Configuration Design Rationale + +#### Why Stretch Configuration is at Operator Level + +The stretch cluster configuration (`STRIMZI_CENTRAL_CLUSTER_ID`, `STRIMZI_REMOTE_KUBE_CONFIG`, plugin settings) is specified at the cluster operator level rather than at the Kafka CR or KafkaNodePool level. +This design decision addresses several important architectural and operational concerns. + +##### Infrastructure-Level Concern + +Stretch cluster functionality requires infrastructure-level connectivity and authentication to remote Kubernetes clusters. +The operator needs kubeconfig credentials and API server endpoints to create resources in remote clusters. +These credentials grant cluster wide access to Kubernetes APIs and are managed by platform administrators, not application developers. + +Placing these credentials at the operator level: + +- Aligns with Kubernetes security model where operators run with ServiceAccounts that have cluster-level permissions +- Avoids embedding sensitive kubeconfig credentials in application level CRs (Kafka, KafkaNodePool). +- Simplifies credential rotation by updating operator deployment once rather than every Kafka CR +- Prevents credential sprawl across multiple Kafka instances + +##### Single Networking Provider Per Operator + +The networking plugin (NodePort, LoadBalancer, MCS) is loaded once at operator startup via Java ServiceLoader. +The plugin JAR is mounted as a volume in the operator deployment. +The plugin's initialization code runs once when the operator starts. + +This means all Kafka clusters managed by a single operator instance must use the same networking provider. +Different networking providers cannot coexist in a single operator instance. + + +**Example scenario:** If networking provider were per Kafka CR, the operator would need to: + +Load and initialize multiple plugin JARs dynamically. +Maintain separate ClassLoaders for each plugin to avoid conflicts. +Handle plugin initialization failures on a per cluster basis. +Manage plugin lifecycle (loading/unloading) during reconciliation. +This adds significant complexity and creates potential ClassLoader isolation issues. + +##### Consistent Cluster Topology + +The `STRIMZI_CENTRAL_CLUSTER_ID` and `STRIMZI_REMOTE_KUBE_CONFIG` define the operator's view of the multi cluster topology. +All Kafka clusters managed by this operator share the same cluster topology. +Allowing different cluster topologies per Kafka CR would create ambiguity: +- Which cluster is "central" for KafkaConnect or KafkaTopic operators? +- How would the operator reconcile resources if cluster IDs conflict between different Kafka CRs? +- What happens if two Kafka CRs define different URLs for the same cluster ID? + +Operator-level configuration ensures a single, consistent cluster topology for all managed resources. + +##### Operational Simplicity + +Having stretch configuration at the operator level simplifies operations: + +**Deployment:** + +- Deploy operator once with stretch configuration. +- All subsequent Kafka CRs automatically inherit the multi-cluster capability. +- No need to repeat configuration in every Kafka CR. + +**Troubleshooting:** + +- Single source of truth for cluster connectivity issues. +- Operator logs show connectivity status to all clusters on startup. +- No need to check individual Kafka CRs to diagnose multi-cluster issues. + +##### Support for Non-Stretch Kafka Instances + +The design allows the same operator to manage both stretch and non-stretch Kafka clusters: +Operator configured with stretch settings can still manage regular (non-stretch) Kafka instances. +Kafka CRs without `strimzi.io/enable-stretch-cluster: "true"` annotation are deployed normally in the Kubernetes cluster where the CR is applied. +KafkaNodePools without `strimzi.io/stretch-cluster-alias` annotation are deployed to the cluster where the CR is applied. + + +##### Scope for KafkaConnect and KafkaBridge + +These ecosystem components are Kafka clients, not part of the Kafka cluster itself. +They must be deployed only in the central cluster where the Kafka CR resides. + +The central operator creates and manages all custom resources (Kafka, KafkaNodePool, KafkaConnect, KafkaBridge, KafkaMirrorMaker2) in the central cluster. +Remote cluster operators reconcile only StrimziPodSet resources to create Kafka pods. +The operator does not create Connect, Bridge, or MirrorMaker2 CRs in remote clusters. + +These components connect to Kafka as standard clients using bootstrap servers. +They can connect to any broker in the stretch cluster regardless of which Kubernetes cluster hosts them. +Deploying them in the central cluster provides a single point of management for all Kafka-related resources. + + +Note on MirrorMaker2: +KafkaMirrorMaker2 is used only for replication to external (non-stretch) Kafka clusters. +It is not used for stretch cluster replication, which is handled by Kafka's built-in replication mechanism. + +### Configuration and Setup + +#### Step 1: Deploy Cluster Operators + +The cluster operator must be deployed to all Kubernetes clusters (central and remote). +The operator in the central cluster requires additional configuration to access remote clusters: + +```yaml +- name: STRIMZI_CENTRAL_CLUSTER_ID + value: "cluster-central" + +- name: STRIMZI_REMOTE_KUBE_CONFIG + value: | + cluster-east.url=https://api.cluster-east.example.com:6443 + cluster-east.secret=kubeconfig-cluster-east + cluster-west.url=https://api.cluster-west.example.com:6443 + cluster-west.secret=kubeconfig-cluster-west + +- name: STRIMZI_STRETCH_PLUGIN_CLASS_NAME + value: io.strimzi.plugin.stretch.NodePortNetworkingProvider + +- name: STRIMZI_STRETCH_PLUGIN_CLASS_PATH + value: /opt/strimzi/plugins/* +``` + +**Environment Variables:** + +- `STRIMZI_CENTRAL_CLUSTER_ID` (Required): Unique identifier for the central cluster +- `STRIMZI_REMOTE_KUBE_CONFIG` (Required): Maps cluster IDs to API endpoints and credential secrets +- `STRIMZI_STRETCH_PLUGIN_CLASS_NAME` (Required): Fully qualified class name of the networking provider +- `STRIMZI_STRETCH_PLUGIN_CLASS_PATH` (Required): Classpath for loading the provider JAR + +**Creating Kubeconfig Secrets for Remote Clusters:** + +It is the user's responsibility to create Kubernetes secrets in the central cluster to enable connectivity to remote clusters. +These secrets are essential because all custom resources (Kafka, KafkaNodePool, KafkaTopic, KafkaUser) are created in the central cluster, +but the operator needs to create Kafka pods and related resources in the remote clusters. + +**Step-by-Step Process:** + +1. **Create kubeconfig files** for each remote cluster containing the necessary authentication credentials and API server endpoints. + + For example, create the following files: + - `kubeconfig-cluster-east` (kubeconfig for Kubernetes cluster `cluster-east`) + - `kubeconfig-cluster-west` (kubeconfig for Kubernetes cluster `cluster-west`) + +2. **Create secrets in the central cluster** using the kubeconfig files: + + ```bash + # Create secret for cluster-east + kubectl create secret generic kubeconfig-cluster-east \ + --from-file=kubeconfig=kubeconfig-cluster-east \ + --namespace kafka + + # Create secret for cluster-west + kubectl create secret generic kubeconfig-cluster-west \ + --from-file=kubeconfig=kubeconfig-cluster-east \ + --namespace kafka + ``` + + **Expected output:** + ``` + secret/kubeconfig-cluster-east created + secret/kubeconfig-cluster-west created + ``` + +3. **Verify the secrets** were created successfully: + + ```bash + kubectl get secrets -n kafka | grep kubeconfig + ``` + +**Secret Format:** + +Each secret must contain a `kubeconfig` key with the base64-encoded kubeconfig data: + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: kubeconfig-cluster-east + namespace: kafka # Must be created in the CENTRAL cluster +type: Opaque +data: + kubeconfig: # Credentials to access cluster-east API server +``` + +**Important Notes:** + +- Secrets must be created in the **central cluster**, not in the remote clusters +- The secret names must match those referenced in the `STRIMZI_REMOTE_KUBE_CONFIG` environment variable +- The kubeconfig must have sufficient permissions to create and manage resources in the remote clusters (StrimziPodSets, ConfigMaps, Secrets, Services, PVCs etc.) +- The central operator validates kubeconfig expiry and reports errors if credentials are about to expire +- Ensure kubeconfig credentials are properly secured and rotated according to your organization's security policies + +#### Step 2: Create Kafka and KafkaNodePool Resources + +**Kafka CR:** + +Enable stretch mode with an annotation: + +```yaml +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: my-cluster + annotations: + strimzi.io/enable-stretch-cluster: "true" +spec: + kafka: + version: 3.8.0 + listeners: + - name: plain + port: 9092 + type: internal + tls: false + - name: tls + port: 9093 + type: internal + tls: true + config: + offsets.topic.replication.factor: 3 + transaction.state.log.replication.factor: 3 + transaction.state.log.min.isr: 2 +``` + +**KafkaNodePool CRs:** + +Each node pool specifies its target cluster via annotation: + +```yaml +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaNodePool +metadata: + name: pool-central + labels: + strimzi.io/cluster: my-cluster + annotations: + strimzi.io/stretch-cluster-alias: "cluster-central" +spec: + replicas: 3 + roles: + - controller + - broker + storage: + type: jbod + volumes: + - id: 0 + type: persistent-claim + size: 100Gi +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaNodePool +metadata: + name: pool-east + labels: + strimzi.io/cluster: my-cluster + annotations: + strimzi.io/stretch-cluster-alias: "cluster-east" +spec: + replicas: 3 + roles: + - controller + - broker + storage: + type: jbod + volumes: + - id: 0 + type: persistent-claim + size: 100Gi +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaNodePool +metadata: + name: pool-west + labels: + strimzi.io/cluster: my-cluster + annotations: + strimzi.io/stretch-cluster-alias: "cluster-west" +spec: + replicas: 4 + roles: + - controller + - broker + storage: + type: jbod + volumes: + - id: 0 + type: persistent-claim + size: 100Gi +``` + +This creates a 10-node Kafka cluster (3+3+4) with controllers and brokers distributed across 3 Kubernetes clusters. + +#### Validation Rules + +The operator enforces comprehensive validation through `StretchClusterValidator`: + +##### Configuration Validation Matrix + +| Operator Configured | Kafka CR Annotation | KNP Annotations | Result | Error Code | +|---------------------|---------------------|-----------------|--------|------------| +| ❌ No | ❌ No | ❌ None | ✅ Valid: Standard deployment | - | +| ❌ No | ❌ No | ✅ Some/All have alias | ❌ **INVALID** | `OperatorNotConfigured` | +| ❌ No | ✅ Yes | ❌ None | ❌ **INVALID** | `OperatorNotConfigured` | +| ❌ No | ✅ Yes | ✅ Some/All have alias | ❌ **INVALID** | `OperatorNotConfigured` | +| ✅ Yes | ❌ No | ❌ None | ✅ Valid: Stretch configured but not used | - | +| ✅ Yes | ❌ No | ✅ Some/All have alias | ❌ **INVALID** | `MissingStretchAnnotation` | +| ✅ Yes | ✅ Yes | ❌ None (all missing) | ❌ **INVALID** | `MissingTargetCluster` | +| ✅ Yes | ✅ Yes | ⚠️ Some have alias | ❌ **INVALID** | `MissingTargetCluster` | +| ✅ Yes | ✅ Yes | ✅ All have valid aliases | ✅ Valid: Stretch cluster | - | +| ✅ Yes | ✅ Yes | ✅ All have aliases (some invalid) | ❌ **INVALID** | `InvalidTargetCluster` | + +##### Validation Checks Performed + +**1. Operator Configuration Check:** +- Validates `STRIMZI_REMOTE_KUBE_CONFIG` is set +- Validates `STRIMZI_CENTRAL_CLUSTER_ID` is set +- Validates `STRIMZI_STRETCH_PLUGIN_CLASS_NAME` is set +- Validates `STRIMZI_STRETCH_PLUGIN_CLASS_PATH` is set + +**2. Kafka CR Annotation Check:** +- Looks for `strimzi.io/enable-stretch-cluster: "true"` annotation +- Must be present when any KafkaNodePool has `strimzi.io/stretch-cluster-alias` + +**3. KafkaNodePool Annotation Check:** +- Each pool must have `strimzi.io/stretch-cluster-alias` annotation when stretch mode is enabled +- Alias value must match either `STRIMZI_CENTRAL_CLUSTER_ID` or a cluster ID in `STRIMZI_REMOTE_KUBE_CONFIG` +- ALL pools must have the annotation (no mixing of stretch and non-stretch pools) + +**4. Runtime Connectivity Check:** +- Validates API server reachability for each remote cluster +- Validates `strimzipodsets.core.strimzi.io` CRD exists in each cluster +- Validates kubeconfig credentials are valid + +**Error Messages:** + +``` +OperatorNotConfigured: "Stretch mode is enabled for Kafka cluster 'my-cluster', +but required environment variables are not properly configured. Required: +STRIMZI_REMOTE_KUBE_CONFIG, STRIMZI_CENTRAL_CLUSTER_ID, +STRIMZI_STRETCH_PLUGIN_CLASS_NAME, STRIMZI_STRETCH_PLUGIN_CLASS_PATH" + +MissingStretchAnnotation: "Kafka CR 'my-cluster' is missing required annotation +'strimzi.io/enable-stretch-cluster: true' but the following node pools have +stretch cluster annotations: pool-east, pool-west. Either add the annotation +to the Kafka CR or remove stretch annotations from all node pools." + +MissingTargetCluster: "The following node pools are missing required annotation +'strimzi.io/stretch-cluster-alias': pool-central. All node pools must specify +a target cluster when stretch mode is enabled." + +InvalidTargetCluster: "The following target cluster IDs are not configured: +cluster-unknown. Valid cluster IDs are: cluster-central, cluster-east, cluster-west. +Please check STRIMZI_CENTRAL_CLUSTER_ID and STRIMZI_REMOTE_KUBE_CONFIG." + +ConnectivityError: "Cannot connect to cluster 'cluster-east': Connection refused. +Please check kubeconfig and network connectivity." + +MissingCRD: "Required CRD 'strimzipodsets.core.strimzi.io' not found in cluster +'cluster-east'. Please ensure Strimzi CRDs are installed in all target clusters." +``` + +Reconciliation fails immediately with descriptive errors for invalid configurations. + + + +### Low-Level Design and Implementation + +#### Garbage Collection for Remote Resources + +One of the key challenges in stretch clusters is ensuring proper cleanup of resources created in remote clusters. +Kubernetes garbage collection relies on `OwnerReferences`, but these do not work across cluster boundaries. + +**Problem:** +- Central operator creates StrimziPodSets, ConfigMaps, Secrets, Services in remote clusters +- These resources cannot have OwnerReferences pointing to Kafka CR in central cluster +- Standard Kubernetes garbage collection cannot cascade delete these resources + +**Solution: Garbage Collector ConfigMap** + +The operator creates a special "garbage collector" ConfigMap in each remote cluster. + +This ConfigMap has **NO ownerReferences** - it is a standalone resource that the operator explicitly manages: + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: my-cluster-kafka-gc + namespace: kafka + labels: + app.kubernetes.io/name: kafka + app.kubernetes.io/instance: my-cluster + strimzi.io/cluster: my-cluster + strimzi.io/kind: Kafka + strimzi.io/component-type: garbage-collector + # NO ownerReferences - this is intentional! + # Cross-cluster owner references don't work +data: + cluster-id: "cluster-east" + kafka-cluster: "my-cluster" + namespace: "kafka" + purpose: "Garbage collection anchor for remote cluster resources" + managed-resources: "StrimziPodSet,ServiceAccount,Secret,ConfigMap,Service,ServiceExport,PersistentVolumeClaim" +``` + +The GC ConfigMap must be a standalone resource that the operator explicitly creates and deletes. + +All remote cluster resources (StrimziPodSets, ConfigMaps, Services, etc.) set this GC ConfigMap as their owner: + +```java +private List addGarbageCollectorOwnerReference( + String targetClusterId, List resources) { + String gcConfigMapName = KafkaResources.kafkaComponentName(reconciliation.name()) + "-gc"; + String gcUid = gcConfigMapUids.get(targetClusterId); + + OwnerReference gcOwnerRef = new OwnerReferenceBuilder() + .withApiVersion("v1") + .withKind("ConfigMap") + .withName(gcConfigMapName) + .withUid(gcUid) + .withController(false) + .withBlockOwnerDeletion(false) + .build(); + + return resources.stream() + .map(resource -> { + resource.getMetadata().setOwnerReferences( + List.of(gcOwnerRef)); + return resource; + }) + .toList(); +} +``` + +**Deletion Flow:** + +1. User deletes Kafka CR from central cluster +2. Kubernetes deletes the Kafka CR immediately +3. Operator's watch detects the deletion and triggers reconciliation +4. Since the CR no longer exists, the operator's `delete()` method is invoked +5. The `delete()` method explicitly deletes GC ConfigMap from each remote cluster: + ```java + private Future deleteGarbageCollectorConfigMaps(Reconciliation reconciliation) { + for (String clusterId : remoteClusterIds) { + ConfigMapOperator configMapOp = getConfigMapOperatorForCluster(clusterId); + configMapOp.reconcile(reconciliation, namespace, gcConfigMapName, null); + } + } + ``` +6. Kubernetes garbage collector in each remote cluster cascades deletion to all resources owned by the GC ConfigMap +7. All StrimziPodSets, ConfigMaps, Services, PVCs in remote clusters are automatically deleted + +**Key Design Points:** + +The GC ConfigMap itself has **no ownerReferences** because cross-cluster ownership doesn't work. +Remote resources (Services, ConfigMaps, Secrets) have the GC ConfigMap as their owner (same-cluster ownership). +The operator explicitly manages the GC ConfigMap lifecycle (create on reconcile, delete on Kafka CR deletion). +This two-level ownership pattern enables proper garbage collection across cluster boundaries. + +#### Kafka Configuration Generation + +The networking provider is responsible for generating Kafka configuration that enables cross cluster communication. + +**advertised.listeners Configuration:** + +For each broker, the operator calls the provider's `generateAdvertisedListeners()` method: + +```java +// Example: NodePort provider returns +"REPLICATION-9091://10.21.37.21:31001,PLAIN-9092://10.21.37.21:31002,TLS-9093://10.21.37.21:31003" + +// Example: MCS provider returns +"REPLICATION-9091://my-cluster-pool-central-0.cluster-east.my-cluster-kafka-brokers.kafka.svc.clusterset.local:9091,..." +``` + +The operator writes this directly to the broker's configuration file. + +**controller.quorum.voters Configuration:** + +The operator collects all controller pods from the Kafka model and calls the networking provider's `generateQuorumVoters()` method: + +```java +// Operator builds list of controller pods across all clusters +List controllerInfos = new ArrayList<>(); +for (NodeRef node : kafka.controllerNodes()) { + controllerInfos.add(new ControllerPodInfo( + node.nodeId(), // e.g., 0, 1, 2 + node.podName(), // e.g., "my-cluster-pool-central-0" + getClusterIdForNode(node) // e.g., "cluster-central" + )); +} + +// Provider generates quorum voters string +String quorumVoters = networkingProvider.generateQuorumVoters( + reconciliation, namespace, controllerInfos, "replication" +); + +// Example: NodePort provider returns +"0@10.21.37.21:31093,1@10.21.37.22:31093,2@10.21.50.10:31093,..." + +// Example: MCS provider returns +"0@my-cluster-pool-central-0.cluster-central.my-cluster-kafka-brokers.kafka.svc.clusterset.local:9091,2@my-cluster-pool-east-2.cluster-east.my-cluster-kafka-brokers.kafka.svc.clusterset.local:9091,...." +``` + +This configuration is written to the KRaft configuration file for all brokers and controllers. + +**Certificate SANs for TLS:** + +When TLS is enabled, broker and controller certificates include Subject Alternative Names for cross cluster DNS resolution. + +For stretch mode, additional SANs are generated based on the provider's `generatePodDnsName()` and `generateServiceDnsName()` methods: + +```bash +# Regular SANs (all modes) +DNS:my-cluster-pool-central-0.my-cluster-kafka-brokers.kafka.svc.cluster.local +DNS:my-cluster-kafka-brokers.kafka.svc + +# Additional SANs in stretch mode (example from MCS provider) +DNS:my-cluster-pool-central-0.cluster-east.my-cluster-kafka-brokers.kafka.svc.clusterset.local +``` + +This ensures TLS hostname verification succeeds when brokers/controllers connect across clusters. + +#### Remote Resource Creation + +The central operator creates resources in remote clusters using the `RemoteResourceOperatorSupplier`: + +```java +// Get operator supplier for target cluster +ResourceOperatorSupplier supplier = + remoteResourceOperatorSupplier.getResourceOperatorSupplier(targetClusterId); + +// Create ConfigMap in remote cluster +ConfigMapOperator configMapOp = supplier.configMapOperations; +configMapOp.reconcile(reconciliation, namespace, configMapName, configMap) + .onComplete(result -> { + if (result.succeeded()) { + LOGGER.info("Created ConfigMap {} in cluster {}", configMapName, targetClusterId); + } + }); +``` + +The `RemoteResourceOperatorSupplier` maintains a map of cluster IDs to `ResourceOperatorSupplier` instances, each configured with the appropriate kubeconfig for that cluster. + +#### Status Reporting + +The Kafka CR status is enhanced to show comprehensive information about the stretch cluster deployment: + +**Example Kafka CR Status (using MCS provider):** + +```yaml +status: + observedGeneration: 1 + kafkaVersion: 3.8.0 + kafkaMetadataVersion: 3.8-IV0 + kafkaMetadataState: KRaft + clusterId: fPb3DwyWQpabxMy0SzaxUg + operatorLastSuccessfulVersion: 0.48.0 + + # Lists all KafkaNodePools in the cluster + kafkaNodePools: + - name: pool-central + - name: pool-east + - name: pool-west + + # Stretch cluster indicator condition + conditions: + - type: StretchCluster + status: "True" + reason: StretchClusterActive + message: "Stretch cluster active: provider=mcs, central=cluster-central, clusters=cluster-central,cluster-east,cluster-west" + lastTransitionTime: "2025-11-29T10:00:00Z" + - type: Ready + status: "True" + lastTransitionTime: "2025-11-29T10:00:15Z" + + # Listener addresses from ALL clusters + listeners: + # Internal plain listener - shows DNS names from all 3 clusters + - name: plain + addresses: + - host: my-cluster-kafka-bootstrap-plain.cluster-central.kafka.svc.clusterset.local + port: 9092 + - host: my-cluster-kafka-bootstrap-plain.cluster-east.kafka.svc.clusterset.local + port: 9092 + - host: my-cluster-kafka-bootstrap-plain.cluster-west.kafka.svc.clusterset.local + port: 9092 + bootstrapServers: >- + my-cluster-kafka-bootstrap-plain.cluster-central.kafka.svc.clusterset.local:9092, + my-cluster-kafka-bootstrap-plain.cluster-east.kafka.svc.clusterset.local:9092, + my-cluster-kafka-bootstrap-plain.cluster-west.kafka.svc.clusterset.local:9092 + + # Internal TLS listener with certificates + - name: tls + addresses: + - host: my-cluster-kafka-bootstrap-tls.cluster-central.kafka.svc.clusterset.local + port: 9093 + - host: my-cluster-kafka-bootstrap-tls.cluster-east.kafka.svc.clusterset.local + port: 9093 + - host: my-cluster-kafka-bootstrap-tls.cluster-west.kafka.svc.clusterset.local + port: 9093 + bootstrapServers: >- + my-cluster-kafka-bootstrap-tls.cluster-central.kafka.svc.clusterset.local:9093, + my-cluster-kafka-bootstrap-tls.cluster-east.kafka.svc.clusterset.local:9093, + my-cluster-kafka-bootstrap-tls.cluster-west.kafka.svc.clusterset.local:9093 + certificates: + - | + -----BEGIN CERTIFICATE----- + MIIFLTCCAxWgAwIBAgIU... [REDACTED] ... + -----END CERTIFICATE----- + + # External Route listener (OpenShift) - shows actual route hostnames + - name: external + type: route + addresses: + - host: my-cluster-kafka-external-bootstrap.apps.cluster-central.example.com + port: 443 + - host: my-cluster-kafka-external-bootstrap.apps.cluster-east.example.com + port: 443 + - host: my-cluster-kafka-external-bootstrap.apps.cluster-west.example.com + port: 443 + bootstrapServers: >- + my-cluster-kafka-external-bootstrap.apps.cluster-central.example.com:443, + my-cluster-kafka-external-bootstrap.apps.cluster-east.example.com:443, + my-cluster-kafka-external-bootstrap.apps.cluster-west.example.com:443 + certificates: + - | + -----BEGIN CERTIFICATE----- + MIIFLTCCAxWgAwIBAgIU... [REDACTED] ... + -----END CERTIFICATE----- +``` + +**Key Status Features:** + +1. **`kafkaNodePools` field**: Lists all node pools participating in the stretch cluster +2. **`StretchCluster` condition**: Indicates stretch mode is active, shows provider name and cluster list +3. **Multi-cluster listener addresses**: Each listener shows bootstrap addresses from all clusters +4. **Provider-specific DNS**: Format depends on networking provider: + - MCS: `service.clusterID.namespace.svc.clusterset.local` + - NodePort: `nodeIP:nodePort` + - LoadBalancer: `loadBalancerIP:port` + +**KafkaNodePool Status:** + +KafkaNodePool CR status remains unchanged from standard Strimzi deployments. The status fields are identical to single cluster mode: + +```yaml +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaNodePool +metadata: + name: pool-east + annotations: + strimzi.io/stretch-cluster-alias: "cluster-east" +status: + observedGeneration: 1 + replicas: 2 + nodeIds: + - 1 + - 3 + roles: + - controller + - broker + conditions: + - type: Ready + status: "True" + lastTransitionTime: "2025-11-29T10:00:00Z" +``` + +The target cluster information is only present in the metadata annotation (`strimzi.io/stretch-cluster-alias`), not in the status. + +**Degraded State Handling:** + +If a remote cluster becomes unavailable: +- The KafkaNodePool for that cluster transitions to `Ready: False` with standard error conditions +- The Kafka CR status shows degraded state but Kafka cluster continues operating with remaining nodes +- Client applications can still connect using bootstrap addresses from available clusters + +#### Rolling Updates + +Kafka pods follow the standard rolling update process, with the central operator determining which pods need restart based on: +- Revision hash changes (configuration updates) +- Certificate updates +- CA renewals +- Storage resizing + +The operator uses the `strimzi.io/stretch-cluster-alias` annotation on each pod to determine the target cluster and performs the rolling restart via the appropriate `ResourceOperatorSupplier`. + +#### Reconciliation Flow + +The reconciliation flow for stretch clusters extends the standard Strimzi reconciliation. The key stretch specific steps are: + +1. **Validation**: Validate stretch configuration using `StretchClusterValidator` (cluster IDs, kubeconfig validity, annotation consistency) +2. **Garbage Collector ConfigMap**: Create GC ConfigMaps in remote clusters for owner reference tracking +3. **Listeners Reconciliation**: Reconcile Services, Routes, Ingresses for each cluster with cluster specific endpoints +4. **Networking Resources**: Call provider's `createNetworkingResources()` for cross cluster pod communication (ServiceExports, LoadBalancers, etc.) +5. **CA and Certificate Secrets**: Create/update CA and broker/controller certificate secrets in all clusters with cross cluster DNS SANs +6. **Configuration Generation**: Call provider's `generateAdvertisedListeners()` and `generateQuorumVoters()` to build Kafka broker configuration +7. **ConfigMaps**: Generate and distribute broker/controller ConfigMaps to all clusters with stretch specific configuration +8. **StrimziPodSets**: Create StrimziPodSets in target clusters with GC ConfigMap owner reference for garbage collection +9. **Rolling Updates**: Perform rolling updates if configuration or pod specs changed +10. **Pods Ready**: Wait for all pods across all clusters to become ready +11. **Listener Status**: Aggregate listener addresses from all clusters (Routes, Ingresses, LoadBalancers, internal DNS) +12. **Kafka Status**: Update Kafka CR status with stretch cluster condition, listener addresses, and node pool statuses + +The actual implementation includes additional steps for PVCs, service accounts, network policies, quotas, and cleanup operations. +Each step handles both central and remote clusters with proper error handling and continues reconciliation even if individual clusters are temporarily unavailable. + + + +##### Overall Architecture diagram + +##### 1. Plugin Loading and Reconciliation Flow + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ OPERATOR STARTUP - PLUGIN LOADING │ +└─────────────────────────────────────────────────────────────────────────────┘ + +1. Read Environment Variables + ┌──────────────────────────────────────┐ + │ STRIMZI_CENTRAL_CLUSTER_ID │ + │ STRIMZI_REMOTE_KUBE_CONFIG │ + │ STRIMZI_STRETCH_PLUGIN_CLASS_NAME │ + │ STRIMZI_STRETCH_PLUGIN_CLASS_PATH │ + └──────────────────┬───────────────────┘ + │ + ▼ +2. Initialize Remote Cluster Connections + ┌──────────────────────────────────────┐ + │ RemoteResourceOperatorSupplier │ + │ • Load kubeconfig secrets │ + │ • Create K8s client for each cluster │ + │ • Validate connectivity │ + └──────────────────┬───────────────────┘ + │ + ▼ +3. Load Networking Plugin + ┌──────────────────────────────────────┐ + │ StretchNetworkingProviderFactory │ + │ │ + │ All providers are plugins! │ + │ (NodePort, LoadBalancer, MCS, etc.) │ + └──────────────────┬───────────────────┘ + │ + ▼ + ┌─────────────────────────────────────┐ + │ Plugin Loading Steps: │ + │ 1. Parse PLUGIN_CLASS_PATH │ + │ 2. Find JAR files in path │ + │ 3. Create URLClassLoader │ + │ 4. Load class by PLUGIN_CLASS_NAME │ + │ 5. Verify implements │ + │ StretchNetworkingProvider │ + │ 6. Instantiate new Provider() │ + └──────────────────┬───────────────────┘ + │ + ▼ + ┌──────────────────────────────────────┐ + │ provider.init(config, suppliers) │ + │ • Initialize with configuration │ + │ • Store cluster suppliers │ + │ • Validate plugin requirements │ + └──────────────────┬───────────────────┘ + │ + ▼ + ┌──────────────────────────────────────┐ + │ Plugin Ready - Operator Starts │ + └──────────────────────────────────────┘ + +┌─────────────────────────────────────────────────────────────────────────────┐ +│ KAFKA CR RECONCILIATION FLOW │ +└─────────────────────────────────────────────────────────────────────────────┘ + +User applies Kafka CR with annotation: +strimzi.io/enable-stretch-cluster: "true" + │ + ▼ +┌──────────────────────────────────────┐ +│ 1. Validation Phase │ +│ StretchClusterValidator │ +│ • Check operator configuration │ +│ • Verify Kafka CR annotation │ +│ • Validate KafkaNodePool annotations │ +│ • Check cluster connectivity │ +│ • Validate kubeconfig credentials │ +└──────────────────┬───────────────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ 2. Build Kafka Cluster Model │ +│ KafkaCluster.fromCrd(...) │ +│ • Parse Kafka CR │ +│ • Parse all KafkaNodePools │ +│ • Determine node distribution │ +│ - pool-central → cluster-central │ +│ - pool-east → cluster-east │ +│ - pool-west → cluster-west │ +└──────────────────┬───────────────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ 3. Create Garbage Collector │ +│ ConfigMaps │ +│ • Create in each remote cluster │ +│ • NO ownerReferences (standalone) │ +│ • Will be owner of ALL remote │ +│ resources (StrimziPodSet, │ +│ ConfigMap, Service, Secret, PVC) │ +└──────────────────┬───────────────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ 4. Reconcile CA and Certificates │ +│ • Generate cluster CA │ +│ • Generate broker certificates │ +│ • Add cross-cluster DNS SANs │ +│ (via provider.generatePodDnsName) │ +│ • Replicate secrets to all clusters │ +│ • Set GC ConfigMap as owner │ +└──────────────────┬───────────────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ 5. Create Networking Resources │ +│ FOR EACH pod in each cluster: │ +│ provider.createNetworkingResources │ +│ • NodePort Plugin: NodePort Svc │ +│ • LoadBalancer Plugin: LB Svc │ +│ • MCS Plugin: Svc + ServiceExport │ +│ • Set GC ConfigMap as owner │ +└──────────────────┬───────────────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ 6. Generate Kafka Configuration │ +│ FOR EACH broker: │ +│ • advertised.listeners: │ +│ provider.generateAdvertised │ +│ Listeners(...) │ +│ • controller.quorum.voters: │ +│ provider.generateQuorumVoters │ +│ (...) │ +│ • broker.rack: cluster-id │ +└──────────────────┬───────────────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ 7. Create ConfigMaps │ +│ • server.properties with Kafka cfg │ +│ • Distribute to all clusters │ +│ • Set GC ConfigMap as owner │ +└──────────────────┬───────────────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ 8. Reconcile StrimziPodSets │ +│ FOR EACH KafkaNodePool: │ +│ • Get target cluster ID │ +│ • Get ResourceOperatorSupplier │ +│ for that cluster │ +│ • Create/Update StrimziPodSet │ +│ • Set GC ConfigMap as owner │ +└──────────────────┬───────────────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ 9. Wait for Pods Ready │ +│ • Watch pods in ALL clusters │ +│ • Wait for readiness probes │ +│ • Verify Kafka cluster formation │ +└──────────────────┬───────────────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ 10. Reconcile Services/Routes │ +│ • Create bootstrap services │ +│ • Create per-broker services │ +│ • Provider-specific resources │ +│ • Aggregate listener addresses │ +│ • Set GC ConfigMap as owner │ +└──────────────────┬───────────────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ 11. Update Kafka CR Status │ +│ • kafkaNodePools: [list] │ +│ • conditions: │ +│ - StretchCluster: True │ +│ - Ready: True │ +│ • listeners: [addresses from ALL │ +│ clusters] │ +└──────────────────┬───────────────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ Reconciliation Complete │ +│ • Kafka cluster operational │ +│ • Clients can connect via any │ +│ cluster's bootstrap address │ +└──────────────────────────────────────┘ +``` + +##### 2. Provider SPI Architecture + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ NETWORKING PROVIDER ABSTRACTION │ +└─────────────────────────────────────────────────────────────────────────────┘ + + ┌────────────────────────────────┐ + │ <> │ + │ StretchNetworkingProvider │ + ├────────────────────────────────┤ + │ + init(...) │ + │ + createNetworkingResources() │ + │ + discoverPodEndpoint() │ + │ + generateAdvertisedListeners()│ + │ + generateQuorumVoters() │ + │ + generatePodDnsName() │ + │ + generateServiceDnsName() │ + │ + generateCertificateSans() │ + │ + deleteNetworkingResources() │ + │ + getProviderName() │ + └────────────────┬───────────────┘ + │ implements + │ + ┌────────────────┴────────────────┐ + │ │ + │ ALL are External Plugins │ + │ (Loaded from JAR files) │ + │ │ + └────────────────┬────────────────┘ + │ + ┌────────────────────────────┼────────────────────────────┐ + │ │ │ + ┌────▼─────┐ ┌───────────▼──────┐ ┌──────────▼─────┐ + │ NodePort │ │ LoadBalancer │ │ MCS │ + │ Plugin │ │ Plugin │ │ Plugin │ + │(Strimzi) │ │ (Strimzi) │ │ (Community) │ + └────┬─────┘ └────────┬─────────┘ └────────┬───────┘ + │ │ │ + ▼ ▼ ▼ +┌───────────────┐ ┌────────────────────┐ ┌──────────────────┐ +│Creates: │ │Creates: │ │Creates: │ +│• NodePort │ │• LoadBalancer │ │• Service │ +│ Service │ │ Service │ │• ServiceExport │ +│ │ │ │ │ │ +│Returns: │ │Returns: │ │Returns: │ +│10.21.37.21: │ │10.21.50.10:9091 │ │pod.cluster2.svc. │ +│31001 │ │ │ │clusterset.local: │ +│ │ │ │ │9091 │ +└───────────────┘ └────────────────────┘ └──────────────────┘ + + ┌────────────────────────────────────────────────────┐ + │ Custom Plugin Example │ + │ (User-provided, any networking solution) │ + │ │ + │ ┌────────────────────────────────────┐ │ + │ │ com.example.MyCustomNetworking │ │ + │ │ Plugin │ │ + │ │ │ │ + │ │ Returns: custom format endpoints │ │ + │ └────────────────────────────────────┘ │ + └────────────────────────────────────────────────────┘ + +Note: ALL networking providers are external plugins loaded from JARs. +The operator has NO built-in networking implementations. +Strimzi provides reference implementations (NodePort, LoadBalancer) as plugins. +``` + +### Testing Strategy + +#### Unit and Integration Tests + +The stretch cluster implementation includes comprehensive test coverage: + +**Plugin Loading Tests** (`StretchNetworkingProviderPluginLoadingTest`) +- Plugin discovery via ServiceLoader +- ClassLoader isolation +- Provider initialization with valid/invalid configurations +- Error handling for missing plugins or initialization failures + +**Integration Tests** (`StretchNetworkingProviderIntegrationTest`) +- Full reconciliation flow with test provider +- Endpoint discovery and configuration generation +- Resource creation and deletion +- Multi cluster scenarios + +**Reconciliation Flow Tests** (`StretchReconciliationFlowTest`) +- Kafka CR reconciliation with stretch mode enabled +- StrimziPodSet creation in remote clusters +- ConfigMap generation with custom configuration +- Garbage collection flow + +**Test Provider** (`TestNetworkingProvider`) +- Fully functional reference implementation +- In-memory endpoint storage +- Configurable behavior for testing error scenarios +- Validates provider contract compliance + + + +#### System Testing Approach + +The feature design enables comprehensive system testing: +- Multi-cluster Kafka deployment validation +- Pod creation and lifecycle across all clusters +- Produce/consume functionality testing +- Rolling update procedures +- Cluster failure scenarios +- Garbage collection verification + +#### Test Coverage + +The stretch cluster feature includes comprehensive automated test coverage: + +**Unit and Integration Tests:** +- **Plugin Loading Tests**: Validate plugin discovery, ClassLoader isolation, initialization, and error handling +- **SPI Contract Tests**: Full provider lifecycle testing using MockKube3 (3-cluster simulation) +- **Validation Tests**: Operator configuration, Kafka CR validation, error codes and messages +- **Reconciliation Tests**: Stretch mode annotations, scaling operations, configuration propagation +- **Test Provider**: Reference implementation demonstrating complete SPI contract + +**System Testing:** + +The plugin architecture enables flexible system testing approaches: + +- End-to-end validation can be performed with any networking provider (NodePort, LoadBalancer, MCS) +- Multi-cluster test environments can use lightweight solutions (kind, minikube) +- Community members can validate the feature with their preferred networking solutions + +Standard Strimzi system test infrastructure will validate basic stretch cluster functionality with NodePort provider. + +#### Performance Testing Methodology + +**Tools:** +- **OpenMessaging Benchmark (OMB):** Throughput and latency measurement +- **Chaos Mesh:** Network latency injection for sensitivity testing +- **Automation:** Python scripts for automated test execution and result collection + +**Test Scenarios:** +1. **Baseline:** same datacenter deployment (validates full throughput) +2. **Latency Sensitivity:** Progressive latency injection (0ms, 10ms, 50ms, 100ms) +3. **Breaking Point:** Binary search to find Kafka stability threshold + +**Conservative Test Methodology:** +Latency sensitivity tests use Chaos Mesh NetworkChaos CR to inject artificial delay between **all pods in all clusters**, including intra-cluster communication. +This represents a **worst-case scenario** more pessimistic than real world deployments, where only cross cluster communication experiences added latency. +Real production performance in same datacenter multi-AZ deployments will be significantly better than these conservative test results. + +**Validation Criteria:** +- same datacenter deployment maintains >= 90% of single cluster throughput +- P99 latency remains < 500ms for same site deployments +- Zero pod crashes or restarts under normal conditions +- Graceful degradation under network latency (no crashes until 750ms threshold) + +**Validation Results:** + +Performance testing using OMB and Chaos Mesh validates the performance characteristics described in the "Performance Characteristics and Operational Guidelines" section. +The testing methodology can be documented and can be replicated by users to validate their own deployments. + +#### Validation Approach + +**Operator and Kafka Upgrades:** + +Standard Strimzi upgrade testing applies to stretch clusters. +The implementation includes validation that all operators run compatible versions, and standard rolling update mechanisms work across multiple clusters. + +**Plugin Testing:** + +Reference implementations (NodePort, LoadBalancer) maintained by Strimzi include comprehensive test coverage. +Third party plugins (e.g., MCS) are maintained independently with their own testing approaches. +The SPI design allows plugin authors to validate their implementations without requiring changes to core Strimzi tests. + +### Additional Considerations + +#### External Access + +External listener types (Route, Ingress, LoadBalancer, NodePort) work in stretch mode: +- Each cluster exposes its own external endpoints +- Kafka CR status lists all bootstrap addresses from all clusters +- Clients can connect to any cluster's bootstrap address +- Kafka handles client redirection to appropriate brokers + + +#### Rack Awareness + +Rack awareness works across clusters by using topology labels: +- Each cluster can have different zone labels +- The `strimzi.io/stretch-cluster-alias` annotation serves as the primary topology key +- Additional zone labels within each cluster provide finer-grained replica distribution + +Example topology configuration: + +```yaml +spec: + kafka: + rack: + topologyKey: topology.kubernetes.io/zone +``` + +Kafka distributes replicas considering both cluster boundaries and zone labels within clusters. + +#### Disaster Recovery + +If the central cluster fails, Kafka brokers and controllers continue operating. +Administrative control can be restored using one of these approaches: + +**Manual Recovery:** + +1. Deploy a new Kubernetes cluster or select an existing remote cluster +2. Deploy the cluster operator +3. Apply the Kafka and KafkaNodePool CRs +4. Update the `STRIMZI_CENTRAL_CLUSTER_ID` to the new cluster's ID +5. Operator resumes management of the stretch cluster + +**GitOps Recovery:** +1. Store all CRs in a Git repository managed by ArgoCD or Flux +2. GitOps tool automatically applies CRs to the new central cluster +3. Operator resumes management + +The key insight is that the Kafka cluster continues serving client requests during central cluster failure. +Only administrative operations (scaling, configuration changes) are temporarily unavailable. + +### Performance Characteristics and Operational Guidelines + +Extensive testing validates that stretch Kafka clusters deliver excellent performance for **same datacenter multi-availability-zone deployments**, the primary and recommended use case for this feature. + +#### Validated Performance - same datacenter Deployment + +Testing was performed across three OpenShift clusters deployed within the same data center, representing the optimal deployment scenario for stretch clusters. + +**Test Configuration:** +- **Infrastructure:** 3 OpenShift clusters, same site +- **Kafka Topology:** 5 controllers (2+2+1), 27 brokers (9+9+9) +- **Networking:** Cilium with Multi-Cluster Services (MCS) API +- **Testing Tool:** OpenMessaging Benchmark (OMB) with Kafka driver +- **Workload:** 1 topic, 16 partitions, 1KB messages +- **Latency Testing:** Chaos Mesh NetworkChaos CR for controlled latency injection + +**Baseline Results (0ms Added Latency):** + +| Metric | same site Stretch Cluster | Expected single cluster | Overhead | +|--------|--------------------------|-------------------------|----------| +| **Throughput** | **50,000 msg/sec** | ~50,000 msg/sec | **0%** ✅ | +| Avg Latency | 80ms | 45ms | +78% | +| P99 Latency | 494ms | 175ms | +182% | +| Error Rate | 0% | 0% | 0% | + +**Key Finding:** Stretch clusters deployed in same datacenter configurations **maintain full throughput** while providing high availability across independent Kubernetes clusters and availability zones. +The latency increase is acceptable for most production workloads and provides significant operational benefits through fault domain isolation. + +**Testing Methodology Note:** Additional latency sensitivity tests (10ms, 50ms, 100ms) used Chaos Mesh to inject artificial latency between **all pods**, including those within the same cluster. +This represents a **worst-case scenario**, real world same datacenter deployments only experience latency on cross cluster communication, resulting in significantly better performance than these conservative test results suggest. + +#### Use Case Suitability + +Stretch Kafka clusters are **optimized for specific deployment scenarios**. +The following guidance helps users determine when stretch clusters provide value: + +##### NOT Recommended scenario for Stretch clusters + +**Scenarios to Avoid:** +| Deployment | Latency | Reason | +|------------|---------|--------| +| Regional (> 200km) | > 100ms | 96%+ throughput loss, constant publish timeouts | +| cross region | > 150ms | System effectively unusable | +| Intercontinental | > 250ms | Constant failures | + +**Alternative:** Use **MirrorMaker 2** for cross region disaster recovery. MM2 provides asynchronous replication optimized for high-latency scenarios and is the correct tool for geographic distribution. + +#### Authentication and Security + +The operator is agnostic to the authentication method used for remote cluster access. +Common approaches include: + +- **ServiceAccount tokens**: Long-lived token stored in Secret +- **mTLS certificates**: Client certificate authentication +- **OIDC**: Integration with identity provider +- **Cloud IAM**: AWS IAM, GCP IAM, Azure AD + +Users must ensure kubeconfig credentials are properly secured and rotated. +The operator validates kubeconfig expiry and reports errors before credentials expire. + +#### Entity Operator + +**Deployment:** +- Deployed only in the central cluster +- Manages KafkaTopic and KafkaUser resources +- No changes required for stretch cluster support + +**Functionality:** +- Standard Strimzi Topic Operator for topic management +- Standard Strimzi User Operator for ACL and SCRAM user management +- Connects to Kafka using standard bootstrap addresses + +#### Drain Cleaner + +**Deployment:** +- Deployed to all clusters +- Functions independently in each cluster +- No changes required + +**Behavior:** +- Evicts pods during node drain operations +- Each cluster's Drain Cleaner only affects pods in that cluster +- No cross cluster coordination needed + +## Affected/Not Affected Projects + +This proposal impacts only the `strimzi-kafka-operator` project. + +Changes include: +- New `StretchNetworkingProvider` SPI in `cluster-operator` module +- New `StretchNetworkingProviderFactory` for plugin loading +- New `RemoteResourceOperatorSupplier` for multi-cluster access +- Enhanced `KafkaReconciler` with stretch cluster logic +- New `StretchClusterConfig` and `StretchClusterValidator` for configuration management +- Garbage collection ConfigMap implementation +- Enhanced status reporting + +No changes to: +- `strimzi-topic-operator` +- `strimzi-user-operator` +- `kafka-agent` +- `strimzi-drain-cleaner` +- CRD definitions (annotations only) + +## Rejected Alternatives + +### Built-in Networking Providers + +**Rejected:** Including NodePort, LoadBalancer, or MCS providers in the core operator. + +**Reason:** +- Increases operator complexity and binary size +- Creates maintenance burden for networking code +- Limits extensibility for cloud-specific solutions +- Makes testing more difficult + +**Alternative:** All providers are external plugins, including Strimzi-maintained reference implementations. + +### Automated Central Cluster Failover + +**Rejected:** Automatic detection of central cluster failure and promotion of a remote cluster. + +**Reason:** +- Requires distributed consensus mechanism (Raft, etcd, ZooKeeper) +- Complex state replication across clusters +- Risk of split-brain scenarios +- Significant implementation complexity + +**Alternative:** Manual or GitOps-based recovery, which is simpler and more predictable. + +### CR Replication with Standby Mode + +**Rejected:** Replicating Kafka and KafkaNodePool CRs to remote clusters with `strimzi.io/standby=true` annotation. + +**Reason:** +- Adds complexity in keeping replicated CRs in sync +- Risk of configuration drift between copies +- Requires additional logic to activate standby CRs during failover + +**Alternative:** GitOps tools naturally provide CR storage and replication to new clusters. + +## Compatibility + +### Operator Version Compatibility + +**Requirement:** All operators in a stretch deployment MUST run the same Strimzi version. + +**Reason:** Stretch clusters require coordinated reconciliation across clusters. Version mismatches can cause: +- Incompatible StrimziPodSet reconciliation logic +- Status reporting discrepancies +- Resource format incompatibilities + +**Validation:** The central operator validates remote operator versions during startup by querying the Kubernetes API server version and checking for Strimzi-specific CRDs. +If version mismatches are detected, the operator: +1. Logs critical error with version details +2. Skips stretch cluster reconciliation for safety +3. Continues normal reconciliation for non-stretch Kafka clusters + +**Upgrade Process:** + +1. **Plan upgrade window:** Brief disruption during operator restarts +2. **Upgrade central operator:** Update Deployment in central cluster +3. **Upgrade remote operators:** Update Deployments in remote clusters (recommended: use GitOps for consistency) +4. **Validation:** Central operator automatically validates version compatibility on startup +5. **Resume operations:** Stretch cluster reconciliation resumes if validation passes + +**Rollback:** If issues occur, downgrade all operators to previous version together. + +### Kafka Version Upgrades + +Kafka version upgrades follow standard Strimzi rolling update procedures with no special stretch specific handling: + +**Process:** +1. Central operator orchestrates rolling update +2. Updates happen pod-by-pod to maintain quorum +3. Each cluster's brokers/controllers are updated independently +4. No client downtime if quorum is maintained + +**Example:** +```bash +# Standard Kafka version upgrade (no stretch specific changes) +kubectl patch kafka my-cluster --type merge -p \ + '{"spec":{"kafka":{"version":"4.0.0"}}}' +``` + +The operator handles rolling updates across all clusters while ensuring Kafka quorum is maintained. + +### Plugin Compatibility + +**SPI Stability:** The `StretchNetworkingProvider` interface follows semantic versioning: +- **Minor version changes:** Backward-compatible additions (new optional methods) +- **Major version changes:** Breaking changes (method signature changes, removals) + +**Plugin Validation:** Operator validates plugin compatibility at startup: +1. Loads plugin via ServiceLoader +2. Checks plugin-declared operator version requirements +3. Fails fast with clear error if incompatible + +**Deprecation Policy:** Breaking SPI changes trigger major version bump. Old plugins continue working with deprecation warnings for 2 releases before removal. + +**Kafka CR configurable networking providers:** If networking provider were configurable per Kafka CR, the operator would need to: +- Load and initialize multiple plugin JARs dynamically +- Maintain separate ClassLoaders for each plugin to avoid conflicts +- Handle plugin initialization failures on a per-cluster basis +- Manage plugin lifecycle (loading/unloading) during reconciliation + +This adds significant complexity and creates potential ClassLoader isolation issues. + + +### Backward Compatibility + +- **Existing Clusters:** Unaffected by stretch cluster feature unless explicitly enabled. +- **No CRD Changes:** All stretch configuration uses annotations (no CRD schema changes). +- **No Behavior Changes:** Operators without stretch configuration behave identically to previous versions. +- **Upgrade Path:** Users can upgrade to operators with stretch support without any changes to existing deployments. + + +## Summary + +This proposal introduces stretch Kafka cluster support to Strimzi through: + +1. **Plugin Architecture**: Service Provider Interface (SPI) for networking implementations +2. **Multi-Cluster Management**: Central operator creates resources across clusters +3. **Garbage Collection**: ConfigMap-based owner references for proper cleanup +4. **Flexible Networking**: Support for MCS, NodePort, LoadBalancer, and custom providers +5. **High Availability**: Kafka continues operating even if central cluster fails +6. **Clean Separation**: Networking logic in plugins, core operator focuses on orchestration + +The implementation maintains backward compatibility, requires no CRD changes, and provides a clear upgrade path for users. +