diff --git a/data-ingestion-service/build.gradle b/data-ingestion-service/build.gradle index 977f53fdc..6bec6212a 100644 --- a/data-ingestion-service/build.gradle +++ b/data-ingestion-service/build.gradle @@ -225,7 +225,7 @@ dependencies { implementation 'io.prometheus:simpleclient_hotspot:0.16.0' implementation 'io.prometheus:simpleclient_httpserver:0.16.0' implementation 'io.prometheus:simpleclient_pushgateway:0.16.0' - implementation 'io.micrometer:micrometer-registry-prometheus:1.13.4' + implementation 'io.micrometer:micrometer-registry-prometheus:1.15.0-M2' // Apache camel implementation 'org.apache.camel.springboot:camel-spring-boot-starter:4.8.0' diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/camel/routes/HL7FileProcessComponent.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/camel/routes/HL7FileProcessComponent.java index 671220789..b582850cd 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/camel/routes/HL7FileProcessComponent.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/camel/routes/HL7FileProcessComponent.java @@ -1,7 +1,7 @@ package gov.cdc.dataingestion.camel.routes; -import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; -import gov.cdc.dataingestion.rawmessage.service.RawELRService; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.rawmessage.service.RawElrService; import org.apache.camel.Handler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,10 +19,10 @@ public class HL7FileProcessComponent { private static Logger logger = LoggerFactory.getLogger(HL7FileProcessComponent.class); String msgType = "HL7"; - private RawELRService rawELRService; + private RawElrService rawELRService; @Autowired - public HL7FileProcessComponent(RawELRService rawELRService){ + public HL7FileProcessComponent(RawElrService rawELRService){ this.rawELRService=rawELRService; } @Handler @@ -34,11 +34,12 @@ public String process(String body) { String hl7Str = body; logger.debug("HL7 Message:{}", hl7Str); if (hl7Str != null && !hl7Str.trim().isEmpty()) { - RawERLDto rawERLDto = new RawERLDto(); - rawERLDto.setType(msgType); - rawERLDto.setValidationActive(true); - rawERLDto.setPayload(hl7Str); - elrId = rawELRService.submission(rawERLDto,version); + RawElrDto rawElrDto = new RawElrDto(); + rawElrDto.setType(msgType); + rawElrDto.setValidationActive(true); + rawElrDto.setPayload(hl7Str); + rawElrDto.setVersion(version); + elrId = rawELRService.submission(rawElrDto); } } catch (Exception e) { logger.error(e.getMessage()); diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/controller/ElrDeadLetterController.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/controller/ElrDeadLetterController.java index 75da2287b..b8f63ab69 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/controller/ElrDeadLetterController.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/controller/ElrDeadLetterController.java @@ -3,6 +3,7 @@ import gov.cdc.dataingestion.deadletter.model.ElrDeadLetterDto; import gov.cdc.dataingestion.deadletter.service.ElrDeadLetterService; import gov.cdc.dataingestion.exception.DeadLetterTopicException; +import gov.cdc.dataingestion.exception.KafkaProducerException; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.enums.ParameterIn; @@ -87,7 +88,7 @@ public ResponseEntity getErrorMessage(@PathVariable("dlt-id") schema = @Schema(type = "string"))} ) @PostMapping(consumes = MediaType.TEXT_PLAIN_VALUE, path = "/api/elrs/{dlt-id}") - public ResponseEntity messageReInject(@PathVariable("dlt-id") String dltId, @RequestBody final String payload) throws DeadLetterTopicException { + public ResponseEntity messageReInject(@PathVariable("dlt-id") String dltId, @RequestBody final String payload) throws DeadLetterTopicException, KafkaProducerException { return ResponseEntity.ok(elrDeadLetterService.updateAndReprocessingMessage(dltId, payload)); } } \ No newline at end of file diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/repository/IElrDeadLetterRepository.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/repository/IElrDeadLetterRepository.java index fdb709f0d..e29417f0f 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/repository/IElrDeadLetterRepository.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/repository/IElrDeadLetterRepository.java @@ -1,8 +1,11 @@ package gov.cdc.dataingestion.deadletter.repository; import gov.cdc.dataingestion.deadletter.repository.model.ElrDeadLetterModel; +import jakarta.transaction.Transactional; import org.springframework.data.domain.Sort; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; import org.springframework.stereotype.Repository; import java.util.List; @@ -11,4 +14,22 @@ @Repository public interface IElrDeadLetterRepository extends JpaRepository { Optional> findAllDltRecordByDltStatus (String dltStatus, Sort sort); -} + + @Modifying + @Transactional + @Query(value = "INSERT INTO elr_dlt (error_message_id, error_message_source, error_stack_trace, error_stack_trace_short, dlt_status, dlt_occurrence, message, created_by, updated_by) VALUES(:id, :topicName, :errorStatus, :errorStatus , 'KAFKA_ERROR_' + :type, :dltOccurrence, :payload, 'elr_raw_service', 'elr_raw_service')", nativeQuery = true) + void addErrorStatusForRawId(String id, String topicName, String type, String payload, String errorStatus, int dltOccurrence); + + @Modifying + @Transactional + @Query(value = "UPDATE elr_dlt SET dlt_occurrence = :dltOccurrence, dlt_status = :errorStatus WHERE error_message_id =:id", nativeQuery = true) + void updateDltOccurrenceForRawId(String id, int dltOccurrence, String errorStatus); + + @Modifying + @Transactional + @Query(value = "UPDATE elr_dlt SET dlt_status = :errorStatus WHERE error_message_id = :id", nativeQuery = true) + void updateErrorStatusForRawId(String id, String errorStatus); + + @Query(value = "SELECT * FROM elr_dlt WHERE dlt_status LIKE '%KAFKA%' AND dlt_occurrence <= 2", nativeQuery = true) + List getAllErrorDltRecordForKafkaError(); +} \ No newline at end of file diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterService.java index bdc7a53b7..f825986b7 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterService.java @@ -7,9 +7,12 @@ import gov.cdc.dataingestion.deadletter.repository.IElrDeadLetterRepository; import gov.cdc.dataingestion.deadletter.repository.model.ElrDeadLetterModel; import gov.cdc.dataingestion.exception.DeadLetterTopicException; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; -import gov.cdc.dataingestion.report.repository.IRawELRRepository; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.rawmessage.service.RawElrService; +import gov.cdc.dataingestion.report.repository.IRawElrRepository; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.validation.repository.IValidatedELRRepository; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; import lombok.extern.slf4j.Slf4j; @@ -17,10 +20,7 @@ import org.springframework.data.domain.Sort; import org.springframework.stereotype.Service; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.UUID; +import java.util.*; import static gov.cdc.dataingestion.share.helper.TimeStampHelper.getCurrentTimeStamp; @@ -35,9 +35,10 @@ @SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"}) public class ElrDeadLetterService { private final IElrDeadLetterRepository dltRepository; - private final IRawELRRepository rawELRRepository; + private final IRawElrRepository rawELRRepository; private final IValidatedELRRepository validatedELRRepository; private final KafkaProducerService kafkaProducerService; + private final RawElrService rawELRService; @Value("${service.timezone}") private String tz = "UTC"; @@ -63,13 +64,14 @@ public class ElrDeadLetterService { public ElrDeadLetterService( IElrDeadLetterRepository dltRepository, - IRawELRRepository rawELRRepository, + IRawElrRepository rawELRRepository, IValidatedELRRepository validatedELRRepository, - KafkaProducerService kafkaProducerService) { + KafkaProducerService kafkaProducerService, RawElrService rawELRService) { this.dltRepository = dltRepository; this.rawELRRepository = rawELRRepository; this.validatedELRRepository = validatedELRRepository; this.kafkaProducerService = kafkaProducerService; + this.rawELRService = rawELRService; } public List getAllErrorDltRecord() { @@ -101,7 +103,7 @@ public ElrDeadLetterDto getDltRecordById(String id) throws DeadLetterTopicExcept - public ElrDeadLetterDto updateAndReprocessingMessage(String id, String body) throws DeadLetterTopicException { + public ElrDeadLetterDto updateAndReprocessingMessage(String id, String body) throws DeadLetterTopicException, KafkaProducerException { var existingRecord = getDltRecordById(id); if(!existingRecord.getDltStatus().equalsIgnoreCase(EnumElrDltStatus.ERROR.name())) { throw new DeadLetterTopicException("Selected record is in REINJECTED state. Please either wait for the ERROR state to occur or select a different record."); @@ -114,7 +116,7 @@ public ElrDeadLetterDto updateAndReprocessingMessage(String id, String body) thr if (!rawRecord.isPresent()) { throw new DeadLetterTopicException(DEAD_LETTER_NULL_EXCEPTION); } - RawERLModel rawModel = rawRecord.get(); + RawElrModel rawModel = rawRecord.get(); rawModel.setPayload(body); rawModel.setUpdatedOn(getCurrentTimeStamp(tz)); @@ -207,4 +209,33 @@ private boolean isValidUUID(String uuidString) { return false; } } + + public void processFailedMessagesFromKafka() throws KafkaProducerException { + List dltMessagesList = dltRepository.getAllErrorDltRecordForKafkaError(); + if(!dltMessagesList.isEmpty()) { + Iterator iterator = dltMessagesList.iterator(); + while (iterator.hasNext()) { + ElrDeadLetterModel message = iterator.next(); + RawElrDto rawElrDto = new RawElrDto(); + rawElrDto.setId(message.getErrorMessageId()); + rawElrDto.setType(getElrMessageType(message.getDltStatus())); + rawElrDto.setPayload(message.getMessage()); + rawElrDto.setValidationActive(true); + dltRepository.updateErrorStatusForRawId(message.getErrorMessageId(), "PROCESSED"); + rawELRService.updateRawMessageAfterRetry(rawElrDto, 2); + iterator.remove(); + } + } + } + + String getElrMessageType(String dltStatus) { + String delimiter = "KAFKA_ERROR"; + int delimiterIndex = dltStatus.indexOf(delimiter); + + String msgType = ""; + if (delimiterIndex != -1) { + msgType = dltStatus.substring(delimiterIndex + 1 + delimiter.length()); + } + return msgType; + } } diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfig.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfig.java new file mode 100644 index 000000000..4a6ea7253 --- /dev/null +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfig.java @@ -0,0 +1,28 @@ +package gov.cdc.dataingestion.deadletter.service; + +import gov.cdc.dataingestion.exception.KafkaProducerException; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; + +@Configuration +@EnableScheduling +public class SchedulerConfig { + + @Value("${dlt.scheduler.enabled}") + private boolean isSchedulerEnabled; + + private final ElrDeadLetterService elrDeadLetterService; + + public SchedulerConfig(ElrDeadLetterService elrDeadLetterService) { + this.elrDeadLetterService = elrDeadLetterService; + } + + @Scheduled(cron = "${dlt.scheduler.cron}") + public void scheduleTask() throws KafkaProducerException { + if (isSchedulerEnabled) { + elrDeadLetterService.processFailedMessagesFromKafka(); + } + } +} diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/exception/KafkaProducerException.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/exception/KafkaProducerException.java new file mode 100644 index 000000000..1940ca78c --- /dev/null +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/exception/KafkaProducerException.java @@ -0,0 +1,14 @@ +package gov.cdc.dataingestion.exception; + +/** + 1118 - require constructor complaint + 125 - comment complaint + 6126 - String block complaint + 1135 - todos complaint + * */ +@SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"}) +public class KafkaProducerException extends Exception{ + public KafkaProducerException(String message) { + super(message); + } +} diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java index 4bca8ca11..e7f47b033 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java @@ -19,8 +19,8 @@ import gov.cdc.dataingestion.nbs.converters.Hl7ToRhapsodysXmlConverter; import gov.cdc.dataingestion.nbs.repository.model.NbsInterfaceModel; import gov.cdc.dataingestion.nbs.services.NbsRepositoryServiceProvider; -import gov.cdc.dataingestion.report.repository.IRawELRRepository; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.report.repository.IRawElrRepository; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.reportstatus.model.ReportStatusIdData; import gov.cdc.dataingestion.reportstatus.repository.IReportStatusRepository; import gov.cdc.dataingestion.validation.integration.validator.interfaces.IHL7DuplicateValidator; @@ -86,7 +86,7 @@ public class KafkaConsumerService { private String prepFhirTopic = "fhir_prep"; private final KafkaProducerService kafkaProducerService; private final IHL7v2Validator iHl7v2Validator; - private final IRawELRRepository iRawELRRepository; + private final IRawElrRepository iRawELRRepository; private final IValidatedELRRepository iValidatedELRRepository; private final IHL7DuplicateValidator iHL7DuplicateValidator; private final NbsRepositoryServiceProvider nbsRepositoryServiceProvider; @@ -105,7 +105,7 @@ public class KafkaConsumerService { //region CONSTRUCTOR public KafkaConsumerService( IValidatedELRRepository iValidatedELRRepository, - IRawELRRepository iRawELRRepository, + IRawElrRepository iRawELRRepository, KafkaProducerService kafkaProducerService, IHL7v2Validator iHl7v2Validator, IHL7DuplicateValidator iHL7DuplicateValidator, @@ -182,7 +182,7 @@ public void handleMessageForRawElr(String message, } try { validationHandler(message, hl7ValidationActivated, dataProcessingEnable); - } catch (DuplicateHL7FileFoundException | DiHL7Exception e) { + } catch (DuplicateHL7FileFoundException | DiHL7Exception | KafkaProducerException e) { throw new RuntimeException(e); //NOSONAR } }); @@ -250,12 +250,20 @@ public void handleMessageForElrXml(String message, iReportStatusRepository.save(reportStatusIdData); if (dataProcessingApplied) { - kafkaProducerService.sendMessageAfterConvertedToXml( - String.valueOf(nbsInterfaceModel.getNbsInterfaceUid()), "dp_elr_unprocessed", 0); + try { + kafkaProducerService.sendMessageAfterConvertedToXml( + String.valueOf(nbsInterfaceModel.getNbsInterfaceUid()), "dp_elr_unprocessed", 0); + } catch (KafkaProducerException e) { + throw new RuntimeException(e); //NOSONAR + } } else { - kafkaProducerService.sendMessageAfterConvertedToXml( - nbsInterfaceModel.getNbsInterfaceUid().toString(), convertedToXmlTopic, 0); + try { + kafkaProducerService.sendMessageAfterConvertedToXml( + nbsInterfaceModel.getNbsInterfaceUid().toString(), convertedToXmlTopic, 0); + } catch (KafkaProducerException e) { + throw new RuntimeException(e); //NOSONAR + } } }); @@ -318,7 +326,11 @@ public void handleMessageForXmlConversionElr(String message, @Header(KafkaHeaderValue.DATA_PROCESSING_ENABLE) String dataProcessingEnable) { timeMetricsBuilder.recordXmlPrepTime(() -> { log.debug(topicDebugLog, message, topic); - xmlConversionHandler(message, operation, dataProcessingEnable); + try { + xmlConversionHandler(message, operation, dataProcessingEnable); + } catch (KafkaProducerException e) { + throw new RuntimeException(e); //NOSONAR + } }); } @@ -455,7 +467,7 @@ private String getDltErrorSource(String incomingTopic) { return erroredSource; } - private void preparationForConversionHandler(String message, String dataProcessingEnable) throws ConversionPrepareException { + private void preparationForConversionHandler(String message, String dataProcessingEnable) throws ConversionPrepareException, KafkaProducerException { Optional validatedElrResponse = this.iValidatedELRRepository.findById(message); if(validatedElrResponse.isPresent()) { kafkaProducerService.sendMessagePreparationTopic(validatedElrResponse.get(), prepXmlTopic, TopicPreparationType.XML, 0, dataProcessingEnable); @@ -469,7 +481,7 @@ private void preparationForConversionHandler(String message, String dataProcessi * make this public so we can add unit test for now. * we need to implementation interface pattern for NBS convert and transformation classes. it better for unit testing * */ - public void xmlConversionHandlerProcessing(String message, String operation, String dataProcessingEnable) { + public void xmlConversionHandlerProcessing(String message, String operation, String dataProcessingEnable) throws KafkaProducerException { String hl7Msg = ""; try { Optional validatedELRModel = iValidatedELRRepository.findById(message); @@ -548,13 +560,13 @@ public void xmlConversionHandlerProcessing(String message, String operation, Str ); } } - private void xmlConversionHandler(String message, String operation, String dataProcessingEnable) { + private void xmlConversionHandler(String message, String operation, String dataProcessingEnable) throws KafkaProducerException { log.debug("Received message id will be retrieved from db and associated hl7 will be converted to xml"); xmlConversionHandlerProcessing(message, operation, dataProcessingEnable); } - private void validationHandler(String message, boolean hl7ValidationActivated, String dataProcessingEnable) throws DuplicateHL7FileFoundException, DiHL7Exception { - Optional rawElrResponse = this.iRawELRRepository.findById(message); - RawERLModel elrModel; + private void validationHandler(String message, boolean hl7ValidationActivated, String dataProcessingEnable) throws DuplicateHL7FileFoundException, DiHL7Exception, KafkaProducerException { + Optional rawElrResponse = this.iRawELRRepository.findById(message); + RawElrModel elrModel; if (!rawElrResponse.isEmpty()) { elrModel = rawElrResponse.get(); } else { diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaProducerService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaProducerService.java index 10f90b7ec..b7ff6bf5e 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaProducerService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaProducerService.java @@ -5,6 +5,7 @@ import gov.cdc.dataingestion.constant.TopicPreparationType; import gov.cdc.dataingestion.constant.enums.EnumKafkaOperation; import gov.cdc.dataingestion.exception.ConversionPrepareException; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.core.KafkaTemplate; @@ -13,6 +14,9 @@ import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; @Service /** @@ -31,7 +35,8 @@ public class KafkaProducerService { private final KafkaTemplate kafkaTemplate; - public KafkaProducerService( KafkaTemplate kafkaTemplate) { + + public KafkaProducerService(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @@ -40,7 +45,7 @@ public void sendMessageFromController(String msg, String msgType, Integer dltOccurrence, Boolean validationActive, - String version) { + String version) throws KafkaProducerException { String uniqueID = msgType + "_" + UUID.randomUUID(); var prodRecord = new ProducerRecord<>(topic, uniqueID, msg); prodRecord.headers().add(KafkaHeaderValue.MESSAGE_TYPE, msgType.getBytes()); @@ -58,7 +63,7 @@ public void sendElrXmlMessageFromController(String msgId, String topic, String msgType, Integer dltOccurrence, - String payload, String version) { + String payload, String version) throws KafkaProducerException { String uniqueID = msgType + "_" + msgId; var prodRecord = new ProducerRecord<>(topic, uniqueID, payload); prodRecord.headers().add(KafkaHeaderValue.MESSAGE_TYPE, msgType.getBytes()); @@ -72,7 +77,7 @@ public void sendElrXmlMessageFromController(String msgId, } public void sendMessageFromDltController( - String msg, String topic, String msgType, Integer dltOccurrence) { + String msg, String topic, String msgType, Integer dltOccurrence) throws KafkaProducerException { String uniqueID = msgType + "_" + UUID.randomUUID(); var prodRecord = new ProducerRecord<>(topic, uniqueID, msg); prodRecord.headers().add(KafkaHeaderValue.MESSAGE_TYPE, msgType.getBytes()); @@ -84,9 +89,12 @@ public void sendMessageFromDltController( } + /** + * @deprecated This method is deprecated and will be removed in a future release. + */ @Deprecated @SuppressWarnings("java:S1133") - public void sendMessageFromCSVController(List> msg, String topic, String msgType) { + public void sendMessageFromCSVController(List> msg, String topic, String msgType) throws KafkaProducerException { String uniqueID = msgType + "_" + UUID.randomUUID(); Gson gson = new Gson(); String json = gson.toJson(msg); @@ -96,7 +104,7 @@ public void sendMessageFromCSVController(List> msg, String topic, S sendMessage(prodRecord); } - public void sendMessageAfterValidatingMessage(ValidatedELRModel msg, String topic, Integer dltOccurrence, String dataProcessingEnable) { + public void sendMessageAfterValidatingMessage(ValidatedELRModel msg, String topic, Integer dltOccurrence, String dataProcessingEnable) throws KafkaProducerException { String uniqueID = PREFIX_MSG_VALID + msg.getMessageType() + "_" + UUID.randomUUID(); var prodRecord = new ProducerRecord<>(topic, uniqueID, msg.getId()); prodRecord.headers().add(KafkaHeaderValue.MESSAGE_TYPE, msg.getMessageType().getBytes()); @@ -108,7 +116,7 @@ public void sendMessageAfterValidatingMessage(ValidatedELRModel msg, String topi sendMessage(prodRecord); } @SuppressWarnings({"java:S6880"}) - public void sendMessagePreparationTopic(ValidatedELRModel msg, String topic, TopicPreparationType topicType, Integer dltOccurrence, String dataProcessingEnable) throws ConversionPrepareException { + public void sendMessagePreparationTopic(ValidatedELRModel msg, String topic, TopicPreparationType topicType, Integer dltOccurrence, String dataProcessingEnable) throws ConversionPrepareException, KafkaProducerException { String uniqueId; if (topicType == TopicPreparationType.XML) { @@ -127,7 +135,7 @@ else if (topicType == TopicPreparationType.FHIR) { private void sendMessageHelper(String topic, Integer dltOccurrence, String uniqueId, String messageOriginId, String messageType, String messageVersion, - String dataProcessingEnable) { + String dataProcessingEnable) throws KafkaProducerException { var prodRecord = new ProducerRecord<>(topic, uniqueId, messageOriginId); prodRecord.headers().add(KafkaHeaderValue.MESSAGE_TYPE, messageType.getBytes()); prodRecord.headers().add(KafkaHeaderValue.MESSAGE_VERSION, messageVersion.getBytes()); @@ -139,7 +147,7 @@ private void sendMessageHelper(String topic, Integer dltOccurrence, String uniqu } public void sendMessageDlt(String msgShort, String msg, String topic, Integer dltOccurrence, - String stackTrace, String originalTopic) { + String stackTrace, String originalTopic) throws KafkaProducerException { String uniqueID = "DLT_" + UUID.randomUUID(); var prodRecord = new ProducerRecord<>(topic, uniqueID, msg); prodRecord.headers().add(KafkaHeaderValue.DLT_OCCURRENCE, dltOccurrence.toString().getBytes()); @@ -151,7 +159,7 @@ public void sendMessageDlt(String msgShort, String msg, String topic, Integer dl - public void sendMessageAfterConvertedToXml(String xmlMsg, String topic, Integer dltOccurrence) { + public void sendMessageAfterConvertedToXml(String xmlMsg, String topic, Integer dltOccurrence) throws KafkaProducerException { String uniqueID = PREFIX_MSG_XML + UUID.randomUUID(); var prodRecord = new ProducerRecord<>(topic, uniqueID, xmlMsg); prodRecord.headers().add(KafkaHeaderValue.DLT_OCCURRENCE, dltOccurrence.toString().getBytes()); @@ -160,7 +168,7 @@ public void sendMessageAfterConvertedToXml(String xmlMsg, String topic, Integer sendMessage(prodRecord); } - public void sendMessageAfterCheckingDuplicateHL7(ValidatedELRModel msg, String validatedElrDuplicateTopic, Integer dltOccurrence) { + public void sendMessageAfterCheckingDuplicateHL7(ValidatedELRModel msg, String validatedElrDuplicateTopic, Integer dltOccurrence) throws KafkaProducerException { String uniqueID = PREFIX_MSG_HL7 + UUID.randomUUID(); var prodRecord = new ProducerRecord<>(validatedElrDuplicateTopic, uniqueID, msg.getRawId()); prodRecord.headers().add(KafkaHeaderValue.DLT_OCCURRENCE, dltOccurrence.toString().getBytes()); @@ -168,12 +176,17 @@ public void sendMessageAfterCheckingDuplicateHL7(ValidatedELRModel msg, String v sendMessage(prodRecord); } - - - - private void sendMessage(ProducerRecord prodRecord) { - kafkaTemplate.send(prodRecord); + private void sendMessage(ProducerRecord prodRecord) throws KafkaProducerException { + try { + kafkaTemplate.send(prodRecord).get(3, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new KafkaProducerException("Thread was interrupted while sending Kafka message to topic: " + + prodRecord.topic() + " with UUID: " + prodRecord.value()); + } + catch (TimeoutException | ExecutionException e) { + throw new KafkaProducerException("Failed publishing message to kafka topic: " + prodRecord.topic() + " with UUID: " + prodRecord.value()); + } } - } \ No newline at end of file diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsController.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsController.java index 90ae3ca82..1afe7f0c4 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsController.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsController.java @@ -1,9 +1,10 @@ package gov.cdc.dataingestion.rawmessage.controller; import gov.cdc.dataingestion.custommetrics.CustomMetricsBuilder; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.hl7.helper.integration.exception.DiHL7Exception; -import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; -import gov.cdc.dataingestion.rawmessage.service.RawELRService; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.rawmessage.service.RawElrService; import gov.cdc.dataingestion.validation.services.interfaces.IHL7Service; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -38,12 +39,12 @@ @SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"}) public class ElrReportsController { - private final RawELRService rawELRService; + private final RawElrService rawELRService; private final CustomMetricsBuilder customMetricsBuilder; private IHL7Service hl7Service; @Autowired - public ElrReportsController(RawELRService rawELRService, + public ElrReportsController(RawElrService rawELRService, CustomMetricsBuilder customMetricsBuilder, IHL7Service hl7Service) { this.rawELRService = rawELRService; @@ -70,27 +71,29 @@ public ElrReportsController(RawELRService rawELRService, ) @PostMapping(consumes = MediaType.TEXT_PLAIN_VALUE, path = "/api/elrs") public ResponseEntity save(@RequestBody final String payload, @RequestHeader("msgType") String type, - @RequestHeader(name = "version", defaultValue = "1") String version) { + @RequestHeader(name = "version", defaultValue = "1") String version) throws KafkaProducerException { if (type.isEmpty()) { throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Required headers should not be null"); } else { type = type.toUpperCase(); } - RawERLDto rawERLDto = new RawERLDto(); + RawElrDto rawElrDto = new RawElrDto(); customMetricsBuilder.incrementMessagesProcessed(); if (type.equalsIgnoreCase(HL7_ELR)) { - rawERLDto.setType(type); - rawERLDto.setPayload(payload); - rawERLDto.setValidationActive(true); - return ResponseEntity.ok(rawELRService.submission(rawERLDto, version)); + rawElrDto.setType(type); + rawElrDto.setPayload(payload); + rawElrDto.setValidationActive(true); + rawElrDto.setVersion(version); + return ResponseEntity.ok(rawELRService.submission(rawElrDto)); } else if (type.equalsIgnoreCase(XML_ELR)) { - rawERLDto.setType(type); - rawERLDto.setPayload(payload); - rawERLDto.setValidationActive(true); - return ResponseEntity.ok(rawELRService.submission(rawERLDto, version)); + rawElrDto.setType(type); + rawElrDto.setPayload(payload); + rawElrDto.setValidationActive(true); + rawElrDto.setVersion(version); + return ResponseEntity.ok(rawELRService.submission(rawElrDto)); } else { throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Please provide valid value for msgType header"); diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/dto/RawERLDto.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/dto/RawElrDto.java similarity index 88% rename from data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/dto/RawERLDto.java rename to data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/dto/RawElrDto.java index 1e525ed40..dde60c6d4 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/dto/RawERLDto.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/dto/RawElrDto.java @@ -10,11 +10,12 @@ 1135 - todos complaint * */ @SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"}) -public class RawERLDto { +public class RawElrDto { private String id; private String type; private String payload; + private String version; private Boolean validationActive = false; } diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawELRService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawELRService.java deleted file mode 100644 index 6859787a1..000000000 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawELRService.java +++ /dev/null @@ -1,83 +0,0 @@ -package gov.cdc.dataingestion.rawmessage.service; - -import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; -import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; -import gov.cdc.dataingestion.report.repository.IRawELRRepository; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; - -import static gov.cdc.dataingestion.constant.MessageType.HL7_ELR; -import static gov.cdc.dataingestion.constant.MessageType.XML_ELR; -import static gov.cdc.dataingestion.share.helper.TimeStampHelper.getCurrentTimeStamp; - -@Service -@RequiredArgsConstructor -@Slf4j -public class RawELRService { - - private static final String CREATED_BY = "admin"; - @Value("${kafka.raw.topic}") - String topicName; - - @Value("${kafka.raw.xml-topic}") - String rawXmlTopicName; - - @Value("${service.timezone}") - private String tz = "UTC"; - - private final IRawELRRepository rawELRRepository; - private final KafkaProducerService kafkaProducerService; - - - public String submission(RawERLDto rawERLDto, String version) { - RawERLModel created = rawELRRepository.save(convert(rawERLDto)); - if(rawERLDto.getType().equalsIgnoreCase(HL7_ELR)) { - kafkaProducerService.sendMessageFromController( - created.getId(), - topicName, - rawERLDto.getType(), - 0, - rawERLDto.getValidationActive(), - version); - } - if(rawERLDto.getType().equalsIgnoreCase(XML_ELR)) { - kafkaProducerService.sendElrXmlMessageFromController( - created.getId(), - rawXmlTopicName, - rawERLDto.getType(), - 0, - rawERLDto.getPayload(), - version); - } - return created.getId(); - } - - public RawERLDto getById(String id) { - RawERLModel rawERLModel = rawELRRepository.getById(id); - return convert(rawERLModel); - } - - private RawERLModel convert(RawERLDto rawERLDto) { - - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setType(rawERLDto.getType()); - rawERLModel.setPayload(rawERLDto.getPayload()); - rawERLModel.setCreatedOn(getCurrentTimeStamp(tz)); - rawERLModel.setUpdatedOn(getCurrentTimeStamp(tz)); - rawERLModel.setCreatedBy(CREATED_BY); - rawERLModel.setUpdatedBy(CREATED_BY); - return rawERLModel; - } - - private RawERLDto convert(RawERLModel rawERLModel) { - - RawERLDto rawERLDto = new RawERLDto(); - rawERLDto.setId(rawERLModel.getId()); - rawERLDto.setType(rawERLModel.getType()); - rawERLDto.setPayload(rawERLModel.getPayload()); - return rawERLDto; - } -} diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawElrService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawElrService.java new file mode 100644 index 000000000..070b3880d --- /dev/null +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawElrService.java @@ -0,0 +1,117 @@ +package gov.cdc.dataingestion.rawmessage.service; + +import gov.cdc.dataingestion.deadletter.repository.IElrDeadLetterRepository; +import gov.cdc.dataingestion.exception.KafkaProducerException; +import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.report.repository.IRawElrRepository; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import static gov.cdc.dataingestion.constant.MessageType.HL7_ELR; +import static gov.cdc.dataingestion.constant.MessageType.XML_ELR; +import static gov.cdc.dataingestion.share.helper.TimeStampHelper.getCurrentTimeStamp; + +@Service +@RequiredArgsConstructor +@Slf4j +public class RawElrService { + + private static final String CREATED_BY = "admin"; + @Value("${kafka.raw.topic}") + String topicName; + + @Value("${kafka.raw.xml-topic}") + String rawXmlTopicName; + + @Value("${service.timezone}") + private String tz = "UTC"; + + private final IRawElrRepository rawElrRepository; + private final KafkaProducerService kafkaProducerService; + private final IElrDeadLetterRepository iElrDeadLetterRepository; + + public String submission(RawElrDto rawElrDto) throws KafkaProducerException { + RawElrModel created = rawElrRepository.save(convert(rawElrDto)); + int dltOccurrence = 0; + try { + if(rawElrDto.getType().equalsIgnoreCase(HL7_ELR)) { + kafkaProducerService.sendMessageFromController( + created.getId(), + topicName, + rawElrDto.getType(), + dltOccurrence, + rawElrDto.getValidationActive(), + rawElrDto.getVersion()); + } + if(rawElrDto.getType().equalsIgnoreCase(XML_ELR)) { + kafkaProducerService.sendElrXmlMessageFromController( + created.getId(), + rawXmlTopicName, + rawElrDto.getType(), + dltOccurrence, + rawElrDto.getPayload(), + rawElrDto.getVersion()); + } + } catch (KafkaProducerException e) { + String errorStatus = "Sending event to elr_raw kafka topic failed"; + iElrDeadLetterRepository.addErrorStatusForRawId(created.getId(), topicName, created.getType(), created.getPayload(), errorStatus, dltOccurrence + 1); + throw new KafkaProducerException("Failed publishing message to kafka topic: " + topicName + " with UUID: " + created.getId()); + } + return created.getId(); + } + + public void updateRawMessageAfterRetry(RawElrDto rawElrDto, int dltOccurrence) throws KafkaProducerException { + try { + if(rawElrDto.getType().equalsIgnoreCase(HL7_ELR)) { + kafkaProducerService.sendMessageFromController( + rawElrDto.getId(), + topicName, + rawElrDto.getType(), + dltOccurrence + 1, + rawElrDto.getValidationActive(), + rawElrDto.getVersion()); + } + if(rawElrDto.getType().equalsIgnoreCase(XML_ELR)) { + kafkaProducerService.sendElrXmlMessageFromController( + rawElrDto.getId(), + rawXmlTopicName, + rawElrDto.getType(), + dltOccurrence + 1, + rawElrDto.getPayload(), + rawElrDto.getVersion()); + } + } catch (KafkaProducerException e) { + iElrDeadLetterRepository.updateDltOccurrenceForRawId(rawElrDto.getId(), dltOccurrence + 1, "ERROR"); + throw new KafkaProducerException("Failed publishing message again to kafka topic: " + topicName + " with UUID: " + rawElrDto.getId()); + } + } + + public RawElrDto getById(String id) { + RawElrModel rawElrModel = rawElrRepository.getReferenceById(id); + return convert(rawElrModel); + } + + private RawElrModel convert(RawElrDto rawElrDto) { + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setType(rawElrDto.getType()); + rawElrModel.setPayload(rawElrDto.getPayload()); + rawElrModel.setVersion(rawElrDto.getVersion()); + rawElrModel.setCreatedOn(getCurrentTimeStamp(tz)); + rawElrModel.setUpdatedOn(getCurrentTimeStamp(tz)); + rawElrModel.setCreatedBy(CREATED_BY); + rawElrModel.setUpdatedBy(CREATED_BY); + return rawElrModel; + } + + private RawElrDto convert(RawElrModel rawElrModel) { + RawElrDto rawElrDto = new RawElrDto(); + rawElrDto.setId(rawElrModel.getId()); + rawElrDto.setType(rawElrModel.getType()); + rawElrDto.setPayload(rawElrModel.getPayload()); + return rawElrDto; + } +} diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawELRRepository.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawElrRepository.java similarity index 60% rename from data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawELRRepository.java rename to data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawElrRepository.java index ea25748e3..78adc53df 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawELRRepository.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawElrRepository.java @@ -1,9 +1,9 @@ package gov.cdc.dataingestion.report.repository; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; @Repository -public interface IRawELRRepository extends JpaRepository { +public interface IRawElrRepository extends JpaRepository { } \ No newline at end of file diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/model/RawERLModel.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/model/RawElrModel.java similarity index 89% rename from data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/model/RawERLModel.java rename to data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/model/RawElrModel.java index a0dec6a66..e06c83cc3 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/model/RawERLModel.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/model/RawElrModel.java @@ -14,7 +14,7 @@ 1135 - todos complaint * */ @SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"}) -public class RawERLModel { +public class RawElrModel { @Id @GenericGenerator(name = "generator", strategy = "guid", parameters = {}) @@ -26,6 +26,9 @@ public class RawERLModel { private String type; private String payload; + @Column(name = "version") + private String version; + @Column(name = "created_on") private Timestamp createdOn; @@ -93,4 +96,12 @@ public String getUpdatedBy() { public void setUpdatedBy(String updatedBy) { this.updatedBy = updatedBy; } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } } diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/reportstatus/service/ReportStatusService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/reportstatus/service/ReportStatusService.java index 801071999..e258fb0b4 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/reportstatus/service/ReportStatusService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/reportstatus/service/ReportStatusService.java @@ -7,8 +7,8 @@ import gov.cdc.dataingestion.odse.repository.IEdxActivityParentLogRepository; import gov.cdc.dataingestion.odse.repository.model.EdxActivityDetailLog; import gov.cdc.dataingestion.odse.repository.model.EdxActivityLog; -import gov.cdc.dataingestion.report.repository.IRawELRRepository; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.report.repository.IRawElrRepository; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.reportstatus.model.DltMessageStatus; import gov.cdc.dataingestion.reportstatus.model.EdxActivityLogStatus; import gov.cdc.dataingestion.reportstatus.model.MessageStatus; @@ -39,7 +39,7 @@ public class ReportStatusService { private final IEdxActivityParentLogRepository iEdxActivityParentLogRepository; private final NbsInterfaceRepository nbsInterfaceRepository; - private final IRawELRRepository iRawELRRepository; + private final IRawElrRepository iRawELRRepository; private final IValidatedELRRepository iValidatedELRRepository; private final IElrDeadLetterRepository iElrDeadLetterRepository; private final IEdxActivityLogRepository iEdxActivityLogRepository; @@ -51,7 +51,7 @@ public class ReportStatusService { public ReportStatusService(IReportStatusRepository iReportStatusRepository, IEdxActivityParentLogRepository iEdxActivityParentLogRepository, NbsInterfaceRepository nbsInterfaceRepository, - IRawELRRepository iRawELRRepository, + IRawElrRepository iRawELRRepository, IValidatedELRRepository iValidatedELRRepository, IElrDeadLetterRepository iElrDeadLetterRepository, IEdxActivityLogRepository iEdxActivityLogRepository) { @@ -67,7 +67,7 @@ public ReportStatusService(IReportStatusRepository iReportStatusRepository, @SuppressWarnings("java:S3776") public MessageStatus getMessageStatus(String rawMessageID) { MessageStatus msgStatus = new MessageStatus(); - Optional rawMessageData = iRawELRRepository.findById(rawMessageID); + Optional rawMessageData = iRawELRRepository.findById(rawMessageID); if (!rawMessageData.isEmpty()) { msgStatus.getRawInfo().setRawMessageId(rawMessageData.get().getId()); //msgStatus.getRawInfo().setRawPayload(Base64.getEncoder().encodeToString(rawMessageData.get().getPayload().getBytes())); diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/HL7DuplicateValidator.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/HL7DuplicateValidator.java index 8d40494c0..8c3bec485 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/HL7DuplicateValidator.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/HL7DuplicateValidator.java @@ -2,6 +2,7 @@ import gov.cdc.dataingestion.custommetrics.CustomMetricsBuilder; import gov.cdc.dataingestion.exception.DuplicateHL7FileFoundException; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; import gov.cdc.dataingestion.validation.integration.validator.interfaces.IHL7DuplicateValidator; import gov.cdc.dataingestion.validation.repository.IValidatedELRRepository; @@ -39,7 +40,7 @@ public HL7DuplicateValidator(IValidatedELRRepository iValidatedELRRepository, Ka } @Override - public void validateHL7Document(ValidatedELRModel hl7ValidatedModel) throws DuplicateHL7FileFoundException { + public void validateHL7Document(ValidatedELRModel hl7ValidatedModel) throws DuplicateHL7FileFoundException, KafkaProducerException { String hashedString = null; try { MessageDigest digestString = MessageDigest.getInstance("SHA-256"); diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/HL7v2Validator.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/HL7v2Validator.java index 05681cc64..d20888ddd 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/HL7v2Validator.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/HL7v2Validator.java @@ -3,7 +3,7 @@ import gov.cdc.dataingestion.constant.enums.EnumMessageType; import gov.cdc.dataingestion.hl7.helper.HL7Helper; import gov.cdc.dataingestion.hl7.helper.integration.exception.DiHL7Exception; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.validation.integration.validator.interfaces.IHL7v2Validator; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; @@ -32,8 +32,8 @@ public String processFhsMessage (String message) { return this.hl7Helper.processFhsMessage(message); } - public ValidatedELRModel messageValidation(String id, RawERLModel rawERLModel, String topicName, boolean validationActive) throws DiHL7Exception { - String replaceSpecialCharacters = messageStringValidation(rawERLModel.getPayload()); + public ValidatedELRModel messageValidation(String id, RawElrModel rawElrModel, String topicName, boolean validationActive) throws DiHL7Exception { + String replaceSpecialCharacters = messageStringValidation(rawElrModel.getPayload()); // validationActive check is obsoleted if (validationActive) { diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/interfaces/IHL7DuplicateValidator.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/interfaces/IHL7DuplicateValidator.java index ba8049b58..3938a7c38 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/interfaces/IHL7DuplicateValidator.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/interfaces/IHL7DuplicateValidator.java @@ -1,8 +1,9 @@ package gov.cdc.dataingestion.validation.integration.validator.interfaces; import gov.cdc.dataingestion.exception.DuplicateHL7FileFoundException; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; public interface IHL7DuplicateValidator { - void validateHL7Document(ValidatedELRModel hl7ValidatedModel) throws DuplicateHL7FileFoundException; + void validateHL7Document(ValidatedELRModel hl7ValidatedModel) throws DuplicateHL7FileFoundException, KafkaProducerException; } diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/interfaces/IHL7v2Validator.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/interfaces/IHL7v2Validator.java index 610ba1ebb..0aed61cc8 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/interfaces/IHL7v2Validator.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/interfaces/IHL7v2Validator.java @@ -1,10 +1,10 @@ package gov.cdc.dataingestion.validation.integration.validator.interfaces; import gov.cdc.dataingestion.hl7.helper.integration.exception.DiHL7Exception; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; public interface IHL7v2Validator { - ValidatedELRModel messageValidation(String message, RawERLModel rawERLModel, String topicName, boolean validationActive) throws DiHL7Exception; + ValidatedELRModel messageValidation(String message, RawElrModel rawElrModel, String topicName, boolean validationActive) throws DiHL7Exception; String hl7MessageValidation(String message) throws DiHL7Exception; String messageStringValidation(String message) throws DiHL7Exception; String processFhsMessage (String message); diff --git a/data-ingestion-service/src/main/resources/application.yaml b/data-ingestion-service/src/main/resources/application.yaml index 8a36bd5e0..098003e7c 100644 --- a/data-ingestion-service/src/main/resources/application.yaml +++ b/data-ingestion-service/src/main/resources/application.yaml @@ -34,6 +34,12 @@ ecr: processing: batchSize: 100 # How many records are processed at a time interval: 3000 # How often (in milliseconds) the database is queried for records to process + +dlt: + scheduler: + enabled: ${SCHEDULER_ENABLED:false} + cron: ${SCHEDULER_CRON:0/30 * * * * *} + --- spring: config: diff --git a/data-ingestion-service/src/main/resources/db/di-service-003.sql b/data-ingestion-service/src/main/resources/db/di-service-003.sql index f5cb1a266..0cf826653 100644 --- a/data-ingestion-service/src/main/resources/db/di-service-003.sql +++ b/data-ingestion-service/src/main/resources/db/di-service-003.sql @@ -1,3 +1,3 @@ ALTER TABLE NBS_MSGOUTE.dbo.NBS_interface ADD original_payload_RR TEXT, - original_doc_type_cd_RR varchar(100); \ No newline at end of file + original_doc_type_cd_RR varchar(100); \ No newline at end of file diff --git a/data-ingestion-service/src/main/resources/db/di-service-004.sql b/data-ingestion-service/src/main/resources/db/di-service-004.sql new file mode 100644 index 000000000..f318d0401 --- /dev/null +++ b/data-ingestion-service/src/main/resources/db/di-service-004.sql @@ -0,0 +1,5 @@ +ALTER TABLE NBS_DataIngest.dbo.elr_raw + ADD version VARCHAR(1); + +ALTER TABLE NBS_DataIngest.dbo.elr_dlt +ALTER COLUMN dlt_status NVARCHAR(30); \ No newline at end of file diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/camel/routes/HL7FileProcessComponentTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/camel/routes/HL7FileProcessComponentTest.java index 6f2aedf70..e58aa911d 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/camel/routes/HL7FileProcessComponentTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/camel/routes/HL7FileProcessComponentTest.java @@ -1,7 +1,8 @@ package gov.cdc.dataingestion.camel.routes; -import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; -import gov.cdc.dataingestion.rawmessage.service.RawELRService; +import gov.cdc.dataingestion.exception.KafkaProducerException; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.rawmessage.service.RawElrService; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -19,7 +20,7 @@ @SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"}) class HL7FileProcessComponentTest { @Mock - private RawELRService rawELRService; + private RawElrService rawELRService; @InjectMocks private HL7FileProcessComponent hL7FileProcessComponent; @@ -30,16 +31,17 @@ public void setUp() { } @Test - void testSaveHL7Message() { + void testSaveHL7Message() throws KafkaProducerException { String hl7Payload = "testmessage"; String messageType = "HL7"; - RawERLDto rawERLDto = new RawERLDto(); - rawERLDto.setType(messageType); - rawERLDto.setPayload(hl7Payload); - rawERLDto.setValidationActive(true); + RawElrDto rawElrDto = new RawElrDto(); + rawElrDto.setType(messageType); + rawElrDto.setPayload(hl7Payload); + rawElrDto.setValidationActive(true); + rawElrDto.setVersion("1"); - when(rawELRService.submission(rawERLDto,"1")).thenReturn("OK"); + when(rawELRService.submission(rawElrDto)).thenReturn("OK"); String status = hL7FileProcessComponent.process(hl7Payload); Assertions.assertEquals("OK",status); } diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterServiceTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterServiceTest.java index e8a55cdbb..5402b000d 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterServiceTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterServiceTest.java @@ -5,15 +5,21 @@ import gov.cdc.dataingestion.deadletter.repository.IElrDeadLetterRepository; import gov.cdc.dataingestion.deadletter.repository.model.ElrDeadLetterModel; import gov.cdc.dataingestion.exception.DeadLetterTopicException; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; -import gov.cdc.dataingestion.report.repository.IRawELRRepository; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.rawmessage.service.RawElrService; +import gov.cdc.dataingestion.report.repository.IRawElrRepository; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.validation.repository.IValidatedELRRepository; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -27,7 +33,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; + /** 1118 - require constructor complaint 125 - comment complaint @@ -41,7 +48,7 @@ class ElrDeadLetterServiceTest { private IElrDeadLetterRepository dltRepository; @Mock - private IRawELRRepository rawELRRepository; + private IRawElrRepository rawELRRepository; @Mock private IValidatedELRRepository validatedELRRepository; @@ -52,12 +59,18 @@ class ElrDeadLetterServiceTest { @InjectMocks private ElrDeadLetterService elrDeadLetterService; + @Mock + private RawElrService rawElrService; + + @Mock + private RawElrDto rawElrDto; + private String guidForTesting = "8DC5E410-4A2E-4018-8C28-A4F6AB99E802"; @BeforeEach public void setUpEach() { MockitoAnnotations.openMocks(this); - elrDeadLetterService = new ElrDeadLetterService(dltRepository, rawELRRepository, validatedELRRepository, kafkaProducerService); + elrDeadLetterService = new ElrDeadLetterService(dltRepository, rawELRRepository, validatedELRRepository, kafkaProducerService, rawElrService); } @@ -107,7 +120,7 @@ void testGetAllErrorDltRecord_Success() { List listData = new ArrayList<>(); listData.add(model); - RawERLModel rawModel = new RawERLModel(); + RawElrModel rawModel = new RawElrModel(); rawModel.setId(guidForTesting); rawModel.setPayload("HL7 message"); @@ -124,7 +137,7 @@ void testGetAllErrorDltRecord_NoDataFound() { } @Test - void testUpdateAndReprocessingMessage_RawElr_Success() throws DeadLetterTopicException { + void testUpdateAndReprocessingMessage_RawElr_Success() throws DeadLetterTopicException, KafkaProducerException { String primaryIdForTesting = guidForTesting; ElrDeadLetterModel elrDltModel = new ElrDeadLetterModel(); @@ -133,31 +146,26 @@ void testUpdateAndReprocessingMessage_RawElr_Success() throws DeadLetterTopicExc elrDltModel.setErrorMessageSource("elr_raw"); - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setPayload("HL7 message"); - rawERLModel.setId(elrDltModel.getErrorMessageId()); + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setPayload("HL7 message"); + rawElrModel.setId(elrDltModel.getErrorMessageId()); elrDltModel.setDltStatus(EnumElrDltStatus.ERROR.name()); when(dltRepository.findById(elrDltModel.getErrorMessageId())) .thenReturn(Optional.of(elrDltModel)); when(rawELRRepository.findById(elrDltModel.getErrorMessageId())) - .thenReturn(Optional.of(rawERLModel)); - when(rawELRRepository.save(rawERLModel)).thenReturn(rawERLModel); + .thenReturn(Optional.of(rawElrModel)); + when(rawELRRepository.save(rawElrModel)).thenReturn(rawElrModel); when(dltRepository.save(any(ElrDeadLetterModel.class))).thenReturn(elrDltModel); var result = elrDeadLetterService.updateAndReprocessingMessage(primaryIdForTesting, "HL7 message"); assertEquals("HL7 message", result.getMessage()); assertEquals(1, result.getDltOccurrence()); - - - - - } @Test - void testUpdateAndReprocessingMessage_ValidatedElr_Success() throws DeadLetterTopicException { + void testUpdateAndReprocessingMessage_ValidatedElr_Success() throws DeadLetterTopicException, KafkaProducerException { String primaryIdForTesting = guidForTesting; ElrDeadLetterModel elrDltModel = new ElrDeadLetterModel(); @@ -186,7 +194,7 @@ void testUpdateAndReprocessingMessage_ValidatedElr_Success() throws DeadLetterTo } @Test - void testUpdateAndReprocessingMessage_FhirPrep_Success() throws DeadLetterTopicException { + void testUpdateAndReprocessingMessage_FhirPrep_Success() throws DeadLetterTopicException, KafkaProducerException { String primaryIdForTesting = guidForTesting; ElrDeadLetterModel elrDltModel = new ElrDeadLetterModel(); @@ -209,7 +217,7 @@ void testUpdateAndReprocessingMessage_FhirPrep_Success() throws DeadLetterTopicE } @Test - void testUpdateAndReprocessingMessage_XmlPrep_Success() throws DeadLetterTopicException { + void testUpdateAndReprocessingMessage_XmlPrep_Success() throws DeadLetterTopicException, KafkaProducerException { String primaryIdForTesting = guidForTesting; ElrDeadLetterModel elrDltModel = new ElrDeadLetterModel(); @@ -272,4 +280,55 @@ void testSaveDltRecord() { assertEquals(savedDto.getUpdatedBy(), dto.getUpdatedBy()); } + @Test + void testProcessFailedMessagesFromKafka_WithMessages() throws KafkaProducerException { + List dltMessagesList = getElrDeadLetterModels(); + + when(dltRepository.getAllErrorDltRecordForKafkaError()).thenReturn(dltMessagesList); + + elrDeadLetterService.processFailedMessagesFromKafka(); + + verify(dltRepository, times(2)).updateErrorStatusForRawId(anyString(), anyString()); + verify(rawElrService, times(2)).updateRawMessageAfterRetry(any(RawElrDto.class), anyInt()); //This line is correct. + + verify(dltRepository).updateErrorStatusForRawId("test1", "PROCESSED"); + verify(dltRepository).updateErrorStatusForRawId("test2", "PROCESSED"); + } + + private static @NotNull List getElrDeadLetterModels() { + List dltMessagesList = new ArrayList<>(); + ElrDeadLetterModel message1 = new ElrDeadLetterModel(); + message1.setErrorMessageId("test1"); + message1.setDltStatus("KAFKA_ERROR_HL7"); + message1.setMessage("test_hl7_payload"); + dltMessagesList.add(message1); + + ElrDeadLetterModel message2 = new ElrDeadLetterModel(); + message2.setErrorMessageId("test2"); + message2.setDltStatus("KAFKA_ERROR_HL7_XML"); + message2.setMessage("test_hl7_xml_payload"); + dltMessagesList.add(message2); + return dltMessagesList; + } + + @Test + void testProcessFailedMessagesFromKafka_NoMessages() throws KafkaProducerException { + when(dltRepository.getAllErrorDltRecordForKafkaError()).thenReturn(new ArrayList<>()); + + elrDeadLetterService.processFailedMessagesFromKafka(); + + verify(dltRepository, times(0)).updateErrorStatusForRawId(anyString(), anyString()); + verify(rawElrService, times(0)).updateRawMessageAfterRetry(any(RawElrDto.class), anyInt()); + } + + @ParameterizedTest + @CsvSource({ + "KAFKA_ERROR_HL7, HL7", + "KAFKA_ERROR_HL7_XML, HL7_XML", + "SOMEPREFIXKAFKAERROR, ''" + }) + void testGetElrMessageType(String dltStatus, String expectedResult) { + String result = elrDeadLetterService.getElrMessageType(dltStatus); + assertEquals(expectedResult, result); + } } diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfigTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfigTest.java new file mode 100644 index 000000000..f24501889 --- /dev/null +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfigTest.java @@ -0,0 +1,53 @@ +package gov.cdc.dataingestion.deadletter.service; + +import gov.cdc.dataingestion.exception.KafkaProducerException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.test.util.ReflectionTestUtils; + +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +@EnableScheduling +class SchedulerConfigTest { + @Test + void testScheduleTask_SchedulerEnabled() throws KafkaProducerException { + ElrDeadLetterService mockService = mock(ElrDeadLetterService.class); + SchedulerConfig schedulerConfig = new SchedulerConfig(mockService); + ReflectionTestUtils.setField(schedulerConfig, "isSchedulerEnabled", true); + + schedulerConfig.scheduleTask(); + + verify(mockService, times(1)).processFailedMessagesFromKafka(); + } + + @Test + void testScheduleTask_SchedulerDisabled() throws KafkaProducerException { + ElrDeadLetterService mockService = mock(ElrDeadLetterService.class); + SchedulerConfig schedulerConfig = new SchedulerConfig(mockService); + ReflectionTestUtils.setField(schedulerConfig, "isSchedulerEnabled", false); + + schedulerConfig.scheduleTask(); + + verify(mockService, never()).processFailedMessagesFromKafka(); + } + + @Test + void testScheduleTask_ServiceThrowsException() throws KafkaProducerException { + ElrDeadLetterService mockService = mock(ElrDeadLetterService.class); + doThrow(new KafkaProducerException("Test exception")) + .when(mockService).processFailedMessagesFromKafka(); + SchedulerConfig schedulerConfig = new SchedulerConfig(mockService); + ReflectionTestUtils.setField(schedulerConfig, "isSchedulerEnabled", true); + + try { + schedulerConfig.scheduleTask(); + } catch (KafkaProducerException e) { + verify(mockService, times(1)).processFailedMessagesFromKafka(); + return; + } + throw new AssertionError("Expected KafkaProducerException was not thrown."); + } +} \ No newline at end of file diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaConsumerServiceTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaConsumerServiceTest.java index 04395fd03..4708e770c 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaConsumerServiceTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaConsumerServiceTest.java @@ -7,6 +7,7 @@ import gov.cdc.dataingestion.deadletter.model.ElrDeadLetterDto; import gov.cdc.dataingestion.deadletter.repository.IElrDeadLetterRepository; import gov.cdc.dataingestion.deadletter.repository.model.ElrDeadLetterModel; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.exception.XmlConversionException; import gov.cdc.dataingestion.hl7.helper.integration.exception.DiHL7Exception; import gov.cdc.dataingestion.kafka.integration.service.KafkaConsumerService; @@ -15,8 +16,8 @@ import gov.cdc.dataingestion.nbs.repository.model.NbsInterfaceModel; import gov.cdc.dataingestion.nbs.services.NbsRepositoryServiceProvider; import gov.cdc.dataingestion.nbs.services.EcrMsgQueryService; -import gov.cdc.dataingestion.report.repository.IRawELRRepository; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.report.repository.IRawElrRepository; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.reportstatus.model.ReportStatusIdData; import gov.cdc.dataingestion.reportstatus.repository.IReportStatusRepository; import gov.cdc.dataingestion.validation.integration.validator.interfaces.IHL7DuplicateValidator; @@ -73,7 +74,7 @@ class KafkaConsumerServiceTest { @Mock private IHL7v2Validator iHl7v2Validator; @Mock - private IRawELRRepository iRawELRRepository; + private IRawElrRepository iRawELRRepository; @Mock private IValidatedELRRepository iValidatedELRRepository; @Mock @@ -188,7 +189,7 @@ void rawConsumerTest() throws DiHL7Exception { ConsumerRecord firstRecord = records.iterator().next(); String value = firstRecord.value(); - RawERLModel rawModel = new RawERLModel(); + RawElrModel rawModel = new RawElrModel(); rawModel.setId(guidForTesting); rawModel.setType("HL7"); @@ -227,7 +228,7 @@ void rawConsumerTestRawRecordNotFound() { ConsumerRecord firstRecord = records.iterator().next(); String value = firstRecord.value(); - RawERLModel rawModel = new RawERLModel(); + RawElrModel rawModel = new RawElrModel(); rawModel.setId(guidForTesting); rawModel.setType("HL7"); @@ -422,7 +423,7 @@ void xmlPreparationConsumerTest() { @Test - void xmlPreparationConsumerTestNewFlow() throws XmlConversionException { + void xmlPreparationConsumerTestNewFlow() throws XmlConversionException, KafkaProducerException { // Produce a test message to the topic @@ -454,7 +455,7 @@ void xmlPreparationConsumerTestNewFlow() throws XmlConversionException { } @Test - void xmlPreparationConsumerTestNewFlow_Exception() { + void xmlPreparationConsumerTestNewFlow_Exception() throws KafkaProducerException { when(iValidatedELRRepository.findById(any())).thenReturn(Optional.empty()); kafkaConsumerService.xmlConversionHandlerProcessing("123", EnumKafkaOperation.INJECTION.name(), "true"); @@ -463,7 +464,7 @@ void xmlPreparationConsumerTestNewFlow_Exception() { } @Test - void xmlPreparationConsumerTestNewFlow_KafkaExistResult() { + void xmlPreparationConsumerTestNewFlow_KafkaExistResult() throws KafkaProducerException { // Produce a test message to the topic @@ -651,7 +652,7 @@ void dltHandlerLogicPipelineCustom() { initialDataInsertionAndSelection(rawTopic); String message = guidForTesting; - RawERLModel rawModel = new RawERLModel(); + RawElrModel rawModel = new RawElrModel(); rawModel.setId(guidForTesting); rawModel.setType("HL7"); rawModel.setPayload(testHL7Message); @@ -669,7 +670,7 @@ void dltHandlerLogicForRawPipeline() { initialDataInsertionAndSelection(rawTopic); String message = guidForTesting; - RawERLModel rawModel = new RawERLModel(); + RawElrModel rawModel = new RawElrModel(); rawModel.setId(guidForTesting); rawModel.setType("HL7"); rawModel.setPayload(testHL7Message); diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaProducerServiceTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaProducerServiceTest.java index ab3aee50c..0219e521e 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaProducerServiceTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaProducerServiceTest.java @@ -2,10 +2,12 @@ import gov.cdc.dataingestion.constant.TopicPreparationType; import gov.cdc.dataingestion.exception.ConversionPrepareException; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -13,14 +15,20 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; import org.testcontainers.junit.jupiter.Testcontainers; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; + @ExtendWith(MockitoExtension.class) @Testcontainers /** @@ -46,34 +54,49 @@ public static void tearDown() { } @Test - void testSendMessageFromController() { + void testSendMessageFromController() throws KafkaProducerException { String msg = "test message"; String topic = "test-topic"; String msgType = "test-type"; Integer dltOccurrence = 1; + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessageFromController(msg, topic, msgType, dltOccurrence, false, "false"); verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); } @Test - void testSendMessageAfterValidatingMessage() { + void testSendMessageAfterValidatingMessage() throws KafkaProducerException { String topic = "test-topic"; ValidatedELRModel model = new ValidatedELRModel(); model.setMessageType("test"); model.setId("test"); model.setMessageVersion("1"); + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessageAfterValidatingMessage(model, topic, 1, "false"); verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); } @Test - void testSendMessagePreparationTopicXML() throws ConversionPrepareException { + void testSendMessagePreparationTopicXML() throws ConversionPrepareException, KafkaProducerException { var topicType = TopicPreparationType.XML; String topic = "test-topic"; ValidatedELRModel model = new ValidatedELRModel(); model.setMessageType("test"); model.setId("test"); model.setMessageVersion("1"); + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessagePreparationTopic(model, topic, topicType, 1, "false"); @@ -81,13 +104,18 @@ void testSendMessagePreparationTopicXML() throws ConversionPrepareException { } @Test - void testSendMessagePreparationTopicFHIR() throws ConversionPrepareException { + void testSendMessagePreparationTopicFHIR() throws ConversionPrepareException, KafkaProducerException { var topicType = TopicPreparationType.FHIR; String topic = "test-topic"; ValidatedELRModel model = new ValidatedELRModel(); model.setMessageType("test"); model.setId("test"); model.setMessageVersion("1"); + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessagePreparationTopic(model, topic, topicType, 1, "false"); @@ -95,8 +123,28 @@ void testSendMessagePreparationTopicFHIR() throws ConversionPrepareException { } @Test - void sendMessageDlt() { + void testSendElrXmlMessageFromController() throws KafkaProducerException { + String msg = "test message"; + String topic = "test-topic"; + String msgType = "test-type"; + Integer dltOccurrence = 1; + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + + kafkaProducerService.sendElrXmlMessageFromController(msg, topic, msgType, dltOccurrence, "payload", "false"); + verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); + } + + @Test + void sendMessageDlt() throws KafkaProducerException { String topic = "test-topic"; + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessageDlt("test","test", topic, 1, "error", topic); verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); @@ -104,54 +152,101 @@ void sendMessageDlt() { @Test - void testSendMessageAfterConvertedToXml() { + void testSendMessageAfterConvertedToXml() throws KafkaProducerException { String topic = "test-topic"; String msg = "test"; + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessageAfterConvertedToXml( msg, topic, 1 ); verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); } @Test - void testSendMessageAfterCheckingDuplicateHL7() { + void testSendMessageAfterCheckingDuplicateHL7() throws KafkaProducerException { String topic = "test-topic"; String msg = "test"; ValidatedELRModel model = new ValidatedELRModel(); model.setRawId("test"); + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessageAfterCheckingDuplicateHL7(model, topic, 1 ); verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); } @Test - void testSendMessageFromDltController() { + void testSendMessageFromDltController() throws KafkaProducerException { String topic = "test-topic"; String msg = "test"; String msgType = "HL7"; Integer occurrence = 0; + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessageFromController(msg, topic,msgType, occurrence, false, "false"); verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); } @Test - void testSendMessageFromCSVController() { + void testSendMessageFromCSVController() throws KafkaProducerException { String topic = "test-topic"; List msgNested = new ArrayList<>(); msgNested.add("test"); List> msg = new ArrayList<>(); msg.add(msgNested); String msgType = "HL7"; + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessageFromCSVController(msg, topic,msgType); verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); } @Test - void testSendMessageFromDltController_Success() { + void testSendMessageFromDltController_Success() throws KafkaProducerException { String topic = "test-topic"; String msg = "test"; String msgType = "HL7"; Integer occurrence = 0; + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessageFromDltController(msg, topic,msgType, occurrence); verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); } + + @Test + void testSendMessageException() throws Exception { + ProducerRecord prodRecord = new ProducerRecord<>("test-topic", "test-key", "test-value"); + + CompletableFuture> future = new CompletableFuture<>(); + future.completeExceptionally(new TimeoutException("Timeout")); + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + + Method sendMessageMethod = KafkaProducerService.class.getDeclaredMethod("sendMessage", ProducerRecord.class); + sendMessageMethod.setAccessible(true); + + KafkaProducerException exception = assertThrows(KafkaProducerException.class, () -> { + try { + sendMessageMethod.invoke(kafkaProducerService, prodRecord); + } catch (InvocationTargetException e) { + throw e.getCause(); + } + }); + + Assertions.assertEquals("Failed publishing message to kafka topic: test-topic with UUID: test-value", exception.getMessage()); + } } diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawELRServiceTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawELRServiceTest.java deleted file mode 100644 index f8e39c727..000000000 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawELRServiceTest.java +++ /dev/null @@ -1,103 +0,0 @@ -package gov.cdc.dataingestion.rawmessage; - -import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; -import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; -import gov.cdc.dataingestion.rawmessage.service.RawELRService; -import gov.cdc.dataingestion.report.repository.IRawELRRepository; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; - -import java.sql.Timestamp; -import java.util.UUID; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; - -class RawELRServiceTest { - @Mock - private IRawELRRepository rawELRRepository; - - @Mock - private KafkaProducerService kafkaProducerService; - - @InjectMocks - private RawELRService target; - - private String guidForTesting = UUID.randomUUID().toString(); - - - @BeforeEach - public void setUpEach() { - MockitoAnnotations.openMocks(this); - target = new RawELRService(rawELRRepository, kafkaProducerService); - } - - - - @Test - void testSaveHL7_Success() { - RawERLDto modelDto = new RawERLDto(); - modelDto.setPayload("test"); - modelDto.setType("HL7"); - RawERLModel model = new RawERLModel(); - model.setId("test"); - model.setCreatedOn(new Timestamp(System.currentTimeMillis())); - model.setUpdatedOn(new Timestamp(System.currentTimeMillis())); - model.setCreatedBy("test"); - model.setUpdatedBy("test"); - when(rawELRRepository.save(any())).thenReturn(model); - Mockito.doNothing().when(kafkaProducerService).sendMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); - var result = target.submission(modelDto, "1"); - - Assertions.assertNotNull(result); - Assertions.assertEquals("test",result); - - } - - @Test - void testSaveElrXml_Success() { - RawERLDto modelDto = new RawERLDto(); - modelDto.setPayload("test"); - modelDto.setType("HL7-XML"); - RawERLModel model = new RawERLModel(); - model.setId("test"); - model.setCreatedOn(new Timestamp(System.currentTimeMillis())); - model.setUpdatedOn(new Timestamp(System.currentTimeMillis())); - model.setCreatedBy("test"); - model.setUpdatedBy("test"); - when(rawELRRepository.save(any())).thenReturn(model); - Mockito.doNothing().when(kafkaProducerService).sendMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); - var result = target.submission(modelDto, "1"); - - Assertions.assertNotNull(result); - Assertions.assertEquals("test",result); - - } - - @Test - void getById_Success() { - RawERLDto modelDto = new RawERLDto(); - modelDto.setPayload("test"); - modelDto.setType("HL7"); - modelDto.setId(guidForTesting); - - RawERLModel model = new RawERLModel(); - model.setId(guidForTesting); - model.setCreatedOn(new Timestamp(System.currentTimeMillis())); - model.setUpdatedOn(new Timestamp(System.currentTimeMillis())); - model.setCreatedBy("test"); - model.setUpdatedBy("test"); - - when(rawELRRepository.getById(any())).thenReturn(model); - - var result = target.getById(modelDto.getId()); - - Assertions.assertNotNull(result); - } -} diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawElrServiceTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawElrServiceTest.java new file mode 100644 index 000000000..e6cd8a2f5 --- /dev/null +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawElrServiceTest.java @@ -0,0 +1,140 @@ +package gov.cdc.dataingestion.rawmessage; + +import gov.cdc.dataingestion.deadletter.repository.IElrDeadLetterRepository; +import gov.cdc.dataingestion.exception.KafkaProducerException; +import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.rawmessage.service.RawElrService; +import gov.cdc.dataingestion.report.repository.IRawElrRepository; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.sql.Timestamp; +import java.util.UUID; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +class RawElrServiceTest { + @Mock + private IRawElrRepository rawELRRepository; + + @Mock + private KafkaProducerService kafkaProducerService; + + @Mock + private IElrDeadLetterRepository iElrDeadLetterRepository; + + @InjectMocks + private RawElrService target; + + private String guidForTesting = UUID.randomUUID().toString(); + + + @BeforeEach + void setUpEach() { + MockitoAnnotations.openMocks(this); + target = new RawElrService(rawELRRepository, kafkaProducerService, iElrDeadLetterRepository); + } + + @Test + void testSaveHL7_Success() throws KafkaProducerException { + RawElrDto modelDto = new RawElrDto(); + modelDto.setPayload("test"); + modelDto.setType("HL7"); + RawElrModel model = new RawElrModel(); + model.setId("test"); + model.setVersion("1"); + model.setCreatedOn(new Timestamp(System.currentTimeMillis())); + model.setUpdatedOn(new Timestamp(System.currentTimeMillis())); + model.setCreatedBy("test"); + model.setUpdatedBy("test"); + when(rawELRRepository.save(any())).thenReturn(model); + Mockito.doNothing().when(kafkaProducerService).sendMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + var result = target.submission(modelDto); + + Assertions.assertNotNull(result); + Assertions.assertEquals("test",result); + } + + @Test + void testSaveElrXml_Success() throws KafkaProducerException { + RawElrDto modelDto = new RawElrDto(); + modelDto.setPayload("test"); + modelDto.setType("HL7-XML"); + RawElrModel model = new RawElrModel(); + model.setId("test"); + model.setVersion("1"); + model.setCreatedOn(new Timestamp(System.currentTimeMillis())); + model.setUpdatedOn(new Timestamp(System.currentTimeMillis())); + model.setCreatedBy("test"); + model.setUpdatedBy("test"); + when(rawELRRepository.save(any())).thenReturn(model); + Mockito.doNothing().when(kafkaProducerService).sendMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + var result = target.submission(modelDto); + + Assertions.assertNotNull(result); + Assertions.assertEquals("test",result); + } + + @Test + void getById_Success() { + RawElrDto modelDto = new RawElrDto(); + modelDto.setPayload("test"); + modelDto.setType("HL7"); + modelDto.setId(guidForTesting); + + RawElrModel model = new RawElrModel(); + model.setId(guidForTesting); + model.setCreatedOn(new Timestamp(System.currentTimeMillis())); + model.setUpdatedOn(new Timestamp(System.currentTimeMillis())); + model.setCreatedBy("test"); + model.setUpdatedBy("test"); + + when(rawELRRepository.getReferenceById(any())).thenReturn(model); + + var result = target.getById(modelDto.getId()); + + Assertions.assertNotNull(result); + } + + @Test + void testUpdateRawMessageAfterRetry_HL7_Success() throws KafkaProducerException { + RawElrDto modelDto = new RawElrDto(); + modelDto.setId("test-retry"); + modelDto.setType("HL7"); + modelDto.setValidationActive(true); + modelDto.setVersion("1"); + Mockito.doNothing().when(kafkaProducerService).sendMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + target.updateRawMessageAfterRetry(modelDto, 1); + Mockito.verify(kafkaProducerService).sendMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(2), Mockito.any(), Mockito.any()); + } + + @Test + void testUpdateRawMessageAfterRetry_XML_Success() throws KafkaProducerException { + RawElrDto modelDto = new RawElrDto(); + modelDto.setId("test-retry-xml"); + modelDto.setType("HL7-XML"); + modelDto.setPayload("xml payload"); + modelDto.setVersion("1"); + Mockito.doNothing().when(kafkaProducerService).sendElrXmlMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + target.updateRawMessageAfterRetry(modelDto, 1); + Mockito.verify(kafkaProducerService).sendElrXmlMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(2), Mockito.any(), Mockito.any()); + } + + @Test + void testUpdateRawMessageAfterRetry_KafkaProducerException() throws KafkaProducerException { + RawElrDto modelDto = new RawElrDto(); + modelDto.setId("retry-fail"); + modelDto.setType("HL7"); + Mockito.doThrow(new KafkaProducerException("Retry failed")).when(kafkaProducerService).sendMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + Assertions.assertThrows(KafkaProducerException.class, () -> target.updateRawMessageAfterRetry(modelDto, 1)); + Mockito.verify(iElrDeadLetterRepository).updateDltOccurrenceForRawId(Mockito.any(), Mockito.eq(2), Mockito.eq("ERROR")); + } +} diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsControllerMockTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsControllerMockTest.java index 6100d1e75..76e403d6d 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsControllerMockTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsControllerMockTest.java @@ -1,9 +1,10 @@ package gov.cdc.dataingestion.rawmessage.controller; import gov.cdc.dataingestion.custommetrics.CustomMetricsBuilder; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.hl7.helper.integration.exception.DiHL7Exception; -import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; -import gov.cdc.dataingestion.rawmessage.service.RawELRService; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.rawmessage.service.RawElrService; import gov.cdc.dataingestion.validation.services.interfaces.IHL7Service; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -30,7 +31,7 @@ @SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"}) class ElrReportsControllerMockTest { @Mock - private RawELRService rawELRService; + private RawElrService rawELRService; @Mock private CustomMetricsBuilder customMetricsBuilder; @@ -48,13 +49,13 @@ void setUp() { } @Test - void testSave_HL7_ELR_Type() { + void testSave_HL7_ELR_Type() throws KafkaProducerException { String payload = "HL7 message"; String type = HL7_ELR; String version = "1"; String expectedResponse = "Submission successful"; - when(rawELRService.submission(any(RawERLDto.class), eq(version))).thenReturn(expectedResponse); + when(rawELRService.submission(any(RawElrDto.class))).thenReturn(expectedResponse); ResponseEntity response = elrReportsController.save(payload, type, version); @@ -64,13 +65,13 @@ void testSave_HL7_ELR_Type() { } @Test - void testSave_XML_ELR_Type() { + void testSave_XML_ELR_Type() throws KafkaProducerException { String payload = "XML message"; String type = XML_ELR; String version = "1"; String expectedResponse = "Submission successful"; - when(rawELRService.submission(any(RawERLDto.class), eq(version))).thenReturn(expectedResponse); + when(rawELRService.submission(any(RawElrDto.class))).thenReturn(expectedResponse); ResponseEntity response = elrReportsController.save(payload, type, version); diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsControllerTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsControllerTest.java index 6ad3c164e..784de53da 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsControllerTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsControllerTest.java @@ -1,8 +1,8 @@ package gov.cdc.dataingestion.rawmessage.controller; import gov.cdc.dataingestion.custommetrics.CustomMetricsBuilder; -import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; -import gov.cdc.dataingestion.rawmessage.service.RawELRService; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.rawmessage.service.RawElrService; import gov.cdc.dataingestion.validation.services.interfaces.IHL7Service; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -31,7 +31,7 @@ class ElrReportsControllerTest { @Autowired private MockMvc mockMvc; @MockBean - private RawELRService rawELRService; + private RawElrService rawELRService; @MockBean private CustomMetricsBuilder customMetricsBuilder; @MockBean @@ -47,12 +47,13 @@ void testSaveHL7Message() throws Exception { .with(SecurityMockMvcRequestPostProcessors.jwt())) .andExpect(MockMvcResultMatchers.status().isOk()); - RawERLDto rawERLDto = new RawERLDto(); - rawERLDto.setType(messageType); - rawERLDto.setPayload(hl7Payload); - rawERLDto.setValidationActive(true); + RawElrDto rawElrDto = new RawElrDto(); + rawElrDto.setType(messageType); + rawElrDto.setPayload(hl7Payload); + rawElrDto.setValidationActive(true); + rawElrDto.setVersion("1"); - verify(rawELRService).submission(rawERLDto, "1"); + verify(rawELRService).submission(rawElrDto); } @@ -67,23 +68,25 @@ void testSaveElrXmlMessage() throws Exception { .with(SecurityMockMvcRequestPostProcessors.jwt())) .andExpect(MockMvcResultMatchers.status().isOk()); - RawERLDto rawERLDto = new RawERLDto(); - rawERLDto.setType(messageType); - rawERLDto.setPayload(xmlPayload); - rawERLDto.setValidationActive(true); + RawElrDto rawElrDto = new RawElrDto(); + rawElrDto.setType(messageType); + rawElrDto.setPayload(xmlPayload); + rawElrDto.setValidationActive(true); + rawElrDto.setVersion("1"); - verify(rawELRService).submission(rawERLDto, "1"); + verify(rawELRService).submission(rawElrDto); } @Test void testSaveHL7Message_no_ValidationActivate() throws Exception { String payload = "Test payload"; String messageType = "HL7"; - RawERLDto rawERLDto = new RawERLDto(); - rawERLDto.setType(messageType); - rawERLDto.setPayload(payload); + RawElrDto rawElrDto = new RawElrDto(); + rawElrDto.setType(messageType); + rawElrDto.setPayload(payload); + rawElrDto.setVersion("1"); - when(rawELRService.submission(rawERLDto, "1")).thenReturn("OK"); + when(rawELRService.submission(rawElrDto)).thenReturn("OK"); mockMvc.perform(MockMvcRequestBuilders.post("/api/elrs") .param("id", "1").with(SecurityMockMvcRequestPostProcessors.jwt()) .header("msgType", messageType) diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/reportstatus/service/ReportStatusServiceTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/reportstatus/service/ReportStatusServiceTest.java index 006cec5e2..7d7334291 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/reportstatus/service/ReportStatusServiceTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/reportstatus/service/ReportStatusServiceTest.java @@ -8,8 +8,8 @@ import gov.cdc.dataingestion.odse.repository.IEdxActivityParentLogRepository; import gov.cdc.dataingestion.odse.repository.model.EdxActivityDetailLog; import gov.cdc.dataingestion.odse.repository.model.EdxActivityLog; -import gov.cdc.dataingestion.report.repository.IRawELRRepository; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.report.repository.IRawElrRepository; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.reportstatus.model.ReportStatusIdData; import gov.cdc.dataingestion.reportstatus.repository.IReportStatusRepository; import gov.cdc.dataingestion.validation.repository.IValidatedELRRepository; @@ -45,7 +45,7 @@ class ReportStatusServiceTest { @Mock private NbsInterfaceRepository nbsInterfaceRepositoryMock; @Mock - private IRawELRRepository iRawELRRepository; + private IRawElrRepository iRawELRRepository; @Mock private IValidatedELRRepository iValidatedELRRepository; @Mock @@ -79,12 +79,12 @@ void tearDown() { void testGetMessageDetailStatus_RawExist_ValidateExist_ReportExist_NbsExist() { String id = "test"; Integer nbsId = 123456; - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setId(id); - rawERLModel.setPayload("payload"); - rawERLModel.setCreatedOn(getCurrentTimeStamp("UTC")); - rawERLModel.setCreatedBy("admin"); - rawERLModel.setType("HL7"); + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setId(id); + rawElrModel.setPayload("payload"); + rawElrModel.setCreatedOn(getCurrentTimeStamp("UTC")); + rawElrModel.setCreatedBy("admin"); + rawElrModel.setType("HL7"); ValidatedELRModel validatedELRModel = new ValidatedELRModel(); validatedELRModel.setId("validate-id"); @@ -99,7 +99,7 @@ void testGetMessageDetailStatus_RawExist_ValidateExist_ReportExist_NbsExist() { NbsInterfaceModel nbsModel = new NbsInterfaceModel(); nbsModel.setRecordStatusCd("Success"); nbsModel.setPayload("payload"); - when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawERLModel)); + when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawElrModel)); when(iValidatedELRRepository.findByRawId(id)).thenReturn(Optional.of(validatedELRModel)); when(iReportStatusRepositoryMock.findByRawMessageId(id)).thenReturn(Optional.of(reportStatusIdModel)); when(nbsInterfaceRepositoryMock.findByNbsInterfaceUid(nbsId)).thenReturn(Optional.of(nbsModel)); @@ -130,12 +130,12 @@ void testGetMessageDetailStatus_RawNotExist() { @Test void testGetMessageDetailStatus_RawExist_ValidateNotExist_DltExist() { String id = "test"; - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setId(id); - rawERLModel.setPayload("payload"); - rawERLModel.setCreatedOn(getCurrentTimeStamp("UTC")); - rawERLModel.setCreatedBy("admin"); - rawERLModel.setType("HL7"); + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setId(id); + rawElrModel.setPayload("payload"); + rawElrModel.setCreatedOn(getCurrentTimeStamp("UTC")); + rawElrModel.setCreatedBy("admin"); + rawElrModel.setType("HL7"); ElrDeadLetterModel dltModel = new ElrDeadLetterModel(); dltModel.setErrorMessageId(id); dltModel.setDltStatus("ERROR"); @@ -143,7 +143,7 @@ void testGetMessageDetailStatus_RawExist_ValidateNotExist_DltExist() { dltModel.setErrorMessageSource("origin"); dltModel.setErrorStackTraceShort("short"); - when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawERLModel)); + when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawElrModel)); when(iValidatedELRRepository.findByRawId(id)).thenReturn(Optional. empty()); @@ -163,14 +163,14 @@ void testGetMessageDetailStatus_RawExist_ValidateNotExist_DltExist() { @Test void testGetMessageDetailStatus_RawExist_ValidateNotExist_DltNotExist() { String id = "test"; - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setId(id); - rawERLModel.setPayload("payload"); - rawERLModel.setCreatedOn(getCurrentTimeStamp("UTC")); - rawERLModel.setCreatedBy("admin"); - rawERLModel.setType("HL7"); - - when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawERLModel)); + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setId(id); + rawElrModel.setPayload("payload"); + rawElrModel.setCreatedOn(getCurrentTimeStamp("UTC")); + rawElrModel.setCreatedBy("admin"); + rawElrModel.setType("HL7"); + + when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawElrModel)); when(iValidatedELRRepository.findByRawId(id)).thenReturn(Optional. empty()); @@ -189,12 +189,12 @@ void testGetMessageDetailStatus_RawExist_ValidateNotExist_DltNotExist() { void testGetMessageDetailStatus_RawExist_ValidateExist_ReportExist_NbsNotExist() { String id = "test"; Integer nbsId = 123456; - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setId(id); - rawERLModel.setPayload("payload"); - rawERLModel.setCreatedOn(getCurrentTimeStamp("UTC")); - rawERLModel.setCreatedBy("admin"); - rawERLModel.setType("HL7"); + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setId(id); + rawElrModel.setPayload("payload"); + rawElrModel.setCreatedOn(getCurrentTimeStamp("UTC")); + rawElrModel.setCreatedBy("admin"); + rawElrModel.setType("HL7"); ValidatedELRModel validatedELRModel = new ValidatedELRModel(); validatedELRModel.setId("validate-id"); @@ -209,7 +209,7 @@ void testGetMessageDetailStatus_RawExist_ValidateExist_ReportExist_NbsNotExist() NbsInterfaceModel nbsModel = new NbsInterfaceModel(); nbsModel.setRecordStatusCd("Success"); nbsModel.setPayload("payload"); - when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawERLModel)); + when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawElrModel)); when(iValidatedELRRepository.findByRawId(id)).thenReturn(Optional.of(validatedELRModel)); when(iReportStatusRepositoryMock.findByRawMessageId(id)).thenReturn(Optional.of(reportStatusIdModel)); when(nbsInterfaceRepositoryMock.findByNbsInterfaceUid(nbsId)).thenReturn(Optional.empty()); @@ -223,12 +223,12 @@ void testGetMessageDetailStatus_RawExist_ValidateExist_ReportExist_NbsNotExist() @Test void testGetMessageDetailStatus_RawExist_ValidateExist_ReportNotExist_DltExist() { String id = "test"; - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setId(id); - rawERLModel.setPayload("payload"); - rawERLModel.setCreatedOn(getCurrentTimeStamp("UTC")); - rawERLModel.setCreatedBy("admin"); - rawERLModel.setType("HL7"); + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setId(id); + rawElrModel.setPayload("payload"); + rawElrModel.setCreatedOn(getCurrentTimeStamp("UTC")); + rawElrModel.setCreatedBy("admin"); + rawElrModel.setType("HL7"); ValidatedELRModel validatedELRModel = new ValidatedELRModel(); validatedELRModel.setId("validate-id"); validatedELRModel.setRawMessage("payload"); @@ -241,7 +241,7 @@ void testGetMessageDetailStatus_RawExist_ValidateExist_ReportNotExist_DltExist() dltModel.setErrorMessageSource("origin"); dltModel.setErrorStackTraceShort("short"); - when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawERLModel)); + when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawElrModel)); when(iValidatedELRRepository.findByRawId(id)).thenReturn(Optional.of(validatedELRModel)); when(iReportStatusRepositoryMock.findByRawMessageId(id)).thenReturn(Optional. empty()); @@ -259,12 +259,12 @@ void testGetMessageDetailStatus_RawExist_ValidateExist_ReportNotExist_DltExist() @Test void testGetMessageDetailStatus_RawExist_ValidateExist_ReportNotExist_DltNotExist() { String id = "test"; - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setId(id); - rawERLModel.setPayload("payload"); - rawERLModel.setCreatedOn(getCurrentTimeStamp("UTC")); - rawERLModel.setCreatedBy("admin"); - rawERLModel.setType("HL7"); + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setId(id); + rawElrModel.setPayload("payload"); + rawElrModel.setCreatedOn(getCurrentTimeStamp("UTC")); + rawElrModel.setCreatedBy("admin"); + rawElrModel.setType("HL7"); ValidatedELRModel validatedELRModel = new ValidatedELRModel(); validatedELRModel.setId("validate-id"); validatedELRModel.setRawMessage("payload"); @@ -272,7 +272,7 @@ void testGetMessageDetailStatus_RawExist_ValidateExist_ReportNotExist_DltNotExis - when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawERLModel)); + when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawElrModel)); when(iValidatedELRRepository.findByRawId(id)).thenReturn(Optional.of(validatedELRModel)); when(iReportStatusRepositoryMock.findByRawMessageId(id)).thenReturn(Optional. empty()); @@ -349,12 +349,12 @@ void testDummyGetStatusForReportSuccessModelCoverage() { void testGetMessageDetailStatus_RawExist_ValidateExist_ReportExist_NbsExist_OdseExist() { String id = "123"; Integer nbsId = 123456; - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setId(id); - rawERLModel.setPayload("payload"); - rawERLModel.setCreatedOn(getCurrentTimeStamp("UTC")); - rawERLModel.setCreatedBy("admin"); - rawERLModel.setType("HL7"); + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setId(id); + rawElrModel.setPayload("payload"); + rawElrModel.setCreatedOn(getCurrentTimeStamp("UTC")); + rawElrModel.setCreatedBy("admin"); + rawElrModel.setType("HL7"); ValidatedELRModel validatedELRModel = new ValidatedELRModel(); validatedELRModel.setId("validate-id"); @@ -380,7 +380,7 @@ void testGetMessageDetailStatus_RawExist_ValidateExist_ReportExist_NbsExist_Odse nbsModel.setRecordStatusCd("Failure"); nbsModel.setPayload("payload"); - when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawERLModel)); + when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawElrModel)); when(iValidatedELRRepository.findByRawId(id)).thenReturn(Optional.of(validatedELRModel)); when(iReportStatusRepositoryMock.findByRawMessageId(id)).thenReturn(Optional.of(reportStatusIdModel)); when(nbsInterfaceRepositoryMock.findByNbsInterfaceUid(nbsId)).thenReturn(Optional.of(nbsModel)); diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/validation/integration/validator/HL7DuplicateValidatorTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/validation/integration/validator/HL7DuplicateValidatorTest.java index fdd8843a5..1263e2193 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/validation/integration/validator/HL7DuplicateValidatorTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/validation/integration/validator/HL7DuplicateValidatorTest.java @@ -2,6 +2,7 @@ import gov.cdc.dataingestion.custommetrics.CustomMetricsBuilder; import gov.cdc.dataingestion.exception.DuplicateHL7FileFoundException; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; import gov.cdc.dataingestion.validation.repository.IValidatedELRRepository; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; @@ -44,7 +45,7 @@ void tearDown() { } @Test - void testValidateHL7DocumentSuccess() throws DuplicateHL7FileFoundException { + void testValidateHL7DocumentSuccess() throws DuplicateHL7FileFoundException, KafkaProducerException { ValidatedELRModel validatedELRModel = getValidatedELRModel(); Mockito.when(iValidatedELRRepositoryMock.findByHashedHL7String(any())) @@ -56,7 +57,7 @@ void testValidateHL7DocumentSuccess() throws DuplicateHL7FileFoundException { } @Test - void testValidateHL7DocumentThrowsException() { + void testValidateHL7DocumentThrowsException() throws KafkaProducerException { String hashedString = "843588fcbfbdca29f9807f81455bbd3ae6935dae6152bcac6851e9568c885c66"; ValidatedELRModel validatedELRModel = getValidatedELRModel(); diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/validation/integration/validator/HL7v2ValidatorTests.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/validation/integration/validator/HL7v2ValidatorTests.java index 91c30352f..90f813b29 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/validation/integration/validator/HL7v2ValidatorTests.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/validation/integration/validator/HL7v2ValidatorTests.java @@ -3,7 +3,7 @@ import gov.cdc.dataingestion.constant.enums.EnumMessageType; import gov.cdc.dataingestion.hl7.helper.HL7Helper; import gov.cdc.dataingestion.hl7.helper.integration.exception.DiHL7Exception; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.validation.integration.validator.interfaces.IHL7v2Validator; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -41,7 +41,7 @@ void MessageValidation_Success_ValidMessage_NotContainNewLine() throws DiHL7Exce String id = "1"; - RawERLModel model = new RawERLModel(); + RawElrModel model = new RawElrModel(); model.setPayload(data); model.setId(id); model.setCreatedOn(null); @@ -70,7 +70,7 @@ void MessageValidation_Success_ValidMessage_NotContainNewLine_231() throws DiHL7 String id = "1"; - RawERLModel model = new RawERLModel(); + RawElrModel model = new RawElrModel(); model.setPayload(data); model.setId(id); model.setCreatedOn(null); @@ -106,7 +106,7 @@ void MessageValidation_Success_ValidMessage_ContainNewLine() throws DiHL7Excepti String id = "1"; - RawERLModel model = new RawERLModel(); + RawElrModel model = new RawElrModel(); model.setPayload(data); model.setId(id); @@ -140,7 +140,7 @@ void MessageValidation_Exception_NotSupportedVersion() { String id = "1"; - RawERLModel model = new RawERLModel(); + RawElrModel model = new RawElrModel(); model.setPayload(data); model.setId(id); @@ -162,7 +162,7 @@ void MessageValidation_InvalidMessage_ThrowException() { String data = "Invalid Message"; String id = "1"; - RawERLModel model = new RawERLModel(); + RawElrModel model = new RawElrModel(); model.setPayload(data); model.setId(id);