diff --git a/build.gradle b/build.gradle index d83d38a2a..65798f74f 100644 --- a/build.gradle +++ b/build.gradle @@ -61,6 +61,8 @@ subprojects { } tasks.named('test') { + // Show all logs during gradle test execution + testLogging.showStandardStreams = true finalizedBy tasks.named('jacocoTestReport', JacocoReport) } } diff --git a/containers/debezium/README.md b/containers/debezium/README.md deleted file mode 100644 index 4e8a64d9c..000000000 --- a/containers/debezium/README.md +++ /dev/null @@ -1,9 +0,0 @@ -## Initialization - -To initialize the connectors, a POST request must be sent to the debezium container for each conector. - -```sh -curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8085/connectors/ -d @containers/debezium/odse_connector.json -curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8085/connectors/ -d @containers/debezium/odse_meta_connector.json -curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8085/connectors/ -d @containers/debezium/srte_connector.json -``` diff --git a/containers/debezium/initialize/healthcheck.sh b/containers/debezium/initialize/healthcheck.sh new file mode 100755 index 000000000..fbd34b13a --- /dev/null +++ b/containers/debezium/initialize/healthcheck.sh @@ -0,0 +1,36 @@ +#!/bin/bash +set -e + +# Get the list of all connectors from Debezium REST API +connectors=$(curl -s --fail --connect-timeout 2 "http://127.0.0.1:8083/connectors") + +# Clean the array string and load into array +cleaned_connector_names=$(sed 's/[][]//g; s/"//g' <<< "$connectors") +IFS=',' read -r -a my_array <<< "$cleaned_connector_names" + +# Check if ODSE connector is available +if [[ " ${my_array[@]} " =~ " odse-connector " ]]; then + echo "odse-connector is enabled"; +else + echo "odse-connector not found. Initializing..."; + curl -s --fail -X POST --header "Accept:application/json" --header "Content-Type:application/json" --data "@/kafka/healthcheck/odse_connector.json" http://localhost:8083/connectors/ +fi + +# Check if ODSE meta connector is available +if [[ " ${my_array[@]} " =~ " odse-meta-connector " ]]; then + echo "odse-meta-connector is enabled"; +else + echo "odse-meta-connector not found. Initializing..."; + curl -s --fail -X POST --header "Accept:application/json" --header "Content-Type:application/json" --data "@/kafka/healthcheck/odse_meta_connector.json" http://localhost:8083/connectors/ +fi + +# Check if SRTE connector is available +if [[ " ${my_array[@]} " =~ " srte-connector " ]]; then + echo "srte-connector is enabled"; +else + echo "srte-connector not found. Initializing..."; + curl -s --fail -X POST --header "Accept:application/json" --header "Content-Type:application/json" --data "@/kafka/healthcheck/srte_connector.json" http://localhost:8083/connectors/ +fi + + +exit 0 diff --git a/containers/debezium/odse_connector.json b/containers/debezium/initialize/odse_connector.json similarity index 100% rename from containers/debezium/odse_connector.json rename to containers/debezium/initialize/odse_connector.json diff --git a/containers/debezium/odse_meta_connector.json b/containers/debezium/initialize/odse_meta_connector.json similarity index 100% rename from containers/debezium/odse_meta_connector.json rename to containers/debezium/initialize/odse_meta_connector.json diff --git a/containers/debezium/srte_connector.json b/containers/debezium/initialize/srte_connector.json similarity index 100% rename from containers/debezium/srte_connector.json rename to containers/debezium/initialize/srte_connector.json diff --git a/containers/kafka-connect/initialize/healthcheck.sh b/containers/kafka-connect/initialize/healthcheck.sh new file mode 100755 index 000000000..0d1a94b61 --- /dev/null +++ b/containers/kafka-connect/initialize/healthcheck.sh @@ -0,0 +1,19 @@ +#!/bin/bash +set -e + +# Get the list of all connectors from Debezium REST API +connectors=$(curl -s --fail "http://127.0.0.1:8083/connectors") + +# Clean the array string and load into array +cleaned_connector_names=$(sed 's/[][]//g; s/"//g' <<< "$connectors") +IFS=',' read -r -a my_array <<< "$cleaned_connector_names" + +# Check if MSQL connector is available +if [[ " ${my_array[@]} " =~ " Kafka-Connect-SqlServer-Sink " ]]; then + echo "Kafka-Connect-SqlServer-Sink is enabled"; +else + echo "Kafka-Connect-SqlServer-Sink not found. Initializing..."; + curl -s --fail -X POST --header "Accept:application/json" --header "Content-Type:application/json" --data "@/kafka/healthcheck/mssql-connector.json" http://localhost:8083/connectors/ +fi + +exit 0 diff --git a/containers/kafka-connect/mssql-connector.json b/containers/kafka-connect/initialize/mssql-connector.json similarity index 100% rename from containers/kafka-connect/mssql-connector.json rename to containers/kafka-connect/initialize/mssql-connector.json diff --git a/docker-compose.yaml b/docker-compose.yaml index 83a6f6cc4..eddb3cbc8 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,8 +1,7 @@ services: - mssql: + nbs-mssql: image: ghcr.io/cdcent/nedssdb:latest platform: linux/amd64 - container_name: nbs-mssql environment: - DATABASE_VERSION=6.0.18.1 - DEPLOY_ADMIN_PASSWORD=${DATABASE_PASSWORD:-PizzaIsGood33!} @@ -13,12 +12,22 @@ services: ports: - 3433:1433 + liquibase: + build: + dockerfile: ./liquibase-service/Dockerfile.local + environment: + - DB_USERNAME=db_deploy_admin + - DB_PASSWORD=${LIQUIBASE_USER_PASSWORD:-db_deploy_admin} + - DB_HOST=nbs-mssql + depends_on: + nbs-mssql: + condition: service_healthy + wildfly: image: ghcr.io/cdcent/nedssdev:6.0.18.1 - platform: linux/amd64 - container_name: rtr-wildfly depends_on: - - mssql + nbs-mssql: + condition: service_healthy ports: - "9991:9990" - "7003:7001" @@ -26,20 +35,24 @@ services: zookeeper: image: confluentinc/cp-zookeeper:7.3.0 - hostname: rtr-zookeeper - container_name: rtr-zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 + KAFKA_OPTS: -Dzookeeper.4lw.commands.whitelist=ruok + healthcheck: + test: echo "ruok" | nc localhost 2181 | grep imok + interval: 20s + timeout: 5s + retries: 10 + start_period: 30s kafka: image: confluentinc/cp-kafka:7.3.0 - hostname: kafka - container_name: kafka depends_on: - - zookeeper + zookeeper: + condition: service_healthy ports: - "9092:9092" environment: @@ -53,16 +66,26 @@ services: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" + healthcheck: + test: kafka-topics --bootstrap-server kafka:29092 --list + interval: 15s + timeout: 10s + retries: 5 kafka-connect: - container_name: kafka-connect build: dockerfile: ./containers/kafka-connect/Dockerfile depends_on: - - kafka - - mssql + kafka: + condition: service_healthy + nbs-mssql: + condition: service_healthy + liquibase: + condition: service_completed_successfully ports: - 8083:8083 + volumes: + - ./containers/kafka-connect/initialize/:/kafka/healthcheck/ environment: CONNECT_BOOTSTRAP_SERVERS: "kafka:29092" CONNECT_GROUP_ID: "cp-kafka-connect.groupId" @@ -75,16 +98,25 @@ services: CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" + healthcheck: + test: ["CMD", "/kafka/healthcheck/healthcheck.sh"] + interval: 20s + timeout: 15s + retries: 5 debezium: image: debezium/connect:2.4 - hostname: debezium - container_name: rtr-debezium depends_on: - - kafka - - zookeeper + kafka: + condition: service_healthy + nbs-mssql: + condition: service_healthy + liquibase: + condition: service_completed_successfully ports: - "8085:8083" + volumes: + - ./containers/debezium/initialize/:/kafka/healthcheck/ environment: BOOTSTRAP_SERVERS: kafka:29092 GROUP_ID: connect-cluster @@ -95,24 +127,17 @@ services: OFFSET_STORAGE_FILE_FILENAME: /tmp/connect.offsets OFFSET_FLUSH_INTERVAL_MS: 10000 PLUGIN_PATH: /kafka/connect - CONFIG_STORAGE_TOPIC: connect-configs OFFSET_STORAGE_TOPIC: connect-offsets STATUS_STORAGE_TOPIC: connect-status CONFIG_STORAGE_REPLICATION_FACTOR: 1 OFFSET_STORAGE_REPLICATION_FACTOR: 1 STATUS_STORAGE_REPLICATION_FACTOR: 1 - - liquibase: - build: - dockerfile: ./liquibase-service/Dockerfile.local - environment: - - DB_USERNAME=db_deploy_admin - - DB_PASSWORD=${LIQUIBASE_USER_PASSWORD:-db_deploy_admin} - - DB_HOST=nbs-mssql - depends_on: - mssql: - condition: service_healthy + healthcheck: + test: ["CMD", "/kafka/healthcheck/healthcheck.sh"] + interval: 10s + timeout: 5s + retries: 5 investigation-service: build: @@ -123,7 +148,12 @@ services: - DB_HOST=jdbc:sqlserver://nbs-mssql:1433;databaseName=RDB_MODERN;encrypt=true;trustServerCertificate=true; - KAFKA_BOOTSTRAP_SERVER=kafka:29092 depends_on: - - kafka + kafka: + condition: service_healthy + nbs-mssql: + condition: service_healthy + liquibase: + condition: service_completed_successfully ldfdata-service: build: @@ -134,7 +164,12 @@ services: - DB_HOST=jdbc:sqlserver://nbs-mssql:1433;databaseName=RDB_MODERN;encrypt=true;trustServerCertificate=true; - KAFKA_BOOTSTRAP_SERVER=kafka:29092 depends_on: - - kafka + kafka: + condition: service_healthy + nbs-mssql: + condition: service_healthy + liquibase: + condition: service_completed_successfully observation-service: build: @@ -145,7 +180,12 @@ services: - DB_HOST=jdbc:sqlserver://nbs-mssql:1433;databaseName=RDB_MODERN;encrypt=true;trustServerCertificate=true; - KAFKA_BOOTSTRAP_SERVER=kafka:29092 depends_on: - - kafka + kafka: + condition: service_healthy + nbs-mssql: + condition: service_healthy + liquibase: + condition: service_completed_successfully organization-service: build: @@ -156,7 +196,12 @@ services: - DB_HOST=jdbc:sqlserver://nbs-mssql:1433;databaseName=RDB_MODERN;encrypt=true;trustServerCertificate=true; - KAFKA_BOOTSTRAP_SERVER=kafka:29092 depends_on: - - kafka + kafka: + condition: service_healthy + nbs-mssql: + condition: service_healthy + liquibase: + condition: service_completed_successfully person-service: build: @@ -166,8 +211,14 @@ services: - DB_PASSWORD=person_service - DB_HOST=jdbc:sqlserver://nbs-mssql:1433;databaseName=RDB_MODERN;encrypt=true;trustServerCertificate=true; - KAFKA_BOOTSTRAP_SERVER=kafka:29092 + - spring.kafka.consumer.maxPollIntervalMs=3000 depends_on: - - kafka + kafka: + condition: service_healthy + nbs-mssql: + condition: service_healthy + liquibase: + condition: service_completed_successfully post-processing-service: build: @@ -178,4 +229,9 @@ services: - DB_HOST=jdbc:sqlserver://nbs-mssql:1433;databaseName=RDB_MODERN;encrypt=true;trustServerCertificate=true; - KAFKA_BOOTSTRAP_SERVER=kafka:29092 depends_on: - - kafka + kafka: + condition: service_healthy + nbs-mssql: + condition: service_healthy + liquibase: + condition: service_completed_successfully diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java new file mode 100644 index 000000000..efab0d449 --- /dev/null +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/IntegrationTest.java @@ -0,0 +1,94 @@ +package gov.cdc.etldatapipeline.postprocessingservice.integration; + +import static org.assertj.core.api.Assertions.assertThat; + +import gov.cdc.etldatapipeline.postprocessingservice.integration.kafkasink.KafkaSinkClient; +import gov.cdc.etldatapipeline.postprocessingservice.integration.patient.PatientCreator; +import gov.cdc.etldatapipeline.postprocessingservice.integration.rdb.DPatientFinder; +import gov.cdc.etldatapipeline.postprocessingservice.integration.util.Await; +import java.io.File; +import java.time.Duration; +import java.util.Optional; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.testcontainers.containers.ComposeContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +@SpringBootTest +@ActiveProfiles("test") +class IntegrationTest { + + @Autowired private PatientCreator patientCreator; + + @Autowired private DPatientFinder dPatientFinder; + + private static final Logger logger = LoggerFactory.getLogger(IntegrationTest.class); + private static final Slf4jLogConsumer consumer = new Slf4jLogConsumer(logger); + + @SuppressWarnings("resource") + private static final ComposeContainer environment = + new ComposeContainer( + DockerImageName.parse("docker:25.0.5"), new File("../docker-compose.yaml")) + // List specific services to prevent launching wildfly container + .withServices( + "nbs-mssql", + "liquibase", + "zookeeper", + "kafka", + "debezium", + "kafka-connect", + "person-service") + // Add specific waits to ensure connectors are ready before test execution + .waitingFor("debezium", Wait.forLogMessage(".*Starting streaming.*", 1)) + .waitingFor( + "kafka-connect", Wait.forLogMessage(".*Sink task finished initialization.*", 1)) + // Pull logs from the containers for better debugging + .withLogConsumer("nbs-mssql", consumer) + .withLogConsumer("liquibase", consumer) + .withLogConsumer("zookeeper", consumer) + .withLogConsumer("kafka", consumer) + .withLogConsumer("debezium", consumer) + .withLogConsumer("kafka-connect", consumer) + .withLogConsumer("person-service", consumer) + // Set the maximum startup timeout all the waits set are bounded to + .withStartupTimeout(Duration.ofMinutes(10)); + + @BeforeAll + static void setUp() { + // Start up necessary containers + environment.start(); + } + + @AfterAll + static void tearDown() { + // Stop all containers + environment.stop(); + } + + @Test + void patientDataIsSuccessfullyProcessed() throws InterruptedException { + // Insert a patient into NBS_ODSE + long createdPatient = patientCreator.create(); + assertThat(createdPatient).isNotZero(); + + // Wait for topics to be created then restart kafka sink connector so it picks + // up newly created nrt_ topics (TEMP WORKAROUND) + Thread.sleep(Duration.ofSeconds(20)); + KafkaSinkClient.restartSinkConnector(); + + // Validate patient data arrives in D_PATIENT with retry + Optional dPatientKey = + Await.waitFor(dPatientFinder::findDPatientKeyWithRetry, createdPatient); + + assertThat(dPatientKey).isPresent(); + assertThat(dPatientKey.get()).isNotZero(); + } +} diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/config/DataSourceConfig.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/config/DataSourceConfig.java new file mode 100644 index 000000000..3a6a89f9b --- /dev/null +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/config/DataSourceConfig.java @@ -0,0 +1,52 @@ +package gov.cdc.etldatapipeline.postprocessingservice.integration.config; + +import javax.sql.DataSource; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.context.annotation.Profile; +import org.springframework.jdbc.core.simple.JdbcClient; + +@Configuration +@Profile("test") +public class DataSourceConfig { + // Standard RTR datasource with appropriate permissions + @Bean + @Primary + @ConfigurationProperties("spring.datasource.primary") + public DataSourceProperties dataSourceProperties() { + return new DataSourceProperties(); + } + + @Bean + @Primary + public DataSource dataSource(DataSourceProperties properties) { + return properties.initializeDataSourceBuilder().build(); + } + + @Bean + @Primary + public JdbcClient jdbcClient(DataSource dataSource) { + return JdbcClient.create(dataSource); + } + + // Testing specific datasource with db ownership + @Bean("testProperties") + @ConfigurationProperties("spring.datasource.test") + public DataSourceProperties testDataSourceProperties() { + return new DataSourceProperties(); + } + + @Bean("testDataSource") + public DataSource testDataSource(@Qualifier("testProperties") DataSourceProperties properties) { + return properties.initializeDataSourceBuilder().build(); + } + + @Bean("testClient") + public JdbcClient testJdbcClient(@Qualifier("testDataSource") DataSource dataSource) { + return JdbcClient.create(dataSource); + } +} diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/id/IdGenerator.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/id/IdGenerator.java new file mode 100644 index 000000000..6c20588c9 --- /dev/null +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/id/IdGenerator.java @@ -0,0 +1,124 @@ +package gov.cdc.etldatapipeline.postprocessingservice.integration.id; + +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.jdbc.core.simple.JdbcClient; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +/** Responsible for managing database IDs in the Local_UID_generator table */ +@Service +public class IdGenerator { + + private JdbcClient client; + + public IdGenerator(@Qualifier("testClient") final JdbcClient client) { + this.client = client; + } + + private static final String SELECT = + """ + SELECT TOP 1 + UID_prefix_cd, + seed_value_nbr, + UID_suffix_cd + FROM + NBS_ODSE.dbo.local_uid_generator + WHERE + class_name_cd = :type + OR type_cd = :type + + """; + + private static final String INCREMENT = + """ + UPDATE + NBS_ODSE.dbo.local_uid_generator + SET + seed_value_nbr = seed_value_nbr + 1 + WHERE + class_name_cd = :type + OR type_cd = :type + """; + + /** + * Gets the next valid Id for the provided Type and increments the value. Will throw an exception + * if the provided type is not found. + * + * @param type {@link EntityType} + * @return {@link GeneratedId} + */ + @Transactional + public GeneratedId next(EntityType type) { + // Retrieve next valid Id + GeneratedId identifier = + client + .sql(SELECT) + .param("type", type.toString()) + .query( + (rs, rn) -> + new GeneratedId( + rs.getString("UID_prefix_cd"), + rs.getLong("seed_value_nbr"), + rs.getString("UID_suffix_cd"))) + .single(); + + // Increment table + client.sql(INCREMENT).param("type", type.toString()).update(); + + return identifier; + } + + public record GeneratedId(String prefix, Long id, String suffix) { + public String toLocalId() { + return prefix + id.toString() + suffix; + } + } + + /** + * Matches the class_name_cd column of the Local_UID_generator table, other than the NBS entry. + * Which references the type_cd column as the class_name_cd for type NBS is dynamic based on the + * jurisdiction + */ + public enum EntityType { + NBS, + CLINICAL_DOCUMENT, + COINFECTION_GROUP, + CS_REPORT, + CT_CONTACT, + DEDUPLICATION_LOG, + EPILINK, + GEOCODING, + GEOCODING_LOG, + GROUP, + INTERVENTION, + INTERVIEW, + MATERIAL, + NBS_DOCUMENT, + NBS_QUESTION_ID_LDF, + NBS_QUESTION_LDF, + NBS_UIMETEDATA_LDF, + NND_METADATA, + NON_LIVING_SUBJECT, + NOTIFICATION, + OBSERVATION, + ORGANIZATION, + PAGE, + PATIENT_ENCOUNTER, + PERSON, + PERSON_GROUP, + PLACE, + PUBLIC_HEALTH_CASE, + RDB_METADATA, + REFERRAL, + REPORT, + REPORTDATASOURCE, + REPORTDATASOURCECOLUMN, + REPORTDISPLAYCOLUMN, + REPORTFILTER, + REPORTFILTERCODE, + REPORTFILTERVALUE, + SECURITY_LOG, + TREATMENT, + WORKUP + } +} diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/kafkasink/KafkaSinkClient.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/kafkasink/KafkaSinkClient.java new file mode 100644 index 000000000..1203645af --- /dev/null +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/kafkasink/KafkaSinkClient.java @@ -0,0 +1,22 @@ +package gov.cdc.etldatapipeline.postprocessingservice.integration.kafkasink; + +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +@Component +public class KafkaSinkClient { + + private static final RestTemplate restTemplate = new RestTemplate(); + private static final String URL = + "http://localhost:8083/connectors/Kafka-Connect-SqlServer-Sink/restart?includeTasks=true"; + + public static void restartSinkConnector() { + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + + restTemplate.postForObject(URL, new HttpEntity<>(null, headers), String.class); + } +} diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/patient/PatientCreator.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/patient/PatientCreator.java new file mode 100644 index 000000000..1e3e70bab --- /dev/null +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/patient/PatientCreator.java @@ -0,0 +1,77 @@ +package gov.cdc.etldatapipeline.postprocessingservice.integration.patient; + +import gov.cdc.etldatapipeline.postprocessingservice.integration.id.IdGenerator; +import gov.cdc.etldatapipeline.postprocessingservice.integration.id.IdGenerator.EntityType; +import gov.cdc.etldatapipeline.postprocessingservice.integration.id.IdGenerator.GeneratedId; +import java.time.LocalDateTime; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.jdbc.core.simple.JdbcClient; +import org.springframework.stereotype.Component; + +/** Responsible for creating and inserting patient data into the NBS_ODSE for integration testing */ +@Component +public class PatientCreator { + + private IdGenerator idGenerator; + private JdbcClient client; + + public PatientCreator( + final IdGenerator idGenerator, @Qualifier("testClient") final JdbcClient client) { + this.idGenerator = idGenerator; + this.client = client; + } + + private static final String CREATE_QUERY = + """ + insert into NBS_ODSE.dbo.Entity(entity_uid, class_cd) values (:id, 'PSN'); + + insert into NBS_ODSE.dbo.Person( + person_uid, + person_parent_uid, + local_id, + version_ctrl_nbr, + cd, + electronic_ind, + edx_ind, + add_time, + add_user_id, + last_chg_time, + last_chg_user_id, + record_status_cd, + record_status_time, + status_cd, + status_time + ) values ( + :id, + :id, + :local, + 1, + 'PAT', + 'N', + 'Y', + :addedOn, + :addedBy, + :addedOn, + :addedBy, + 'ACTIVE', + :addedOn, + 'A', + :addedOn + ); + """; + + public long create() { + + GeneratedId identifier = idGenerator.next(EntityType.PERSON); + + this.client + .sql(CREATE_QUERY) + .param("id", identifier.id()) + .param("local", identifier.toLocalId()) + .param("addedOn", LocalDateTime.now()) + .param("addedBy", "9999") + .update(); + + return identifier.id(); + } +} diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/rdb/DPatientFinder.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/rdb/DPatientFinder.java new file mode 100644 index 000000000..cb8f80956 --- /dev/null +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/rdb/DPatientFinder.java @@ -0,0 +1,31 @@ +package gov.cdc.etldatapipeline.postprocessingservice.integration.rdb; + +import java.util.Optional; +import org.springframework.context.annotation.Profile; +import org.springframework.jdbc.core.simple.JdbcClient; +import org.springframework.stereotype.Component; + +@Component +@Profile("test") +public class DPatientFinder { + + private final JdbcClient client; + + public DPatientFinder(JdbcClient client) { + this.client = client; + } + + private static final String SELECT_KEY_BY_ID = + """ + SELECT TOP 1 + patient_key + FROM + D_PATIENT + WHERE + patient_mpr_uid = :id + """; + + public Optional findDPatientKeyWithRetry(long id) { + return client.sql(SELECT_KEY_BY_ID).param("id", id).query(Long.class).optional(); + } +} diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/util/Await.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/util/Await.java new file mode 100644 index 000000000..ce69428ce --- /dev/null +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/integration/util/Await.java @@ -0,0 +1,47 @@ +package gov.cdc.etldatapipeline.postprocessingservice.integration.util; + +import java.time.Duration; +import java.util.Optional; +import java.util.function.Function; + +public class Await { + + private static final Duration DEFAULT_RETRY_DELAY = Duration.ofSeconds(6); + private static final int DEFAULT_MAX_RETRY = 10; + + /** + * Calls the provided function until it returns a non-empty Optional or the retry limit is + * reached. + * + * @param Generic Input + * @param Generic Output + * @param function Function that accepts a single parameter of type I and returns an + * Optional{@literal } + * @param parameter The single parameter of type I to be passed to the function + * @return + */ + public static Optional waitFor(Function> function, I parameter) { + return waitFor(function, parameter, DEFAULT_MAX_RETRY, DEFAULT_RETRY_DELAY); + } + + @SuppressWarnings("java:S2925") // this code sleeps while waiting. + public static Optional waitFor( + Function> function, A parameter, int maxRetry, Duration retryDelay) { + int retryCount = 0; + Optional result = Optional.empty(); + + while (retryCount < maxRetry && result.isEmpty()) { + retryCount += 1; + result = function.apply(parameter); + + if (result.isEmpty()) { + try { + Thread.sleep(retryDelay); + } catch (InterruptedException e) { + return result; + } + } + } + return result; + } +} diff --git a/post-processing-service/src/test/resources/application-test.yaml b/post-processing-service/src/test/resources/application-test.yaml new file mode 100644 index 000000000..6211e3258 --- /dev/null +++ b/post-processing-service/src/test/resources/application-test.yaml @@ -0,0 +1,25 @@ +spring: + datasource: + primary: + username: post_processing_service_rdb + password: post_processing_service + url: jdbc:sqlserver://localhost:3433;databaseName=RDB_MODERN;encrypt=true;trustServerCertificate=true; + test: + username: sa + password: PizzaIsGood33! + url: jdbc:sqlserver://localhost:3433;databaseName=NBS_ODSE;encrypt=true;trustServerCertificate=true; + kafka: + consumer: + maxPollIntervalMs: 6000 + max-retry: 10 +service: + fixed-delay: + cached-ids: 6000 + datamart: 6000 + backfill: 6000 + max-retries: 10 + +logging: + level: + org.springframework.jdbc.core: TRACE + org.testcontainers: DEBUG