Skip to content

103. Kinesis with Java

Qingye Jiang (John) edited this page Jun 14, 2018 · 26 revisions

(1) Writing to Kinesis

To write to your Kinesis data stream, all you need to do is to create an instance of the AmazonKinesisClient, then use the putRecord() method to send records to the stream. The putRecords() method takes three parameters, the stream name (String), the data (ByteBuffer), and the partition key (String). You can optionally add a sequence number (String) for ordering.

Below is an example of writing to Kinesis data stream with putRecord(). In this example, we will write a large number of random strings (generated by UUID) to the destination stream, with the goal to saturate the stream (hitting some limits on the stream side).

package net.qyjohn.KinesisTutorials;

import java.io.*;
import java.util.*;
import java.nio.charset.Charset;
import java.nio.ByteBuffer;
import com.amazonaws.*;
import com.amazonaws.auth.*;
import com.amazonaws.auth.profile.*;
import com.amazonaws.regions.*;
import com.amazonaws.services.kinesis.*;
import com.amazonaws.services.kinesis.model.*;


public class KinesisWriteExample
{
	AmazonKinesisClient client;
	String streamName;

	public KinesisWriteExample(String streamName)
	{
		client = new AmazonKinesisClient();
		client.configureRegion(Regions.AP_SOUTHEAST_2);
		this.streamName = streamName;
	}

