|
Note
|
This lab assumes that the From Edge to Streams Processing has been completed. If you haven’t, please ask your instructor to set your cluster state for you. |
In this workshop you will use Streams Replication Manager (SRM) to replicate Kafka topics across clusters.
The labs in this workshop will require two clusters so that we can configure replication between them. If you were assigned two clusters by your instructor, you can perform all the configuration yourself. Otherwise, please pair up with another workshop attended and work together to configure replication between your individual clusters.
We will refer to the two clusters as Cluster A and Cluster B, throughout the labs and will use the aliases cluster_a and cluster_b, respectively, throughout the exercises. These aliases can be changed to suit your needs/preferences (e.g. you could use nyc and paris, as long as you maintain consistency across all exercises).
While one SRM cluster can replicate both ways, we will implement the best practice for this type of replication, which consists of Remote Reads and Local Writes. Hence, SRM in Cluster A will replicate messages from Cluster B to Cluster A and vice-versa.
Some labs will have to be performed on both clusters while others only apply to one specific cluster. At the beginning of each lab we will specify to which cluster(s) they apply.
Throughout this series of labs we will install and configure the Streams Replication Manager (SRM) service to replicate data and configuration between two Kafka clusters.
SRM is comprised of 2 service roles:
-
Driver role: This role is responsible for connecting to the specified clusters and performing replication between them. The Driver role can be installed on one or more hosts but since the workshop cluster have a single node we will limit this role to that node only.
-
Service role: This role consists of a REST API and a Kafka Streams application to aggregate and expose cluster, topic and consumer group metrics. The Service role can be installed on one host only.
We will also see how to configure the Streams Messaging Manager (SMM) service to monitor the replication that is configured between the two clusters.
Our goal is to have implemented the following bidirectional replication architecture by the end of the workshop:
-
Lab 1 - Install the Streams Replication Manager (SRM) service
-
Lab 2 - Tuning the SRM service
-
Lab 3 - Configure replication monitoring
-
Lab 4 - Enable Kafka Replication with SRM
-
Lab 5 - Failing over consumers
|
Note
|
Run on both clusters |
-
On the Cloudera Manager console, click on the Cloudera logo at the top-left corner to ensure you are at the home page.
-
Click on the "three-dots" menu to the right of the OneNodeCluster name and select Add Service
-
Select Streams Replication Manager and click Continue
-
On the Select Dependencies page, select the row that contains HDFS, Kafka and ZooKeeper and then click Continue
-
On the Assign Roles page, leave the selected defaults as is and click Continue
-
On the Review Changes page set the following properties:
NoteReplace the CLUSTER_A_FQDNandCLUSTER_B_FQDNplaceholders in the values below with the fully-qualified domain name for clusters A and B, respectively.-
On Cluster A only:
Property value Streams Replication Manager Cluster alias
cluster_a, cluster_bStreams Replication Manager’s Replication Configs
(click the “+” button to separately add each property on the right)
cluster_a.bootstrap.servers=<CLUSTER_A_FQDN>:9092cluster_b.bootstrap.servers=<CLUSTER_B_FQDN>:9092cluster_b->cluster_a.enabled=truereplication.factor=1heartbeats.topic.replication.factor=1checkpoints.topic.replication.factor=1offset-syncs.topic.replication.factor=1offset.storage.replication.factor=1config.storage.replication.factor=1status.storage.replication.factor=1metrics.topic.replication.factor=1Streams Replication Manager Driver Target Cluster
cluster_a, cluster_bStreams Replication Manager Service Target Cluster
cluster_a -
On Cluster B only:
Property Value Streams Replication Manager Cluster alias
cluster_a, cluster_bStreams Replication Manager’s Replication Configs
(click the “+” button to separately add each property on the right)
cluster_a.bootstrap.servers=<CLUSTER_A_FQDN>:9092cluster_b.bootstrap.servers=<CLUSTER_B_FQDN>:9092cluster_a->cluster_b.enabled=truereplication.factor=1heartbeats.topic.replication.factor=1checkpoints.topic.replication.factor=1offset-syncs.topic.replication.factor=1offset.storage.replication.factor=1config.storage.replication.factor=1status.storage.replication.factor=1metrics.topic.replication.factor=1Streams Replication Manager Driver Target Cluster
cluster_a, cluster_bStreams Replication Manager Service Target Cluster
cluster_b
-
-
Click Continue once all the properties are set correctly
-
Wait for the First Run Command to finish and click Continue
-
Click Finish
You now have a working Streams Replication Manager service!
|
Note
|
Run on both clusters |
The SRM service comes configured with some default refresh intervals that are usually appropriate for production environments. For our labs, though, we want the refresh intervals to be much shorter so that we can run tests and see the results quickly. Let’s reconfigure those intervals before we continue.
-
On the Cloudera Manager console go to Clusters > Streams Replication Manager > Configuration.
-
On the search box, type "interval" to filter the configuration properties
-
Set the following properties:
Property Value Refresh Topics Interval Seconds
30 secondsRefresh Groups Interval Seconds
30 secondsSync Topic Configs Interval Seconds
30 seconds -
Click on Save Changes
-
Click on Actions > Deploy Client Configuration and wait for the client configuration deployment to finish.
-
Click on Actions > Restart and wait for the service restart to finish.
|
Note
|
Run on both clusters |
In this lab we will configure Streams Messaging Manager (SMM) to monitor the Kafka replication between both clusters.
-
On the Cloudera Manager console go to Clusters > SMM > Configuration.
-
On the search box, type "replication" to filter the configuration properties
-
Set the following properties for the service:
Property value Configure Streams Replication Manager
CheckedStreams Replication Manager Rest Protocol
httpStreams Replication Manager Rest Host
<FQDN_of_SRM_Service_host>Streams Replication Manager Rest Port
6670 -
Click on Save Changes
-
Click on Actions > Restart and wait for the service restart to finish.
-
Go to the SMM Web UI (Clusters > SMM > Streams Messaging Manager Web UI), and click on the Cluster Replications icon (
). You should be able to see the monitoring page for the replication on both clusters:On cluster A:
On cluster B:
Notce that, so far, only the heartbeats topic is being replicated. In the next lab we will add more topics to the replication.
In this lab, we will enable Active-Active replication where messages produced in Cluster A are replicated to Cluster B, and messages produced in Cluster B are replicated to Cluster A.
SRM has a whitelist and a blacklist for topics. Only topics that are in the whitelist but not in the blacklist are replicated. The administrator can selectively control which topics to replicate but managing those lists. The same applies to consumer groups offset replication.
-
To prepare for the activities in this lab, let’s first create a new Kafka topic using SMM. On the SMM Web UI, click on the Topics icon (
)` on the left-hand side menu, then Add New button, and add the following properties:Topic Name: global_iot Partitions: 5 Availability: 1 Cleanup Policy: delete -
Click Save to create the topic
Now, follow the below steps to enable message replication from Cluster A to B. These steps should be executed in Cluster B only.
-
To initiate the replication of topics we must whitelist them in SRM. SRM supports Regular Expressions for whitelisting/blacklisting topics with a particular pattern. In our case, we would like to replicate only topics that start with the keyword
global. To do so, SSH into the Cluster B host and run the following command:export security_protocol=PLAINTEXT sudo -E srm-control topics \ --source cluster_a \ --target cluster_b \ --add "global.*"Run the following command to confirm that the whitelist was correctly modified:
sudo -E srm-control topics \ --source cluster_a \ --target cluster_b \ --listYou should see the following output:
Current whitelist: global.* Current blacklist: Done. -
Go to the SMM Web UI on Cluster B and check the Cluster Replications page. You should now see that all the topics that match the whitelist appear in the replication:
-
Click on the Topics icon (
) and search for all topics containing the string iot. You should see a new topic calledcluster_a.global_iot. Since we haven’t produced any data to the source topic yet, the replicated topic is also empty. -
To check that the replication is working, we need to start producing data to the
global_iotKafka topic in Cluster A. The easiest way to do it is to make a small change to our existing NiFi flow:-
Go to the NiFi canvas on Cluster A
-
Enter the Process Sensor Data process group
-
Select the PublishKafkaRecord processor and copy & paste it. This will create a new processor on the canvas.
-
Double-click the new processor to open the configuration
-
On the SETTINGS tab, change the Name property to "Publish to Kafka topic: global_iot"
-
On the PROPERTIES tab, change the Topic Name property to
global_iot -
Click Apply
-
Connect the "Set Schema Name" processor to the new Kafka processor.
-
Connect the new Kafka processor to the same "failure" funnel that the original processor is connected to.
-
Start the new processor.
-
You will now have dual ingest of events to both
iotandglobal_iottopics. Your flow now should look like the following:
-
-
Go to SMM Web UI on Cluster B and check the content of the
cluster_a.global_iottopic. You should see events being replicated from the Cluster A. After some time, you will see the replicated topic’s metrics increasing. -
Click on the Cluster Replications icon (
) and check the throughput and latency metrics to make sure that everything is working as expected. You should expect a throughput greater than zero and a latency in the order of milliseconds. -
Now that replication is working in the A → B direction, repeat the same steps in reverse to implement replication in the B → A direction.
One of the great features of SRM is its ability to translate Consumer Group offsets from one cluster to the other so that consumers can be switched over to the remote cluster without losing or duplicating messages. SRM continuously replicates the consumer groups offsets to the remote cluster so that it can perform the translation even when the source cluster is offline.
We can manage the consumer groups for which SRM replicates offset using a whitelist/blacklist mechanism, similar to what is done for topics.
In this lab we will configure offset replication for all consumer groups and perform a consumer failover to the remote cluster. To make it more interesting, we will failover two consumers, one using the correct approach for offset translation and the other without caring about that so that we can see the difference.
-
To simplify things for the purpose of this lab, let’s whitelist the replication of all consumer groups from A → B, by adding the regexp
.*to the whitelist. To do so, SSH into the Cluster B host and run the following command:export security_protocol=PLAINTEXT sudo -E srm-control groups \ --source cluster_a \ --target cluster_b \ --add ".*"Run the following command to confirm that the whitelist was correctly modified:
sudo -E srm-control groups \ --source cluster_a \ --target cluster_b \ --listYou should see the following output:
Current whitelist: .* Current blacklist: Done. -
Open a SSH session to any of the hosts and run the following consumer to start consuming data from the
global_iottopic on Cluster A. This consumer uses a consumer group calledgood.failover:CLUSTER_A_HOST=<CLUSTER_A_HOST_FQDN> kafka-console-consumer \ --bootstrap-server $CLUSTER_A_HOST:9092 \ --topic "global_iot" \ --group good.failover | tee good.failover.before -
Let the consumer read some data from the topic and press CTRL+C after you have a few lines of data shown on your screen. The command above saves the retrieved messages in the
good.failover.beforefile. -
Run this other consumer to also consume some data from the
global_iottopic on Cluster A. This consumer uses a different consumer group from the first one, calledbad.failover:CLUSTER_A_HOST=<CLUSTER_A_HOST_FQDN> kafka-console-consumer \ --bootstrap-server $CLUSTER_A_HOST:9092 \ --topic "global_iot" \ --group bad.failover | tee bad.failover.before -
Again, let the consumer read some data from the topic and press CTRL+C after you have a few lines of data shown on your screen. This command above saves the retrieved messages in the
bad.failover.beforefile. -
Open the SMM Web UI, and click on the Cluster Replications icon (
]). Note that the offsets of the two consumer groups we used are now being replicated by SRM: -
Let’s now first try to fail over a consumer without following the recommended steps for offsets translation. On the SSH session, run the
bad.failoverconsumer. This time, though, we will connect to the replicated topiccluster_a.global_ioton Cluster B.CLUSTER_B_HOST=<CLUSTER_B_HOST_FQDN> kafka-console-consumer \ --bootstrap-server $CLUSTER_B_HOST:9092 \ --topic "cluster_a.global_iot" \ --group bad.failover | tee bad.failover.after -
As you have done before, let the consumer read some data from the topic and press CTRL+C after you have a few lines of data shown on your screen. This command above saves the retrieved messages in the
bad.failover.afterfile. -
Each message saved in the
bad.failover.beforeandbad.failover.afterfiles above have the timestamp of when they were generated. Since we have approximately 1 message being generated per second, we would like to ensure that no gap between two consecutive messages is much larger than 1 second.To check if the failover occurred correctly, we want to calculate the gap between the largest timestamp read before the failover and the smallest timestamp read after the failover. If no messages were lost, we should see a gap not much larger than 1 second between those. + You can either verify this manually or run the commands below, which will calculate that gap for you:
last_msg_before_failover=$(grep -o "[0-9]\{10,\}" bad.failover.before | sort | tail -1) first_msg_after_failover=$(grep -o "[0-9]\{10,\}" bad.failover.after | sort | head -1) echo "Gap = $(echo "($first_msg_after_failover-$last_msg_before_failover)/1000000" | bc) second(s)" -
You should see an output like the one below, showing a large gap between the messages before and after the failover. The length of the gap will depend on how long you took between the two executions of the
bad.failoverconsumer.Gap = 1743 second(s) -
Now that we have seen what a incorrect failover looks like, let’s failover the other consumer correctly. Connect to the Cluster B host and execute the following command to export the translated offsets of the
good.failoverconsumer group. Note that you can execute this command even if Cluster A is unavailable.export security_protocol=PLAINTEXT sudo -E srm-control offsets \ --source cluster_a \ --target cluster_b \ --group good.failover \ --export > good.failover.offsets cat good.failover.offsetsThe
good.failover.offsetswill contain all the translated offsets for all the partitions that thegood.failoverconsumer group touched on the source cluster. -
To complete the offset translation, still on the Cluster B host, run the command below to import the translated offsets into Kafka:
CLUSTER_B_HOST=<CLUSTER_B_HOST_FQDN> kafka-consumer-groups \ --bootstrap-server $CLUSTER_B_HOST:9092 \ --reset-offsets \ --group good.failover \ --from-file good.failover.offsets \ --executeYou should see an output like the one below:
GROUP TOPIC PARTITION NEW-OFFSET good.failover cluster_a.global_iot 3 11100 good.failover cluster_a.global_iot 4 11099 good.failover cluster_a.global_iot 0 11099 good.failover cluster_a.global_iot 1 11099 good.failover cluster_a.global_iot 2 11098 -
We are now ready to fail over the
good.failoverconsumer group. On the SSH session, run thegood.failoverconsumer, connecting to the replicated topiccluster_a.global_ioton Cluster B.CLUSTER_B_HOST=<CLUSTER_B_HOST_FQDN> kafka-console-consumer \ --bootstrap-server $CLUSTER_B_HOST:9092 \ --topic "cluster_a.global_iot" \ --group good.failover | tee good.failover.after -
This time you will notice a lot of message read at once when you start the consumer. This happens because the offset where the consumer stopped previously was translated to the new cluster and loaded into Kafka. So, the consumer started reading all the messages from where it had stopped and had accumulated since that happened.
-
Press CTRL+C to stop the consumer. The command above saves the retrieved messages in the
good.failover.beforefile. -
Let’s check the gap between messages during the correct failover. Again, you can do it manually or run the commands below:
last_msg_before_failover=$(grep -o "[0-9]\{10,\}" good.failover.before | sort | tail -1) first_msg_after_failover=$(grep -o "[0-9]\{10,\}" good.failover.after | sort | head -1) echo "Gap = $(echo "($first_msg_after_failover-$last_msg_before_failover)/1000000" | bc) second(s)" -
You should see that the gap is now 1 second, which means that no messages were skipped or lost during the failover:
Gap = 1 second(s)










