-
Notifications
You must be signed in to change notification settings - Fork 0
105. Kinesis Client Library (KCL)
(1) Very High Level View
The Amazon Kinesis Client Library for Java (Amazon KCL) enables Java developers to easily consume and process data from Amazon Kinesis. KCL provides an easy-to-use programming model for processing data using Amazon Kinesis. KCL also helps with scale-out and fault-tolerant processing.
On a very high level view, a KCL application can be running on multiple nodes, each node is considered as a Worker. (You can optionally run multiple instances of the same KCL application on the same node, where each instance is considered as a Worker.) Each Worker manages multiple Record Processor, while each Record Processor handles data from a particular shard.
| KCL | --> Record Processor 0 --> Shard 0 --|
|--> Worker 0 --> | |
| | --> Record Processor 1 --> Shard 1 --|
| |
KCL | KCL | --> Record Processor 2 --> Shard 2 --|
Application --|--> Worker 1 --> | | --> Stream
| | --> Record Processor 3 --> Shard 3 --|
| |
| KCL | --> Record Processor 4 --> Shard 4 --|
|--> Worker 2 --> | |
| | --> Record Processor 5 --> Shard 5 --|
In order to do so, the KCL application needs to do the following:
- Create a class that implements the IRecordProcessor interface. This is the basic building block for the KCL application. The KCL library fetches records from a specific shard, which are then handled by this class. This class needs to implement the initialize(), processRecords(), and shutdown() methods.
package net.qyjohn.KinesisTutorials;
import java.util.*;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.*;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.*;
import com.amazonaws.services.kinesis.model.Record;
public class KCLRecordProcessor implements IRecordProcessor
{
@Override
public void initialize(String shardId)
{
}
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
{
for (Record r : records)
{
// Do your record processing here.
}
try
{
checkpointer.checkpoint();
} catch (Exception e)
{
}
}
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
{
try
{
checkpointer.checkpoint();
} catch (Exception e)
{
}
}
}
- Create a class that implements the IRecordProcessorFactory interface. When your consumer instantiates the worker, it passes a reference to this factory.
package net.qyjohn.KinesisTutorials;
import java.io.*;
import java.util.*;
import java.nio.ByteBuffer;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.*;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.*;
import com.amazonaws.services.kinesis.model.Record;
public class KCLExample implements IRecordProcessorFactory
{
@Override
public IRecordProcessor createProcessor()
{
return new KCLRecordProcessor();
}
public static void main(String[] args)
{
String streamName = "demo";
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(
"KCL_Application_Name",
streamName,
new DefaultAWSCredentialsProviderChain(),
"KCL_Worker_ID")
.withRegionName("ap-southeast-2")
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
KCLExample consumer = new KCLExample();
new Worker.Builder()
.recordProcessorFactory(consumer)
.config(config)
.build()
.run();
}
}
Compile and run this KCL application. It just works. As such, it is quite simple to write a KCL application.
(2) Look Closer
When you run the sample KCL application, it creates an instance of the KCL Worker. The worker connects to the stream and enumerates the shards. Then it does the following:
- (a) Coordinates shard associations with other workers (if any)
- (b) Instantiates a record processor for every shard it manages (via the IRecordProcessorFactory)
- (c) Pulls data records from the stream and pushes the records to the corresponding record processor
- (d) Processes records and checkpoints processed records (via the IRecordProcessor)
- (e) Balances shard-worker associations when the worker count changes, shards are splitted or merged
In the above-mentioned code sample, we use a fixed string as the worker ID. In a large scale deployment, the KCL application is running on many nodes. Since all nodes uses the same worker ID, this makes it difficult to achieve (a)(d)(e), because there is no way to discriminate different workers. This results in unbalanced resource utilization on different worker nodes.
The following demo code uses the name of the local computer plus a UUID as the worker ID. This allows different worker nodes to identify each other. If you want to run multiple instances of the same KCL application on the same node as different workers, they can also identify each other using the UUID.
String streamName = "demo";
String workerName = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(
"KCL_Application_Name",
streamName,
new DefaultAWSCredentialsProviderChain(),
workerName)
.withRegionName("ap-southeast-2")
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
Create a new Kinesis stream with two shards. Do not put any records into the stream. Run one instance of the KCL application with wire logging to observe the behaviour of the KCL application. You will see the following happening in sequence:
2018-06-15 01:07:40,515 [main] INFO com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator - With failover time 10000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
2018-06-15 01:07:40,516 [main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initialization attempt 1
2018-06-15 01:07:40,516 [main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initializing LeaseCoordinator
The following API calls are made to initialize the DynamoDB table:
- [Main] DynamoDB.DescribeTable -> 400 ResourceNotFoundException
- [Main] DynamoDB.CreateTable
- [Main] Kinesis.DescribeStream
- [Main] DynamoDB.Scan
- [Main] DynamoDB.PutItem (Shard 0)
- [Main] DynamoDB.PutItem (Shard 1)
- [Main] DynamoDB.Scan
The following API calls are made to obtain leases:
- [LeaseCoordinator-1] DynamoDB.Scan
2018-06-15 01:07:51,077 [LeaseCoordinator-1] INFO com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker ip-172-31-0-20.ap-southeast-2.compute.internal:b7f9ed0c-42b7-47d2-8406-7d2b53aba88f saw 2 total leases, 2 available leases, 1 workers. Target is 2 leases, I have 0 leases, I will take 2 leases
- [LeaseCoordinator-1] DynamoDB.UpdateItem w/ Expected (Shard 0)
- [LeaseCoordinator-1] DynamoDB.UpdateItem w/ Expected (Shard 1)
2018-06-15 01:07:51,094 [LeaseCoordinator-1] INFO com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker ip-172-31-0-20.ap-southeast-2.compute.internal:b7f9ed0c-42b7-47d2-8406-7d2b53aba88f successfully took 2 leases: shardId-000000000001, shardId-000000000000
The following API calls are made to renew leases:
- [LeaseRenewer-1] DynamoDB.UpdateItem w/ Expected (Shard 0)
- [LeaseRenewer-2] DynamoDB.UpdateItem w/ Expected (Shard 1)
- [LeaseRenewer-3] DynamoDB.UpdateItem w/ Expected (Shard 0)
- [LeaseRenewer-4] DynamoDB.UpdateItem w/ Expected (Shard 1)
- [LeaseRenewer-5] DynamoDB.UpdateItem w/ Expected (Shard 0)
- [LeaseRenewer-6] DynamoDB.UpdateItem w/ Expected (Shard 1)
2018-06-15 01:08:01,067 [main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initialization complete. Starting worker loop.
2018-06-15 01:08:01,075 [main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Created new shardConsumer for : ShardInfo [shardId=shardId-000000000001, concurrencyToken=efab1988-a1
10-4154-8e34-681864c08a90, parentShardIds=[], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}]
2018-06-15 01:08:01,076 [main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Created new shardConsumer for : ShardInfo [shardId=shardId-000000000000, concurrencyToken=09074a88-91
fe-44b4-89d2-fc1388008284, parentShardIds=[], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}]
2018-06-15 01:08:01,077 [RecordProcessor-0000] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask - No need to block on parents [] of shard shardId-000000000001
2018-06-15 01:08:01,077 [RecordProcessor-0001] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask - No need to block on parents [] of shard shardId-000000000000
At this point the Record Process for each shard is launched.
- [cw-metrics-publisher] CloudWatch.PutMetricData
- [RecordProcessor-0000] DynamoDB.GetItem (Shard 0, get initial position)
- [RecordProcessor-0001] DynamoDB.GetItem (Shard 1, get initial position)
- [RecordProcessor-0000] Kinesis.GetShardIterator (Shard 0)
- [RecordProcessor-0001] Kinesis.GetShardIterator (Shard 1)
- [RecordProcessor-0000] Kinesis.GetRecords (Shard 0)
- [RecordProcessor-0001] Kinesis.GetRecords (Shard 1)
- [LeaseRenewer-7] DynamoDB.UpdateItem w/ Expected (Shard 0)
- [LeaseRenewer-8] DynamoDB.UpdateItem w/ Expected (Shard 1)
At this point, you should be quite clear about how the KCL application initializes itself and starts processing. Use the AWS CLI to send a bunch of records to the stream to see how check-pointing works. Look into the corresponding DynamoDB table to see the result of check-pointing.
Launch a second instance of the KCL application (a new worker), observe and analyse its initialization process. In particular, pay attention to how it steals lease from the existing worker.
Kill the second instance of the KCL application and start it again. What happens?
Use the KPL application in 104 to send a large amount of data to the stream. Use the KCL application to process the incoming data. On the KPL side, you are trying to produce as fast as possible. One the KCL side, you are trying to consume as fast as possible. Look into the various CloudWatch metrics (Kinesis, KPL, and KCL) to understand what happens in the whole system. Are you hitting any bottlenecks? If yes what are they and how to mitigate those issues?
(3) Exception Handling
Send a record to the stream to intentionally break the pre-designed record processing logic, causing an exception within the **for (Record r : records) ** loop. Observe what happens to your KCL application. Explain your observation using various CloudWatch metrics.
How do you intentionally crash the Worker thread? What happens when the Worker thread crashes.
(4) KCL References
Read through the source code of the following KCL sample application. Try to understand each and every steps in the source code.
(5) Exercise
You are running a large scale web application, with above 10,000 requests per second. The web application is deployed on a fleet of EC2 instances in an AutoScaling group behind an ELB. On the EC2 instances you are using the Kinesis Agent to push Apache logs to a Kinesis stream.
You are developing a KCL application to consume from the Kinesis stream. Your boss has asked you to display the top 10 IP addresses accessing your web site, ordered by the number of HTTP requests, during the past 5 minutes.
-
Design and implement an application to produce simulated input to the Kinesis stream.
-
Design and implement an application to achieve your boss's requirement.