From 3ccabfa157212310c1168ee20a5ef09f0329c40b Mon Sep 17 00:00:00 2001 From: "anand.loganathan@cdwg.com" Date: Wed, 17 Sep 2025 09:28:35 -0700 Subject: [PATCH 1/7] Add the max.poll.records as a configurable parameter to the kafka consumer micro-services --- .../investigation/config/KafkaConsumerConfig.java | 4 ++++ investigation-service/src/main/resources/application.yaml | 1 + .../etldatapipeline/ldfdata/config/KafkaConsumerConfig.java | 4 ++++ ldfdata-service/src/main/resources/application.yaml | 1 + .../observation/config/KafkaConsumerConfig.java | 4 ++++ observation-service/src/main/resources/application.yaml | 1 + .../organization/config/KafkaConsumerConfig.java | 4 ++++ organization-service/src/main/resources/application.yaml | 1 + .../etldatapipeline/person/config/KafkaConsumerConfig.java | 4 ++++ person-service/src/main/resources/application.yaml | 1 + .../postprocessingservice/config/KafkaConsumerConfig.java | 4 ++++ 11 files changed, 29 insertions(+) diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java index 9fee85b58..a25a2b6bc 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java @@ -29,6 +29,9 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollIntervalMs}") private String maxPollInterval = ""; + @Value("${spring.kafka.consumer.maxPollRecs}") + private String maxPollRecords = ""; + @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); @@ -37,6 +40,7 @@ public ConsumerFactory consumerFactory() { config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval); + config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); return new DefaultKafkaConsumerFactory<>(config); } diff --git a/investigation-service/src/main/resources/application.yaml b/investigation-service/src/main/resources/application.yaml index edc640a00..5b32cd393 100644 --- a/investigation-service/src/main/resources/application.yaml +++ b/investigation-service/src/main/resources/application.yaml @@ -34,6 +34,7 @@ spring: consumer: max-retry: 3 maxPollIntervalMs: 300000 + maxPollRecs: 200 enable-auto-commit: false admin: auto-create: true diff --git a/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java b/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java index df838347c..cdf79ebf0 100644 --- a/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java +++ b/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java @@ -29,6 +29,9 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollIntervalMs}") private String maxPollInterval = ""; + @Value("${spring.kafka.consumer.maxPollRecs}") + private String maxPollRecords = ""; + @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); @@ -37,6 +40,7 @@ public ConsumerFactory consumerFactory() { config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval); + config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); return new DefaultKafkaConsumerFactory<>(config); } diff --git a/ldfdata-service/src/main/resources/application.yaml b/ldfdata-service/src/main/resources/application.yaml index abbdfabef..65b615b7d 100644 --- a/ldfdata-service/src/main/resources/application.yaml +++ b/ldfdata-service/src/main/resources/application.yaml @@ -13,6 +13,7 @@ spring: consumer: max-retry: 3 maxPollIntervalMs: 30000 + maxPollRecs: 200 admin: auto-create: true application: diff --git a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java index d15e5e274..911a6a13c 100644 --- a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java +++ b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java @@ -29,6 +29,9 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollIntervalMs}") private String maxPollInterval = ""; + @Value("${spring.kafka.consumer.maxPollRecs}") + private String maxPollRecords = ""; + @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); @@ -37,6 +40,7 @@ public ConsumerFactory consumerFactory() { config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval); + config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); return new DefaultKafkaConsumerFactory<>(config); } diff --git a/observation-service/src/main/resources/application.yaml b/observation-service/src/main/resources/application.yaml index 62b9ff37a..11129dba6 100644 --- a/observation-service/src/main/resources/application.yaml +++ b/observation-service/src/main/resources/application.yaml @@ -21,6 +21,7 @@ spring: consumer: max-retry: 3 maxPollIntervalMs: 30000 + maxPollRecs: 200 admin: auto-create: true application: diff --git a/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java b/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java index b8d6daceb..51b01e311 100644 --- a/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java +++ b/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java @@ -29,6 +29,9 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollIntervalMs}") private String maxPollInterval = ""; + @Value("${spring.kafka.consumer.maxPollRecs}") + private String maxPollRecords = ""; + @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); @@ -37,6 +40,7 @@ public ConsumerFactory consumerFactory() { config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval); + config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); return new DefaultKafkaConsumerFactory<>(config); } diff --git a/organization-service/src/main/resources/application.yaml b/organization-service/src/main/resources/application.yaml index f4e239392..9e53166ee 100644 --- a/organization-service/src/main/resources/application.yaml +++ b/organization-service/src/main/resources/application.yaml @@ -24,6 +24,7 @@ spring: consumer: max-retry: 3 maxPollIntervalMs: 30000 + maxPollRecs: 200 key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer admin: diff --git a/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java b/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java index 240f0c26f..89e63b009 100644 --- a/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java +++ b/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java @@ -29,6 +29,9 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollIntervalMs}") private String maxPollInterval = ""; + @Value("${spring.kafka.consumer.maxPollRecs}") + private String maxPollRecords = ""; + @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); @@ -37,6 +40,7 @@ public ConsumerFactory consumerFactory() { config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval); + config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); return new DefaultKafkaConsumerFactory<>(config); } diff --git a/person-service/src/main/resources/application.yaml b/person-service/src/main/resources/application.yaml index f1228ab20..496d9fafd 100644 --- a/person-service/src/main/resources/application.yaml +++ b/person-service/src/main/resources/application.yaml @@ -26,6 +26,7 @@ spring: consumer: max-retry: 3 maxPollIntervalMs: 30000 + maxPollRecs: 200 key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer admin: diff --git a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java index 1d371208a..53890ed39 100644 --- a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java +++ b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java @@ -30,6 +30,9 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollIntervalMs}") private String maxPollInterval = ""; + @Value("${spring.kafka.consumer.maxPollRecs}") + private String maxPollRecords = ""; + @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); @@ -38,6 +41,7 @@ public ConsumerFactory consumerFactory() { config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval); + config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); return new DefaultKafkaConsumerFactory<>(config); } From bb8487fcaef35d6a5b366fefeeaec3f90b092c93 Mon Sep 17 00:00:00 2001 From: "anand.loganathan@cdwg.com" Date: Wed, 17 Sep 2025 09:40:34 -0700 Subject: [PATCH 2/7] Add the max.poll.records as a configurable parameter to the kafka consumer micro-services --- investigation-service/src/main/resources/application.yaml | 2 +- ldfdata-service/src/main/resources/application.yaml | 2 +- observation-service/src/main/resources/application.yaml | 2 +- organization-service/src/main/resources/application.yaml | 2 +- person-service/src/main/resources/application.yaml | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/investigation-service/src/main/resources/application.yaml b/investigation-service/src/main/resources/application.yaml index 5b32cd393..e76b72881 100644 --- a/investigation-service/src/main/resources/application.yaml +++ b/investigation-service/src/main/resources/application.yaml @@ -34,7 +34,7 @@ spring: consumer: max-retry: 3 maxPollIntervalMs: 300000 - maxPollRecs: 200 + maxPollRecs: ${KAFKA_CONSUMER_MAX_POLL_RECS:200} enable-auto-commit: false admin: auto-create: true diff --git a/ldfdata-service/src/main/resources/application.yaml b/ldfdata-service/src/main/resources/application.yaml index 65b615b7d..19104d6ff 100644 --- a/ldfdata-service/src/main/resources/application.yaml +++ b/ldfdata-service/src/main/resources/application.yaml @@ -13,7 +13,7 @@ spring: consumer: max-retry: 3 maxPollIntervalMs: 30000 - maxPollRecs: 200 + maxPollRecs: ${KAFKA_CONSUMER_MAX_POLL_RECS:200} admin: auto-create: true application: diff --git a/observation-service/src/main/resources/application.yaml b/observation-service/src/main/resources/application.yaml index 11129dba6..4cc703d7c 100644 --- a/observation-service/src/main/resources/application.yaml +++ b/observation-service/src/main/resources/application.yaml @@ -21,7 +21,7 @@ spring: consumer: max-retry: 3 maxPollIntervalMs: 30000 - maxPollRecs: 200 + maxPollRecs: ${KAFKA_CONSUMER_MAX_POLL_RECS:200} admin: auto-create: true application: diff --git a/organization-service/src/main/resources/application.yaml b/organization-service/src/main/resources/application.yaml index 9e53166ee..f9273addd 100644 --- a/organization-service/src/main/resources/application.yaml +++ b/organization-service/src/main/resources/application.yaml @@ -24,7 +24,7 @@ spring: consumer: max-retry: 3 maxPollIntervalMs: 30000 - maxPollRecs: 200 + maxPollRecs: ${KAFKA_CONSUMER_MAX_POLL_RECS:200} key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer admin: diff --git a/person-service/src/main/resources/application.yaml b/person-service/src/main/resources/application.yaml index 496d9fafd..62fd57bd2 100644 --- a/person-service/src/main/resources/application.yaml +++ b/person-service/src/main/resources/application.yaml @@ -26,7 +26,7 @@ spring: consumer: max-retry: 3 maxPollIntervalMs: 30000 - maxPollRecs: 200 + maxPollRecs: ${KAFKA_CONSUMER_MAX_POLL_RECS:200} key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer admin: From 595f1eb83348bcaee2857c04b6456bfaaebb2b8a Mon Sep 17 00:00:00 2001 From: "anand.loganathan@cdwg.com" Date: Wed, 17 Sep 2025 11:51:46 -0700 Subject: [PATCH 3/7] Add the max.poll.records as a configurable parameter to the kafka consumer micro-services --- post-processing-service/src/main/resources/application.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/post-processing-service/src/main/resources/application.yaml b/post-processing-service/src/main/resources/application.yaml index cf92d794b..0016d7867 100644 --- a/post-processing-service/src/main/resources/application.yaml +++ b/post-processing-service/src/main/resources/application.yaml @@ -48,6 +48,7 @@ spring: consumer: max-retry: 3 maxPollIntervalMs: 30000 + maxPollRecs: ${KAFKA_CONSUMER_MAX_POLL_RECS:200} maxConcurrency: 5 admin: auto-create: true From de802e4f798502ebe2dac3a8055d8c4d13261e02 Mon Sep 17 00:00:00 2001 From: "anand.loganathan@cdwg.com" Date: Wed, 17 Sep 2025 11:59:50 -0700 Subject: [PATCH 4/7] Add the max.poll.records as a configurable parameter to the kafka consumer micro-services --- .../investigation/config/KafkaConsumerConfig.java | 2 +- .../cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java | 2 +- .../etldatapipeline/observation/config/KafkaConsumerConfig.java | 2 +- .../organization/config/KafkaConsumerConfig.java | 2 +- .../cdc/etldatapipeline/person/config/KafkaConsumerConfig.java | 2 +- .../postprocessingservice/config/KafkaConsumerConfig.java | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java index a25a2b6bc..56ad3c1db 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java @@ -30,7 +30,7 @@ public class KafkaConsumerConfig { private String maxPollInterval = ""; @Value("${spring.kafka.consumer.maxPollRecs}") - private String maxPollRecords = ""; + private String maxPollRecords; @Bean public ConsumerFactory consumerFactory() { diff --git a/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java b/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java index cdf79ebf0..cd2e785e7 100644 --- a/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java +++ b/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java @@ -30,7 +30,7 @@ public class KafkaConsumerConfig { private String maxPollInterval = ""; @Value("${spring.kafka.consumer.maxPollRecs}") - private String maxPollRecords = ""; + private String maxPollRecords; @Bean public ConsumerFactory consumerFactory() { diff --git a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java index 911a6a13c..87ff459e2 100644 --- a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java +++ b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java @@ -30,7 +30,7 @@ public class KafkaConsumerConfig { private String maxPollInterval = ""; @Value("${spring.kafka.consumer.maxPollRecs}") - private String maxPollRecords = ""; + private String maxPollRecords; @Bean public ConsumerFactory consumerFactory() { diff --git a/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java b/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java index 51b01e311..f2c5829a8 100644 --- a/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java +++ b/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java @@ -30,7 +30,7 @@ public class KafkaConsumerConfig { private String maxPollInterval = ""; @Value("${spring.kafka.consumer.maxPollRecs}") - private String maxPollRecords = ""; + private String maxPollRecords; @Bean public ConsumerFactory consumerFactory() { diff --git a/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java b/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java index 89e63b009..13b56d261 100644 --- a/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java +++ b/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java @@ -30,7 +30,7 @@ public class KafkaConsumerConfig { private String maxPollInterval = ""; @Value("${spring.kafka.consumer.maxPollRecs}") - private String maxPollRecords = ""; + private String maxPollRecords; @Bean public ConsumerFactory consumerFactory() { diff --git a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java index 53890ed39..f065f8f60 100644 --- a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java +++ b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java @@ -31,7 +31,7 @@ public class KafkaConsumerConfig { private String maxPollInterval = ""; @Value("${spring.kafka.consumer.maxPollRecs}") - private String maxPollRecords = ""; + private String maxPollRecords; @Bean public ConsumerFactory consumerFactory() { From 50dfc42e7938d80f912dce9fd5850031b893589f Mon Sep 17 00:00:00 2001 From: "anand.loganathan@cdwg.com" Date: Wed, 17 Sep 2025 12:02:49 -0700 Subject: [PATCH 5/7] update the KafkaConsumerConfig --- .../investigation/config/KafkaConsumerConfig.java | 6 +++--- .../investigation/config/KafkaProducerConfig.java | 2 +- .../ldfdata/config/KafkaConsumerConfig.java | 6 +++--- .../ldfdata/config/KafkaProducerConfig.java | 2 +- .../observation/config/KafkaConsumerConfig.java | 6 +++--- .../observation/config/KafkaProducerConfig.java | 2 +- .../organization/config/KafkaConsumerConfig.java | 6 +++--- .../organization/config/KafkaProducerConfig.java | 2 +- .../person/config/KafkaConsumerConfig.java | 6 +++--- .../person/config/KafkaProducerConfig.java | 2 +- .../postprocessingservice/config/KafkaConsumerConfig.java | 8 ++++---- .../postprocessingservice/config/KafkaProducerConfig.java | 2 +- 12 files changed, 25 insertions(+), 25 deletions(-) diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java index 56ad3c1db..30226ef71 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java @@ -19,15 +19,15 @@ @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.group-id}") - private String groupId = ""; + private String groupId; @Value("${spring.kafka.bootstrap-servers}") - private String bootstrapServers = ""; + private String bootstrapServers; // Higher value for more intensive operation, also increase latency // default is 300000, equivalent to 5 min @Value("${spring.kafka.consumer.maxPollIntervalMs}") - private String maxPollInterval = ""; + private String maxPollInterval; @Value("${spring.kafka.consumer.maxPollRecs}") private String maxPollRecords; diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaProducerConfig.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaProducerConfig.java index f4fe2091e..e841c9ef5 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaProducerConfig.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaProducerConfig.java @@ -15,7 +15,7 @@ @Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") - private String bootstrapServers = ""; + private String bootstrapServers; @Bean public ProducerFactory producerFactory() { diff --git a/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java b/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java index cd2e785e7..3bce9a1c0 100644 --- a/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java +++ b/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java @@ -19,15 +19,15 @@ @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.group-id}") - private String groupId = ""; + private String groupId; @Value("${spring.kafka.bootstrap-servers}") - private String bootstrapServers = ""; + private String bootstrapServers; // Higher value for more intensive operation, also increase latency // default is 30000, equivalent to 5 min @Value("${spring.kafka.consumer.maxPollIntervalMs}") - private String maxPollInterval = ""; + private String maxPollInterval; @Value("${spring.kafka.consumer.maxPollRecs}") private String maxPollRecords; diff --git a/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaProducerConfig.java b/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaProducerConfig.java index 33c08e3c0..b9836eed0 100644 --- a/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaProducerConfig.java +++ b/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaProducerConfig.java @@ -15,7 +15,7 @@ @Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") - private String bootstrapServers = ""; + private String bootstrapServers; @Bean public ProducerFactory producerFactory() { diff --git a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java index 87ff459e2..880d43983 100644 --- a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java +++ b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java @@ -19,15 +19,15 @@ @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.group-id}") - private String groupId = ""; + private String groupId; @Value("${spring.kafka.bootstrap-servers}") - private String bootstrapServers = ""; + private String bootstrapServers; // Higher value for more intensive operation, also increase latency // default is 30000, equivalent to 5 min @Value("${spring.kafka.consumer.maxPollIntervalMs}") - private String maxPollInterval = ""; + private String maxPollInterval; @Value("${spring.kafka.consumer.maxPollRecs}") private String maxPollRecords; diff --git a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaProducerConfig.java b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaProducerConfig.java index 89626d599..2511e41ca 100644 --- a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaProducerConfig.java +++ b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaProducerConfig.java @@ -15,7 +15,7 @@ @Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") - private String bootstrapServers = ""; + private String bootstrapServers; @Bean public ProducerFactory producerFactory() { diff --git a/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java b/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java index f2c5829a8..231bf5a20 100644 --- a/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java +++ b/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java @@ -19,15 +19,15 @@ @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.group-id}") - private String groupId = ""; + private String groupId; @Value("${spring.kafka.bootstrap-servers}") - private String bootstrapServers = ""; + private String bootstrapServers; // Higher value for more intensive operation, also increase latency // default is 30000, equivalent to 5 min @Value("${spring.kafka.consumer.maxPollIntervalMs}") - private String maxPollInterval = ""; + private String maxPollInterval; @Value("${spring.kafka.consumer.maxPollRecs}") private String maxPollRecords; diff --git a/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaProducerConfig.java b/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaProducerConfig.java index 9eb97481b..8e5d2110d 100644 --- a/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaProducerConfig.java +++ b/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaProducerConfig.java @@ -15,7 +15,7 @@ @Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") - private String bootstrapServers = ""; + private String bootstrapServers; @Bean public ProducerFactory producerFactory() { diff --git a/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java b/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java index 13b56d261..c69d3cd61 100644 --- a/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java +++ b/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java @@ -19,15 +19,15 @@ @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.group-id}") - private String groupId = ""; + private String groupId; @Value("${spring.kafka.bootstrap-servers}") - private String bootstrapServers = ""; + private String bootstrapServers; // Higher value for more intensive operation, also increase latency // default is 30000, equivalent to 5 min @Value("${spring.kafka.consumer.maxPollIntervalMs}") - private String maxPollInterval = ""; + private String maxPollInterval; @Value("${spring.kafka.consumer.maxPollRecs}") private String maxPollRecords; diff --git a/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaProducerConfig.java b/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaProducerConfig.java index 80c09d7b2..56e55aca4 100644 --- a/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaProducerConfig.java +++ b/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaProducerConfig.java @@ -15,7 +15,7 @@ @Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") - private String bootstrapServers = ""; + private String bootstrapServers; @Bean public ProducerFactory producerFactory() { diff --git a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java index f065f8f60..28f6844e4 100644 --- a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java +++ b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java @@ -19,16 +19,16 @@ @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.group-id}") - private String groupId = ""; + private String groupId; @Value("${spring.kafka.dlt-group-id}") - private String groupIdDlt = ""; + private String groupIdDlt; @Value("${spring.kafka.bootstrap-servers}") - private String bootstrapServers = ""; + private String bootstrapServers; @Value("${spring.kafka.consumer.maxPollIntervalMs}") - private String maxPollInterval = ""; + private String maxPollInterval; @Value("${spring.kafka.consumer.maxPollRecs}") private String maxPollRecords; diff --git a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaProducerConfig.java b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaProducerConfig.java index f73cf7a5f..6255f0e1b 100644 --- a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaProducerConfig.java +++ b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaProducerConfig.java @@ -15,7 +15,7 @@ @Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") - private String bootstrapServers = ""; + private String bootstrapServers; @Bean public ProducerFactory producerFactory() { From 63c107ee7f63bb4a989d63759fdea299b26cea08 Mon Sep 17 00:00:00 2001 From: "anand.loganathan@cdwg.com" Date: Wed, 17 Sep 2025 12:13:37 -0700 Subject: [PATCH 6/7] update the KafkaConsumerConfig --- .../investigation/config/KafkaConsumerConfig.java | 3 --- .../etldatapipeline/ldfdata/config/KafkaConsumerConfig.java | 3 --- .../observation/config/KafkaConsumerConfig.java | 3 --- .../organization/config/KafkaConsumerConfig.java | 3 --- .../cdc/etldatapipeline/person/config/KafkaConsumerConfig.java | 3 --- .../postprocessingservice/config/KafkaConsumerConfig.java | 3 --- 6 files changed, 18 deletions(-) diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java index f535c76a5..30226ef71 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java @@ -32,9 +32,6 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollRecs}") private String maxPollRecords; - @Value("${spring.kafka.consumer.maxPollRecs}") - private String maxPollRecords = ""; - @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); diff --git a/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java b/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java index 0de494ec5..3bce9a1c0 100644 --- a/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java +++ b/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java @@ -32,9 +32,6 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollRecs}") private String maxPollRecords; - @Value("${spring.kafka.consumer.maxPollRecs}") - private String maxPollRecords = ""; - @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); diff --git a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java index f987a6c0a..880d43983 100644 --- a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java +++ b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java @@ -32,9 +32,6 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollRecs}") private String maxPollRecords; - @Value("${spring.kafka.consumer.maxPollRecs}") - private String maxPollRecords = ""; - @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); diff --git a/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java b/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java index 08b94c749..231bf5a20 100644 --- a/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java +++ b/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java @@ -32,9 +32,6 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollRecs}") private String maxPollRecords; - @Value("${spring.kafka.consumer.maxPollRecs}") - private String maxPollRecords = ""; - @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); diff --git a/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java b/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java index 06714334d..c69d3cd61 100644 --- a/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java +++ b/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java @@ -32,9 +32,6 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollRecs}") private String maxPollRecords; - @Value("${spring.kafka.consumer.maxPollRecs}") - private String maxPollRecords = ""; - @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); diff --git a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java index 556c55990..28f6844e4 100644 --- a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java +++ b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java @@ -33,9 +33,6 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollRecs}") private String maxPollRecords; - @Value("${spring.kafka.consumer.maxPollRecs}") - private String maxPollRecords = ""; - @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); From 60c841c2fdbf3729e1778c619e54016313875077 Mon Sep 17 00:00:00 2001 From: "anand.loganathan@cdwg.com" Date: Thu, 18 Sep 2025 09:57:18 -0700 Subject: [PATCH 7/7] update the KafkaConsumerConfig --- .../investigation/config/KafkaConsumerConfig.java | 3 +++ .../etldatapipeline/ldfdata/config/KafkaConsumerConfig.java | 3 +++ .../observation/config/KafkaConsumerConfig.java | 3 +++ .../organization/config/KafkaConsumerConfig.java | 3 +++ .../cdc/etldatapipeline/person/config/KafkaConsumerConfig.java | 3 +++ .../postprocessingservice/config/KafkaConsumerConfig.java | 3 +++ 6 files changed, 18 insertions(+) diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java index b7915b6ad..a25a2b6bc 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/config/KafkaConsumerConfig.java @@ -29,6 +29,9 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollIntervalMs}") private String maxPollInterval = ""; + @Value("${spring.kafka.consumer.maxPollRecs}") + private String maxPollRecords = ""; + @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); diff --git a/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java b/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java index 069aa0d12..cdf79ebf0 100644 --- a/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java +++ b/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/config/KafkaConsumerConfig.java @@ -29,6 +29,9 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollIntervalMs}") private String maxPollInterval = ""; + @Value("${spring.kafka.consumer.maxPollRecs}") + private String maxPollRecords = ""; + @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); diff --git a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java index caff4dd0d..911a6a13c 100644 --- a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java +++ b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/config/KafkaConsumerConfig.java @@ -29,6 +29,9 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollIntervalMs}") private String maxPollInterval = ""; + @Value("${spring.kafka.consumer.maxPollRecs}") + private String maxPollRecords = ""; + @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); diff --git a/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java b/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java index 4ab61a2fc..51b01e311 100644 --- a/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java +++ b/organization-service/src/main/java/gov/cdc/etldatapipeline/organization/config/KafkaConsumerConfig.java @@ -29,6 +29,9 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollIntervalMs}") private String maxPollInterval = ""; + @Value("${spring.kafka.consumer.maxPollRecs}") + private String maxPollRecords = ""; + @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); diff --git a/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java b/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java index 7c5f7955e..89e63b009 100644 --- a/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java +++ b/person-service/src/main/java/gov/cdc/etldatapipeline/person/config/KafkaConsumerConfig.java @@ -29,6 +29,9 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollIntervalMs}") private String maxPollInterval = ""; + @Value("${spring.kafka.consumer.maxPollRecs}") + private String maxPollRecords = ""; + @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); diff --git a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java index 2d54d1118..53890ed39 100644 --- a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java +++ b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/config/KafkaConsumerConfig.java @@ -30,6 +30,9 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollIntervalMs}") private String maxPollInterval = ""; + @Value("${spring.kafka.consumer.maxPollRecs}") + private String maxPollRecords = ""; + @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>();