diff --git a/observation-service/README.md b/observation-service/README.md new file mode 100644 index 000000000..7701eb7a4 --- /dev/null +++ b/observation-service/README.md @@ -0,0 +1,28 @@ +## Observation Service + +### Run on Your Host Machine +The following instructions require that the Kafka and Database containers are running at a minimum. Also note that `application-local.yaml` is included in `.gitignore` so keep this file stored somewhere for reuse. + +1. Create the properties file `src/main/resources/application-local.yaml` and populate with the following content: +```yaml +spring: + kafka: + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER:localhost:9092} + group-id: ${KAFKA_CONSUMER_APP:observation-reporting-consumer-app} + datasource: + password: ${DB_PASSWORD:PizzaIsGood33!} + username: ${DB_USERNAME:sa} + url: ${DB_HOST:jdbc:sqlserver://localhost:3433;databaseName=NBS_ODSE;encrypt=true;trustServerCertificate=true;} +``` +2. Ensure the `kafka` and `rtr-mssql` containers running. You likely want all your RTR containers running for complete testing! +```shell +docker ps -a -f "name=kafka$" -f "name=rtr-mssql$" +``` +3. In the root of this repository execute the following command so the service uses `application-local.yaml`. +```shell +./gradlew :observation-service:bootRun --args='--spring.profiles.active=local' +``` +4. (Optional) Run the service in debug mode. +```shell +./gradlew :observation-service:bootRun --args='--spring.profiles.active=local' --debug-jvm +``` \ No newline at end of file diff --git a/post-processing-service/README.md b/post-processing-service/README.md new file mode 100644 index 000000000..a585c7802 --- /dev/null +++ b/post-processing-service/README.md @@ -0,0 +1,24 @@ +## Post Processing Service + +### Run on Your Host Machine +The following instructions require that the Kafka and Database containers are running at a minimum. Also note that `application-local.yaml` is included in `.gitignore` so keep this file stored somewhere for reuse. + +1. Create the properties file `src/main/resources/application-local.yaml` and populate with the following content: +```yaml +spring: + kafka: + group-id: ${KAFKA_CONSUMER_APP:post-processing-reporting-consumer-app} + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER:localhost:9092} + datasource: + password: ${DB_PASSWORD:PizzaIsGood33!} + username: ${DB_USERNAME:sa} + url: ${DB_HOST:jdbc:sqlserver://localhost:3433;databaseName=RDB_MODERN;encrypt=true;trustServerCertificate=true;} +``` +2. Ensure the `kafka` and `rtr-mssql` containers running. You likely want all your RTR containers running for complete testing! +```shell +docker ps -a -f "name=kafka$" -f "name=rtr-mssql$" +``` +3. In the root of this repository execute the following command (this service is configured to run in debug mode on port 17070 in `build.gradle` by previous dev team). +```shell +./gradlew :post-processing-service:bootRun --args='--spring.profiles.active=local' +``` \ No newline at end of file diff --git a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java index dc71545fc..c1f8feb62 100644 --- a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java +++ b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java @@ -281,6 +281,24 @@ private void extractValFromMessage(Long uid, String topic, String payload) { "Order_rslt")) { obsCache.computeIfAbsent(LAB_REPORT, k -> new ConcurrentLinkedQueue<>()).add(uid); } + + /* + Extract the result observation uid from the Order observation payload since Debezium does not detect + changes for Result lab records on the Observation table. When processObservation is called, this uid + will be in the 'labIds' list and passed into the target sp_d_lab_test_postprocessing stored procedure. + + This is specifically for ELRs so we need to make sure this is associated with an ELR. + */ + JsonNode resUidNode = payloadNode.path("result_observation_uid"); + String electronicInd = payloadNode.path("electronic_ind").asText(); // should always be there + if (resUidNode != null && !Objects.equals(resUidNode.asText(), "") + && domainCd.equalsIgnoreCase("Order") && electronicInd.equalsIgnoreCase("Y")) { + List resUidList = List.of(resUidNode.asText().split(",")); + resUidList.forEach(resUid -> { + Long rUid = Long.valueOf(resUid); + obsCache.computeIfAbsent(LAB_REPORT, k -> new ConcurrentLinkedQueue<>()).add(rUid); + }); + } } } catch (Exception ex) { logger.warn("Error processing ID values for the {} message: {}", topic, ex.getMessage()); diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceEntityTest.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceEntityTest.java index 18c74c311..6f19bd6ff 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceEntityTest.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceEntityTest.java @@ -496,6 +496,48 @@ void testPostProcessObservationLab(String payload) { assertTrue(logs.get(9).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); } + + void testPostProcessObservationLabOrderWithResult() { + String topic = "dummy_observation"; + String key = "{\"payload\":{\"observation_uid\":12344}}"; + String payload = """ + { + "payload": { + "observation_uid": 12344, + "obs_domain_cd_st_1": "Order", + "ctrl_cd_display_form": "LabReport", + "result_observation_uid": 12345, + "electronic_ind": "Y" + } + } + """; + + postProcessingServiceMock.processNrtMessage(topic, key, payload); + assertEquals(12344L, postProcessingServiceMock.idCache.get(topic).element()); + assertTrue(postProcessingServiceMock.idCache.containsKey(topic)); + + assertTrue(postProcessingServiceMock.obsCache.containsKey(LAB_REPORT)); + assertTrue(postProcessingServiceMock.obsCache.get(LAB_REPORT).contains(12344L)); + + // The result uid should only be added to the obsCache + assertTrue(postProcessingServiceMock.obsCache.containsKey(LAB_REPORT)); + assertTrue(postProcessingServiceMock.obsCache.get(LAB_REPORT).contains(12345L)); + assertFalse(postProcessingServiceMock.idCache.get(topic).contains(12345L)); + + postProcessingServiceMock.processCachedIds(); + + String expectedObsIdsString = "12344,12345"; + verify(postProcRepositoryMock).executeStoredProcForLabTest(expectedObsIdsString); + verify(postProcRepositoryMock).executeStoredProcForLabTestResult(expectedObsIdsString); + List logs = listAppender.list; + assertEquals(10, logs.size()); + assertTrue(logs.get(2).getFormattedMessage().contains("sp_d_lab_test_postprocessing")); + assertTrue(logs.get(4).getFormattedMessage().contains("sp_d_labtest_result_postprocessing")); + assertTrue(logs.get(6).getFormattedMessage().contains("sp_lab100_datamart_postprocessing")); + assertTrue(logs.get(8).getFormattedMessage().contains("sp_lab101_datamart_postprocessing")); + assertTrue(logs.get(9).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); + } + @ParameterizedTest @CsvSource({ "'{\"payload\":{\"observation_uid\":123, \"obs_domain_cd_st_1\": \"Result\",\"ctrl_cd_display_form\": \"MorbReport\"}}'",