	public void start()
	{
		while (true)
		{
			try
			{
				String uuid = UUID.randomUUID().toString();
				ByteBuffer data = ByteBuffer.wrap(uuid.getBytes());
				// Use UUID for both the data and partition key
				client.putRecord(streamName, data, uuid);
			} catch (Exception e) 
			{
				System.out.println(e.getMessage());
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args)
	{
		String streamName = args[0];
		KinesisWriteExample example = new KinesisWriteExample(streamName);
		example.start();
	}
}

Exercises:

  • What is the expected performance (number of records per second) of this code? How do you arrive at the estimates. Look into the CloudWatch metrics to see how accurate your estimates are.

  • How do you improve the performance of this code?

  • What are the limiting factors? How do you know which limit is reached? How do you mitigate these limits?

  • Modify the demo code to send data in CSV format, with the following information in a record:

time stamp, sender ip, a random integer, another random integer, a random uuid
  • Review the various activities and resource consumption on the OS level (top, htop, pstree, iostat, free, netstat) and JVM level (jps, jstack).

  • What happens when the Kinesis service gives you an elevated level of 5xx errors?

  • What happens when the Kinesis service stops working for an extended period of time, say, 30 minutes?

(2) Reading from Kinesis

In order to read from your Kinesis stream, you will need to do the following:

(1) Make a DescribeStream API call to obtain a list of shards.

(2) For each shard, make a GetShardIterator API call to obtain the shard iterator.

(3) For each shard, start a thread to read from the shard using the GetRecords API call. Each GetRecords API call starts with the current shard iterator, and returns a new shard iterator for the next GetRecords API call.

Below is the demo code to dump the data in your Kinesis stream in real time. Run this code against your Kinesis stream to see what happens.

package net.qyjohn.KinesisTutorials;

import java.io.*;
import java.util.*;
import java.nio.charset.Charset;
import com.amazonaws.*;
import com.amazonaws.auth.*;
import com.amazonaws.auth.profile.*;
import com.amazonaws.regions.*;
import com.amazonaws.services.kinesis.*;
import com.amazonaws.services.kinesis.model.*;

class ShardReader extends Thread
{
        AmazonKinesisClient client;
	String streamName, shardId;

	public ShardReader(String streamName, String shardId)
	{
                client = new AmazonKinesisClient();
                client.configureRegion(Regions.AP_SOUTHEAST_2);
		this.streamName = streamName;
		this.shardId = shardId;
	}

	public void run()
	{
		try
		{
			GetShardIteratorResult result1 = client.getShardIterator(streamName, shardId, "TRIM_HORIZON");
			String shardIterator = result1.getShardIterator();

			
			boolean hasData = true;
			while (hasData)
			{
				GetRecordsResult result2 = client.getRecords(new GetRecordsRequest().withShardIterator(shardIterator));	
				shardIterator = result2.getNextShardIterator();
				if (shardIterator == null)
				{
					hasData = false; // Shard closed.
				}

				List<Record> records = result2.getRecords();
				if (records.isEmpty())
				{
					sleep(2000);	// No records
				}
				else
				{
					for (Record record : records)
					{
						String data = Charset.forName("UTF-8").decode(record.getData()).toString();
						System.out.println(shardId + "\t" + data);
					}
				}
			}
		}catch (Exception e)
		{
			System.out.println(e.getMessage());
			e.printStackTrace();
		}
	}
}

public class KinesisReadExample
{
        public AmazonKinesisClient client;
	public String streamName;

        public KinesisReadExample()
        {
                client = new AmazonKinesisClient();
                client.configureRegion(Regions.AP_SOUTHEAST_2);
		try
		{
			Properties prop = new Properties();
			InputStream input = new FileInputStream("kinesis.properties");
			prop.load(input);
			streamName = prop.getProperty("streamName");
		}catch (Exception e)
		{
			System.out.println(e.getMessage());
			e.printStackTrace();
		}
        }

	public void run()
	{
		try
		{
			DescribeStreamResult result = client.describeStream(streamName);
			StreamDescription description = result.getStreamDescription();
			List<Shard> shards = description.getShards();
			for (Shard shard : shards)
			{
				String shardId = shard.getShardId();
				new ShardReader(streamName, shardId).start();
			}
		}catch (Exception e)
		{
			System.out.println(e.getMessage());
			e.printStackTrace();
		}
	}

        public static void main(String[] args)
        {
		try 
		{
			KinesisReadExample demo = new KinesisReadExample();
			demo.run();
		} catch (Exception e) 
		{
			System.out.println(e.getMessage());
			e.printStackTrace();
		}
        }
}

Exercises:

  • Modify the decode the data in CSV format, output the one or more data file with the following format:
time stamp, sender ip, integer A, integer B, sum of A and B, random uuid
  • What is the expected level of processing performance (processed records per second) of the application? How do you arrive at the estimates. Look into the CloudWatch metrics to see how accurate your estimates are. Does the performance depend on the amount of backlogs (and why)?

  • Review the various activities and resource consumption on the OS level (top, htop, pstree, iostat, free, netstat) and JVM level (jps, jstack).

  • Is your consuming rate faster than producing rate? How do you simulate CPU intensive or disk I/O intensive processing that requires significant amount of computing resource to process each record. What CloudWatch metric do you use to determine whether you are producing faster or consuming faster?

  • Assuming that currently you are producing at a constant rate 1000 records per second, and your consumer is consuming at a slower rate, what should you do to improve the performance of your application? How do you evaluate the effectiveness of your improvement efforts?

  • What if your program crashes and you want to restart from where you crash?

  • Use the AWS CLI to send a record with different format and see how your application reacts.

(3) Programming Project

The obvious problems with this approach include:

  • If there is a change in the number of shards when the code starts execution, the code will not be aware of the new shards in the stream.

  • If there is something wrong during the execution of the program (for example, out-of-memory error), the program will need to start from the beginning of the stream again.

  • When you have many shards in the stream, a single node is not sufficient to process the data.

As such, you need to design a solution that can be run in a distributed manner, which takes care of the above-mentioned issues. The distributed solution needs to consider the following:

  • Lease management - which worker node can work on which shard.

  • Checkpointing - which record has been worked on? If a worker node dies, where to resume the processing?

  • What do you use to perform lease management and checkpointing?

Design and implement your own distributed system to consume from your Kinesis stream. Be aware of the various API rate limits that you might encounter, in particular:

  • DescribeStream can provide up to 10 transactions per second.
  • GetShardIterator can provide up to 5 transactions per second per open shard.
  • ListShards has a limit of 100 transactions per second per data stream.
  • Each shard can support up to 5 transactions per second for reads, up to a maximum total data read rate of 2 MB per second.

Please note that in this project you are expected to build your own KCL (don't use the Amazon KCL).

(4) DynamoDB Streams

A DynamoDB stream is an ordered flow of information about changes to items in an Amazon DynamoDB table. When you enable a stream on a table, DynamoDB captures information about every modification to data items in the table. Whenever an application creates, updates, or deletes items in the table, DynamoDB Streams writes a stream record with the primary key attribute(s) of the items that were modified. DynamoDB Streams work in the same way as Kinesis Streams. As such, you can change the above-mentioned demo code to print out the changes in your DynamoDB table.

Modify the sample Java code in section (2) and (3) to read from a DynamoDB stream.

Clone this wiki locally