From 2bb3b41d1229158a5c79e5d26aca16ff97d7089d Mon Sep 17 00:00:00 2001 From: Michael Peels Date: Fri, 6 Mar 2026 13:13:42 -0500 Subject: [PATCH 01/11] WIP - Working container start up --- docker-compose.yaml | 103 ++++++++++++------ .../integration/IntegrationTest.java | 54 +++++++++ 2 files changed, 124 insertions(+), 33 deletions(-) create mode 100644 post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java diff --git a/docker-compose.yaml b/docker-compose.yaml index 83a6f6cc4..1f5beff80 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,8 +1,7 @@ services: - mssql: + nbs-mssql: image: ghcr.io/cdcent/nedssdb:latest platform: linux/amd64 - container_name: nbs-mssql environment: - DATABASE_VERSION=6.0.18.1 - DEPLOY_ADMIN_PASSWORD=${DATABASE_PASSWORD:-PizzaIsGood33!} @@ -13,12 +12,23 @@ services: ports: - 3433:1433 + liquibase: + build: + dockerfile: ./liquibase-service/Dockerfile.local + environment: + - DB_USERNAME=db_deploy_admin + - DB_PASSWORD=${LIQUIBASE_USER_PASSWORD:-db_deploy_admin} + - DB_HOST=nbs-mssql + depends_on: + nbs-mssql: + condition: service_healthy + wildfly: image: ghcr.io/cdcent/nedssdev:6.0.18.1 platform: linux/amd64 - container_name: rtr-wildfly depends_on: - - mssql + nbs-mssql: + condition: service_healthy ports: - "9991:9990" - "7003:7001" @@ -26,20 +36,22 @@ services: zookeeper: image: confluentinc/cp-zookeeper:7.3.0 - hostname: rtr-zookeeper - container_name: rtr-zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 + healthcheck: + test: nc -z localhost 2181 || exit -1 + interval: 10s + timeout: 5s + retries: 3 kafka: image: confluentinc/cp-kafka:7.3.0 - hostname: kafka - container_name: kafka depends_on: - - zookeeper + zookeeper: + condition: service_healthy ports: - "9092:9092" environment: @@ -53,14 +65,20 @@ services: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" + healthcheck: + test: kafka-topics --bootstrap-server kafka:29092 --list + interval: 30s + timeout: 10s + retries: 3 kafka-connect: - container_name: kafka-connect build: dockerfile: ./containers/kafka-connect/Dockerfile depends_on: - - kafka - - mssql + kafka: + condition: service_healthy + nbs-mssql: + condition: service_healthy ports: - 8083:8083 environment: @@ -78,11 +96,11 @@ services: debezium: image: debezium/connect:2.4 - hostname: debezium - container_name: rtr-debezium depends_on: - - kafka - - zookeeper + kafka: + condition: service_healthy + zookeeper: + condition: service_healthy ports: - "8085:8083" environment: @@ -103,17 +121,6 @@ services: OFFSET_STORAGE_REPLICATION_FACTOR: 1 STATUS_STORAGE_REPLICATION_FACTOR: 1 - liquibase: - build: - dockerfile: ./liquibase-service/Dockerfile.local - environment: - - DB_USERNAME=db_deploy_admin - - DB_PASSWORD=${LIQUIBASE_USER_PASSWORD:-db_deploy_admin} - - DB_HOST=nbs-mssql - depends_on: - mssql: - condition: service_healthy - investigation-service: build: dockerfile: ./investigation-service/Dockerfile @@ -123,7 +130,12 @@ services: - DB_HOST=jdbc:sqlserver://nbs-mssql:1433;databaseName=RDB_MODERN;encrypt=true;trustServerCertificate=true; - KAFKA_BOOTSTRAP_SERVER=kafka:29092 depends_on: - - kafka + kafka: + condition: service_healthy + nbs-mssql: + condition: service_healthy + liquibase: + condition: service_completed_successfully ldfdata-service: build: @@ -134,7 +146,12 @@ services: - DB_HOST=jdbc:sqlserver://nbs-mssql:1433;databaseName=RDB_MODERN;encrypt=true;trustServerCertificate=true; - KAFKA_BOOTSTRAP_SERVER=kafka:29092 depends_on: - - kafka + kafka: + condition: service_healthy + nbs-mssql: + condition: service_healthy + liquibase: + condition: service_completed_successfully observation-service: build: @@ -145,7 +162,12 @@ services: - DB_HOST=jdbc:sqlserver://nbs-mssql:1433;databaseName=RDB_MODERN;encrypt=true;trustServerCertificate=true; - KAFKA_BOOTSTRAP_SERVER=kafka:29092 depends_on: - - kafka + kafka: + condition: service_healthy + nbs-mssql: + condition: service_healthy + liquibase: + condition: service_completed_successfully organization-service: build: @@ -156,7 +178,12 @@ services: - DB_HOST=jdbc:sqlserver://nbs-mssql:1433;databaseName=RDB_MODERN;encrypt=true;trustServerCertificate=true; - KAFKA_BOOTSTRAP_SERVER=kafka:29092 depends_on: - - kafka + kafka: + condition: service_healthy + nbs-mssql: + condition: service_healthy + liquibase: + condition: service_completed_successfully person-service: build: @@ -167,7 +194,12 @@ services: - DB_HOST=jdbc:sqlserver://nbs-mssql:1433;databaseName=RDB_MODERN;encrypt=true;trustServerCertificate=true; - KAFKA_BOOTSTRAP_SERVER=kafka:29092 depends_on: - - kafka + kafka: + condition: service_healthy + nbs-mssql: + condition: service_healthy + liquibase: + condition: service_completed_successfully post-processing-service: build: @@ -178,4 +210,9 @@ services: - DB_HOST=jdbc:sqlserver://nbs-mssql:1433;databaseName=RDB_MODERN;encrypt=true;trustServerCertificate=true; - KAFKA_BOOTSTRAP_SERVER=kafka:29092 depends_on: - - kafka + kafka: + condition: service_healthy + nbs-mssql: + condition: service_healthy + liquibase: + condition: service_completed_successfully diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java new file mode 100644 index 000000000..009f79aa6 --- /dev/null +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java @@ -0,0 +1,54 @@ +package gov.cdc.etldatapipeline.postprocessingservice.integration; + +import java.io.File; +import java.time.Duration; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.ComposeContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +class IntegrationTest { + + @SuppressWarnings("resource") + private static final ComposeContainer environment = new ComposeContainer( + DockerImageName.parse("docker:25.0.5"), + new File("../docker-compose.yaml")) + .withServices( + "nbs-mssql", + "liquibase", + "zookeeper", + "kafka", + "debezium", + "kafka-connect", + "person-service", + "post-processing-service") + .waitingFor("liquibase", + Wait.forLogMessage("Migrations complete.*", 1).withStartupTimeout(Duration.ofMinutes(3))) + .withStartupTimeout(Duration.ofMinutes(10)); + + @BeforeAll + static void setUp() { + // Start up necessary containers + environment.start(); + } + + @AfterAll + static void tearDown() { + // Stop all containers + environment.stop(); + } + + @Test + void patientDataIsSuccessfullyProcessed() throws InterruptedException { + System.out.println("Starting test..."); + Thread.sleep(Duration.ofSeconds(10)); // Testing that container comes up + System.out.println("Test complete..."); + // Insert a patient into NBS_ODSE + + // Validate patient data arrives in D_PATIENT + } + +} From f5a08856fe1a1e48d797c289ae16df0e7827be2d Mon Sep 17 00:00:00 2001 From: Michael Peels Date: Mon, 9 Mar 2026 17:10:59 -0400 Subject: [PATCH 02/11] Add separate test datasource for NBS_ODSE user. Successful patient creation --- .../integration/IntegrationTest.java | 30 ++++- .../integration/config/DataSourceConfig.java | 57 ++++++++ .../integration/id/IdGenerator.java | 125 ++++++++++++++++++ .../integration/patient/PatientCreator.java | 82 ++++++++++++ .../src/test/resources/application-test.yaml | 14 ++ 5 files changed, 302 insertions(+), 6 deletions(-) create mode 100644 post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/config/DataSourceConfig.java create mode 100644 post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/id/IdGenerator.java create mode 100644 post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/patient/PatientCreator.java create mode 100644 post-processing-service/src/test/resources/application-test.yaml diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java index 009f79aa6..eee46d8b9 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java @@ -1,21 +1,34 @@ package gov.cdc.etldatapipeline.postprocessingservice.integration; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.File; import java.time.Duration; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; import org.testcontainers.containers.ComposeContainer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerImageName; +import gov.cdc.etldatapipeline.postprocessingservice.integration.patient.PatientCreator; + +@SpringBootTest +@ActiveProfiles("test") class IntegrationTest { + @Autowired + private PatientCreator patientCreator; + @SuppressWarnings("resource") private static final ComposeContainer environment = new ComposeContainer( DockerImageName.parse("docker:25.0.5"), new File("../docker-compose.yaml")) + // List specific services to prevent launching wildfly container .withServices( "nbs-mssql", "liquibase", @@ -23,16 +36,21 @@ class IntegrationTest { "kafka", "debezium", "kafka-connect", - "person-service", - "post-processing-service") + "person-service") + // Add liquibase specific log check and increase default timeout .waitingFor("liquibase", Wait.forLogMessage("Migrations complete.*", 1).withStartupTimeout(Duration.ofMinutes(3))) + // Set a global startup timeout for ComposeContainer .withStartupTimeout(Duration.ofMinutes(10)); @BeforeAll static void setUp() { // Start up necessary containers environment.start(); + + // TODO Initialize debezium connectors + + // TODO Initialize kafka-sync connector } @AfterAll @@ -42,13 +60,13 @@ static void tearDown() { } @Test - void patientDataIsSuccessfullyProcessed() throws InterruptedException { - System.out.println("Starting test..."); - Thread.sleep(Duration.ofSeconds(10)); // Testing that container comes up - System.out.println("Test complete..."); + void patientDataIsSuccessfullyProcessed() { // Insert a patient into NBS_ODSE + long createdPatient = patientCreator.create(); + assertThat(createdPatient).isNotZero(); // Validate patient data arrives in D_PATIENT + } } diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/config/DataSourceConfig.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/config/DataSourceConfig.java new file mode 100644 index 000000000..543343b30 --- /dev/null +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/config/DataSourceConfig.java @@ -0,0 +1,57 @@ +package gov.cdc.etldatapipeline.postprocessingservice.integration.config; + +import javax.sql.DataSource; + +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.context.annotation.Profile; +import org.springframework.jdbc.core.simple.JdbcClient; + +@Configuration +@Profile("test") +public class DataSourceConfig { + // Standard RTR datasource with appropriate permissions + @Bean + @Primary + @ConfigurationProperties("spring.datasource.primary") + public DataSourceProperties dataSourceProperties() { + return new DataSourceProperties(); + } + + @Bean + @Primary + public DataSource dataSource(DataSourceProperties properties) { + return properties + .initializeDataSourceBuilder() + .build(); + } + + @Bean + @Primary + public JdbcClient jdbcClient(DataSource dataSource) { + return JdbcClient.create(dataSource); + } + + // Testing specific datasource with db ownership + @Bean("testProperties") + @ConfigurationProperties("spring.datasource.test") + public DataSourceProperties testDataSourceProperties() { + return new DataSourceProperties(); + } + + @Bean("testDataSource") + public DataSource testDataSource(@Qualifier("testProperties") DataSourceProperties properties) { + return properties + .initializeDataSourceBuilder() + .build(); + } + + @Bean("testClient") + public JdbcClient testJdbcClient(@Qualifier("testDataSource") DataSource dataSource) { + return JdbcClient.create(dataSource); + } +} diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/id/IdGenerator.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/id/IdGenerator.java new file mode 100644 index 000000000..7aadb06ca --- /dev/null +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/id/IdGenerator.java @@ -0,0 +1,125 @@ +package gov.cdc.etldatapipeline.postprocessingservice.integration.id; + +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.jdbc.core.simple.JdbcClient; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +/** + * Responsible for managing database IDs in the Local_UID_generator table + */ +@Service +public class IdGenerator { + + private JdbcClient client; + + public IdGenerator(@Qualifier("testClient") final JdbcClient client) { + this.client = client; + } + + private static final String SELECT = """ + SELECT TOP 1 + UID_prefix_cd, + seed_value_nbr, + UID_suffix_cd + FROM + NBS_ODSE.dbo.local_uid_generator + WHERE + class_name_cd = :type + OR type_cd = :type + + """; + + private static final String INCREMENT = """ + UPDATE + NBS_ODSE.dbo.local_uid_generator + SET + seed_value_nbr = seed_value_nbr + 1 + WHERE + class_name_cd = :type + OR type_cd = :type + """; + + /** + * + * Gets the next valid Id for the provided Type and increments the value. Will + * throw an exception if the provided type is not found. + * + * @param type {@link EntityType} + * @return {@link GeneratedId} + */ + @Transactional + public GeneratedId next(EntityType type) { + // Retrieve next valid Id + GeneratedId identifier = client.sql(SELECT) + .param("type", type.toString()) + .query((rs, rn) -> new GeneratedId( + rs.getString("UID_prefix_cd"), + rs.getLong("seed_value_nbr"), + rs.getString("UID_suffix_cd"))) + .single(); + + // Increment table + client.sql(INCREMENT) + .param("type", type.toString()) + .update(); + + return identifier; + } + + public record GeneratedId(String prefix, Long id, String suffix) { + public String toLocalId() { + return prefix + id.toString() + suffix; + } + } + + /** + * Matches the class_name_cd column of the Local_UID_generator table, other than + * the NBS entry. Which references the + * type_cd column as the class_name_cd for type NBS is dynamic based on the + * jurisdiction + */ + public enum EntityType { + NBS, + CLINICAL_DOCUMENT, + COINFECTION_GROUP, + CS_REPORT, + CT_CONTACT, + DEDUPLICATION_LOG, + EPILINK, + GEOCODING, + GEOCODING_LOG, + GROUP, + INTERVENTION, + INTERVIEW, + MATERIAL, + NBS_DOCUMENT, + NBS_QUESTION_ID_LDF, + NBS_QUESTION_LDF, + NBS_UIMETEDATA_LDF, + NND_METADATA, + NON_LIVING_SUBJECT, + NOTIFICATION, + OBSERVATION, + ORGANIZATION, + PAGE, + PATIENT_ENCOUNTER, + PERSON, + PERSON_GROUP, + PLACE, + PUBLIC_HEALTH_CASE, + RDB_METADATA, + REFERRAL, + REPORT, + REPORTDATASOURCE, + REPORTDATASOURCECOLUMN, + REPORTDISPLAYCOLUMN, + REPORTFILTER, + REPORTFILTERCODE, + REPORTFILTERVALUE, + SECURITY_LOG, + TREATMENT, + WORKUP + } + +} diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/patient/PatientCreator.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/patient/PatientCreator.java new file mode 100644 index 000000000..44559220d --- /dev/null +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/patient/PatientCreator.java @@ -0,0 +1,82 @@ +package gov.cdc.etldatapipeline.postprocessingservice.integration.patient; + +import java.time.LocalDateTime; + +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.jdbc.core.simple.JdbcClient; +import org.springframework.stereotype.Component; + +import gov.cdc.etldatapipeline.postprocessingservice.integration.id.IdGenerator; +import gov.cdc.etldatapipeline.postprocessingservice.integration.id.IdGenerator.EntityType; +import gov.cdc.etldatapipeline.postprocessingservice.integration.id.IdGenerator.GeneratedId; + +/** + * Responsible for creating and inserting patient data into the NBS_ODSE for + * integration testing + */ +@Component +public class PatientCreator { + + private IdGenerator idGenerator; + private JdbcClient client; + + public PatientCreator( + final IdGenerator idGenerator, + @Qualifier("testClient") final JdbcClient client) { + this.idGenerator = idGenerator; + this.client = client; + } + + private static final String CREATE_QUERY = """ + insert into NBS_ODSE.dbo.Entity(entity_uid, class_cd) values (:id, 'PSN'); + + insert into NBS_ODSE.dbo.Person( + person_uid, + person_parent_uid, + local_id, + version_ctrl_nbr, + cd, + electronic_ind, + edx_ind, + add_time, + add_user_id, + last_chg_time, + last_chg_user_id, + record_status_cd, + record_status_time, + status_cd, + status_time + ) values ( + :id, + :id, + :local, + 1, + 'PAT', + 'N', + 'Y', + :addedOn, + :addedBy, + :addedOn, + :addedBy, + 'ACTIVE', + :addedOn, + 'A', + :addedOn + ); + """; + + public long create() { + + GeneratedId identifier = idGenerator.next(EntityType.PERSON); + + this.client.sql(CREATE_QUERY) + .param("id", identifier.id()) + .param("local", identifier.toLocalId()) + .param("addedOn", LocalDateTime.now()) + .param("addedBy", "9999") + .update(); + + return identifier.id(); + } + +} diff --git a/post-processing-service/src/test/resources/application-test.yaml b/post-processing-service/src/test/resources/application-test.yaml new file mode 100644 index 000000000..9fd35c89a --- /dev/null +++ b/post-processing-service/src/test/resources/application-test.yaml @@ -0,0 +1,14 @@ +spring: + datasource: + primary: + username: post_processing_service_rdb + password: post_processing_service + url: jdbc:sqlserver://localhost:3433;databaseName=RDB_MODERN;encrypt=true;trustServerCertificate=true; + test: + username: sa + password: PizzaIsGood33! + url: jdbc:sqlserver://localhost:3433;databaseName=NBS_ODSE;encrypt=true;trustServerCertificate=true; + +logging: + level: + org.springframework.jdbc.core: TRACE From 7f813c31dca5c5020f72021e11404b9eba4f850a Mon Sep 17 00:00:00 2001 From: Michael Peels Date: Tue, 10 Mar 2026 16:22:41 -0400 Subject: [PATCH 03/11] Add initialization for debezium and kafka-sync. Finish test --- containers/debezium/README.md | 9 ---- containers/debezium/initialize/healthcheck.sh | 36 ++++++++++++++++ .../{ => initialize}/odse_connector.json | 0 .../{ => initialize}/odse_meta_connector.json | 0 .../{ => initialize}/srte_connector.json | 0 .../kafka-connect/initialize/healthcheck.sh | 19 +++++++++ .../{ => initialize}/mssql-connector.json | 0 docker-compose.yaml | 37 +++++++++++++--- .../integration/IntegrationTest.java | 22 ++++++---- .../integration/rdb/DPatientFinder.java | 35 ++++++++++++++++ .../integration/util/Await.java | 42 +++++++++++++++++++ 11 files changed, 179 insertions(+), 21 deletions(-) delete mode 100644 containers/debezium/README.md create mode 100755 containers/debezium/initialize/healthcheck.sh rename containers/debezium/{ => initialize}/odse_connector.json (100%) rename containers/debezium/{ => initialize}/odse_meta_connector.json (100%) rename containers/debezium/{ => initialize}/srte_connector.json (100%) create mode 100755 containers/kafka-connect/initialize/healthcheck.sh rename containers/kafka-connect/{ => initialize}/mssql-connector.json (100%) create mode 100644 post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/rdb/DPatientFinder.java create mode 100644 post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/util/Await.java diff --git a/containers/debezium/README.md b/containers/debezium/README.md deleted file mode 100644 index 4e8a64d9c..000000000 --- a/containers/debezium/README.md +++ /dev/null @@ -1,9 +0,0 @@ -## Initialization - -To initialize the connectors, a POST request must be sent to the debezium container for each conector. - -```sh -curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8085/connectors/ -d @containers/debezium/odse_connector.json -curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8085/connectors/ -d @containers/debezium/odse_meta_connector.json -curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8085/connectors/ -d @containers/debezium/srte_connector.json -``` diff --git a/containers/debezium/initialize/healthcheck.sh b/containers/debezium/initialize/healthcheck.sh new file mode 100755 index 000000000..7c12f98b9 --- /dev/null +++ b/containers/debezium/initialize/healthcheck.sh @@ -0,0 +1,36 @@ +#!/bin/bash +set -e + +# Get the list of all connectors from Debezium REST API +connectors=$(curl -s --fail "http://127.0.0.1:8083/connectors") + +# Clean the array string and load into array +cleaned_connector_names=$(sed 's/[][]//g; s/"//g' <<< "$connectors") +IFS=',' read -r -a my_array <<< "$cleaned_connector_names" + +# Check if ODSE connector is available +if [[ " ${my_array[@]} " =~ " odse-connector " ]]; then + echo "odse-connector is enabled"; +else + echo "odse-connector not found. Initializing..."; + curl -s --fail -X POST --header "Accept:application/json" --header "Content-Type:application/json" --data "@/kafka/healthcheck/odse_connector.json" http://localhost:8083/connectors/ +fi + +# Check if ODSE meta connector is available +if [[ " ${my_array[@]} " =~ " odse-meta-connector " ]]; then + echo "odse-meta-connector is enabled"; +else + echo "odse-meta-connector not found. Initializing..."; + curl -s --fail -X POST --header "Accept:application/json" --header "Content-Type:application/json" --data "@/kafka/healthcheck/odse_meta_connector.json" http://localhost:8083/connectors/ +fi + +# Check if SRTE connector is available +if [[ " ${my_array[@]} " =~ " srte-connector " ]]; then + echo "srte-connector is enabled"; +else + echo "srte-connector not found. Initializing..."; + curl -s --fail -X POST --header "Accept:application/json" --header "Content-Type:application/json" --data "@/kafka/healthcheck/srte_connector.json" http://localhost:8083/connectors/ +fi + + +exit 0 \ No newline at end of file diff --git a/containers/debezium/odse_connector.json b/containers/debezium/initialize/odse_connector.json similarity index 100% rename from containers/debezium/odse_connector.json rename to containers/debezium/initialize/odse_connector.json diff --git a/containers/debezium/odse_meta_connector.json b/containers/debezium/initialize/odse_meta_connector.json similarity index 100% rename from containers/debezium/odse_meta_connector.json rename to containers/debezium/initialize/odse_meta_connector.json diff --git a/containers/debezium/srte_connector.json b/containers/debezium/initialize/srte_connector.json similarity index 100% rename from containers/debezium/srte_connector.json rename to containers/debezium/initialize/srte_connector.json diff --git a/containers/kafka-connect/initialize/healthcheck.sh b/containers/kafka-connect/initialize/healthcheck.sh new file mode 100755 index 000000000..bc3e1e83f --- /dev/null +++ b/containers/kafka-connect/initialize/healthcheck.sh @@ -0,0 +1,19 @@ +#!/bin/bash +set -e + +# Get the list of all connectors from Debezium REST API +connectors=$(curl -s --fail "http://127.0.0.1:8083/connectors") + +# Clean the array string and load into array +cleaned_connector_names=$(sed 's/[][]//g; s/"//g' <<< "$connectors") +IFS=',' read -r -a my_array <<< "$cleaned_connector_names" + +# Check if MSQL connector is available +if [[ " ${my_array[@]} " =~ " Kafka-Connect-SqlServer-Sink " ]]; then + echo "Kafka-Connect-SqlServer-Sink is enabled"; +else + echo "Kafka-Connect-SqlServer-Sink not found. Initializing..."; + curl -s --fail -X POST --header "Accept:application/json" --header "Content-Type:application/json" --data "@/kafka/healthcheck/mssql-connector.json" http://localhost:8083/connectors/ +fi + +exit 0 \ No newline at end of file diff --git a/containers/kafka-connect/mssql-connector.json b/containers/kafka-connect/initialize/mssql-connector.json similarity index 100% rename from containers/kafka-connect/mssql-connector.json rename to containers/kafka-connect/initialize/mssql-connector.json diff --git a/docker-compose.yaml b/docker-compose.yaml index 1f5beff80..8a9a5b054 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -25,7 +25,6 @@ services: wildfly: image: ghcr.io/cdcent/nedssdev:6.0.18.1 - platform: linux/amd64 depends_on: nbs-mssql: condition: service_healthy @@ -41,11 +40,14 @@ services: environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 + KAFKA_OPTS: -Dzookeeper.4lw.commands.whitelist=ruok healthcheck: - test: nc -z localhost 2181 || exit -1 + test: echo "ruok" | nc localhost 2181 | grep imok interval: 10s timeout: 5s retries: 3 + start_period: 10s + start_interval: 10s kafka: image: confluentinc/cp-kafka:7.3.0 @@ -67,9 +69,11 @@ services: KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" healthcheck: test: kafka-topics --bootstrap-server kafka:29092 --list - interval: 30s + interval: 15s timeout: 10s - retries: 3 + retries: 5 + start_period: 20s + start_interval: 10s kafka-connect: build: @@ -79,8 +83,12 @@ services: condition: service_healthy nbs-mssql: condition: service_healthy + liquibase: + condition: service_completed_successfully ports: - 8083:8083 + volumes: + - ./containers/kafka-connect/initialize/:/kafka/healthcheck/ environment: CONNECT_BOOTSTRAP_SERVERS: "kafka:29092" CONNECT_GROUP_ID: "cp-kafka-connect.groupId" @@ -93,6 +101,13 @@ services: CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" + healthcheck: + test: ["CMD", "/kafka/healthcheck/healthcheck.sh"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 20s + start_interval: 10s debezium: image: debezium/connect:2.4 @@ -101,8 +116,14 @@ services: condition: service_healthy zookeeper: condition: service_healthy + nbs-mssql: + condition: service_healthy + liquibase: + condition: service_completed_successfully ports: - "8085:8083" + volumes: + - ./containers/debezium/initialize/:/kafka/healthcheck/ environment: BOOTSTRAP_SERVERS: kafka:29092 GROUP_ID: connect-cluster @@ -113,13 +134,19 @@ services: OFFSET_STORAGE_FILE_FILENAME: /tmp/connect.offsets OFFSET_FLUSH_INTERVAL_MS: 10000 PLUGIN_PATH: /kafka/connect - CONFIG_STORAGE_TOPIC: connect-configs OFFSET_STORAGE_TOPIC: connect-offsets STATUS_STORAGE_TOPIC: connect-status CONFIG_STORAGE_REPLICATION_FACTOR: 1 OFFSET_STORAGE_REPLICATION_FACTOR: 1 STATUS_STORAGE_REPLICATION_FACTOR: 1 + healthcheck: + test: ["CMD", "/kafka/healthcheck/healthcheck.sh"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 20s + start_interval: 10s investigation-service: build: diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java index eee46d8b9..501a80926 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java @@ -4,6 +4,7 @@ import java.io.File; import java.time.Duration; +import java.util.Optional; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -16,6 +17,8 @@ import org.testcontainers.utility.DockerImageName; import gov.cdc.etldatapipeline.postprocessingservice.integration.patient.PatientCreator; +import gov.cdc.etldatapipeline.postprocessingservice.integration.rdb.DPatientFinder; +import gov.cdc.etldatapipeline.postprocessingservice.integration.util.Await; @SpringBootTest @ActiveProfiles("test") @@ -24,6 +27,9 @@ class IntegrationTest { @Autowired private PatientCreator patientCreator; + @Autowired + private DPatientFinder dPatientFinder; + @SuppressWarnings("resource") private static final ComposeContainer environment = new ComposeContainer( DockerImageName.parse("docker:25.0.5"), @@ -34,12 +40,15 @@ class IntegrationTest { "liquibase", "zookeeper", "kafka", + "person-service", "debezium", - "kafka-connect", - "person-service") + "kafka-connect") // Add liquibase specific log check and increase default timeout .waitingFor("liquibase", Wait.forLogMessage("Migrations complete.*", 1).withStartupTimeout(Duration.ofMinutes(3))) + // Add debezium specific wait to ensure connector is ready before test execution + .waitingFor("debezium", + Wait.forLogMessage("Finished creating connector.*", 3).withStartupTimeout(Duration.ofMinutes(3))) // Set a global startup timeout for ComposeContainer .withStartupTimeout(Duration.ofMinutes(10)); @@ -47,10 +56,6 @@ class IntegrationTest { static void setUp() { // Start up necessary containers environment.start(); - - // TODO Initialize debezium connectors - - // TODO Initialize kafka-sync connector } @AfterAll @@ -65,8 +70,11 @@ void patientDataIsSuccessfullyProcessed() { long createdPatient = patientCreator.create(); assertThat(createdPatient).isNotZero(); - // Validate patient data arrives in D_PATIENT + // Validate patient data arrives in D_PATIENT with retry + Optional dPatientKey = Await.waitFor(dPatientFinder::findDPatientKeyWithRetry, createdPatient); + assertThat(dPatientKey).isPresent(); + assertThat(dPatientKey.get()).isNotZero(); } } diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/rdb/DPatientFinder.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/rdb/DPatientFinder.java new file mode 100644 index 000000000..ac1f942a4 --- /dev/null +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/rdb/DPatientFinder.java @@ -0,0 +1,35 @@ +package gov.cdc.etldatapipeline.postprocessingservice.integration.rdb; + +import java.util.Optional; + +import org.springframework.context.annotation.Profile; +import org.springframework.jdbc.core.simple.JdbcClient; +import org.springframework.stereotype.Component; + +@Component +@Profile("test") +public class DPatientFinder { + + private final JdbcClient client; + + public DPatientFinder(JdbcClient client) { + this.client = client; + } + + private static final String SELECT_KEY_BY_ID = """ + SELECT TOP 1 + patient_key + FROM + D_PATIENT + WHERE + patient_mpr_uid = :id + """; + + public Optional findDPatientKeyWithRetry(long id) { + return client.sql(SELECT_KEY_BY_ID) + .param("id", id) + .query(Long.class) + .optional(); + } + +} diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/util/Await.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/util/Await.java new file mode 100644 index 000000000..fc689a07d --- /dev/null +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/util/Await.java @@ -0,0 +1,42 @@ +package gov.cdc.etldatapipeline.postprocessingservice.integration.util; + +import java.time.Duration; +import java.util.Optional; +import java.util.function.Function; + +public class Await { + + private static final int DEFAULT_MAX_RETRY = 6; + private static final Duration DEFAULT_RETRY_DELAY = Duration.ofSeconds(10); + + public static Optional waitFor( + Function> function, + A parameter) { + return waitFor(function, parameter, DEFAULT_MAX_RETRY, DEFAULT_RETRY_DELAY); + } + + @SuppressWarnings("java:S2925") // this code sleeps while waiting. + public static Optional waitFor( + Function> function, + A parameter, + int maxRetry, + Duration retryDelay) { + int retryCount = 0; + Optional result = Optional.empty(); + + while (retryCount < maxRetry && result.isEmpty()) { + retryCount += 1; + result = function.apply(parameter); + + if (result.isEmpty()) { + try { + Thread.sleep(retryDelay); + } catch (InterruptedException e) { + return result; + } + } + } + return result; + } + +} From fdf889f78590779f80d7ea703ba0a3decb707133 Mon Sep 17 00:00:00 2001 From: Michael Peels Date: Tue, 10 Mar 2026 16:35:12 -0400 Subject: [PATCH 04/11] Remove unneeded startup timeout --- .../integration/IntegrationTest.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java index 501a80926..fe2da118c 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java @@ -43,14 +43,11 @@ class IntegrationTest { "person-service", "debezium", "kafka-connect") - // Add liquibase specific log check and increase default timeout - .waitingFor("liquibase", - Wait.forLogMessage("Migrations complete.*", 1).withStartupTimeout(Duration.ofMinutes(3))) // Add debezium specific wait to ensure connector is ready before test execution .waitingFor("debezium", - Wait.forLogMessage("Finished creating connector.*", 3).withStartupTimeout(Duration.ofMinutes(3))) - // Set a global startup timeout for ComposeContainer - .withStartupTimeout(Duration.ofMinutes(10)); + Wait.forLogMessage(".*Finished creating connector.*", 3)) + // Set the maximum startup timeout all the waits set are bounded to + .withStartupTimeout(Duration.ofMinutes(5)); @BeforeAll static void setUp() { From 966436a1fff3c2e556fcccacf5175db59db31a2d Mon Sep 17 00:00:00 2001 From: Michael Peels Date: Tue, 10 Mar 2026 16:40:42 -0400 Subject: [PATCH 05/11] kafka-connect log wait --- .../postprocessingservice/integration/IntegrationTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java index fe2da118c..da85f87f0 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java @@ -43,9 +43,11 @@ class IntegrationTest { "person-service", "debezium", "kafka-connect") - // Add debezium specific wait to ensure connector is ready before test execution + // Add specific waits to ensure connectors are ready before test execution .waitingFor("debezium", Wait.forLogMessage(".*Finished creating connector.*", 3)) + .waitingFor("kafka-connect", + Wait.forLogMessage(".*Finished creating connector.*", 1)) // Set the maximum startup timeout all the waits set are bounded to .withStartupTimeout(Duration.ofMinutes(5)); From a03c69ceb20321460ef8c9aaebe4de64c09c3fc5 Mon Sep 17 00:00:00 2001 From: Michael Peels Date: Wed, 11 Mar 2026 12:02:42 -0400 Subject: [PATCH 06/11] Update log monitor regex. Reduce polling time for test. Add kafka sink connector restart --- .../kafka-connect/initialize/healthcheck.sh | 2 +- docker-compose.yaml | 10 -------- .../integration/IntegrationTest.java | 14 +++++++---- .../kafkasink/KafkaSinkClient.java | 25 +++++++++++++++++++ .../integration/util/Await.java | 23 ++++++++++++----- .../src/test/resources/application-test.yaml | 8 ++++++ 6 files changed, 60 insertions(+), 22 deletions(-) create mode 100644 post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/kafkasink/KafkaSinkClient.java diff --git a/containers/kafka-connect/initialize/healthcheck.sh b/containers/kafka-connect/initialize/healthcheck.sh index bc3e1e83f..5175b9075 100755 --- a/containers/kafka-connect/initialize/healthcheck.sh +++ b/containers/kafka-connect/initialize/healthcheck.sh @@ -12,7 +12,7 @@ IFS=',' read -r -a my_array <<< "$cleaned_connector_names" if [[ " ${my_array[@]} " =~ " Kafka-Connect-SqlServer-Sink " ]]; then echo "Kafka-Connect-SqlServer-Sink is enabled"; else - echo "Kafka-Connect-SqlServer-Sink not found. Initializing..."; + echo "Kafka-Connect-SqlServer-Sink not found. Initializing..."; curl -s --fail -X POST --header "Accept:application/json" --header "Content-Type:application/json" --data "@/kafka/healthcheck/mssql-connector.json" http://localhost:8083/connectors/ fi diff --git a/docker-compose.yaml b/docker-compose.yaml index 8a9a5b054..eef0953be 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -46,8 +46,6 @@ services: interval: 10s timeout: 5s retries: 3 - start_period: 10s - start_interval: 10s kafka: image: confluentinc/cp-kafka:7.3.0 @@ -72,8 +70,6 @@ services: interval: 15s timeout: 10s retries: 5 - start_period: 20s - start_interval: 10s kafka-connect: build: @@ -106,16 +102,12 @@ services: interval: 10s timeout: 5s retries: 5 - start_period: 20s - start_interval: 10s debezium: image: debezium/connect:2.4 depends_on: kafka: condition: service_healthy - zookeeper: - condition: service_healthy nbs-mssql: condition: service_healthy liquibase: @@ -145,8 +137,6 @@ services: interval: 10s timeout: 5s retries: 5 - start_period: 20s - start_interval: 10s investigation-service: build: diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java index da85f87f0..80e4f554d 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java @@ -16,6 +16,7 @@ import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerImageName; +import gov.cdc.etldatapipeline.postprocessingservice.integration.kafkasink.KafkaSinkClient; import gov.cdc.etldatapipeline.postprocessingservice.integration.patient.PatientCreator; import gov.cdc.etldatapipeline.postprocessingservice.integration.rdb.DPatientFinder; import gov.cdc.etldatapipeline.postprocessingservice.integration.util.Await; @@ -40,16 +41,16 @@ class IntegrationTest { "liquibase", "zookeeper", "kafka", - "person-service", "debezium", - "kafka-connect") + "kafka-connect", + "person-service") // Add specific waits to ensure connectors are ready before test execution .waitingFor("debezium", - Wait.forLogMessage(".*Finished creating connector.*", 3)) + Wait.forLogMessage(".*Starting streaming.*", 1)) .waitingFor("kafka-connect", - Wait.forLogMessage(".*Finished creating connector.*", 1)) + Wait.forLogMessage(".*Sink task finished initialization.*", 1)) // Set the maximum startup timeout all the waits set are bounded to - .withStartupTimeout(Duration.ofMinutes(5)); + .withStartupTimeout(Duration.ofMinutes(10)); @BeforeAll static void setUp() { @@ -65,6 +66,9 @@ static void tearDown() { @Test void patientDataIsSuccessfullyProcessed() { + // Restart kafka sink connector so it picks up newly created topics + KafkaSinkClient.restartSinkConnector(); + // Insert a patient into NBS_ODSE long createdPatient = patientCreator.create(); assertThat(createdPatient).isNotZero(); diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/kafkasink/KafkaSinkClient.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/kafkasink/KafkaSinkClient.java new file mode 100644 index 000000000..b3c291cf3 --- /dev/null +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/kafkasink/KafkaSinkClient.java @@ -0,0 +1,25 @@ +package gov.cdc.etldatapipeline.postprocessingservice.integration.kafkasink; + +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +@Component +public class KafkaSinkClient { + + private static final RestTemplate restTemplate = new RestTemplate(); + private static final String URL = "http://localhost:8083/connectors/Kafka-Connect-SqlServer-Sink/restart?includeTasks=true"; + + public static void restartSinkConnector() { + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + + restTemplate.postForObject( + URL, + new HttpEntity<>(null, headers), + String.class); + } + +} diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/util/Await.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/util/Await.java index fc689a07d..e1e9f7fba 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/util/Await.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/util/Await.java @@ -6,12 +6,23 @@ public class Await { - private static final int DEFAULT_MAX_RETRY = 6; - private static final Duration DEFAULT_RETRY_DELAY = Duration.ofSeconds(10); - - public static Optional waitFor( - Function> function, - A parameter) { + private static final Duration DEFAULT_RETRY_DELAY = Duration.ofSeconds(6); + private static final int DEFAULT_MAX_RETRY = 10; + + /** + * Calls the provided function until it returns a non-empty Optional or the + * retry limit is reached. + * + * @param Generic Input + * @param Generic Output + * @param function Function that accepts a single parameter of type I and + * returns an Optional{@literal } + * @param parameter The single parameter of type I to be passed to the function + * @return + */ + public static Optional waitFor( + Function> function, + I parameter) { return waitFor(function, parameter, DEFAULT_MAX_RETRY, DEFAULT_RETRY_DELAY); } diff --git a/post-processing-service/src/test/resources/application-test.yaml b/post-processing-service/src/test/resources/application-test.yaml index 9fd35c89a..7fb0d3419 100644 --- a/post-processing-service/src/test/resources/application-test.yaml +++ b/post-processing-service/src/test/resources/application-test.yaml @@ -8,6 +8,14 @@ spring: username: sa password: PizzaIsGood33! url: jdbc:sqlserver://localhost:3433;databaseName=NBS_ODSE;encrypt=true;trustServerCertificate=true; + kafka: + consumer: + maxPollIntervalMs: 3000 +service: + fixed-delay: + cached-ids: 5000 + datamart: 5000 + backfill: 5000 logging: level: From 671245195bd33a7f543f49ddb5bd8c9d8cdc44bb Mon Sep 17 00:00:00 2001 From: Michael Peels Date: Wed, 11 Mar 2026 12:34:09 -0400 Subject: [PATCH 07/11] Temp workaround for sink connector --- containers/debezium/initialize/healthcheck.sh | 2 +- containers/kafka-connect/initialize/healthcheck.sh | 2 +- .../integration/IntegrationTest.java | 10 ++++++---- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/containers/debezium/initialize/healthcheck.sh b/containers/debezium/initialize/healthcheck.sh index 7c12f98b9..e6c4dd69e 100755 --- a/containers/debezium/initialize/healthcheck.sh +++ b/containers/debezium/initialize/healthcheck.sh @@ -33,4 +33,4 @@ else fi -exit 0 \ No newline at end of file +exit 0 diff --git a/containers/kafka-connect/initialize/healthcheck.sh b/containers/kafka-connect/initialize/healthcheck.sh index 5175b9075..0d1a94b61 100755 --- a/containers/kafka-connect/initialize/healthcheck.sh +++ b/containers/kafka-connect/initialize/healthcheck.sh @@ -16,4 +16,4 @@ else curl -s --fail -X POST --header "Accept:application/json" --header "Content-Type:application/json" --data "@/kafka/healthcheck/mssql-connector.json" http://localhost:8083/connectors/ fi -exit 0 \ No newline at end of file +exit 0 diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java index 80e4f554d..a6f59d882 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java @@ -65,14 +65,16 @@ static void tearDown() { } @Test - void patientDataIsSuccessfullyProcessed() { - // Restart kafka sink connector so it picks up newly created topics - KafkaSinkClient.restartSinkConnector(); - + void patientDataIsSuccessfullyProcessed() throws InterruptedException { // Insert a patient into NBS_ODSE long createdPatient = patientCreator.create(); assertThat(createdPatient).isNotZero(); + // Wait for topics to be created then restart kafka sink connector so it picks + // up newly created nrt_ topics (TEMP WORKAROUND) + Thread.sleep(Duration.ofSeconds(10)); + KafkaSinkClient.restartSinkConnector(); + // Validate patient data arrives in D_PATIENT with retry Optional dPatientKey = Await.waitFor(dPatientFinder::findDPatientKeyWithRetry, createdPatient); From eb46d7e82dfdbb79614aea8a1e28802c4c45d484 Mon Sep 17 00:00:00 2001 From: Michael Peels Date: Wed, 11 Mar 2026 12:38:49 -0400 Subject: [PATCH 08/11] Spotless apply --- .../integration/IntegrationTest.java | 105 ++++++----- .../integration/config/DataSourceConfig.java | 77 ++++---- .../integration/id/IdGenerator.java | 165 +++++++++--------- .../kafkasink/KafkaSinkClient.java | 19 +- .../integration/patient/PatientCreator.java | 53 +++--- .../integration/rdb/DPatientFinder.java | 22 +-- .../integration/util/Await.java | 78 ++++----- 7 files changed, 245 insertions(+), 274 deletions(-) diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java index a6f59d882..c94d21fb4 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java @@ -2,10 +2,13 @@ import static org.assertj.core.api.Assertions.assertThat; +import gov.cdc.etldatapipeline.postprocessingservice.integration.kafkasink.KafkaSinkClient; +import gov.cdc.etldatapipeline.postprocessingservice.integration.patient.PatientCreator; +import gov.cdc.etldatapipeline.postprocessingservice.integration.rdb.DPatientFinder; +import gov.cdc.etldatapipeline.postprocessingservice.integration.util.Await; import java.io.File; import java.time.Duration; import java.util.Optional; - import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -16,70 +19,62 @@ import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerImageName; -import gov.cdc.etldatapipeline.postprocessingservice.integration.kafkasink.KafkaSinkClient; -import gov.cdc.etldatapipeline.postprocessingservice.integration.patient.PatientCreator; -import gov.cdc.etldatapipeline.postprocessingservice.integration.rdb.DPatientFinder; -import gov.cdc.etldatapipeline.postprocessingservice.integration.util.Await; - @SpringBootTest @ActiveProfiles("test") class IntegrationTest { - @Autowired - private PatientCreator patientCreator; - - @Autowired - private DPatientFinder dPatientFinder; + @Autowired private PatientCreator patientCreator; - @SuppressWarnings("resource") - private static final ComposeContainer environment = new ComposeContainer( - DockerImageName.parse("docker:25.0.5"), - new File("../docker-compose.yaml")) - // List specific services to prevent launching wildfly container - .withServices( - "nbs-mssql", - "liquibase", - "zookeeper", - "kafka", - "debezium", - "kafka-connect", - "person-service") - // Add specific waits to ensure connectors are ready before test execution - .waitingFor("debezium", - Wait.forLogMessage(".*Starting streaming.*", 1)) - .waitingFor("kafka-connect", - Wait.forLogMessage(".*Sink task finished initialization.*", 1)) - // Set the maximum startup timeout all the waits set are bounded to - .withStartupTimeout(Duration.ofMinutes(10)); + @Autowired private DPatientFinder dPatientFinder; - @BeforeAll - static void setUp() { - // Start up necessary containers - environment.start(); - } + @SuppressWarnings("resource") + private static final ComposeContainer environment = + new ComposeContainer( + DockerImageName.parse("docker:25.0.5"), new File("../docker-compose.yaml")) + // List specific services to prevent launching wildfly container + .withServices( + "nbs-mssql", + "liquibase", + "zookeeper", + "kafka", + "debezium", + "kafka-connect", + "person-service") + // Add specific waits to ensure connectors are ready before test execution + .waitingFor("debezium", Wait.forLogMessage(".*Starting streaming.*", 1)) + .waitingFor( + "kafka-connect", Wait.forLogMessage(".*Sink task finished initialization.*", 1)) + // Set the maximum startup timeout all the waits set are bounded to + .withStartupTimeout(Duration.ofMinutes(10)); - @AfterAll - static void tearDown() { - // Stop all containers - environment.stop(); - } + @BeforeAll + static void setUp() { + // Start up necessary containers + environment.start(); + } - @Test - void patientDataIsSuccessfullyProcessed() throws InterruptedException { - // Insert a patient into NBS_ODSE - long createdPatient = patientCreator.create(); - assertThat(createdPatient).isNotZero(); + @AfterAll + static void tearDown() { + // Stop all containers + environment.stop(); + } - // Wait for topics to be created then restart kafka sink connector so it picks - // up newly created nrt_ topics (TEMP WORKAROUND) - Thread.sleep(Duration.ofSeconds(10)); - KafkaSinkClient.restartSinkConnector(); + @Test + void patientDataIsSuccessfullyProcessed() throws InterruptedException { + // Insert a patient into NBS_ODSE + long createdPatient = patientCreator.create(); + assertThat(createdPatient).isNotZero(); - // Validate patient data arrives in D_PATIENT with retry - Optional dPatientKey = Await.waitFor(dPatientFinder::findDPatientKeyWithRetry, createdPatient); + // Wait for topics to be created then restart kafka sink connector so it picks + // up newly created nrt_ topics (TEMP WORKAROUND) + Thread.sleep(Duration.ofSeconds(10)); + KafkaSinkClient.restartSinkConnector(); - assertThat(dPatientKey).isPresent(); - assertThat(dPatientKey.get()).isNotZero(); - } + // Validate patient data arrives in D_PATIENT with retry + Optional dPatientKey = + Await.waitFor(dPatientFinder::findDPatientKeyWithRetry, createdPatient); + assertThat(dPatientKey).isPresent(); + assertThat(dPatientKey.get()).isNotZero(); + } } diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/config/DataSourceConfig.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/config/DataSourceConfig.java index 543343b30..3a6a89f9b 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/config/DataSourceConfig.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/config/DataSourceConfig.java @@ -1,7 +1,6 @@ package gov.cdc.etldatapipeline.postprocessingservice.integration.config; import javax.sql.DataSource; - import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -14,44 +13,40 @@ @Configuration @Profile("test") public class DataSourceConfig { - // Standard RTR datasource with appropriate permissions - @Bean - @Primary - @ConfigurationProperties("spring.datasource.primary") - public DataSourceProperties dataSourceProperties() { - return new DataSourceProperties(); - } - - @Bean - @Primary - public DataSource dataSource(DataSourceProperties properties) { - return properties - .initializeDataSourceBuilder() - .build(); - } - - @Bean - @Primary - public JdbcClient jdbcClient(DataSource dataSource) { - return JdbcClient.create(dataSource); - } - - // Testing specific datasource with db ownership - @Bean("testProperties") - @ConfigurationProperties("spring.datasource.test") - public DataSourceProperties testDataSourceProperties() { - return new DataSourceProperties(); - } - - @Bean("testDataSource") - public DataSource testDataSource(@Qualifier("testProperties") DataSourceProperties properties) { - return properties - .initializeDataSourceBuilder() - .build(); - } - - @Bean("testClient") - public JdbcClient testJdbcClient(@Qualifier("testDataSource") DataSource dataSource) { - return JdbcClient.create(dataSource); - } + // Standard RTR datasource with appropriate permissions + @Bean + @Primary + @ConfigurationProperties("spring.datasource.primary") + public DataSourceProperties dataSourceProperties() { + return new DataSourceProperties(); + } + + @Bean + @Primary + public DataSource dataSource(DataSourceProperties properties) { + return properties.initializeDataSourceBuilder().build(); + } + + @Bean + @Primary + public JdbcClient jdbcClient(DataSource dataSource) { + return JdbcClient.create(dataSource); + } + + // Testing specific datasource with db ownership + @Bean("testProperties") + @ConfigurationProperties("spring.datasource.test") + public DataSourceProperties testDataSourceProperties() { + return new DataSourceProperties(); + } + + @Bean("testDataSource") + public DataSource testDataSource(@Qualifier("testProperties") DataSourceProperties properties) { + return properties.initializeDataSourceBuilder().build(); + } + + @Bean("testClient") + public JdbcClient testJdbcClient(@Qualifier("testDataSource") DataSource dataSource) { + return JdbcClient.create(dataSource); + } } diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/id/IdGenerator.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/id/IdGenerator.java index 7aadb06ca..6c20588c9 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/id/IdGenerator.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/id/IdGenerator.java @@ -5,19 +5,18 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -/** - * Responsible for managing database IDs in the Local_UID_generator table - */ +/** Responsible for managing database IDs in the Local_UID_generator table */ @Service public class IdGenerator { - private JdbcClient client; + private JdbcClient client; - public IdGenerator(@Qualifier("testClient") final JdbcClient client) { - this.client = client; - } + public IdGenerator(@Qualifier("testClient") final JdbcClient client) { + this.client = client; + } - private static final String SELECT = """ + private static final String SELECT = + """ SELECT TOP 1 UID_prefix_cd, seed_value_nbr, @@ -30,7 +29,8 @@ public IdGenerator(@Qualifier("testClient") final JdbcClient client) { """; - private static final String INCREMENT = """ + private static final String INCREMENT = + """ UPDATE NBS_ODSE.dbo.local_uid_generator SET @@ -40,86 +40,85 @@ public IdGenerator(@Qualifier("testClient") final JdbcClient client) { OR type_cd = :type """; - /** - * - * Gets the next valid Id for the provided Type and increments the value. Will - * throw an exception if the provided type is not found. - * - * @param type {@link EntityType} - * @return {@link GeneratedId} - */ - @Transactional - public GeneratedId next(EntityType type) { - // Retrieve next valid Id - GeneratedId identifier = client.sql(SELECT) - .param("type", type.toString()) - .query((rs, rn) -> new GeneratedId( + /** + * Gets the next valid Id for the provided Type and increments the value. Will throw an exception + * if the provided type is not found. + * + * @param type {@link EntityType} + * @return {@link GeneratedId} + */ + @Transactional + public GeneratedId next(EntityType type) { + // Retrieve next valid Id + GeneratedId identifier = + client + .sql(SELECT) + .param("type", type.toString()) + .query( + (rs, rn) -> + new GeneratedId( rs.getString("UID_prefix_cd"), rs.getLong("seed_value_nbr"), rs.getString("UID_suffix_cd"))) - .single(); - - // Increment table - client.sql(INCREMENT) - .param("type", type.toString()) - .update(); + .single(); - return identifier; - } + // Increment table + client.sql(INCREMENT).param("type", type.toString()).update(); - public record GeneratedId(String prefix, Long id, String suffix) { - public String toLocalId() { - return prefix + id.toString() + suffix; - } - } + return identifier; + } - /** - * Matches the class_name_cd column of the Local_UID_generator table, other than - * the NBS entry. Which references the - * type_cd column as the class_name_cd for type NBS is dynamic based on the - * jurisdiction - */ - public enum EntityType { - NBS, - CLINICAL_DOCUMENT, - COINFECTION_GROUP, - CS_REPORT, - CT_CONTACT, - DEDUPLICATION_LOG, - EPILINK, - GEOCODING, - GEOCODING_LOG, - GROUP, - INTERVENTION, - INTERVIEW, - MATERIAL, - NBS_DOCUMENT, - NBS_QUESTION_ID_LDF, - NBS_QUESTION_LDF, - NBS_UIMETEDATA_LDF, - NND_METADATA, - NON_LIVING_SUBJECT, - NOTIFICATION, - OBSERVATION, - ORGANIZATION, - PAGE, - PATIENT_ENCOUNTER, - PERSON, - PERSON_GROUP, - PLACE, - PUBLIC_HEALTH_CASE, - RDB_METADATA, - REFERRAL, - REPORT, - REPORTDATASOURCE, - REPORTDATASOURCECOLUMN, - REPORTDISPLAYCOLUMN, - REPORTFILTER, - REPORTFILTERCODE, - REPORTFILTERVALUE, - SECURITY_LOG, - TREATMENT, - WORKUP + public record GeneratedId(String prefix, Long id, String suffix) { + public String toLocalId() { + return prefix + id.toString() + suffix; } + } + /** + * Matches the class_name_cd column of the Local_UID_generator table, other than the NBS entry. + * Which references the type_cd column as the class_name_cd for type NBS is dynamic based on the + * jurisdiction + */ + public enum EntityType { + NBS, + CLINICAL_DOCUMENT, + COINFECTION_GROUP, + CS_REPORT, + CT_CONTACT, + DEDUPLICATION_LOG, + EPILINK, + GEOCODING, + GEOCODING_LOG, + GROUP, + INTERVENTION, + INTERVIEW, + MATERIAL, + NBS_DOCUMENT, + NBS_QUESTION_ID_LDF, + NBS_QUESTION_LDF, + NBS_UIMETEDATA_LDF, + NND_METADATA, + NON_LIVING_SUBJECT, + NOTIFICATION, + OBSERVATION, + ORGANIZATION, + PAGE, + PATIENT_ENCOUNTER, + PERSON, + PERSON_GROUP, + PLACE, + PUBLIC_HEALTH_CASE, + RDB_METADATA, + REFERRAL, + REPORT, + REPORTDATASOURCE, + REPORTDATASOURCECOLUMN, + REPORTDISPLAYCOLUMN, + REPORTFILTER, + REPORTFILTERCODE, + REPORTFILTERVALUE, + SECURITY_LOG, + TREATMENT, + WORKUP + } } diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/kafkasink/KafkaSinkClient.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/kafkasink/KafkaSinkClient.java index b3c291cf3..1203645af 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/kafkasink/KafkaSinkClient.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/kafkasink/KafkaSinkClient.java @@ -9,17 +9,14 @@ @Component public class KafkaSinkClient { - private static final RestTemplate restTemplate = new RestTemplate(); - private static final String URL = "http://localhost:8083/connectors/Kafka-Connect-SqlServer-Sink/restart?includeTasks=true"; + private static final RestTemplate restTemplate = new RestTemplate(); + private static final String URL = + "http://localhost:8083/connectors/Kafka-Connect-SqlServer-Sink/restart?includeTasks=true"; - public static void restartSinkConnector() { - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.APPLICATION_JSON); - - restTemplate.postForObject( - URL, - new HttpEntity<>(null, headers), - String.class); - } + public static void restartSinkConnector() { + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + restTemplate.postForObject(URL, new HttpEntity<>(null, headers), String.class); + } } diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/patient/PatientCreator.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/patient/PatientCreator.java index 44559220d..1e3e70bab 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/patient/PatientCreator.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/patient/PatientCreator.java @@ -1,33 +1,28 @@ package gov.cdc.etldatapipeline.postprocessingservice.integration.patient; +import gov.cdc.etldatapipeline.postprocessingservice.integration.id.IdGenerator; +import gov.cdc.etldatapipeline.postprocessingservice.integration.id.IdGenerator.EntityType; +import gov.cdc.etldatapipeline.postprocessingservice.integration.id.IdGenerator.GeneratedId; import java.time.LocalDateTime; - import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jdbc.core.simple.JdbcClient; import org.springframework.stereotype.Component; -import gov.cdc.etldatapipeline.postprocessingservice.integration.id.IdGenerator; -import gov.cdc.etldatapipeline.postprocessingservice.integration.id.IdGenerator.EntityType; -import gov.cdc.etldatapipeline.postprocessingservice.integration.id.IdGenerator.GeneratedId; - -/** - * Responsible for creating and inserting patient data into the NBS_ODSE for - * integration testing - */ +/** Responsible for creating and inserting patient data into the NBS_ODSE for integration testing */ @Component public class PatientCreator { - private IdGenerator idGenerator; - private JdbcClient client; + private IdGenerator idGenerator; + private JdbcClient client; - public PatientCreator( - final IdGenerator idGenerator, - @Qualifier("testClient") final JdbcClient client) { - this.idGenerator = idGenerator; - this.client = client; - } + public PatientCreator( + final IdGenerator idGenerator, @Qualifier("testClient") final JdbcClient client) { + this.idGenerator = idGenerator; + this.client = client; + } - private static final String CREATE_QUERY = """ + private static final String CREATE_QUERY = + """ insert into NBS_ODSE.dbo.Entity(entity_uid, class_cd) values (:id, 'PSN'); insert into NBS_ODSE.dbo.Person( @@ -65,18 +60,18 @@ public PatientCreator( ); """; - public long create() { - - GeneratedId identifier = idGenerator.next(EntityType.PERSON); + public long create() { - this.client.sql(CREATE_QUERY) - .param("id", identifier.id()) - .param("local", identifier.toLocalId()) - .param("addedOn", LocalDateTime.now()) - .param("addedBy", "9999") - .update(); + GeneratedId identifier = idGenerator.next(EntityType.PERSON); - return identifier.id(); - } + this.client + .sql(CREATE_QUERY) + .param("id", identifier.id()) + .param("local", identifier.toLocalId()) + .param("addedOn", LocalDateTime.now()) + .param("addedBy", "9999") + .update(); + return identifier.id(); + } } diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/rdb/DPatientFinder.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/rdb/DPatientFinder.java index ac1f942a4..cb8f80956 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/rdb/DPatientFinder.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/rdb/DPatientFinder.java @@ -1,7 +1,6 @@ package gov.cdc.etldatapipeline.postprocessingservice.integration.rdb; import java.util.Optional; - import org.springframework.context.annotation.Profile; import org.springframework.jdbc.core.simple.JdbcClient; import org.springframework.stereotype.Component; @@ -10,13 +9,14 @@ @Profile("test") public class DPatientFinder { - private final JdbcClient client; + private final JdbcClient client; - public DPatientFinder(JdbcClient client) { - this.client = client; - } + public DPatientFinder(JdbcClient client) { + this.client = client; + } - private static final String SELECT_KEY_BY_ID = """ + private static final String SELECT_KEY_BY_ID = + """ SELECT TOP 1 patient_key FROM @@ -25,11 +25,7 @@ public DPatientFinder(JdbcClient client) { patient_mpr_uid = :id """; - public Optional findDPatientKeyWithRetry(long id) { - return client.sql(SELECT_KEY_BY_ID) - .param("id", id) - .query(Long.class) - .optional(); - } - + public Optional findDPatientKeyWithRetry(long id) { + return client.sql(SELECT_KEY_BY_ID).param("id", id).query(Long.class).optional(); + } } diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/util/Await.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/util/Await.java index e1e9f7fba..ce69428ce 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/util/Await.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/util/Await.java @@ -6,48 +6,42 @@ public class Await { - private static final Duration DEFAULT_RETRY_DELAY = Duration.ofSeconds(6); - private static final int DEFAULT_MAX_RETRY = 10; - - /** - * Calls the provided function until it returns a non-empty Optional or the - * retry limit is reached. - * - * @param Generic Input - * @param Generic Output - * @param function Function that accepts a single parameter of type I and - * returns an Optional{@literal } - * @param parameter The single parameter of type I to be passed to the function - * @return - */ - public static Optional waitFor( - Function> function, - I parameter) { - return waitFor(function, parameter, DEFAULT_MAX_RETRY, DEFAULT_RETRY_DELAY); - } - - @SuppressWarnings("java:S2925") // this code sleeps while waiting. - public static Optional waitFor( - Function> function, - A parameter, - int maxRetry, - Duration retryDelay) { - int retryCount = 0; - Optional result = Optional.empty(); - - while (retryCount < maxRetry && result.isEmpty()) { - retryCount += 1; - result = function.apply(parameter); - - if (result.isEmpty()) { - try { - Thread.sleep(retryDelay); - } catch (InterruptedException e) { - return result; - } - } + private static final Duration DEFAULT_RETRY_DELAY = Duration.ofSeconds(6); + private static final int DEFAULT_MAX_RETRY = 10; + + /** + * Calls the provided function until it returns a non-empty Optional or the retry limit is + * reached. + * + * @param Generic Input + * @param Generic Output + * @param function Function that accepts a single parameter of type I and returns an + * Optional{@literal } + * @param parameter The single parameter of type I to be passed to the function + * @return + */ + public static Optional waitFor(Function> function, I parameter) { + return waitFor(function, parameter, DEFAULT_MAX_RETRY, DEFAULT_RETRY_DELAY); + } + + @SuppressWarnings("java:S2925") // this code sleeps while waiting. + public static Optional waitFor( + Function> function, A parameter, int maxRetry, Duration retryDelay) { + int retryCount = 0; + Optional result = Optional.empty(); + + while (retryCount < maxRetry && result.isEmpty()) { + retryCount += 1; + result = function.apply(parameter); + + if (result.isEmpty()) { + try { + Thread.sleep(retryDelay); + } catch (InterruptedException e) { + return result; } - return result; + } } - + return result; + } } From d15da423db40e7a968e4cc467e624815587d7766 Mon Sep 17 00:00:00 2001 From: Michael Peels <109251240+mpeels@users.noreply.github.com> Date: Wed, 11 Mar 2026 15:50:27 -0400 Subject: [PATCH 09/11] Update containers/debezium/initialize/healthcheck.sh Co-authored-by: Eric Buckley --- containers/debezium/initialize/healthcheck.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/containers/debezium/initialize/healthcheck.sh b/containers/debezium/initialize/healthcheck.sh index e6c4dd69e..fbd34b13a 100755 --- a/containers/debezium/initialize/healthcheck.sh +++ b/containers/debezium/initialize/healthcheck.sh @@ -2,7 +2,7 @@ set -e # Get the list of all connectors from Debezium REST API -connectors=$(curl -s --fail "http://127.0.0.1:8083/connectors") +connectors=$(curl -s --fail --connect-timeout 2 "http://127.0.0.1:8083/connectors") # Clean the array string and load into array cleaned_connector_names=$(sed 's/[][]//g; s/"//g' <<< "$connectors") From 059fb7613c588dc6cf2ecb2cddca6d2714c9bbe3 Mon Sep 17 00:00:00 2001 From: Michael Peels Date: Wed, 11 Mar 2026 18:51:33 -0400 Subject: [PATCH 10/11] Tweak some timings. Capture container logs --- build.gradle | 2 ++ docker-compose.yaml | 5 +++-- .../integration/IntegrationTest.java | 16 +++++++++++++++- .../src/test/resources/application-test.yaml | 11 +++++++---- 4 files changed, 27 insertions(+), 7 deletions(-) diff --git a/build.gradle b/build.gradle index d83d38a2a..65798f74f 100644 --- a/build.gradle +++ b/build.gradle @@ -61,6 +61,8 @@ subprojects { } tasks.named('test') { + // Show all logs during gradle test execution + testLogging.showStandardStreams = true finalizedBy tasks.named('jacocoTestReport', JacocoReport) } } diff --git a/docker-compose.yaml b/docker-compose.yaml index eef0953be..d588ab30a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -99,8 +99,8 @@ services: CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" healthcheck: test: ["CMD", "/kafka/healthcheck/healthcheck.sh"] - interval: 10s - timeout: 5s + interval: 20s + timeout: 15s retries: 5 debezium: @@ -210,6 +210,7 @@ services: - DB_PASSWORD=person_service - DB_HOST=jdbc:sqlserver://nbs-mssql:1433;databaseName=RDB_MODERN;encrypt=true;trustServerCertificate=true; - KAFKA_BOOTSTRAP_SERVER=kafka:29092 + - spring.kafka.consumer.maxPollIntervalMs=3000 depends_on: kafka: condition: service_healthy diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java index c94d21fb4..efab0d449 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java @@ -12,10 +12,13 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.testcontainers.containers.ComposeContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerImageName; @@ -27,6 +30,9 @@ class IntegrationTest { @Autowired private DPatientFinder dPatientFinder; + private static final Logger logger = LoggerFactory.getLogger(IntegrationTest.class); + private static final Slf4jLogConsumer consumer = new Slf4jLogConsumer(logger); + @SuppressWarnings("resource") private static final ComposeContainer environment = new ComposeContainer( @@ -44,6 +50,14 @@ class IntegrationTest { .waitingFor("debezium", Wait.forLogMessage(".*Starting streaming.*", 1)) .waitingFor( "kafka-connect", Wait.forLogMessage(".*Sink task finished initialization.*", 1)) + // Pull logs from the containers for better debugging + .withLogConsumer("nbs-mssql", consumer) + .withLogConsumer("liquibase", consumer) + .withLogConsumer("zookeeper", consumer) + .withLogConsumer("kafka", consumer) + .withLogConsumer("debezium", consumer) + .withLogConsumer("kafka-connect", consumer) + .withLogConsumer("person-service", consumer) // Set the maximum startup timeout all the waits set are bounded to .withStartupTimeout(Duration.ofMinutes(10)); @@ -67,7 +81,7 @@ void patientDataIsSuccessfullyProcessed() throws InterruptedException { // Wait for topics to be created then restart kafka sink connector so it picks // up newly created nrt_ topics (TEMP WORKAROUND) - Thread.sleep(Duration.ofSeconds(10)); + Thread.sleep(Duration.ofSeconds(20)); KafkaSinkClient.restartSinkConnector(); // Validate patient data arrives in D_PATIENT with retry diff --git a/post-processing-service/src/test/resources/application-test.yaml b/post-processing-service/src/test/resources/application-test.yaml index 7fb0d3419..6211e3258 100644 --- a/post-processing-service/src/test/resources/application-test.yaml +++ b/post-processing-service/src/test/resources/application-test.yaml @@ -10,13 +10,16 @@ spring: url: jdbc:sqlserver://localhost:3433;databaseName=NBS_ODSE;encrypt=true;trustServerCertificate=true; kafka: consumer: - maxPollIntervalMs: 3000 + maxPollIntervalMs: 6000 + max-retry: 10 service: fixed-delay: - cached-ids: 5000 - datamart: 5000 - backfill: 5000 + cached-ids: 6000 + datamart: 6000 + backfill: 6000 + max-retries: 10 logging: level: org.springframework.jdbc.core: TRACE + org.testcontainers: DEBUG From ae1f15592266bef88e1447bce883e782950b2e62 Mon Sep 17 00:00:00 2001 From: Michael Peels Date: Wed, 11 Mar 2026 20:13:15 -0400 Subject: [PATCH 11/11] Test increase zookeeper timeouts --- docker-compose.yaml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index d588ab30a..eddb3cbc8 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -43,9 +43,10 @@ services: KAFKA_OPTS: -Dzookeeper.4lw.commands.whitelist=ruok healthcheck: test: echo "ruok" | nc localhost 2181 | grep imok - interval: 10s + interval: 20s timeout: 5s - retries: 3 + retries: 10 + start_period: 30s kafka: image: confluentinc/cp-kafka:7.3.0