From 8f31597950bd897cc18f5ee893972e064b7dc000 Mon Sep 17 00:00:00 2001 From: Raghvendra Singh Date: Mon, 22 Apr 2019 12:20:36 +0530 Subject: [PATCH 1/2] adding dynamo read write throughput --- .../kinesis/span/collector/config/ProjectConfiguration.scala | 2 ++ .../config/entities/KinesisConsumerConfiguration.scala | 2 ++ .../kinesis/span/collector/kinesis/client/KinesisConsumer.scala | 2 ++ .../kinesis/span/collector/unit/tests/RecordProcessorSpec.scala | 2 +- 4 files changed, 7 insertions(+), 1 deletion(-) diff --git a/kinesis/src/main/scala/com/expedia/www/haystack/kinesis/span/collector/config/ProjectConfiguration.scala b/kinesis/src/main/scala/com/expedia/www/haystack/kinesis/span/collector/config/ProjectConfiguration.scala index fbce69d..a27a5ff 100644 --- a/kinesis/src/main/scala/com/expedia/www/haystack/kinesis/span/collector/config/ProjectConfiguration.scala +++ b/kinesis/src/main/scala/com/expedia/www/haystack/kinesis/span/collector/config/ProjectConfiguration.scala @@ -52,6 +52,8 @@ object ProjectConfiguration { kinesisEndpoint = if (kinesis.hasPath("endpoint")) Some(kinesis.getString("endpoint")) else None, dynamoEndpoint = if (kinesis.hasPath("dynamodb.endpoint")) Some(kinesis.getString("dynamodb.endpoint")) else None, dynamoTableName = if(kinesis.hasPath("dynamodb.table")) Some(kinesis.getString("dynamodb.table")) else None, + dynamoReadThroughput = if (kinesis.hasPath("dynamodb.readThroughput")) Some(kinesis.getInt("dynamodb.readThroughput")) else None, + dynamoWriteThroughput = if (kinesis.hasPath("dynamodb.writeThroughput")) Some(kinesis.getInt("dynamodb.writeThroughput")) else None, maxRecordsToRead = kinesis.getInt("max.records.read"), idleTimeBetweenReads = kinesis.getDuration("idle.time.between.reads.ms", TimeUnit.MILLISECONDS).millis, shardSyncInterval = kinesis.getDuration("shard.sync.interval.ms", TimeUnit.MILLISECONDS).millis, diff --git a/kinesis/src/main/scala/com/expedia/www/haystack/kinesis/span/collector/config/entities/KinesisConsumerConfiguration.scala b/kinesis/src/main/scala/com/expedia/www/haystack/kinesis/span/collector/config/entities/KinesisConsumerConfiguration.scala index 97fdc03..3fe0eb3 100644 --- a/kinesis/src/main/scala/com/expedia/www/haystack/kinesis/span/collector/config/entities/KinesisConsumerConfiguration.scala +++ b/kinesis/src/main/scala/com/expedia/www/haystack/kinesis/span/collector/config/entities/KinesisConsumerConfiguration.scala @@ -33,6 +33,8 @@ case class KinesisConsumerConfiguration(awsRegion: String, kinesisEndpoint: Option[String], dynamoEndpoint: Option[String], dynamoTableName: Option[String], + dynamoReadThroughput: Option[Int], + dynamoWriteThroughput: Option[Int], maxRecordsToRead: Int, idleTimeBetweenReads: FiniteDuration, shardSyncInterval: FiniteDuration, diff --git a/kinesis/src/main/scala/com/expedia/www/haystack/kinesis/span/collector/kinesis/client/KinesisConsumer.scala b/kinesis/src/main/scala/com/expedia/www/haystack/kinesis/span/collector/kinesis/client/KinesisConsumer.scala index 36222aa..5dabaac 100644 --- a/kinesis/src/main/scala/com/expedia/www/haystack/kinesis/span/collector/kinesis/client/KinesisConsumer.scala +++ b/kinesis/src/main/scala/com/expedia/www/haystack/kinesis/span/collector/kinesis/client/KinesisConsumer.scala @@ -96,6 +96,8 @@ class KinesisConsumer(config: KinesisConsumerConfiguration, config.dynamoEndpoint.map(kinesisClientConfig.withDynamoDBEndpoint) config.kinesisEndpoint.map(kinesisClientConfig.withKinesisEndpoint) + config.dynamoReadThroughput.map(kinesisClientConfig.withInitialLeaseTableReadCapacity) + config.dynamoWriteThroughput.map(kinesisClientConfig.withInitialLeaseTableWriteCapacity) new Worker.Builder() .config(kinesisClientConfig) diff --git a/kinesis/src/test/scala/com/expedia/www/haystack/kinesis/span/collector/unit/tests/RecordProcessorSpec.scala b/kinesis/src/test/scala/com/expedia/www/haystack/kinesis/span/collector/unit/tests/RecordProcessorSpec.scala index 5f82745..5319de9 100644 --- a/kinesis/src/test/scala/com/expedia/www/haystack/kinesis/span/collector/unit/tests/RecordProcessorSpec.scala +++ b/kinesis/src/test/scala/com/expedia/www/haystack/kinesis/span/collector/unit/tests/RecordProcessorSpec.scala @@ -47,7 +47,7 @@ class RecordProcessorSpec extends FunSpec with Matchers with EasyMockSugar with describe("Record Processor") { val kinesisConfig = KinesisConsumerConfiguration("us-west-2", None, - "app-group", "stream-1", InitialPositionInStream.LATEST, 10.seconds, 10, 10.seconds, None, None, None, + "app-group", "stream-1", InitialPositionInStream.LATEST, 10.seconds, 10, 10.seconds, None, None, None, None, None, 10000, 500.millis, 10000.millis, MetricsLevel.NONE, 10000.millis, 200.millis) it("should process the record, sends to sink and perform checkpointing") { From af3264c95b651550dcc091c6d80d00f247f75ae1 Mon Sep 17 00:00:00 2001 From: Raghvendra Singh Date: Mon, 22 Apr 2019 15:34:09 +0530 Subject: [PATCH 2/2] adding dynamo config in terraform to make it overridable --- deployment/terraform/kinesis-span-collector/main.tf | 2 ++ .../templates/kinesis-span-collector_conf.tpl | 5 +++++ deployment/terraform/kinesis-span-collector/variables.tf | 2 ++ deployment/terraform/main.tf | 2 ++ kinesis/src/main/resources/config/base.conf | 5 +++++ 5 files changed, 16 insertions(+) diff --git a/deployment/terraform/kinesis-span-collector/main.tf b/deployment/terraform/kinesis-span-collector/main.tf index 29edee4..371a0c3 100644 --- a/deployment/terraform/kinesis-span-collector/main.tf +++ b/deployment/terraform/kinesis-span-collector/main.tf @@ -27,6 +27,8 @@ data "template_file" "config_data" { kafka_endpoint = "${var.kafka_endpoint}" sts_role_arn = "${var.sts_role_arn}" app_group_name = "${var.haystack_cluster_name}-${var.app_name}" + dynamodb_read_thoroughput = "${var.dynamodb_read_throughput}" + dynamodb_write_throughput = "${var.dynamodb_write_throughput}" } } diff --git a/deployment/terraform/kinesis-span-collector/templates/kinesis-span-collector_conf.tpl b/deployment/terraform/kinesis-span-collector/templates/kinesis-span-collector_conf.tpl index e2ef6dc..13f87ae 100644 --- a/deployment/terraform/kinesis-span-collector/templates/kinesis-span-collector_conf.tpl +++ b/deployment/terraform/kinesis-span-collector/templates/kinesis-span-collector_conf.tpl @@ -27,6 +27,11 @@ kinesis { retry.interval.ms = 250 } + dynamodb { + readThroughput = ${dynamodb_read_throughput} + writeThroughput = ${dynamodb_write_throughput} + } + task.backoff.ms = 200 max.records.read = 2000 idle.time.between.reads.ms = 500 diff --git a/deployment/terraform/kinesis-span-collector/variables.tf b/deployment/terraform/kinesis-span-collector/variables.tf index 204ebcd..0eab661 100644 --- a/deployment/terraform/kinesis-span-collector/variables.tf +++ b/deployment/terraform/kinesis-span-collector/variables.tf @@ -18,6 +18,8 @@ variable "cpu_request"{} variable "cpu_limit"{} variable "app_name"{ default = "kinesis-span-collector" } variable "env_vars" {} +variable "dynamodb_read_throughput" {} +variable "dynamodb_write_throughput" {} variable "termination_grace_period" { default = 30 diff --git a/deployment/terraform/main.tf b/deployment/terraform/main.tf index f4bb3ec..407937d 100644 --- a/deployment/terraform/main.tf +++ b/deployment/terraform/main.tf @@ -22,6 +22,8 @@ module "kinesis-span-collector" { memory_limit = "${var.collector["kinesis_span_collector_memory_limit"]}" jvm_memory_limit = "${var.collector["kinesis_span_collector_jvm_memory_limit"]}" app_name = "${var.collector["kinesis_span_collector_app_name"]}" + dynamodb_read_throughput = "${var.collector["kinesis_span_collector_dynamodb_read_thoroughput"]}" + dynamodb_write_throughput = "${var.collector["kinesis_span_collector_dynamodb_write_thoroughput"]}" } module "http-span-collector" { diff --git a/kinesis/src/main/resources/config/base.conf b/kinesis/src/main/resources/config/base.conf index 63fcd8f..20f4645 100644 --- a/kinesis/src/main/resources/config/base.conf +++ b/kinesis/src/main/resources/config/base.conf @@ -33,6 +33,11 @@ kinesis { retry.interval.ms = 250 } + dynamodb { + readThroughput = 10 + writeThroughput = 10 + } + task.backoff.ms = 200 max.records.read = 2000 idle.time.between.reads.ms = 500