From 0eb52a2e1fb039216f5f35507137d8a552cf5b34 Mon Sep 17 00:00:00 2001 From: Ragul Shanmugam Date: Thu, 6 Mar 2025 15:11:01 -0800 Subject: [PATCH 1/6] Ading automatic retry for kafka errors --- .../controller/ElrDeadLetterController.java | 3 +- .../repository/IElrDeadLetterRepository.java | 18 ++++- .../service/ElrDeadLetterService.java | 46 +++++++++-- .../deadletter/service/SchedulerConfig.java | 28 +++++++ .../exception/KafkaProducerException.java | 14 ++++ .../service/KafkaConsumerService.java | 32 +++++--- .../service/KafkaProducerService.java | 54 +++++++++---- .../rawmessage/service/RawELRService.java | 76 ++++++++++++++----- .../report/repository/IRawELRRepository.java | 8 ++ .../validator/HL7DuplicateValidator.java | 3 +- .../interfaces/IHL7DuplicateValidator.java | 3 +- .../src/main/resources/application.yaml | 7 ++ 12 files changed, 238 insertions(+), 54 deletions(-) create mode 100644 data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfig.java create mode 100644 data-ingestion-service/src/main/java/gov/cdc/dataingestion/exception/KafkaProducerException.java diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/controller/ElrDeadLetterController.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/controller/ElrDeadLetterController.java index 75da2287b..b8f63ab69 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/controller/ElrDeadLetterController.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/controller/ElrDeadLetterController.java @@ -3,6 +3,7 @@ import gov.cdc.dataingestion.deadletter.model.ElrDeadLetterDto; import gov.cdc.dataingestion.deadletter.service.ElrDeadLetterService; import gov.cdc.dataingestion.exception.DeadLetterTopicException; +import gov.cdc.dataingestion.exception.KafkaProducerException; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.enums.ParameterIn; @@ -87,7 +88,7 @@ public ResponseEntity getErrorMessage(@PathVariable("dlt-id") schema = @Schema(type = "string"))} ) @PostMapping(consumes = MediaType.TEXT_PLAIN_VALUE, path = "/api/elrs/{dlt-id}") - public ResponseEntity messageReInject(@PathVariable("dlt-id") String dltId, @RequestBody final String payload) throws DeadLetterTopicException { + public ResponseEntity messageReInject(@PathVariable("dlt-id") String dltId, @RequestBody final String payload) throws DeadLetterTopicException, KafkaProducerException { return ResponseEntity.ok(elrDeadLetterService.updateAndReprocessingMessage(dltId, payload)); } } \ No newline at end of file diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/repository/IElrDeadLetterRepository.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/repository/IElrDeadLetterRepository.java index fdb709f0d..e6cf9d790 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/repository/IElrDeadLetterRepository.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/repository/IElrDeadLetterRepository.java @@ -1,8 +1,11 @@ package gov.cdc.dataingestion.deadletter.repository; import gov.cdc.dataingestion.deadletter.repository.model.ElrDeadLetterModel; +import jakarta.transaction.Transactional; import org.springframework.data.domain.Sort; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; import org.springframework.stereotype.Repository; import java.util.List; @@ -11,4 +14,17 @@ @Repository public interface IElrDeadLetterRepository extends JpaRepository { Optional> findAllDltRecordByDltStatus (String dltStatus, Sort sort); -} + + @Modifying + @Transactional + @Query(value = "INSERT INTO elr_dlt (error_message_id, error_message_source, error_stack_trace, error_stack_trace_short, dlt_status, dlt_occurrence, message, created_by, updated_by) VALUES(:id, :topicName, :errorStatus, :errorStatus , 'KAFKA_ERROR_' + :type, :dltOccurrence, :payload, 'elr_raw_service', 'elr_raw_service')", nativeQuery = true) + void addErrorStatusForRawId(String id, String topicName, String type, String payload, String errorStatus, int dltOccurrence); + + @Modifying + @Transactional + @Query(value = "UPDATE elr_dlt SET dlt_status = 'PROCESSED' WHERE error_message_id = :id", nativeQuery = true) + void updateErrorStatusForRawId(String id, String errorStatus); + + @Query(value = "SELECT * FROM elr_dlt WHERE dlt_status LIKE '%KAFKA%' AND dlt_occurrence <= 2", nativeQuery = true) + List getAllErrorDltRecordFromKafka(); +} \ No newline at end of file diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterService.java index bdc7a53b7..2dd204264 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterService.java @@ -7,7 +7,10 @@ import gov.cdc.dataingestion.deadletter.repository.IElrDeadLetterRepository; import gov.cdc.dataingestion.deadletter.repository.model.ElrDeadLetterModel; import gov.cdc.dataingestion.exception.DeadLetterTopicException; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; +import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; +import gov.cdc.dataingestion.rawmessage.service.RawELRService; import gov.cdc.dataingestion.report.repository.IRawELRRepository; import gov.cdc.dataingestion.report.repository.model.RawERLModel; import gov.cdc.dataingestion.validation.repository.IValidatedELRRepository; @@ -17,10 +20,7 @@ import org.springframework.data.domain.Sort; import org.springframework.stereotype.Service; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.UUID; +import java.util.*; import static gov.cdc.dataingestion.share.helper.TimeStampHelper.getCurrentTimeStamp; @@ -38,6 +38,7 @@ public class ElrDeadLetterService { private final IRawELRRepository rawELRRepository; private final IValidatedELRRepository validatedELRRepository; private final KafkaProducerService kafkaProducerService; + private final RawELRService rawELRService; @Value("${service.timezone}") private String tz = "UTC"; @@ -65,11 +66,12 @@ public ElrDeadLetterService( IElrDeadLetterRepository dltRepository, IRawELRRepository rawELRRepository, IValidatedELRRepository validatedELRRepository, - KafkaProducerService kafkaProducerService) { + KafkaProducerService kafkaProducerService, RawELRService rawELRService) { this.dltRepository = dltRepository; this.rawELRRepository = rawELRRepository; this.validatedELRRepository = validatedELRRepository; this.kafkaProducerService = kafkaProducerService; + this.rawELRService = rawELRService; } public List getAllErrorDltRecord() { @@ -101,7 +103,7 @@ public ElrDeadLetterDto getDltRecordById(String id) throws DeadLetterTopicExcept - public ElrDeadLetterDto updateAndReprocessingMessage(String id, String body) throws DeadLetterTopicException { + public ElrDeadLetterDto updateAndReprocessingMessage(String id, String body) throws DeadLetterTopicException, KafkaProducerException { var existingRecord = getDltRecordById(id); if(!existingRecord.getDltStatus().equalsIgnoreCase(EnumElrDltStatus.ERROR.name())) { throw new DeadLetterTopicException("Selected record is in REINJECTED state. Please either wait for the ERROR state to occur or select a different record."); @@ -207,4 +209,36 @@ private boolean isValidUUID(String uuidString) { return false; } } + + public void processFailedMessagesFromKafka() throws KafkaProducerException { + List dltMessagesList = dltRepository.getAllErrorDltRecordFromKafka(); + System.out.println("Running dlt scheduler...."); + dltMessagesList.forEach(System.err::println); + if(!dltMessagesList.isEmpty()) { + Iterator iterator = dltMessagesList.iterator(); + while (iterator.hasNext()) { + ElrDeadLetterModel message = iterator.next(); + System.err.println(message); + RawERLDto rawELRDto = new RawERLDto(); + rawELRDto.setId(message.getErrorMessageId()); + rawELRDto.setType(getElrMessageType(message.getDltStatus())); + rawELRDto.setPayload(message.getMessage()); + rawELRDto.setValidationActive(true); + dltRepository.updateErrorStatusForRawId(message.getErrorMessageId(), "PROCESSED"); + rawELRService.updateRawMessageAfterRetry(rawELRDto, "2"); + iterator.remove(); + } + } + } + + private String getElrMessageType(String dltStatus) { + String delimiter = "KAFKA_ERROR"; + int delimiterIndex = dltStatus.indexOf(delimiter); + + String msgType = ""; + if (delimiterIndex != -1) { + msgType = dltStatus.substring(delimiterIndex + 1 + delimiter.length()); + } + return msgType; + } } diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfig.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfig.java new file mode 100644 index 000000000..a217fc4fa --- /dev/null +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfig.java @@ -0,0 +1,28 @@ +package gov.cdc.dataingestion.deadletter.service; + +import gov.cdc.dataingestion.exception.KafkaProducerException; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; + +@Configuration +@EnableScheduling +public class SchedulerConfig { + + @Value("${dlt.scheduler.enabled}") + private boolean isSchedulerEnabled; + + private ElrDeadLetterService elrDeadLetterService; + + public SchedulerConfig(ElrDeadLetterService elrDeadLetterService) { + this.elrDeadLetterService = elrDeadLetterService; + } + + @Scheduled(cron = "${dlt.scheduler.cron.expression}") + public void scheduleTask() throws KafkaProducerException { + if (isSchedulerEnabled) { + elrDeadLetterService.processFailedMessagesFromKafka(); + } + } +} diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/exception/KafkaProducerException.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/exception/KafkaProducerException.java new file mode 100644 index 000000000..1940ca78c --- /dev/null +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/exception/KafkaProducerException.java @@ -0,0 +1,14 @@ +package gov.cdc.dataingestion.exception; + +/** + 1118 - require constructor complaint + 125 - comment complaint + 6126 - String block complaint + 1135 - todos complaint + * */ +@SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"}) +public class KafkaProducerException extends Exception{ + public KafkaProducerException(String message) { + super(message); + } +} diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java index 4bca8ca11..db5c56e96 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java @@ -182,7 +182,7 @@ public void handleMessageForRawElr(String message, } try { validationHandler(message, hl7ValidationActivated, dataProcessingEnable); - } catch (DuplicateHL7FileFoundException | DiHL7Exception e) { + } catch (DuplicateHL7FileFoundException | DiHL7Exception | KafkaProducerException e) { throw new RuntimeException(e); //NOSONAR } }); @@ -250,12 +250,20 @@ public void handleMessageForElrXml(String message, iReportStatusRepository.save(reportStatusIdData); if (dataProcessingApplied) { - kafkaProducerService.sendMessageAfterConvertedToXml( - String.valueOf(nbsInterfaceModel.getNbsInterfaceUid()), "dp_elr_unprocessed", 0); + try { + kafkaProducerService.sendMessageAfterConvertedToXml( + String.valueOf(nbsInterfaceModel.getNbsInterfaceUid()), "dp_elr_unprocessed", 0); + } catch (KafkaProducerException e) { + throw new RuntimeException(e); + } } else { - kafkaProducerService.sendMessageAfterConvertedToXml( - nbsInterfaceModel.getNbsInterfaceUid().toString(), convertedToXmlTopic, 0); + try { + kafkaProducerService.sendMessageAfterConvertedToXml( + nbsInterfaceModel.getNbsInterfaceUid().toString(), convertedToXmlTopic, 0); + } catch (KafkaProducerException e) { + throw new RuntimeException(e); + } } }); @@ -318,7 +326,11 @@ public void handleMessageForXmlConversionElr(String message, @Header(KafkaHeaderValue.DATA_PROCESSING_ENABLE) String dataProcessingEnable) { timeMetricsBuilder.recordXmlPrepTime(() -> { log.debug(topicDebugLog, message, topic); - xmlConversionHandler(message, operation, dataProcessingEnable); + try { + xmlConversionHandler(message, operation, dataProcessingEnable); + } catch (KafkaProducerException e) { + throw new RuntimeException(e); + } }); } @@ -455,7 +467,7 @@ private String getDltErrorSource(String incomingTopic) { return erroredSource; } - private void preparationForConversionHandler(String message, String dataProcessingEnable) throws ConversionPrepareException { + private void preparationForConversionHandler(String message, String dataProcessingEnable) throws ConversionPrepareException, KafkaProducerException { Optional validatedElrResponse = this.iValidatedELRRepository.findById(message); if(validatedElrResponse.isPresent()) { kafkaProducerService.sendMessagePreparationTopic(validatedElrResponse.get(), prepXmlTopic, TopicPreparationType.XML, 0, dataProcessingEnable); @@ -469,7 +481,7 @@ private void preparationForConversionHandler(String message, String dataProcessi * make this public so we can add unit test for now. * we need to implementation interface pattern for NBS convert and transformation classes. it better for unit testing * */ - public void xmlConversionHandlerProcessing(String message, String operation, String dataProcessingEnable) { + public void xmlConversionHandlerProcessing(String message, String operation, String dataProcessingEnable) throws KafkaProducerException { String hl7Msg = ""; try { Optional validatedELRModel = iValidatedELRRepository.findById(message); @@ -548,11 +560,11 @@ public void xmlConversionHandlerProcessing(String message, String operation, Str ); } } - private void xmlConversionHandler(String message, String operation, String dataProcessingEnable) { + private void xmlConversionHandler(String message, String operation, String dataProcessingEnable) throws KafkaProducerException { log.debug("Received message id will be retrieved from db and associated hl7 will be converted to xml"); xmlConversionHandlerProcessing(message, operation, dataProcessingEnable); } - private void validationHandler(String message, boolean hl7ValidationActivated, String dataProcessingEnable) throws DuplicateHL7FileFoundException, DiHL7Exception { + private void validationHandler(String message, boolean hl7ValidationActivated, String dataProcessingEnable) throws DuplicateHL7FileFoundException, DiHL7Exception, KafkaProducerException { Optional rawElrResponse = this.iRawELRRepository.findById(message); RawERLModel elrModel; if (!rawElrResponse.isEmpty()) { diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaProducerService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaProducerService.java index 10f90b7ec..4b84e2c26 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaProducerService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaProducerService.java @@ -5,14 +5,18 @@ import gov.cdc.dataingestion.constant.TopicPreparationType; import gov.cdc.dataingestion.constant.enums.EnumKafkaOperation; import gov.cdc.dataingestion.exception.ConversionPrepareException; +import gov.cdc.dataingestion.exception.KafkaProducerException; +import gov.cdc.dataingestion.report.repository.IRawELRRepository; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.stereotype.Service; +import java.util.Arrays; import java.util.List; import java.util.UUID; +import java.util.concurrent.*; @Service /** @@ -31,8 +35,11 @@ public class KafkaProducerService { private final KafkaTemplate kafkaTemplate; - public KafkaProducerService( KafkaTemplate kafkaTemplate) { + private final IRawELRRepository iRawELRRepository; + + public KafkaProducerService(KafkaTemplate kafkaTemplate, IRawELRRepository iRawELRRepository) { this.kafkaTemplate = kafkaTemplate; + this.iRawELRRepository = iRawELRRepository; } public void sendMessageFromController(String msg, @@ -40,7 +47,7 @@ public void sendMessageFromController(String msg, String msgType, Integer dltOccurrence, Boolean validationActive, - String version) { + String version) throws KafkaProducerException { String uniqueID = msgType + "_" + UUID.randomUUID(); var prodRecord = new ProducerRecord<>(topic, uniqueID, msg); prodRecord.headers().add(KafkaHeaderValue.MESSAGE_TYPE, msgType.getBytes()); @@ -58,7 +65,7 @@ public void sendElrXmlMessageFromController(String msgId, String topic, String msgType, Integer dltOccurrence, - String payload, String version) { + String payload, String version) throws KafkaProducerException { String uniqueID = msgType + "_" + msgId; var prodRecord = new ProducerRecord<>(topic, uniqueID, payload); prodRecord.headers().add(KafkaHeaderValue.MESSAGE_TYPE, msgType.getBytes()); @@ -72,7 +79,7 @@ public void sendElrXmlMessageFromController(String msgId, } public void sendMessageFromDltController( - String msg, String topic, String msgType, Integer dltOccurrence) { + String msg, String topic, String msgType, Integer dltOccurrence) throws KafkaProducerException { String uniqueID = msgType + "_" + UUID.randomUUID(); var prodRecord = new ProducerRecord<>(topic, uniqueID, msg); prodRecord.headers().add(KafkaHeaderValue.MESSAGE_TYPE, msgType.getBytes()); @@ -86,7 +93,7 @@ public void sendMessageFromDltController( @Deprecated @SuppressWarnings("java:S1133") - public void sendMessageFromCSVController(List> msg, String topic, String msgType) { + public void sendMessageFromCSVController(List> msg, String topic, String msgType) throws KafkaProducerException { String uniqueID = msgType + "_" + UUID.randomUUID(); Gson gson = new Gson(); String json = gson.toJson(msg); @@ -96,7 +103,7 @@ public void sendMessageFromCSVController(List> msg, String topic, S sendMessage(prodRecord); } - public void sendMessageAfterValidatingMessage(ValidatedELRModel msg, String topic, Integer dltOccurrence, String dataProcessingEnable) { + public void sendMessageAfterValidatingMessage(ValidatedELRModel msg, String topic, Integer dltOccurrence, String dataProcessingEnable) throws KafkaProducerException { String uniqueID = PREFIX_MSG_VALID + msg.getMessageType() + "_" + UUID.randomUUID(); var prodRecord = new ProducerRecord<>(topic, uniqueID, msg.getId()); prodRecord.headers().add(KafkaHeaderValue.MESSAGE_TYPE, msg.getMessageType().getBytes()); @@ -108,7 +115,7 @@ public void sendMessageAfterValidatingMessage(ValidatedELRModel msg, String topi sendMessage(prodRecord); } @SuppressWarnings({"java:S6880"}) - public void sendMessagePreparationTopic(ValidatedELRModel msg, String topic, TopicPreparationType topicType, Integer dltOccurrence, String dataProcessingEnable) throws ConversionPrepareException { + public void sendMessagePreparationTopic(ValidatedELRModel msg, String topic, TopicPreparationType topicType, Integer dltOccurrence, String dataProcessingEnable) throws ConversionPrepareException, KafkaProducerException { String uniqueId; if (topicType == TopicPreparationType.XML) { @@ -127,7 +134,7 @@ else if (topicType == TopicPreparationType.FHIR) { private void sendMessageHelper(String topic, Integer dltOccurrence, String uniqueId, String messageOriginId, String messageType, String messageVersion, - String dataProcessingEnable) { + String dataProcessingEnable) throws KafkaProducerException { var prodRecord = new ProducerRecord<>(topic, uniqueId, messageOriginId); prodRecord.headers().add(KafkaHeaderValue.MESSAGE_TYPE, messageType.getBytes()); prodRecord.headers().add(KafkaHeaderValue.MESSAGE_VERSION, messageVersion.getBytes()); @@ -139,7 +146,7 @@ private void sendMessageHelper(String topic, Integer dltOccurrence, String uniqu } public void sendMessageDlt(String msgShort, String msg, String topic, Integer dltOccurrence, - String stackTrace, String originalTopic) { + String stackTrace, String originalTopic) throws KafkaProducerException { String uniqueID = "DLT_" + UUID.randomUUID(); var prodRecord = new ProducerRecord<>(topic, uniqueID, msg); prodRecord.headers().add(KafkaHeaderValue.DLT_OCCURRENCE, dltOccurrence.toString().getBytes()); @@ -151,7 +158,7 @@ public void sendMessageDlt(String msgShort, String msg, String topic, Integer dl - public void sendMessageAfterConvertedToXml(String xmlMsg, String topic, Integer dltOccurrence) { + public void sendMessageAfterConvertedToXml(String xmlMsg, String topic, Integer dltOccurrence) throws KafkaProducerException { String uniqueID = PREFIX_MSG_XML + UUID.randomUUID(); var prodRecord = new ProducerRecord<>(topic, uniqueID, xmlMsg); prodRecord.headers().add(KafkaHeaderValue.DLT_OCCURRENCE, dltOccurrence.toString().getBytes()); @@ -160,7 +167,7 @@ public void sendMessageAfterConvertedToXml(String xmlMsg, String topic, Integer sendMessage(prodRecord); } - public void sendMessageAfterCheckingDuplicateHL7(ValidatedELRModel msg, String validatedElrDuplicateTopic, Integer dltOccurrence) { + public void sendMessageAfterCheckingDuplicateHL7(ValidatedELRModel msg, String validatedElrDuplicateTopic, Integer dltOccurrence) throws KafkaProducerException { String uniqueID = PREFIX_MSG_HL7 + UUID.randomUUID(); var prodRecord = new ProducerRecord<>(validatedElrDuplicateTopic, uniqueID, msg.getRawId()); prodRecord.headers().add(KafkaHeaderValue.DLT_OCCURRENCE, dltOccurrence.toString().getBytes()); @@ -168,11 +175,26 @@ public void sendMessageAfterCheckingDuplicateHL7(ValidatedELRModel msg, String v sendMessage(prodRecord); } - - - - private void sendMessage(ProducerRecord prodRecord) { - kafkaTemplate.send(prodRecord); + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future future = executor.submit(() -> { + Thread.sleep(1000000000); + return "Task completed"; + }); + + + + private void sendMessage(ProducerRecord prodRecord) throws KafkaProducerException { + try { +// kafkaTemplate.send(prodRecord).get(3, TimeUnit.SECONDS); + String result = future.get(3, TimeUnit.SECONDS); // Wait with a timeout of 5 seconds + System.out.println("result is..." + result); + System.err.println("Printing the send..." ); + } catch (TimeoutException | InterruptedException | ExecutionException e) { + System.err.println("Printing the exception..." + Arrays.toString(e.getStackTrace())); +// logger.error + System.err.println("Failed publishing message to kafka topic: " + prodRecord.topic() + " with UUID: " + prodRecord.value()); + throw new KafkaProducerException("Failed publishing message to kafka topic: " + prodRecord.topic() + " with UUID: " + prodRecord.value()); + } } diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawELRService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawELRService.java index 6859787a1..376e99efb 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawELRService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawELRService.java @@ -1,5 +1,7 @@ package gov.cdc.dataingestion.rawmessage.service; +import gov.cdc.dataingestion.deadletter.repository.IElrDeadLetterRepository; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; import gov.cdc.dataingestion.report.repository.IRawELRRepository; @@ -28,35 +30,73 @@ public class RawELRService { @Value("${service.timezone}") private String tz = "UTC"; + private final String errorStatus = "Sending event to elr_raw kafka topic failed"; + private final IRawELRRepository rawELRRepository; private final KafkaProducerService kafkaProducerService; + private final IElrDeadLetterRepository iElrDeadLetterRepository; - public String submission(RawERLDto rawERLDto, String version) { + public String submission(RawERLDto rawERLDto, String version) throws KafkaProducerException { RawERLModel created = rawELRRepository.save(convert(rawERLDto)); - if(rawERLDto.getType().equalsIgnoreCase(HL7_ELR)) { - kafkaProducerService.sendMessageFromController( - created.getId(), - topicName, - rawERLDto.getType(), - 0, - rawERLDto.getValidationActive(), - version); - } - if(rawERLDto.getType().equalsIgnoreCase(XML_ELR)) { - kafkaProducerService.sendElrXmlMessageFromController( - created.getId(), - rawXmlTopicName, - rawERLDto.getType(), - 0, - rawERLDto.getPayload(), - version); + int dltOccurrence = 0; + try { + if(rawERLDto.getType().equalsIgnoreCase(HL7_ELR)) { + kafkaProducerService.sendMessageFromController( + created.getId(), + topicName, + rawERLDto.getType(), + dltOccurrence, + rawERLDto.getValidationActive(), + version); + } + if(rawERLDto.getType().equalsIgnoreCase(XML_ELR)) { + kafkaProducerService.sendElrXmlMessageFromController( + created.getId(), + rawXmlTopicName, + rawERLDto.getType(), + dltOccurrence, + rawERLDto.getPayload(), + version); + } + } catch (KafkaProducerException e) { + iElrDeadLetterRepository.addErrorStatusForRawId(created.getId(), topicName, created.getType(), created.getPayload(), errorStatus, dltOccurrence + 1); + throw new KafkaProducerException("Failed publishing message to kafka topic: " + topicName + " with UUID: " + created.getId()); } return created.getId(); } + public void updateRawMessageAfterRetry(RawERLDto rawElrDto, String version) throws KafkaProducerException { + rawELRRepository.updateRawMessageWithNewVersion(rawElrDto.getId(), "Reprocessed"); + int dltOccurrence = 1; + try { + if(rawElrDto.getType().equalsIgnoreCase(HL7_ELR)) { + kafkaProducerService.sendMessageFromController( + rawElrDto.getId(), + topicName, + rawElrDto.getType(), + dltOccurrence + 1, + rawElrDto.getValidationActive(), + version); + } + if(rawElrDto.getType().equalsIgnoreCase(XML_ELR)) { + kafkaProducerService.sendElrXmlMessageFromController( + rawElrDto.getId(), + rawXmlTopicName, + rawElrDto.getType(), + dltOccurrence + 1, + rawElrDto.getPayload(), + version); + } + } catch (KafkaProducerException e) { + iElrDeadLetterRepository.addErrorStatusForRawId(rawElrDto.getId(), topicName, rawElrDto.getType(), rawElrDto.getPayload(), errorStatus, dltOccurrence + 1); + throw new KafkaProducerException("Failed publishing message again to kafka topic: " + topicName + " with UUID: " + rawElrDto.getId()); + } + } + public RawERLDto getById(String id) { RawERLModel rawERLModel = rawELRRepository.getById(id); +// rawELRRepository. return convert(rawERLModel); } diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawELRRepository.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawELRRepository.java index ea25748e3..80e0c85df 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawELRRepository.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawELRRepository.java @@ -1,9 +1,17 @@ package gov.cdc.dataingestion.report.repository; import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import jakarta.transaction.Transactional; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; import org.springframework.stereotype.Repository; @Repository public interface IRawELRRepository extends JpaRepository { + + @Modifying + @Transactional + @Query(value = "UPDATE elr_raw SET status = :status WHERE id = :id", nativeQuery = true) + void updateRawMessageWithNewVersion(String id, String status); } \ No newline at end of file diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/HL7DuplicateValidator.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/HL7DuplicateValidator.java index 8d40494c0..8c3bec485 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/HL7DuplicateValidator.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/HL7DuplicateValidator.java @@ -2,6 +2,7 @@ import gov.cdc.dataingestion.custommetrics.CustomMetricsBuilder; import gov.cdc.dataingestion.exception.DuplicateHL7FileFoundException; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; import gov.cdc.dataingestion.validation.integration.validator.interfaces.IHL7DuplicateValidator; import gov.cdc.dataingestion.validation.repository.IValidatedELRRepository; @@ -39,7 +40,7 @@ public HL7DuplicateValidator(IValidatedELRRepository iValidatedELRRepository, Ka } @Override - public void validateHL7Document(ValidatedELRModel hl7ValidatedModel) throws DuplicateHL7FileFoundException { + public void validateHL7Document(ValidatedELRModel hl7ValidatedModel) throws DuplicateHL7FileFoundException, KafkaProducerException { String hashedString = null; try { MessageDigest digestString = MessageDigest.getInstance("SHA-256"); diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/interfaces/IHL7DuplicateValidator.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/interfaces/IHL7DuplicateValidator.java index ba8049b58..3938a7c38 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/interfaces/IHL7DuplicateValidator.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/interfaces/IHL7DuplicateValidator.java @@ -1,8 +1,9 @@ package gov.cdc.dataingestion.validation.integration.validator.interfaces; import gov.cdc.dataingestion.exception.DuplicateHL7FileFoundException; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; public interface IHL7DuplicateValidator { - void validateHL7Document(ValidatedELRModel hl7ValidatedModel) throws DuplicateHL7FileFoundException; + void validateHL7Document(ValidatedELRModel hl7ValidatedModel) throws DuplicateHL7FileFoundException, KafkaProducerException; } diff --git a/data-ingestion-service/src/main/resources/application.yaml b/data-ingestion-service/src/main/resources/application.yaml index 8a36bd5e0..5f2446ebb 100644 --- a/data-ingestion-service/src/main/resources/application.yaml +++ b/data-ingestion-service/src/main/resources/application.yaml @@ -34,6 +34,13 @@ ecr: processing: batchSize: 100 # How many records are processed at a time interval: 3000 # How often (in milliseconds) the database is queried for records to process + +dlt: + scheduler: + enabled: true + cron: + expression: 0/10 * * * * * + --- spring: config: From 6bebded5c3edb6ee88aafe031cf36d8b5cda3d04 Mon Sep 17 00:00:00 2001 From: Ragul Shanmugam Date: Thu, 6 Mar 2025 15:47:14 -0800 Subject: [PATCH 2/6] Updated repository for dlt --- data-ingestion-service/build.gradle | 2 +- .../deadletter/repository/IElrDeadLetterRepository.java | 5 +++++ .../kafka/integration/service/KafkaProducerService.java | 2 +- .../rawmessage/controller/ElrReportsController.java | 3 ++- .../cdc/dataingestion/rawmessage/service/RawELRService.java | 2 +- 5 files changed, 10 insertions(+), 4 deletions(-) diff --git a/data-ingestion-service/build.gradle b/data-ingestion-service/build.gradle index 977f53fdc..6bec6212a 100644 --- a/data-ingestion-service/build.gradle +++ b/data-ingestion-service/build.gradle @@ -225,7 +225,7 @@ dependencies { implementation 'io.prometheus:simpleclient_hotspot:0.16.0' implementation 'io.prometheus:simpleclient_httpserver:0.16.0' implementation 'io.prometheus:simpleclient_pushgateway:0.16.0' - implementation 'io.micrometer:micrometer-registry-prometheus:1.13.4' + implementation 'io.micrometer:micrometer-registry-prometheus:1.15.0-M2' // Apache camel implementation 'org.apache.camel.springboot:camel-spring-boot-starter:4.8.0' diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/repository/IElrDeadLetterRepository.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/repository/IElrDeadLetterRepository.java index e6cf9d790..128f6d6ec 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/repository/IElrDeadLetterRepository.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/repository/IElrDeadLetterRepository.java @@ -20,6 +20,11 @@ public interface IElrDeadLetterRepository extends JpaRepository prodRecord) throws KafkaProducerException { try { // kafkaTemplate.send(prodRecord).get(3, TimeUnit.SECONDS); - String result = future.get(3, TimeUnit.SECONDS); // Wait with a timeout of 5 seconds + String result = future.get(3, TimeUnit.SECONDS); System.out.println("result is..." + result); System.err.println("Printing the send..." ); } catch (TimeoutException | InterruptedException | ExecutionException e) { diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsController.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsController.java index 90ae3ca82..0b4a159bf 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsController.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsController.java @@ -1,6 +1,7 @@ package gov.cdc.dataingestion.rawmessage.controller; import gov.cdc.dataingestion.custommetrics.CustomMetricsBuilder; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.hl7.helper.integration.exception.DiHL7Exception; import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; import gov.cdc.dataingestion.rawmessage.service.RawELRService; @@ -70,7 +71,7 @@ public ElrReportsController(RawELRService rawELRService, ) @PostMapping(consumes = MediaType.TEXT_PLAIN_VALUE, path = "/api/elrs") public ResponseEntity save(@RequestBody final String payload, @RequestHeader("msgType") String type, - @RequestHeader(name = "version", defaultValue = "1") String version) { + @RequestHeader(name = "version", defaultValue = "1") String version) throws KafkaProducerException { if (type.isEmpty()) { throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Required headers should not be null"); } else { diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawELRService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawELRService.java index 376e99efb..003433e29 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawELRService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawELRService.java @@ -89,7 +89,7 @@ public void updateRawMessageAfterRetry(RawERLDto rawElrDto, String version) thro version); } } catch (KafkaProducerException e) { - iElrDeadLetterRepository.addErrorStatusForRawId(rawElrDto.getId(), topicName, rawElrDto.getType(), rawElrDto.getPayload(), errorStatus, dltOccurrence + 1); + iElrDeadLetterRepository.updateDltOccurrenceForRawId(rawElrDto.getId(), dltOccurrence + 1); throw new KafkaProducerException("Failed publishing message again to kafka topic: " + topicName + " with UUID: " + rawElrDto.getId()); } } From 962185e0ad64b5119b0c311d757129fdcf3b30d6 Mon Sep 17 00:00:00 2001 From: Ragul Shanmugam Date: Tue, 11 Mar 2025 17:10:49 -0700 Subject: [PATCH 3/6] Unit tests added --- .../camel/routes/HL7FileProcessComponent.java | 19 +-- .../repository/IElrDeadLetterRepository.java | 8 +- .../service/ElrDeadLetterService.java | 38 +++-- .../deadletter/service/SchedulerConfig.java | 2 +- .../service/KafkaConsumerService.java | 12 +- .../service/KafkaProducerService.java | 24 +-- .../controller/ElrReportsController.java | 28 ++-- .../dto/{RawERLDto.java => RawElrDto.java} | 3 +- ...{RawELRService.java => RawElrService.java} | 84 +++++------ .../report/repository/IRawELRRepository.java | 17 --- .../report/repository/IRawElrRepository.java | 9 ++ .../{RawERLModel.java => RawElrModel.java} | 13 +- .../service/ReportStatusService.java | 10 +- .../integration/validator/HL7v2Validator.java | 6 +- .../validator/interfaces/IHL7v2Validator.java | 4 +- .../routes/HL7FileProcessComponentTest.java | 20 +-- .../service/ElrDeadLetterServiceTest.java | 113 +++++++++++--- .../service/SchedulerConfigTest.java | 53 +++++++ .../service/KafkaConsumerServiceTest.java | 21 +-- .../service/KafkaProducerServiceTest.java | 119 +++++++++++++-- .../rawmessage/RawELRServiceTest.java | 103 ------------- .../rawmessage/RawElrServiceTest.java | 140 ++++++++++++++++++ .../ElrReportsControllerMockTest.java | 15 +- .../controller/ElrReportsControllerTest.java | 37 ++--- .../service/ReportStatusServiceTest.java | 106 ++++++------- .../validator/HL7DuplicateValidatorTest.java | 5 +- .../validator/HL7v2ValidatorTests.java | 12 +- 27 files changed, 635 insertions(+), 386 deletions(-) rename data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/dto/{RawERLDto.java => RawElrDto.java} (88%) rename data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/{RawELRService.java => RawElrService.java} (57%) delete mode 100644 data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawELRRepository.java create mode 100644 data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawElrRepository.java rename data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/model/{RawERLModel.java => RawElrModel.java} (89%) create mode 100644 data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfigTest.java delete mode 100644 data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawELRServiceTest.java create mode 100644 data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawElrServiceTest.java diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/camel/routes/HL7FileProcessComponent.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/camel/routes/HL7FileProcessComponent.java index 671220789..b582850cd 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/camel/routes/HL7FileProcessComponent.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/camel/routes/HL7FileProcessComponent.java @@ -1,7 +1,7 @@ package gov.cdc.dataingestion.camel.routes; -import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; -import gov.cdc.dataingestion.rawmessage.service.RawELRService; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.rawmessage.service.RawElrService; import org.apache.camel.Handler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,10 +19,10 @@ public class HL7FileProcessComponent { private static Logger logger = LoggerFactory.getLogger(HL7FileProcessComponent.class); String msgType = "HL7"; - private RawELRService rawELRService; + private RawElrService rawELRService; @Autowired - public HL7FileProcessComponent(RawELRService rawELRService){ + public HL7FileProcessComponent(RawElrService rawELRService){ this.rawELRService=rawELRService; } @Handler @@ -34,11 +34,12 @@ public String process(String body) { String hl7Str = body; logger.debug("HL7 Message:{}", hl7Str); if (hl7Str != null && !hl7Str.trim().isEmpty()) { - RawERLDto rawERLDto = new RawERLDto(); - rawERLDto.setType(msgType); - rawERLDto.setValidationActive(true); - rawERLDto.setPayload(hl7Str); - elrId = rawELRService.submission(rawERLDto,version); + RawElrDto rawElrDto = new RawElrDto(); + rawElrDto.setType(msgType); + rawElrDto.setValidationActive(true); + rawElrDto.setPayload(hl7Str); + rawElrDto.setVersion(version); + elrId = rawELRService.submission(rawElrDto); } } catch (Exception e) { logger.error(e.getMessage()); diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/repository/IElrDeadLetterRepository.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/repository/IElrDeadLetterRepository.java index 128f6d6ec..e29417f0f 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/repository/IElrDeadLetterRepository.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/repository/IElrDeadLetterRepository.java @@ -22,14 +22,14 @@ public interface IElrDeadLetterRepository extends JpaRepository getAllErrorDltRecordFromKafka(); + List getAllErrorDltRecordForKafkaError(); } \ No newline at end of file diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterService.java index 2dd204264..128d6c385 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterService.java @@ -9,10 +9,10 @@ import gov.cdc.dataingestion.exception.DeadLetterTopicException; import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; -import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; -import gov.cdc.dataingestion.rawmessage.service.RawELRService; -import gov.cdc.dataingestion.report.repository.IRawELRRepository; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.rawmessage.service.RawElrService; +import gov.cdc.dataingestion.report.repository.IRawElrRepository; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.validation.repository.IValidatedELRRepository; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; import lombok.extern.slf4j.Slf4j; @@ -35,10 +35,10 @@ @SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"}) public class ElrDeadLetterService { private final IElrDeadLetterRepository dltRepository; - private final IRawELRRepository rawELRRepository; + private final IRawElrRepository rawELRRepository; private final IValidatedELRRepository validatedELRRepository; private final KafkaProducerService kafkaProducerService; - private final RawELRService rawELRService; + private final RawElrService rawELRService; @Value("${service.timezone}") private String tz = "UTC"; @@ -64,9 +64,9 @@ public class ElrDeadLetterService { public ElrDeadLetterService( IElrDeadLetterRepository dltRepository, - IRawELRRepository rawELRRepository, + IRawElrRepository rawELRRepository, IValidatedELRRepository validatedELRRepository, - KafkaProducerService kafkaProducerService, RawELRService rawELRService) { + KafkaProducerService kafkaProducerService, RawElrService rawELRService) { this.dltRepository = dltRepository; this.rawELRRepository = rawELRRepository; this.validatedELRRepository = validatedELRRepository; @@ -116,7 +116,7 @@ public ElrDeadLetterDto updateAndReprocessingMessage(String id, String body) thr if (!rawRecord.isPresent()) { throw new DeadLetterTopicException(DEAD_LETTER_NULL_EXCEPTION); } - RawERLModel rawModel = rawRecord.get(); + RawElrModel rawModel = rawRecord.get(); rawModel.setPayload(body); rawModel.setUpdatedOn(getCurrentTimeStamp(tz)); @@ -211,27 +211,25 @@ private boolean isValidUUID(String uuidString) { } public void processFailedMessagesFromKafka() throws KafkaProducerException { - List dltMessagesList = dltRepository.getAllErrorDltRecordFromKafka(); - System.out.println("Running dlt scheduler...."); - dltMessagesList.forEach(System.err::println); + List dltMessagesList = dltRepository.getAllErrorDltRecordForKafkaError(); if(!dltMessagesList.isEmpty()) { Iterator iterator = dltMessagesList.iterator(); while (iterator.hasNext()) { ElrDeadLetterModel message = iterator.next(); - System.err.println(message); - RawERLDto rawELRDto = new RawERLDto(); - rawELRDto.setId(message.getErrorMessageId()); - rawELRDto.setType(getElrMessageType(message.getDltStatus())); - rawELRDto.setPayload(message.getMessage()); - rawELRDto.setValidationActive(true); + System.err.println(message.getErrorMessageId()); + RawElrDto rawElrDto = new RawElrDto(); + rawElrDto.setId(message.getErrorMessageId()); + rawElrDto.setType(getElrMessageType(message.getDltStatus())); + rawElrDto.setPayload(message.getMessage()); + rawElrDto.setValidationActive(true); dltRepository.updateErrorStatusForRawId(message.getErrorMessageId(), "PROCESSED"); - rawELRService.updateRawMessageAfterRetry(rawELRDto, "2"); + rawELRService.updateRawMessageAfterRetry(rawElrDto, 2); iterator.remove(); } } } - private String getElrMessageType(String dltStatus) { + String getElrMessageType(String dltStatus) { String delimiter = "KAFKA_ERROR"; int delimiterIndex = dltStatus.indexOf(delimiter); diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfig.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfig.java index a217fc4fa..e86ece51c 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfig.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfig.java @@ -13,7 +13,7 @@ public class SchedulerConfig { @Value("${dlt.scheduler.enabled}") private boolean isSchedulerEnabled; - private ElrDeadLetterService elrDeadLetterService; + private final ElrDeadLetterService elrDeadLetterService; public SchedulerConfig(ElrDeadLetterService elrDeadLetterService) { this.elrDeadLetterService = elrDeadLetterService; diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java index db5c56e96..d871a60e7 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java @@ -19,8 +19,8 @@ import gov.cdc.dataingestion.nbs.converters.Hl7ToRhapsodysXmlConverter; import gov.cdc.dataingestion.nbs.repository.model.NbsInterfaceModel; import gov.cdc.dataingestion.nbs.services.NbsRepositoryServiceProvider; -import gov.cdc.dataingestion.report.repository.IRawELRRepository; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.report.repository.IRawElrRepository; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.reportstatus.model.ReportStatusIdData; import gov.cdc.dataingestion.reportstatus.repository.IReportStatusRepository; import gov.cdc.dataingestion.validation.integration.validator.interfaces.IHL7DuplicateValidator; @@ -86,7 +86,7 @@ public class KafkaConsumerService { private String prepFhirTopic = "fhir_prep"; private final KafkaProducerService kafkaProducerService; private final IHL7v2Validator iHl7v2Validator; - private final IRawELRRepository iRawELRRepository; + private final IRawElrRepository iRawELRRepository; private final IValidatedELRRepository iValidatedELRRepository; private final IHL7DuplicateValidator iHL7DuplicateValidator; private final NbsRepositoryServiceProvider nbsRepositoryServiceProvider; @@ -105,7 +105,7 @@ public class KafkaConsumerService { //region CONSTRUCTOR public KafkaConsumerService( IValidatedELRRepository iValidatedELRRepository, - IRawELRRepository iRawELRRepository, + IRawElrRepository iRawELRRepository, KafkaProducerService kafkaProducerService, IHL7v2Validator iHl7v2Validator, IHL7DuplicateValidator iHL7DuplicateValidator, @@ -565,8 +565,8 @@ private void xmlConversionHandler(String message, String operation, String dataP xmlConversionHandlerProcessing(message, operation, dataProcessingEnable); } private void validationHandler(String message, boolean hl7ValidationActivated, String dataProcessingEnable) throws DuplicateHL7FileFoundException, DiHL7Exception, KafkaProducerException { - Optional rawElrResponse = this.iRawELRRepository.findById(message); - RawERLModel elrModel; + Optional rawElrResponse = this.iRawELRRepository.findById(message); + RawElrModel elrModel; if (!rawElrResponse.isEmpty()) { elrModel = rawElrResponse.get(); } else { diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaProducerService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaProducerService.java index 2a81ead76..cbcae4262 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaProducerService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaProducerService.java @@ -6,14 +6,13 @@ import gov.cdc.dataingestion.constant.enums.EnumKafkaOperation; import gov.cdc.dataingestion.exception.ConversionPrepareException; import gov.cdc.dataingestion.exception.KafkaProducerException; -import gov.cdc.dataingestion.report.repository.IRawELRRepository; +import gov.cdc.dataingestion.report.repository.IRawElrRepository; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.stereotype.Service; -import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.concurrent.*; @@ -35,9 +34,9 @@ public class KafkaProducerService { private final KafkaTemplate kafkaTemplate; - private final IRawELRRepository iRawELRRepository; + private final IRawElrRepository iRawELRRepository; - public KafkaProducerService(KafkaTemplate kafkaTemplate, IRawELRRepository iRawELRRepository) { + public KafkaProducerService(KafkaTemplate kafkaTemplate, IRawElrRepository iRawELRRepository) { this.kafkaTemplate = kafkaTemplate; this.iRawELRRepository = iRawELRRepository; } @@ -175,27 +174,12 @@ public void sendMessageAfterCheckingDuplicateHL7(ValidatedELRModel msg, String v sendMessage(prodRecord); } - ExecutorService executor = Executors.newSingleThreadExecutor(); - Future future = executor.submit(() -> { - Thread.sleep(1000000000); - return "Task completed"; - }); - - - private void sendMessage(ProducerRecord prodRecord) throws KafkaProducerException { try { -// kafkaTemplate.send(prodRecord).get(3, TimeUnit.SECONDS); - String result = future.get(3, TimeUnit.SECONDS); - System.out.println("result is..." + result); - System.err.println("Printing the send..." ); + kafkaTemplate.send(prodRecord).get(3, TimeUnit.SECONDS); } catch (TimeoutException | InterruptedException | ExecutionException e) { - System.err.println("Printing the exception..." + Arrays.toString(e.getStackTrace())); -// logger.error - System.err.println("Failed publishing message to kafka topic: " + prodRecord.topic() + " with UUID: " + prodRecord.value()); throw new KafkaProducerException("Failed publishing message to kafka topic: " + prodRecord.topic() + " with UUID: " + prodRecord.value()); } } - } \ No newline at end of file diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsController.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsController.java index 0b4a159bf..1afe7f0c4 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsController.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsController.java @@ -3,8 +3,8 @@ import gov.cdc.dataingestion.custommetrics.CustomMetricsBuilder; import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.hl7.helper.integration.exception.DiHL7Exception; -import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; -import gov.cdc.dataingestion.rawmessage.service.RawELRService; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.rawmessage.service.RawElrService; import gov.cdc.dataingestion.validation.services.interfaces.IHL7Service; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -39,12 +39,12 @@ @SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"}) public class ElrReportsController { - private final RawELRService rawELRService; + private final RawElrService rawELRService; private final CustomMetricsBuilder customMetricsBuilder; private IHL7Service hl7Service; @Autowired - public ElrReportsController(RawELRService rawELRService, + public ElrReportsController(RawElrService rawELRService, CustomMetricsBuilder customMetricsBuilder, IHL7Service hl7Service) { this.rawELRService = rawELRService; @@ -78,20 +78,22 @@ public ResponseEntity save(@RequestBody final String payload, @RequestHe type = type.toUpperCase(); } - RawERLDto rawERLDto = new RawERLDto(); + RawElrDto rawElrDto = new RawElrDto(); customMetricsBuilder.incrementMessagesProcessed(); if (type.equalsIgnoreCase(HL7_ELR)) { - rawERLDto.setType(type); - rawERLDto.setPayload(payload); - rawERLDto.setValidationActive(true); - return ResponseEntity.ok(rawELRService.submission(rawERLDto, version)); + rawElrDto.setType(type); + rawElrDto.setPayload(payload); + rawElrDto.setValidationActive(true); + rawElrDto.setVersion(version); + return ResponseEntity.ok(rawELRService.submission(rawElrDto)); } else if (type.equalsIgnoreCase(XML_ELR)) { - rawERLDto.setType(type); - rawERLDto.setPayload(payload); - rawERLDto.setValidationActive(true); - return ResponseEntity.ok(rawELRService.submission(rawERLDto, version)); + rawElrDto.setType(type); + rawElrDto.setPayload(payload); + rawElrDto.setValidationActive(true); + rawElrDto.setVersion(version); + return ResponseEntity.ok(rawELRService.submission(rawElrDto)); } else { throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Please provide valid value for msgType header"); diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/dto/RawERLDto.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/dto/RawElrDto.java similarity index 88% rename from data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/dto/RawERLDto.java rename to data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/dto/RawElrDto.java index 1e525ed40..dde60c6d4 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/dto/RawERLDto.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/dto/RawElrDto.java @@ -10,11 +10,12 @@ 1135 - todos complaint * */ @SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"}) -public class RawERLDto { +public class RawElrDto { private String id; private String type; private String payload; + private String version; private Boolean validationActive = false; } diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawELRService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawElrService.java similarity index 57% rename from data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawELRService.java rename to data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawElrService.java index 003433e29..832155d68 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawELRService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawElrService.java @@ -3,9 +3,9 @@ import gov.cdc.dataingestion.deadletter.repository.IElrDeadLetterRepository; import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; -import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; -import gov.cdc.dataingestion.report.repository.IRawELRRepository; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.report.repository.IRawElrRepository; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -18,7 +18,7 @@ @Service @RequiredArgsConstructor @Slf4j -public class RawELRService { +public class RawElrService { private static final String CREATED_BY = "admin"; @Value("${kafka.raw.topic}") @@ -30,45 +30,41 @@ public class RawELRService { @Value("${service.timezone}") private String tz = "UTC"; - private final String errorStatus = "Sending event to elr_raw kafka topic failed"; - - private final IRawELRRepository rawELRRepository; + private final IRawElrRepository rawElrRepository; private final KafkaProducerService kafkaProducerService; private final IElrDeadLetterRepository iElrDeadLetterRepository; - - public String submission(RawERLDto rawERLDto, String version) throws KafkaProducerException { - RawERLModel created = rawELRRepository.save(convert(rawERLDto)); + public String submission(RawElrDto rawElrDto) throws KafkaProducerException { + RawElrModel created = rawElrRepository.save(convert(rawElrDto)); int dltOccurrence = 0; try { - if(rawERLDto.getType().equalsIgnoreCase(HL7_ELR)) { + if(rawElrDto.getType().equalsIgnoreCase(HL7_ELR)) { kafkaProducerService.sendMessageFromController( created.getId(), topicName, - rawERLDto.getType(), + rawElrDto.getType(), dltOccurrence, - rawERLDto.getValidationActive(), - version); + rawElrDto.getValidationActive(), + rawElrDto.getVersion()); } - if(rawERLDto.getType().equalsIgnoreCase(XML_ELR)) { + if(rawElrDto.getType().equalsIgnoreCase(XML_ELR)) { kafkaProducerService.sendElrXmlMessageFromController( created.getId(), rawXmlTopicName, - rawERLDto.getType(), + rawElrDto.getType(), dltOccurrence, - rawERLDto.getPayload(), - version); + rawElrDto.getPayload(), + rawElrDto.getVersion()); } } catch (KafkaProducerException e) { + String errorStatus = "Sending event to elr_raw kafka topic failed"; iElrDeadLetterRepository.addErrorStatusForRawId(created.getId(), topicName, created.getType(), created.getPayload(), errorStatus, dltOccurrence + 1); throw new KafkaProducerException("Failed publishing message to kafka topic: " + topicName + " with UUID: " + created.getId()); } return created.getId(); } - public void updateRawMessageAfterRetry(RawERLDto rawElrDto, String version) throws KafkaProducerException { - rawELRRepository.updateRawMessageWithNewVersion(rawElrDto.getId(), "Reprocessed"); - int dltOccurrence = 1; + public void updateRawMessageAfterRetry(RawElrDto rawElrDto, int dltOccurrence) throws KafkaProducerException { try { if(rawElrDto.getType().equalsIgnoreCase(HL7_ELR)) { kafkaProducerService.sendMessageFromController( @@ -77,7 +73,7 @@ public void updateRawMessageAfterRetry(RawERLDto rawElrDto, String version) thro rawElrDto.getType(), dltOccurrence + 1, rawElrDto.getValidationActive(), - version); + rawElrDto.getVersion()); } if(rawElrDto.getType().equalsIgnoreCase(XML_ELR)) { kafkaProducerService.sendElrXmlMessageFromController( @@ -86,38 +82,36 @@ public void updateRawMessageAfterRetry(RawERLDto rawElrDto, String version) thro rawElrDto.getType(), dltOccurrence + 1, rawElrDto.getPayload(), - version); + rawElrDto.getVersion()); } } catch (KafkaProducerException e) { - iElrDeadLetterRepository.updateDltOccurrenceForRawId(rawElrDto.getId(), dltOccurrence + 1); + iElrDeadLetterRepository.updateDltOccurrenceForRawId(rawElrDto.getId(), dltOccurrence + 1, "ERROR"); throw new KafkaProducerException("Failed publishing message again to kafka topic: " + topicName + " with UUID: " + rawElrDto.getId()); } } - public RawERLDto getById(String id) { - RawERLModel rawERLModel = rawELRRepository.getById(id); -// rawELRRepository. - return convert(rawERLModel); + public RawElrDto getById(String id) { + RawElrModel rawElrModel = rawElrRepository.getById(id); + return convert(rawElrModel); } - private RawERLModel convert(RawERLDto rawERLDto) { - - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setType(rawERLDto.getType()); - rawERLModel.setPayload(rawERLDto.getPayload()); - rawERLModel.setCreatedOn(getCurrentTimeStamp(tz)); - rawERLModel.setUpdatedOn(getCurrentTimeStamp(tz)); - rawERLModel.setCreatedBy(CREATED_BY); - rawERLModel.setUpdatedBy(CREATED_BY); - return rawERLModel; + private RawElrModel convert(RawElrDto rawElrDto) { + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setType(rawElrDto.getType()); + rawElrModel.setPayload(rawElrDto.getPayload()); + rawElrModel.setVersion(rawElrDto.getVersion()); + rawElrModel.setCreatedOn(getCurrentTimeStamp(tz)); + rawElrModel.setUpdatedOn(getCurrentTimeStamp(tz)); + rawElrModel.setCreatedBy(CREATED_BY); + rawElrModel.setUpdatedBy(CREATED_BY); + return rawElrModel; } - private RawERLDto convert(RawERLModel rawERLModel) { - - RawERLDto rawERLDto = new RawERLDto(); - rawERLDto.setId(rawERLModel.getId()); - rawERLDto.setType(rawERLModel.getType()); - rawERLDto.setPayload(rawERLModel.getPayload()); - return rawERLDto; + private RawElrDto convert(RawElrModel rawElrModel) { + RawElrDto rawElrDto = new RawElrDto(); + rawElrDto.setId(rawElrModel.getId()); + rawElrDto.setType(rawElrModel.getType()); + rawElrDto.setPayload(rawElrModel.getPayload()); + return rawElrDto; } } diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawELRRepository.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawELRRepository.java deleted file mode 100644 index 80e0c85df..000000000 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawELRRepository.java +++ /dev/null @@ -1,17 +0,0 @@ -package gov.cdc.dataingestion.report.repository; - -import gov.cdc.dataingestion.report.repository.model.RawERLModel; -import jakarta.transaction.Transactional; -import org.springframework.data.jpa.repository.JpaRepository; -import org.springframework.data.jpa.repository.Modifying; -import org.springframework.data.jpa.repository.Query; -import org.springframework.stereotype.Repository; - -@Repository -public interface IRawELRRepository extends JpaRepository { - - @Modifying - @Transactional - @Query(value = "UPDATE elr_raw SET status = :status WHERE id = :id", nativeQuery = true) - void updateRawMessageWithNewVersion(String id, String status); -} \ No newline at end of file diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawElrRepository.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawElrRepository.java new file mode 100644 index 000000000..78adc53df --- /dev/null +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/IRawElrRepository.java @@ -0,0 +1,9 @@ +package gov.cdc.dataingestion.report.repository; + +import gov.cdc.dataingestion.report.repository.model.RawElrModel; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface IRawElrRepository extends JpaRepository { +} \ No newline at end of file diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/model/RawERLModel.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/model/RawElrModel.java similarity index 89% rename from data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/model/RawERLModel.java rename to data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/model/RawElrModel.java index a0dec6a66..e06c83cc3 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/model/RawERLModel.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/report/repository/model/RawElrModel.java @@ -14,7 +14,7 @@ 1135 - todos complaint * */ @SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"}) -public class RawERLModel { +public class RawElrModel { @Id @GenericGenerator(name = "generator", strategy = "guid", parameters = {}) @@ -26,6 +26,9 @@ public class RawERLModel { private String type; private String payload; + @Column(name = "version") + private String version; + @Column(name = "created_on") private Timestamp createdOn; @@ -93,4 +96,12 @@ public String getUpdatedBy() { public void setUpdatedBy(String updatedBy) { this.updatedBy = updatedBy; } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } } diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/reportstatus/service/ReportStatusService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/reportstatus/service/ReportStatusService.java index 801071999..e258fb0b4 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/reportstatus/service/ReportStatusService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/reportstatus/service/ReportStatusService.java @@ -7,8 +7,8 @@ import gov.cdc.dataingestion.odse.repository.IEdxActivityParentLogRepository; import gov.cdc.dataingestion.odse.repository.model.EdxActivityDetailLog; import gov.cdc.dataingestion.odse.repository.model.EdxActivityLog; -import gov.cdc.dataingestion.report.repository.IRawELRRepository; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.report.repository.IRawElrRepository; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.reportstatus.model.DltMessageStatus; import gov.cdc.dataingestion.reportstatus.model.EdxActivityLogStatus; import gov.cdc.dataingestion.reportstatus.model.MessageStatus; @@ -39,7 +39,7 @@ public class ReportStatusService { private final IEdxActivityParentLogRepository iEdxActivityParentLogRepository; private final NbsInterfaceRepository nbsInterfaceRepository; - private final IRawELRRepository iRawELRRepository; + private final IRawElrRepository iRawELRRepository; private final IValidatedELRRepository iValidatedELRRepository; private final IElrDeadLetterRepository iElrDeadLetterRepository; private final IEdxActivityLogRepository iEdxActivityLogRepository; @@ -51,7 +51,7 @@ public class ReportStatusService { public ReportStatusService(IReportStatusRepository iReportStatusRepository, IEdxActivityParentLogRepository iEdxActivityParentLogRepository, NbsInterfaceRepository nbsInterfaceRepository, - IRawELRRepository iRawELRRepository, + IRawElrRepository iRawELRRepository, IValidatedELRRepository iValidatedELRRepository, IElrDeadLetterRepository iElrDeadLetterRepository, IEdxActivityLogRepository iEdxActivityLogRepository) { @@ -67,7 +67,7 @@ public ReportStatusService(IReportStatusRepository iReportStatusRepository, @SuppressWarnings("java:S3776") public MessageStatus getMessageStatus(String rawMessageID) { MessageStatus msgStatus = new MessageStatus(); - Optional rawMessageData = iRawELRRepository.findById(rawMessageID); + Optional rawMessageData = iRawELRRepository.findById(rawMessageID); if (!rawMessageData.isEmpty()) { msgStatus.getRawInfo().setRawMessageId(rawMessageData.get().getId()); //msgStatus.getRawInfo().setRawPayload(Base64.getEncoder().encodeToString(rawMessageData.get().getPayload().getBytes())); diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/HL7v2Validator.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/HL7v2Validator.java index 05681cc64..d20888ddd 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/HL7v2Validator.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/HL7v2Validator.java @@ -3,7 +3,7 @@ import gov.cdc.dataingestion.constant.enums.EnumMessageType; import gov.cdc.dataingestion.hl7.helper.HL7Helper; import gov.cdc.dataingestion.hl7.helper.integration.exception.DiHL7Exception; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.validation.integration.validator.interfaces.IHL7v2Validator; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; @@ -32,8 +32,8 @@ public String processFhsMessage (String message) { return this.hl7Helper.processFhsMessage(message); } - public ValidatedELRModel messageValidation(String id, RawERLModel rawERLModel, String topicName, boolean validationActive) throws DiHL7Exception { - String replaceSpecialCharacters = messageStringValidation(rawERLModel.getPayload()); + public ValidatedELRModel messageValidation(String id, RawElrModel rawElrModel, String topicName, boolean validationActive) throws DiHL7Exception { + String replaceSpecialCharacters = messageStringValidation(rawElrModel.getPayload()); // validationActive check is obsoleted if (validationActive) { diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/interfaces/IHL7v2Validator.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/interfaces/IHL7v2Validator.java index 610ba1ebb..0aed61cc8 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/interfaces/IHL7v2Validator.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/validation/integration/validator/interfaces/IHL7v2Validator.java @@ -1,10 +1,10 @@ package gov.cdc.dataingestion.validation.integration.validator.interfaces; import gov.cdc.dataingestion.hl7.helper.integration.exception.DiHL7Exception; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; public interface IHL7v2Validator { - ValidatedELRModel messageValidation(String message, RawERLModel rawERLModel, String topicName, boolean validationActive) throws DiHL7Exception; + ValidatedELRModel messageValidation(String message, RawElrModel rawElrModel, String topicName, boolean validationActive) throws DiHL7Exception; String hl7MessageValidation(String message) throws DiHL7Exception; String messageStringValidation(String message) throws DiHL7Exception; String processFhsMessage (String message); diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/camel/routes/HL7FileProcessComponentTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/camel/routes/HL7FileProcessComponentTest.java index 6f2aedf70..e58aa911d 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/camel/routes/HL7FileProcessComponentTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/camel/routes/HL7FileProcessComponentTest.java @@ -1,7 +1,8 @@ package gov.cdc.dataingestion.camel.routes; -import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; -import gov.cdc.dataingestion.rawmessage.service.RawELRService; +import gov.cdc.dataingestion.exception.KafkaProducerException; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.rawmessage.service.RawElrService; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -19,7 +20,7 @@ @SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"}) class HL7FileProcessComponentTest { @Mock - private RawELRService rawELRService; + private RawElrService rawELRService; @InjectMocks private HL7FileProcessComponent hL7FileProcessComponent; @@ -30,16 +31,17 @@ public void setUp() { } @Test - void testSaveHL7Message() { + void testSaveHL7Message() throws KafkaProducerException { String hl7Payload = "testmessage"; String messageType = "HL7"; - RawERLDto rawERLDto = new RawERLDto(); - rawERLDto.setType(messageType); - rawERLDto.setPayload(hl7Payload); - rawERLDto.setValidationActive(true); + RawElrDto rawElrDto = new RawElrDto(); + rawElrDto.setType(messageType); + rawElrDto.setPayload(hl7Payload); + rawElrDto.setValidationActive(true); + rawElrDto.setVersion("1"); - when(rawELRService.submission(rawERLDto,"1")).thenReturn("OK"); + when(rawELRService.submission(rawElrDto)).thenReturn("OK"); String status = hL7FileProcessComponent.process(hl7Payload); Assertions.assertEquals("OK",status); } diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterServiceTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterServiceTest.java index e8a55cdbb..419ff417d 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterServiceTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterServiceTest.java @@ -5,11 +5,15 @@ import gov.cdc.dataingestion.deadletter.repository.IElrDeadLetterRepository; import gov.cdc.dataingestion.deadletter.repository.model.ElrDeadLetterModel; import gov.cdc.dataingestion.exception.DeadLetterTopicException; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; -import gov.cdc.dataingestion.report.repository.IRawELRRepository; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.rawmessage.service.RawElrService; +import gov.cdc.dataingestion.report.repository.IRawElrRepository; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.validation.repository.IValidatedELRRepository; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -27,7 +31,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; + /** 1118 - require constructor complaint 125 - comment complaint @@ -41,7 +46,7 @@ class ElrDeadLetterServiceTest { private IElrDeadLetterRepository dltRepository; @Mock - private IRawELRRepository rawELRRepository; + private IRawElrRepository rawELRRepository; @Mock private IValidatedELRRepository validatedELRRepository; @@ -52,12 +57,18 @@ class ElrDeadLetterServiceTest { @InjectMocks private ElrDeadLetterService elrDeadLetterService; + @Mock + private RawElrService rawElrService; + + @Mock + private RawElrDto rawElrDto; + private String guidForTesting = "8DC5E410-4A2E-4018-8C28-A4F6AB99E802"; @BeforeEach public void setUpEach() { MockitoAnnotations.openMocks(this); - elrDeadLetterService = new ElrDeadLetterService(dltRepository, rawELRRepository, validatedELRRepository, kafkaProducerService); + elrDeadLetterService = new ElrDeadLetterService(dltRepository, rawELRRepository, validatedELRRepository, kafkaProducerService, rawElrService); } @@ -107,7 +118,7 @@ void testGetAllErrorDltRecord_Success() { List listData = new ArrayList<>(); listData.add(model); - RawERLModel rawModel = new RawERLModel(); + RawElrModel rawModel = new RawElrModel(); rawModel.setId(guidForTesting); rawModel.setPayload("HL7 message"); @@ -124,7 +135,7 @@ void testGetAllErrorDltRecord_NoDataFound() { } @Test - void testUpdateAndReprocessingMessage_RawElr_Success() throws DeadLetterTopicException { + void testUpdateAndReprocessingMessage_RawElr_Success() throws DeadLetterTopicException, KafkaProducerException { String primaryIdForTesting = guidForTesting; ElrDeadLetterModel elrDltModel = new ElrDeadLetterModel(); @@ -133,31 +144,26 @@ void testUpdateAndReprocessingMessage_RawElr_Success() throws DeadLetterTopicExc elrDltModel.setErrorMessageSource("elr_raw"); - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setPayload("HL7 message"); - rawERLModel.setId(elrDltModel.getErrorMessageId()); + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setPayload("HL7 message"); + rawElrModel.setId(elrDltModel.getErrorMessageId()); elrDltModel.setDltStatus(EnumElrDltStatus.ERROR.name()); when(dltRepository.findById(elrDltModel.getErrorMessageId())) .thenReturn(Optional.of(elrDltModel)); when(rawELRRepository.findById(elrDltModel.getErrorMessageId())) - .thenReturn(Optional.of(rawERLModel)); - when(rawELRRepository.save(rawERLModel)).thenReturn(rawERLModel); + .thenReturn(Optional.of(rawElrModel)); + when(rawELRRepository.save(rawElrModel)).thenReturn(rawElrModel); when(dltRepository.save(any(ElrDeadLetterModel.class))).thenReturn(elrDltModel); var result = elrDeadLetterService.updateAndReprocessingMessage(primaryIdForTesting, "HL7 message"); assertEquals("HL7 message", result.getMessage()); assertEquals(1, result.getDltOccurrence()); - - - - - } @Test - void testUpdateAndReprocessingMessage_ValidatedElr_Success() throws DeadLetterTopicException { + void testUpdateAndReprocessingMessage_ValidatedElr_Success() throws DeadLetterTopicException, KafkaProducerException { String primaryIdForTesting = guidForTesting; ElrDeadLetterModel elrDltModel = new ElrDeadLetterModel(); @@ -186,7 +192,7 @@ void testUpdateAndReprocessingMessage_ValidatedElr_Success() throws DeadLetterTo } @Test - void testUpdateAndReprocessingMessage_FhirPrep_Success() throws DeadLetterTopicException { + void testUpdateAndReprocessingMessage_FhirPrep_Success() throws DeadLetterTopicException, KafkaProducerException { String primaryIdForTesting = guidForTesting; ElrDeadLetterModel elrDltModel = new ElrDeadLetterModel(); @@ -209,7 +215,7 @@ void testUpdateAndReprocessingMessage_FhirPrep_Success() throws DeadLetterTopicE } @Test - void testUpdateAndReprocessingMessage_XmlPrep_Success() throws DeadLetterTopicException { + void testUpdateAndReprocessingMessage_XmlPrep_Success() throws DeadLetterTopicException, KafkaProducerException { String primaryIdForTesting = guidForTesting; ElrDeadLetterModel elrDltModel = new ElrDeadLetterModel(); @@ -272,4 +278,71 @@ void testSaveDltRecord() { assertEquals(savedDto.getUpdatedBy(), dto.getUpdatedBy()); } + @Test + void testProcessFailedMessagesFromKafka_WithMessages() throws KafkaProducerException { + List dltMessagesList = getElrDeadLetterModels(); + + when(dltRepository.getAllErrorDltRecordForKafkaError()).thenReturn(dltMessagesList); + + elrDeadLetterService.processFailedMessagesFromKafka(); + + verify(dltRepository, times(2)).updateErrorStatusForRawId(anyString(), anyString()); + verify(rawElrService, times(2)).updateRawMessageAfterRetry(any(RawElrDto.class), anyInt()); //This line is correct. + + verify(dltRepository).updateErrorStatusForRawId("test1", "PROCESSED"); + verify(dltRepository).updateErrorStatusForRawId("test2", "PROCESSED"); + } + + private static @NotNull List getElrDeadLetterModels() { + List dltMessagesList = new ArrayList<>(); + ElrDeadLetterModel message1 = new ElrDeadLetterModel(); + message1.setErrorMessageId("test1"); + message1.setDltStatus("KAFKA_ERROR_HL7"); + message1.setMessage("test_hl7_payload"); + dltMessagesList.add(message1); + + ElrDeadLetterModel message2 = new ElrDeadLetterModel(); + message2.setErrorMessageId("test2"); + message2.setDltStatus("KAFKA_ERROR_HL7_XML"); + message2.setMessage("test_hl7_xml_payload"); + dltMessagesList.add(message2); + return dltMessagesList; + } + + @Test + void testProcessFailedMessagesFromKafka_NoMessages() throws KafkaProducerException { + when(dltRepository.getAllErrorDltRecordForKafkaError()).thenReturn(new ArrayList<>()); + + elrDeadLetterService.processFailedMessagesFromKafka(); + + verify(dltRepository, times(0)).updateErrorStatusForRawId(anyString(), anyString()); + verify(rawElrService, times(0)).updateRawMessageAfterRetry(any(RawElrDto.class), anyInt()); + } + + @Test + void testGetElrMessageType_WithDelimiter() { + String dltStatus = "KAFKA_ERROR_HL7"; + + String result = elrDeadLetterService.getElrMessageType(dltStatus); + + assertEquals("HL7", result); + } + + @Test + void testGetElrMessageType_WithDelimiterXml() { + String dltStatus = "KAFKA_ERROR_HL7_XML"; + + String result = elrDeadLetterService.getElrMessageType(dltStatus); + + assertEquals("HL7_XML", result); + } + + @Test + void testGetElrMessageType_NoTypeDelimiter() { + String dltStatus = "SOMEPREFIXKAFKAERROR"; + + String result = elrDeadLetterService.getElrMessageType(dltStatus); + + assertEquals("", result); + } } diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfigTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfigTest.java new file mode 100644 index 000000000..f24501889 --- /dev/null +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfigTest.java @@ -0,0 +1,53 @@ +package gov.cdc.dataingestion.deadletter.service; + +import gov.cdc.dataingestion.exception.KafkaProducerException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.test.util.ReflectionTestUtils; + +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +@EnableScheduling +class SchedulerConfigTest { + @Test + void testScheduleTask_SchedulerEnabled() throws KafkaProducerException { + ElrDeadLetterService mockService = mock(ElrDeadLetterService.class); + SchedulerConfig schedulerConfig = new SchedulerConfig(mockService); + ReflectionTestUtils.setField(schedulerConfig, "isSchedulerEnabled", true); + + schedulerConfig.scheduleTask(); + + verify(mockService, times(1)).processFailedMessagesFromKafka(); + } + + @Test + void testScheduleTask_SchedulerDisabled() throws KafkaProducerException { + ElrDeadLetterService mockService = mock(ElrDeadLetterService.class); + SchedulerConfig schedulerConfig = new SchedulerConfig(mockService); + ReflectionTestUtils.setField(schedulerConfig, "isSchedulerEnabled", false); + + schedulerConfig.scheduleTask(); + + verify(mockService, never()).processFailedMessagesFromKafka(); + } + + @Test + void testScheduleTask_ServiceThrowsException() throws KafkaProducerException { + ElrDeadLetterService mockService = mock(ElrDeadLetterService.class); + doThrow(new KafkaProducerException("Test exception")) + .when(mockService).processFailedMessagesFromKafka(); + SchedulerConfig schedulerConfig = new SchedulerConfig(mockService); + ReflectionTestUtils.setField(schedulerConfig, "isSchedulerEnabled", true); + + try { + schedulerConfig.scheduleTask(); + } catch (KafkaProducerException e) { + verify(mockService, times(1)).processFailedMessagesFromKafka(); + return; + } + throw new AssertionError("Expected KafkaProducerException was not thrown."); + } +} \ No newline at end of file diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaConsumerServiceTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaConsumerServiceTest.java index 04395fd03..4708e770c 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaConsumerServiceTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaConsumerServiceTest.java @@ -7,6 +7,7 @@ import gov.cdc.dataingestion.deadletter.model.ElrDeadLetterDto; import gov.cdc.dataingestion.deadletter.repository.IElrDeadLetterRepository; import gov.cdc.dataingestion.deadletter.repository.model.ElrDeadLetterModel; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.exception.XmlConversionException; import gov.cdc.dataingestion.hl7.helper.integration.exception.DiHL7Exception; import gov.cdc.dataingestion.kafka.integration.service.KafkaConsumerService; @@ -15,8 +16,8 @@ import gov.cdc.dataingestion.nbs.repository.model.NbsInterfaceModel; import gov.cdc.dataingestion.nbs.services.NbsRepositoryServiceProvider; import gov.cdc.dataingestion.nbs.services.EcrMsgQueryService; -import gov.cdc.dataingestion.report.repository.IRawELRRepository; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.report.repository.IRawElrRepository; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.reportstatus.model.ReportStatusIdData; import gov.cdc.dataingestion.reportstatus.repository.IReportStatusRepository; import gov.cdc.dataingestion.validation.integration.validator.interfaces.IHL7DuplicateValidator; @@ -73,7 +74,7 @@ class KafkaConsumerServiceTest { @Mock private IHL7v2Validator iHl7v2Validator; @Mock - private IRawELRRepository iRawELRRepository; + private IRawElrRepository iRawELRRepository; @Mock private IValidatedELRRepository iValidatedELRRepository; @Mock @@ -188,7 +189,7 @@ void rawConsumerTest() throws DiHL7Exception { ConsumerRecord firstRecord = records.iterator().next(); String value = firstRecord.value(); - RawERLModel rawModel = new RawERLModel(); + RawElrModel rawModel = new RawElrModel(); rawModel.setId(guidForTesting); rawModel.setType("HL7"); @@ -227,7 +228,7 @@ void rawConsumerTestRawRecordNotFound() { ConsumerRecord firstRecord = records.iterator().next(); String value = firstRecord.value(); - RawERLModel rawModel = new RawERLModel(); + RawElrModel rawModel = new RawElrModel(); rawModel.setId(guidForTesting); rawModel.setType("HL7"); @@ -422,7 +423,7 @@ void xmlPreparationConsumerTest() { @Test - void xmlPreparationConsumerTestNewFlow() throws XmlConversionException { + void xmlPreparationConsumerTestNewFlow() throws XmlConversionException, KafkaProducerException { // Produce a test message to the topic @@ -454,7 +455,7 @@ void xmlPreparationConsumerTestNewFlow() throws XmlConversionException { } @Test - void xmlPreparationConsumerTestNewFlow_Exception() { + void xmlPreparationConsumerTestNewFlow_Exception() throws KafkaProducerException { when(iValidatedELRRepository.findById(any())).thenReturn(Optional.empty()); kafkaConsumerService.xmlConversionHandlerProcessing("123", EnumKafkaOperation.INJECTION.name(), "true"); @@ -463,7 +464,7 @@ void xmlPreparationConsumerTestNewFlow_Exception() { } @Test - void xmlPreparationConsumerTestNewFlow_KafkaExistResult() { + void xmlPreparationConsumerTestNewFlow_KafkaExistResult() throws KafkaProducerException { // Produce a test message to the topic @@ -651,7 +652,7 @@ void dltHandlerLogicPipelineCustom() { initialDataInsertionAndSelection(rawTopic); String message = guidForTesting; - RawERLModel rawModel = new RawERLModel(); + RawElrModel rawModel = new RawElrModel(); rawModel.setId(guidForTesting); rawModel.setType("HL7"); rawModel.setPayload(testHL7Message); @@ -669,7 +670,7 @@ void dltHandlerLogicForRawPipeline() { initialDataInsertionAndSelection(rawTopic); String message = guidForTesting; - RawERLModel rawModel = new RawERLModel(); + RawElrModel rawModel = new RawElrModel(); rawModel.setId(guidForTesting); rawModel.setType("HL7"); rawModel.setPayload(testHL7Message); diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaProducerServiceTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaProducerServiceTest.java index ab3aee50c..0219e521e 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaProducerServiceTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaProducerServiceTest.java @@ -2,10 +2,12 @@ import gov.cdc.dataingestion.constant.TopicPreparationType; import gov.cdc.dataingestion.exception.ConversionPrepareException; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -13,14 +15,20 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; import org.testcontainers.junit.jupiter.Testcontainers; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; + @ExtendWith(MockitoExtension.class) @Testcontainers /** @@ -46,34 +54,49 @@ public static void tearDown() { } @Test - void testSendMessageFromController() { + void testSendMessageFromController() throws KafkaProducerException { String msg = "test message"; String topic = "test-topic"; String msgType = "test-type"; Integer dltOccurrence = 1; + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessageFromController(msg, topic, msgType, dltOccurrence, false, "false"); verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); } @Test - void testSendMessageAfterValidatingMessage() { + void testSendMessageAfterValidatingMessage() throws KafkaProducerException { String topic = "test-topic"; ValidatedELRModel model = new ValidatedELRModel(); model.setMessageType("test"); model.setId("test"); model.setMessageVersion("1"); + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessageAfterValidatingMessage(model, topic, 1, "false"); verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); } @Test - void testSendMessagePreparationTopicXML() throws ConversionPrepareException { + void testSendMessagePreparationTopicXML() throws ConversionPrepareException, KafkaProducerException { var topicType = TopicPreparationType.XML; String topic = "test-topic"; ValidatedELRModel model = new ValidatedELRModel(); model.setMessageType("test"); model.setId("test"); model.setMessageVersion("1"); + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessagePreparationTopic(model, topic, topicType, 1, "false"); @@ -81,13 +104,18 @@ void testSendMessagePreparationTopicXML() throws ConversionPrepareException { } @Test - void testSendMessagePreparationTopicFHIR() throws ConversionPrepareException { + void testSendMessagePreparationTopicFHIR() throws ConversionPrepareException, KafkaProducerException { var topicType = TopicPreparationType.FHIR; String topic = "test-topic"; ValidatedELRModel model = new ValidatedELRModel(); model.setMessageType("test"); model.setId("test"); model.setMessageVersion("1"); + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessagePreparationTopic(model, topic, topicType, 1, "false"); @@ -95,8 +123,28 @@ void testSendMessagePreparationTopicFHIR() throws ConversionPrepareException { } @Test - void sendMessageDlt() { + void testSendElrXmlMessageFromController() throws KafkaProducerException { + String msg = "test message"; + String topic = "test-topic"; + String msgType = "test-type"; + Integer dltOccurrence = 1; + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + + kafkaProducerService.sendElrXmlMessageFromController(msg, topic, msgType, dltOccurrence, "payload", "false"); + verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); + } + + @Test + void sendMessageDlt() throws KafkaProducerException { String topic = "test-topic"; + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessageDlt("test","test", topic, 1, "error", topic); verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); @@ -104,54 +152,101 @@ void sendMessageDlt() { @Test - void testSendMessageAfterConvertedToXml() { + void testSendMessageAfterConvertedToXml() throws KafkaProducerException { String topic = "test-topic"; String msg = "test"; + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessageAfterConvertedToXml( msg, topic, 1 ); verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); } @Test - void testSendMessageAfterCheckingDuplicateHL7() { + void testSendMessageAfterCheckingDuplicateHL7() throws KafkaProducerException { String topic = "test-topic"; String msg = "test"; ValidatedELRModel model = new ValidatedELRModel(); model.setRawId("test"); + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessageAfterCheckingDuplicateHL7(model, topic, 1 ); verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); } @Test - void testSendMessageFromDltController() { + void testSendMessageFromDltController() throws KafkaProducerException { String topic = "test-topic"; String msg = "test"; String msgType = "HL7"; Integer occurrence = 0; + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessageFromController(msg, topic,msgType, occurrence, false, "false"); verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); } @Test - void testSendMessageFromCSVController() { + void testSendMessageFromCSVController() throws KafkaProducerException { String topic = "test-topic"; List msgNested = new ArrayList<>(); msgNested.add("test"); List> msg = new ArrayList<>(); msg.add(msgNested); String msgType = "HL7"; + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessageFromCSVController(msg, topic,msgType); verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); } @Test - void testSendMessageFromDltController_Success() { + void testSendMessageFromDltController_Success() throws KafkaProducerException { String topic = "test-topic"; String msg = "test"; String msgType = "HL7"; Integer occurrence = 0; + CompletableFuture> future = new CompletableFuture<>(); + future.complete(new SendResult<>(new ProducerRecord<>(topic, "test"), null)); + + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + kafkaProducerService.sendMessageFromDltController(msg, topic,msgType, occurrence); verify(kafkaTemplate, times(1)).send(any(ProducerRecord.class)); } + + @Test + void testSendMessageException() throws Exception { + ProducerRecord prodRecord = new ProducerRecord<>("test-topic", "test-key", "test-value"); + + CompletableFuture> future = new CompletableFuture<>(); + future.completeExceptionally(new TimeoutException("Timeout")); + when(kafkaTemplate.send(any(ProducerRecord.class))).thenReturn(future); + + Method sendMessageMethod = KafkaProducerService.class.getDeclaredMethod("sendMessage", ProducerRecord.class); + sendMessageMethod.setAccessible(true); + + KafkaProducerException exception = assertThrows(KafkaProducerException.class, () -> { + try { + sendMessageMethod.invoke(kafkaProducerService, prodRecord); + } catch (InvocationTargetException e) { + throw e.getCause(); + } + }); + + Assertions.assertEquals("Failed publishing message to kafka topic: test-topic with UUID: test-value", exception.getMessage()); + } } diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawELRServiceTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawELRServiceTest.java deleted file mode 100644 index f8e39c727..000000000 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawELRServiceTest.java +++ /dev/null @@ -1,103 +0,0 @@ -package gov.cdc.dataingestion.rawmessage; - -import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; -import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; -import gov.cdc.dataingestion.rawmessage.service.RawELRService; -import gov.cdc.dataingestion.report.repository.IRawELRRepository; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; - -import java.sql.Timestamp; -import java.util.UUID; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; - -class RawELRServiceTest { - @Mock - private IRawELRRepository rawELRRepository; - - @Mock - private KafkaProducerService kafkaProducerService; - - @InjectMocks - private RawELRService target; - - private String guidForTesting = UUID.randomUUID().toString(); - - - @BeforeEach - public void setUpEach() { - MockitoAnnotations.openMocks(this); - target = new RawELRService(rawELRRepository, kafkaProducerService); - } - - - - @Test - void testSaveHL7_Success() { - RawERLDto modelDto = new RawERLDto(); - modelDto.setPayload("test"); - modelDto.setType("HL7"); - RawERLModel model = new RawERLModel(); - model.setId("test"); - model.setCreatedOn(new Timestamp(System.currentTimeMillis())); - model.setUpdatedOn(new Timestamp(System.currentTimeMillis())); - model.setCreatedBy("test"); - model.setUpdatedBy("test"); - when(rawELRRepository.save(any())).thenReturn(model); - Mockito.doNothing().when(kafkaProducerService).sendMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); - var result = target.submission(modelDto, "1"); - - Assertions.assertNotNull(result); - Assertions.assertEquals("test",result); - - } - - @Test - void testSaveElrXml_Success() { - RawERLDto modelDto = new RawERLDto(); - modelDto.setPayload("test"); - modelDto.setType("HL7-XML"); - RawERLModel model = new RawERLModel(); - model.setId("test"); - model.setCreatedOn(new Timestamp(System.currentTimeMillis())); - model.setUpdatedOn(new Timestamp(System.currentTimeMillis())); - model.setCreatedBy("test"); - model.setUpdatedBy("test"); - when(rawELRRepository.save(any())).thenReturn(model); - Mockito.doNothing().when(kafkaProducerService).sendMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); - var result = target.submission(modelDto, "1"); - - Assertions.assertNotNull(result); - Assertions.assertEquals("test",result); - - } - - @Test - void getById_Success() { - RawERLDto modelDto = new RawERLDto(); - modelDto.setPayload("test"); - modelDto.setType("HL7"); - modelDto.setId(guidForTesting); - - RawERLModel model = new RawERLModel(); - model.setId(guidForTesting); - model.setCreatedOn(new Timestamp(System.currentTimeMillis())); - model.setUpdatedOn(new Timestamp(System.currentTimeMillis())); - model.setCreatedBy("test"); - model.setUpdatedBy("test"); - - when(rawELRRepository.getById(any())).thenReturn(model); - - var result = target.getById(modelDto.getId()); - - Assertions.assertNotNull(result); - } -} diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawElrServiceTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawElrServiceTest.java new file mode 100644 index 000000000..161067880 --- /dev/null +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawElrServiceTest.java @@ -0,0 +1,140 @@ +package gov.cdc.dataingestion.rawmessage; + +import gov.cdc.dataingestion.deadletter.repository.IElrDeadLetterRepository; +import gov.cdc.dataingestion.exception.KafkaProducerException; +import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.rawmessage.service.RawElrService; +import gov.cdc.dataingestion.report.repository.IRawElrRepository; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.sql.Timestamp; +import java.util.UUID; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +class RawElrServiceTest { + @Mock + private IRawElrRepository rawELRRepository; + + @Mock + private KafkaProducerService kafkaProducerService; + + @Mock + private IElrDeadLetterRepository iElrDeadLetterRepository; + + @InjectMocks + private RawElrService target; + + private String guidForTesting = UUID.randomUUID().toString(); + + + @BeforeEach + public void setUpEach() { + MockitoAnnotations.openMocks(this); + target = new RawElrService(rawELRRepository, kafkaProducerService, iElrDeadLetterRepository); + } + + @Test + void testSaveHL7_Success() throws KafkaProducerException { + RawElrDto modelDto = new RawElrDto(); + modelDto.setPayload("test"); + modelDto.setType("HL7"); + RawElrModel model = new RawElrModel(); + model.setId("test"); + model.setVersion("1"); + model.setCreatedOn(new Timestamp(System.currentTimeMillis())); + model.setUpdatedOn(new Timestamp(System.currentTimeMillis())); + model.setCreatedBy("test"); + model.setUpdatedBy("test"); + when(rawELRRepository.save(any())).thenReturn(model); + Mockito.doNothing().when(kafkaProducerService).sendMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + var result = target.submission(modelDto); + + Assertions.assertNotNull(result); + Assertions.assertEquals("test",result); + } + + @Test + void testSaveElrXml_Success() throws KafkaProducerException { + RawElrDto modelDto = new RawElrDto(); + modelDto.setPayload("test"); + modelDto.setType("HL7-XML"); + RawElrModel model = new RawElrModel(); + model.setId("test"); + model.setVersion("1"); + model.setCreatedOn(new Timestamp(System.currentTimeMillis())); + model.setUpdatedOn(new Timestamp(System.currentTimeMillis())); + model.setCreatedBy("test"); + model.setUpdatedBy("test"); + when(rawELRRepository.save(any())).thenReturn(model); + Mockito.doNothing().when(kafkaProducerService).sendMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + var result = target.submission(modelDto); + + Assertions.assertNotNull(result); + Assertions.assertEquals("test",result); + } + + @Test + void getById_Success() { + RawElrDto modelDto = new RawElrDto(); + modelDto.setPayload("test"); + modelDto.setType("HL7"); + modelDto.setId(guidForTesting); + + RawElrModel model = new RawElrModel(); + model.setId(guidForTesting); + model.setCreatedOn(new Timestamp(System.currentTimeMillis())); + model.setUpdatedOn(new Timestamp(System.currentTimeMillis())); + model.setCreatedBy("test"); + model.setUpdatedBy("test"); + + when(rawELRRepository.getById(any())).thenReturn(model); + + var result = target.getById(modelDto.getId()); + + Assertions.assertNotNull(result); + } + + @Test + void testUpdateRawMessageAfterRetry_HL7_Success() throws KafkaProducerException { + RawElrDto modelDto = new RawElrDto(); + modelDto.setId("test-retry"); + modelDto.setType("HL7"); + modelDto.setValidationActive(true); + modelDto.setVersion("1"); + Mockito.doNothing().when(kafkaProducerService).sendMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + target.updateRawMessageAfterRetry(modelDto, 1); + Mockito.verify(kafkaProducerService).sendMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(2), Mockito.any(), Mockito.any()); + } + + @Test + void testUpdateRawMessageAfterRetry_XML_Success() throws KafkaProducerException { + RawElrDto modelDto = new RawElrDto(); + modelDto.setId("test-retry-xml"); + modelDto.setType("HL7-XML"); + modelDto.setPayload("xml payload"); + modelDto.setVersion("1"); + Mockito.doNothing().when(kafkaProducerService).sendElrXmlMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + target.updateRawMessageAfterRetry(modelDto, 1); + Mockito.verify(kafkaProducerService).sendElrXmlMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(2), Mockito.any(), Mockito.any()); + } + + @Test + void testUpdateRawMessageAfterRetry_KafkaProducerException() throws KafkaProducerException { + RawElrDto modelDto = new RawElrDto(); + modelDto.setId("retry-fail"); + modelDto.setType("HL7"); + Mockito.doThrow(new KafkaProducerException("Retry failed")).when(kafkaProducerService).sendMessageFromController(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + Assertions.assertThrows(KafkaProducerException.class, () -> target.updateRawMessageAfterRetry(modelDto, 1)); + Mockito.verify(iElrDeadLetterRepository).updateDltOccurrenceForRawId(Mockito.any(), Mockito.eq(2), Mockito.eq("ERROR")); + } +} diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsControllerMockTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsControllerMockTest.java index 6100d1e75..76e403d6d 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsControllerMockTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsControllerMockTest.java @@ -1,9 +1,10 @@ package gov.cdc.dataingestion.rawmessage.controller; import gov.cdc.dataingestion.custommetrics.CustomMetricsBuilder; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.hl7.helper.integration.exception.DiHL7Exception; -import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; -import gov.cdc.dataingestion.rawmessage.service.RawELRService; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.rawmessage.service.RawElrService; import gov.cdc.dataingestion.validation.services.interfaces.IHL7Service; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -30,7 +31,7 @@ @SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"}) class ElrReportsControllerMockTest { @Mock - private RawELRService rawELRService; + private RawElrService rawELRService; @Mock private CustomMetricsBuilder customMetricsBuilder; @@ -48,13 +49,13 @@ void setUp() { } @Test - void testSave_HL7_ELR_Type() { + void testSave_HL7_ELR_Type() throws KafkaProducerException { String payload = "HL7 message"; String type = HL7_ELR; String version = "1"; String expectedResponse = "Submission successful"; - when(rawELRService.submission(any(RawERLDto.class), eq(version))).thenReturn(expectedResponse); + when(rawELRService.submission(any(RawElrDto.class))).thenReturn(expectedResponse); ResponseEntity response = elrReportsController.save(payload, type, version); @@ -64,13 +65,13 @@ void testSave_HL7_ELR_Type() { } @Test - void testSave_XML_ELR_Type() { + void testSave_XML_ELR_Type() throws KafkaProducerException { String payload = "XML message"; String type = XML_ELR; String version = "1"; String expectedResponse = "Submission successful"; - when(rawELRService.submission(any(RawERLDto.class), eq(version))).thenReturn(expectedResponse); + when(rawELRService.submission(any(RawElrDto.class))).thenReturn(expectedResponse); ResponseEntity response = elrReportsController.save(payload, type, version); diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsControllerTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsControllerTest.java index 6ad3c164e..784de53da 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsControllerTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/controller/ElrReportsControllerTest.java @@ -1,8 +1,8 @@ package gov.cdc.dataingestion.rawmessage.controller; import gov.cdc.dataingestion.custommetrics.CustomMetricsBuilder; -import gov.cdc.dataingestion.rawmessage.dto.RawERLDto; -import gov.cdc.dataingestion.rawmessage.service.RawELRService; +import gov.cdc.dataingestion.rawmessage.dto.RawElrDto; +import gov.cdc.dataingestion.rawmessage.service.RawElrService; import gov.cdc.dataingestion.validation.services.interfaces.IHL7Service; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -31,7 +31,7 @@ class ElrReportsControllerTest { @Autowired private MockMvc mockMvc; @MockBean - private RawELRService rawELRService; + private RawElrService rawELRService; @MockBean private CustomMetricsBuilder customMetricsBuilder; @MockBean @@ -47,12 +47,13 @@ void testSaveHL7Message() throws Exception { .with(SecurityMockMvcRequestPostProcessors.jwt())) .andExpect(MockMvcResultMatchers.status().isOk()); - RawERLDto rawERLDto = new RawERLDto(); - rawERLDto.setType(messageType); - rawERLDto.setPayload(hl7Payload); - rawERLDto.setValidationActive(true); + RawElrDto rawElrDto = new RawElrDto(); + rawElrDto.setType(messageType); + rawElrDto.setPayload(hl7Payload); + rawElrDto.setValidationActive(true); + rawElrDto.setVersion("1"); - verify(rawELRService).submission(rawERLDto, "1"); + verify(rawELRService).submission(rawElrDto); } @@ -67,23 +68,25 @@ void testSaveElrXmlMessage() throws Exception { .with(SecurityMockMvcRequestPostProcessors.jwt())) .andExpect(MockMvcResultMatchers.status().isOk()); - RawERLDto rawERLDto = new RawERLDto(); - rawERLDto.setType(messageType); - rawERLDto.setPayload(xmlPayload); - rawERLDto.setValidationActive(true); + RawElrDto rawElrDto = new RawElrDto(); + rawElrDto.setType(messageType); + rawElrDto.setPayload(xmlPayload); + rawElrDto.setValidationActive(true); + rawElrDto.setVersion("1"); - verify(rawELRService).submission(rawERLDto, "1"); + verify(rawELRService).submission(rawElrDto); } @Test void testSaveHL7Message_no_ValidationActivate() throws Exception { String payload = "Test payload"; String messageType = "HL7"; - RawERLDto rawERLDto = new RawERLDto(); - rawERLDto.setType(messageType); - rawERLDto.setPayload(payload); + RawElrDto rawElrDto = new RawElrDto(); + rawElrDto.setType(messageType); + rawElrDto.setPayload(payload); + rawElrDto.setVersion("1"); - when(rawELRService.submission(rawERLDto, "1")).thenReturn("OK"); + when(rawELRService.submission(rawElrDto)).thenReturn("OK"); mockMvc.perform(MockMvcRequestBuilders.post("/api/elrs") .param("id", "1").with(SecurityMockMvcRequestPostProcessors.jwt()) .header("msgType", messageType) diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/reportstatus/service/ReportStatusServiceTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/reportstatus/service/ReportStatusServiceTest.java index 006cec5e2..7d7334291 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/reportstatus/service/ReportStatusServiceTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/reportstatus/service/ReportStatusServiceTest.java @@ -8,8 +8,8 @@ import gov.cdc.dataingestion.odse.repository.IEdxActivityParentLogRepository; import gov.cdc.dataingestion.odse.repository.model.EdxActivityDetailLog; import gov.cdc.dataingestion.odse.repository.model.EdxActivityLog; -import gov.cdc.dataingestion.report.repository.IRawELRRepository; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.report.repository.IRawElrRepository; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.reportstatus.model.ReportStatusIdData; import gov.cdc.dataingestion.reportstatus.repository.IReportStatusRepository; import gov.cdc.dataingestion.validation.repository.IValidatedELRRepository; @@ -45,7 +45,7 @@ class ReportStatusServiceTest { @Mock private NbsInterfaceRepository nbsInterfaceRepositoryMock; @Mock - private IRawELRRepository iRawELRRepository; + private IRawElrRepository iRawELRRepository; @Mock private IValidatedELRRepository iValidatedELRRepository; @Mock @@ -79,12 +79,12 @@ void tearDown() { void testGetMessageDetailStatus_RawExist_ValidateExist_ReportExist_NbsExist() { String id = "test"; Integer nbsId = 123456; - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setId(id); - rawERLModel.setPayload("payload"); - rawERLModel.setCreatedOn(getCurrentTimeStamp("UTC")); - rawERLModel.setCreatedBy("admin"); - rawERLModel.setType("HL7"); + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setId(id); + rawElrModel.setPayload("payload"); + rawElrModel.setCreatedOn(getCurrentTimeStamp("UTC")); + rawElrModel.setCreatedBy("admin"); + rawElrModel.setType("HL7"); ValidatedELRModel validatedELRModel = new ValidatedELRModel(); validatedELRModel.setId("validate-id"); @@ -99,7 +99,7 @@ void testGetMessageDetailStatus_RawExist_ValidateExist_ReportExist_NbsExist() { NbsInterfaceModel nbsModel = new NbsInterfaceModel(); nbsModel.setRecordStatusCd("Success"); nbsModel.setPayload("payload"); - when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawERLModel)); + when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawElrModel)); when(iValidatedELRRepository.findByRawId(id)).thenReturn(Optional.of(validatedELRModel)); when(iReportStatusRepositoryMock.findByRawMessageId(id)).thenReturn(Optional.of(reportStatusIdModel)); when(nbsInterfaceRepositoryMock.findByNbsInterfaceUid(nbsId)).thenReturn(Optional.of(nbsModel)); @@ -130,12 +130,12 @@ void testGetMessageDetailStatus_RawNotExist() { @Test void testGetMessageDetailStatus_RawExist_ValidateNotExist_DltExist() { String id = "test"; - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setId(id); - rawERLModel.setPayload("payload"); - rawERLModel.setCreatedOn(getCurrentTimeStamp("UTC")); - rawERLModel.setCreatedBy("admin"); - rawERLModel.setType("HL7"); + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setId(id); + rawElrModel.setPayload("payload"); + rawElrModel.setCreatedOn(getCurrentTimeStamp("UTC")); + rawElrModel.setCreatedBy("admin"); + rawElrModel.setType("HL7"); ElrDeadLetterModel dltModel = new ElrDeadLetterModel(); dltModel.setErrorMessageId(id); dltModel.setDltStatus("ERROR"); @@ -143,7 +143,7 @@ void testGetMessageDetailStatus_RawExist_ValidateNotExist_DltExist() { dltModel.setErrorMessageSource("origin"); dltModel.setErrorStackTraceShort("short"); - when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawERLModel)); + when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawElrModel)); when(iValidatedELRRepository.findByRawId(id)).thenReturn(Optional. empty()); @@ -163,14 +163,14 @@ void testGetMessageDetailStatus_RawExist_ValidateNotExist_DltExist() { @Test void testGetMessageDetailStatus_RawExist_ValidateNotExist_DltNotExist() { String id = "test"; - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setId(id); - rawERLModel.setPayload("payload"); - rawERLModel.setCreatedOn(getCurrentTimeStamp("UTC")); - rawERLModel.setCreatedBy("admin"); - rawERLModel.setType("HL7"); - - when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawERLModel)); + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setId(id); + rawElrModel.setPayload("payload"); + rawElrModel.setCreatedOn(getCurrentTimeStamp("UTC")); + rawElrModel.setCreatedBy("admin"); + rawElrModel.setType("HL7"); + + when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawElrModel)); when(iValidatedELRRepository.findByRawId(id)).thenReturn(Optional. empty()); @@ -189,12 +189,12 @@ void testGetMessageDetailStatus_RawExist_ValidateNotExist_DltNotExist() { void testGetMessageDetailStatus_RawExist_ValidateExist_ReportExist_NbsNotExist() { String id = "test"; Integer nbsId = 123456; - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setId(id); - rawERLModel.setPayload("payload"); - rawERLModel.setCreatedOn(getCurrentTimeStamp("UTC")); - rawERLModel.setCreatedBy("admin"); - rawERLModel.setType("HL7"); + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setId(id); + rawElrModel.setPayload("payload"); + rawElrModel.setCreatedOn(getCurrentTimeStamp("UTC")); + rawElrModel.setCreatedBy("admin"); + rawElrModel.setType("HL7"); ValidatedELRModel validatedELRModel = new ValidatedELRModel(); validatedELRModel.setId("validate-id"); @@ -209,7 +209,7 @@ void testGetMessageDetailStatus_RawExist_ValidateExist_ReportExist_NbsNotExist() NbsInterfaceModel nbsModel = new NbsInterfaceModel(); nbsModel.setRecordStatusCd("Success"); nbsModel.setPayload("payload"); - when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawERLModel)); + when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawElrModel)); when(iValidatedELRRepository.findByRawId(id)).thenReturn(Optional.of(validatedELRModel)); when(iReportStatusRepositoryMock.findByRawMessageId(id)).thenReturn(Optional.of(reportStatusIdModel)); when(nbsInterfaceRepositoryMock.findByNbsInterfaceUid(nbsId)).thenReturn(Optional.empty()); @@ -223,12 +223,12 @@ void testGetMessageDetailStatus_RawExist_ValidateExist_ReportExist_NbsNotExist() @Test void testGetMessageDetailStatus_RawExist_ValidateExist_ReportNotExist_DltExist() { String id = "test"; - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setId(id); - rawERLModel.setPayload("payload"); - rawERLModel.setCreatedOn(getCurrentTimeStamp("UTC")); - rawERLModel.setCreatedBy("admin"); - rawERLModel.setType("HL7"); + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setId(id); + rawElrModel.setPayload("payload"); + rawElrModel.setCreatedOn(getCurrentTimeStamp("UTC")); + rawElrModel.setCreatedBy("admin"); + rawElrModel.setType("HL7"); ValidatedELRModel validatedELRModel = new ValidatedELRModel(); validatedELRModel.setId("validate-id"); validatedELRModel.setRawMessage("payload"); @@ -241,7 +241,7 @@ void testGetMessageDetailStatus_RawExist_ValidateExist_ReportNotExist_DltExist() dltModel.setErrorMessageSource("origin"); dltModel.setErrorStackTraceShort("short"); - when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawERLModel)); + when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawElrModel)); when(iValidatedELRRepository.findByRawId(id)).thenReturn(Optional.of(validatedELRModel)); when(iReportStatusRepositoryMock.findByRawMessageId(id)).thenReturn(Optional. empty()); @@ -259,12 +259,12 @@ void testGetMessageDetailStatus_RawExist_ValidateExist_ReportNotExist_DltExist() @Test void testGetMessageDetailStatus_RawExist_ValidateExist_ReportNotExist_DltNotExist() { String id = "test"; - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setId(id); - rawERLModel.setPayload("payload"); - rawERLModel.setCreatedOn(getCurrentTimeStamp("UTC")); - rawERLModel.setCreatedBy("admin"); - rawERLModel.setType("HL7"); + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setId(id); + rawElrModel.setPayload("payload"); + rawElrModel.setCreatedOn(getCurrentTimeStamp("UTC")); + rawElrModel.setCreatedBy("admin"); + rawElrModel.setType("HL7"); ValidatedELRModel validatedELRModel = new ValidatedELRModel(); validatedELRModel.setId("validate-id"); validatedELRModel.setRawMessage("payload"); @@ -272,7 +272,7 @@ void testGetMessageDetailStatus_RawExist_ValidateExist_ReportNotExist_DltNotExis - when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawERLModel)); + when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawElrModel)); when(iValidatedELRRepository.findByRawId(id)).thenReturn(Optional.of(validatedELRModel)); when(iReportStatusRepositoryMock.findByRawMessageId(id)).thenReturn(Optional. empty()); @@ -349,12 +349,12 @@ void testDummyGetStatusForReportSuccessModelCoverage() { void testGetMessageDetailStatus_RawExist_ValidateExist_ReportExist_NbsExist_OdseExist() { String id = "123"; Integer nbsId = 123456; - RawERLModel rawERLModel = new RawERLModel(); - rawERLModel.setId(id); - rawERLModel.setPayload("payload"); - rawERLModel.setCreatedOn(getCurrentTimeStamp("UTC")); - rawERLModel.setCreatedBy("admin"); - rawERLModel.setType("HL7"); + RawElrModel rawElrModel = new RawElrModel(); + rawElrModel.setId(id); + rawElrModel.setPayload("payload"); + rawElrModel.setCreatedOn(getCurrentTimeStamp("UTC")); + rawElrModel.setCreatedBy("admin"); + rawElrModel.setType("HL7"); ValidatedELRModel validatedELRModel = new ValidatedELRModel(); validatedELRModel.setId("validate-id"); @@ -380,7 +380,7 @@ void testGetMessageDetailStatus_RawExist_ValidateExist_ReportExist_NbsExist_Odse nbsModel.setRecordStatusCd("Failure"); nbsModel.setPayload("payload"); - when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawERLModel)); + when(iRawELRRepository.findById(id)).thenReturn(Optional.of(rawElrModel)); when(iValidatedELRRepository.findByRawId(id)).thenReturn(Optional.of(validatedELRModel)); when(iReportStatusRepositoryMock.findByRawMessageId(id)).thenReturn(Optional.of(reportStatusIdModel)); when(nbsInterfaceRepositoryMock.findByNbsInterfaceUid(nbsId)).thenReturn(Optional.of(nbsModel)); diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/validation/integration/validator/HL7DuplicateValidatorTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/validation/integration/validator/HL7DuplicateValidatorTest.java index fdd8843a5..1263e2193 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/validation/integration/validator/HL7DuplicateValidatorTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/validation/integration/validator/HL7DuplicateValidatorTest.java @@ -2,6 +2,7 @@ import gov.cdc.dataingestion.custommetrics.CustomMetricsBuilder; import gov.cdc.dataingestion.exception.DuplicateHL7FileFoundException; +import gov.cdc.dataingestion.exception.KafkaProducerException; import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService; import gov.cdc.dataingestion.validation.repository.IValidatedELRRepository; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; @@ -44,7 +45,7 @@ void tearDown() { } @Test - void testValidateHL7DocumentSuccess() throws DuplicateHL7FileFoundException { + void testValidateHL7DocumentSuccess() throws DuplicateHL7FileFoundException, KafkaProducerException { ValidatedELRModel validatedELRModel = getValidatedELRModel(); Mockito.when(iValidatedELRRepositoryMock.findByHashedHL7String(any())) @@ -56,7 +57,7 @@ void testValidateHL7DocumentSuccess() throws DuplicateHL7FileFoundException { } @Test - void testValidateHL7DocumentThrowsException() { + void testValidateHL7DocumentThrowsException() throws KafkaProducerException { String hashedString = "843588fcbfbdca29f9807f81455bbd3ae6935dae6152bcac6851e9568c885c66"; ValidatedELRModel validatedELRModel = getValidatedELRModel(); diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/validation/integration/validator/HL7v2ValidatorTests.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/validation/integration/validator/HL7v2ValidatorTests.java index 91c30352f..90f813b29 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/validation/integration/validator/HL7v2ValidatorTests.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/validation/integration/validator/HL7v2ValidatorTests.java @@ -3,7 +3,7 @@ import gov.cdc.dataingestion.constant.enums.EnumMessageType; import gov.cdc.dataingestion.hl7.helper.HL7Helper; import gov.cdc.dataingestion.hl7.helper.integration.exception.DiHL7Exception; -import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.report.repository.model.RawElrModel; import gov.cdc.dataingestion.validation.integration.validator.interfaces.IHL7v2Validator; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -41,7 +41,7 @@ void MessageValidation_Success_ValidMessage_NotContainNewLine() throws DiHL7Exce String id = "1"; - RawERLModel model = new RawERLModel(); + RawElrModel model = new RawElrModel(); model.setPayload(data); model.setId(id); model.setCreatedOn(null); @@ -70,7 +70,7 @@ void MessageValidation_Success_ValidMessage_NotContainNewLine_231() throws DiHL7 String id = "1"; - RawERLModel model = new RawERLModel(); + RawElrModel model = new RawElrModel(); model.setPayload(data); model.setId(id); model.setCreatedOn(null); @@ -106,7 +106,7 @@ void MessageValidation_Success_ValidMessage_ContainNewLine() throws DiHL7Excepti String id = "1"; - RawERLModel model = new RawERLModel(); + RawElrModel model = new RawElrModel(); model.setPayload(data); model.setId(id); @@ -140,7 +140,7 @@ void MessageValidation_Exception_NotSupportedVersion() { String id = "1"; - RawERLModel model = new RawERLModel(); + RawElrModel model = new RawElrModel(); model.setPayload(data); model.setId(id); @@ -162,7 +162,7 @@ void MessageValidation_InvalidMessage_ThrowException() { String data = "Invalid Message"; String id = "1"; - RawERLModel model = new RawERLModel(); + RawElrModel model = new RawElrModel(); model.setPayload(data); model.setId(id); From 956387c26f0ad2b265e5082f268dfaaa6c9bc5ba Mon Sep 17 00:00:00 2001 From: Ragul Shanmugam Date: Tue, 11 Mar 2025 17:37:18 -0700 Subject: [PATCH 4/6] Added cron config as env variable and sql scritps added --- .../dataingestion/deadletter/service/SchedulerConfig.java | 2 +- .../src/main/resources/application.yaml | 5 ++--- .../src/main/resources/db/di-service-003.sql | 8 +++++--- .../src/main/resources/db/di-service-004.sql | 0 4 files changed, 8 insertions(+), 7 deletions(-) create mode 100644 data-ingestion-service/src/main/resources/db/di-service-004.sql diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfig.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfig.java index e86ece51c..4a6ea7253 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfig.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/SchedulerConfig.java @@ -19,7 +19,7 @@ public SchedulerConfig(ElrDeadLetterService elrDeadLetterService) { this.elrDeadLetterService = elrDeadLetterService; } - @Scheduled(cron = "${dlt.scheduler.cron.expression}") + @Scheduled(cron = "${dlt.scheduler.cron}") public void scheduleTask() throws KafkaProducerException { if (isSchedulerEnabled) { elrDeadLetterService.processFailedMessagesFromKafka(); diff --git a/data-ingestion-service/src/main/resources/application.yaml b/data-ingestion-service/src/main/resources/application.yaml index 5f2446ebb..c804441d2 100644 --- a/data-ingestion-service/src/main/resources/application.yaml +++ b/data-ingestion-service/src/main/resources/application.yaml @@ -37,9 +37,8 @@ ecr: dlt: scheduler: - enabled: true - cron: - expression: 0/10 * * * * * + enabled: ${SCHEDULER_ENABLED:true} + cron: ${SCHEDULER_CRON:0/30 * * * * *} --- spring: diff --git a/data-ingestion-service/src/main/resources/db/di-service-003.sql b/data-ingestion-service/src/main/resources/db/di-service-003.sql index f5cb1a266..f318d0401 100644 --- a/data-ingestion-service/src/main/resources/db/di-service-003.sql +++ b/data-ingestion-service/src/main/resources/db/di-service-003.sql @@ -1,3 +1,5 @@ -ALTER TABLE NBS_MSGOUTE.dbo.NBS_interface - ADD original_payload_RR TEXT, - original_doc_type_cd_RR varchar(100); \ No newline at end of file +ALTER TABLE NBS_DataIngest.dbo.elr_raw + ADD version VARCHAR(1); + +ALTER TABLE NBS_DataIngest.dbo.elr_dlt +ALTER COLUMN dlt_status NVARCHAR(30); \ No newline at end of file diff --git a/data-ingestion-service/src/main/resources/db/di-service-004.sql b/data-ingestion-service/src/main/resources/db/di-service-004.sql new file mode 100644 index 000000000..e69de29bb From f0c777586d0c45f98de8787d900788a55c2a3876 Mon Sep 17 00:00:00 2001 From: Ragul Shanmugam Date: Tue, 11 Mar 2025 18:07:11 -0700 Subject: [PATCH 5/6] Fixed sonar issues --- .../service/ElrDeadLetterService.java | 1 - .../service/KafkaConsumerService.java | 6 ++-- .../service/KafkaProducerService.java | 19 +++++++---- .../rawmessage/service/RawElrService.java | 2 +- .../service/ElrDeadLetterServiceTest.java | 34 ++++++------------- .../rawmessage/RawElrServiceTest.java | 4 +-- 6 files changed, 29 insertions(+), 37 deletions(-) diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterService.java index 128d6c385..f825986b7 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterService.java @@ -216,7 +216,6 @@ public void processFailedMessagesFromKafka() throws KafkaProducerException { Iterator iterator = dltMessagesList.iterator(); while (iterator.hasNext()) { ElrDeadLetterModel message = iterator.next(); - System.err.println(message.getErrorMessageId()); RawElrDto rawElrDto = new RawElrDto(); rawElrDto.setId(message.getErrorMessageId()); rawElrDto.setType(getElrMessageType(message.getDltStatus())); diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java index d871a60e7..e7f47b033 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java @@ -254,7 +254,7 @@ public void handleMessageForElrXml(String message, kafkaProducerService.sendMessageAfterConvertedToXml( String.valueOf(nbsInterfaceModel.getNbsInterfaceUid()), "dp_elr_unprocessed", 0); } catch (KafkaProducerException e) { - throw new RuntimeException(e); + throw new RuntimeException(e); //NOSONAR } } else { @@ -262,7 +262,7 @@ public void handleMessageForElrXml(String message, kafkaProducerService.sendMessageAfterConvertedToXml( nbsInterfaceModel.getNbsInterfaceUid().toString(), convertedToXmlTopic, 0); } catch (KafkaProducerException e) { - throw new RuntimeException(e); + throw new RuntimeException(e); //NOSONAR } } }); @@ -329,7 +329,7 @@ public void handleMessageForXmlConversionElr(String message, try { xmlConversionHandler(message, operation, dataProcessingEnable); } catch (KafkaProducerException e) { - throw new RuntimeException(e); + throw new RuntimeException(e); //NOSONAR } }); } diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaProducerService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaProducerService.java index cbcae4262..b7ff6bf5e 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaProducerService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaProducerService.java @@ -6,7 +6,6 @@ import gov.cdc.dataingestion.constant.enums.EnumKafkaOperation; import gov.cdc.dataingestion.exception.ConversionPrepareException; import gov.cdc.dataingestion.exception.KafkaProducerException; -import gov.cdc.dataingestion.report.repository.IRawElrRepository; import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.core.KafkaTemplate; @@ -15,7 +14,9 @@ import java.util.List; import java.util.UUID; -import java.util.concurrent.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; @Service /** @@ -34,11 +35,9 @@ public class KafkaProducerService { private final KafkaTemplate kafkaTemplate; - private final IRawElrRepository iRawELRRepository; - public KafkaProducerService(KafkaTemplate kafkaTemplate, IRawElrRepository iRawELRRepository) { + public KafkaProducerService(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; - this.iRawELRRepository = iRawELRRepository; } public void sendMessageFromController(String msg, @@ -90,6 +89,9 @@ public void sendMessageFromDltController( } + /** + * @deprecated This method is deprecated and will be removed in a future release. + */ @Deprecated @SuppressWarnings("java:S1133") public void sendMessageFromCSVController(List> msg, String topic, String msgType) throws KafkaProducerException { @@ -177,7 +179,12 @@ public void sendMessageAfterCheckingDuplicateHL7(ValidatedELRModel msg, String v private void sendMessage(ProducerRecord prodRecord) throws KafkaProducerException { try { kafkaTemplate.send(prodRecord).get(3, TimeUnit.SECONDS); - } catch (TimeoutException | InterruptedException | ExecutionException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new KafkaProducerException("Thread was interrupted while sending Kafka message to topic: " + + prodRecord.topic() + " with UUID: " + prodRecord.value()); + } + catch (TimeoutException | ExecutionException e) { throw new KafkaProducerException("Failed publishing message to kafka topic: " + prodRecord.topic() + " with UUID: " + prodRecord.value()); } } diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawElrService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawElrService.java index 832155d68..070b3880d 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawElrService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/rawmessage/service/RawElrService.java @@ -91,7 +91,7 @@ public void updateRawMessageAfterRetry(RawElrDto rawElrDto, int dltOccurrence) t } public RawElrDto getById(String id) { - RawElrModel rawElrModel = rawElrRepository.getById(id); + RawElrModel rawElrModel = rawElrRepository.getReferenceById(id); return convert(rawElrModel); } diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterServiceTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterServiceTest.java index 419ff417d..5402b000d 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterServiceTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/deadletter/service/ElrDeadLetterServiceTest.java @@ -18,6 +18,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -319,30 +321,14 @@ void testProcessFailedMessagesFromKafka_NoMessages() throws KafkaProducerExcepti verify(rawElrService, times(0)).updateRawMessageAfterRetry(any(RawElrDto.class), anyInt()); } - @Test - void testGetElrMessageType_WithDelimiter() { - String dltStatus = "KAFKA_ERROR_HL7"; - - String result = elrDeadLetterService.getElrMessageType(dltStatus); - - assertEquals("HL7", result); - } - - @Test - void testGetElrMessageType_WithDelimiterXml() { - String dltStatus = "KAFKA_ERROR_HL7_XML"; - + @ParameterizedTest + @CsvSource({ + "KAFKA_ERROR_HL7, HL7", + "KAFKA_ERROR_HL7_XML, HL7_XML", + "SOMEPREFIXKAFKAERROR, ''" + }) + void testGetElrMessageType(String dltStatus, String expectedResult) { String result = elrDeadLetterService.getElrMessageType(dltStatus); - - assertEquals("HL7_XML", result); - } - - @Test - void testGetElrMessageType_NoTypeDelimiter() { - String dltStatus = "SOMEPREFIXKAFKAERROR"; - - String result = elrDeadLetterService.getElrMessageType(dltStatus); - - assertEquals("", result); + assertEquals(expectedResult, result); } } diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawElrServiceTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawElrServiceTest.java index 161067880..e6cd8a2f5 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawElrServiceTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/rawmessage/RawElrServiceTest.java @@ -38,7 +38,7 @@ class RawElrServiceTest { @BeforeEach - public void setUpEach() { + void setUpEach() { MockitoAnnotations.openMocks(this); target = new RawElrService(rawELRRepository, kafkaProducerService, iElrDeadLetterRepository); } @@ -97,7 +97,7 @@ void getById_Success() { model.setCreatedBy("test"); model.setUpdatedBy("test"); - when(rawELRRepository.getById(any())).thenReturn(model); + when(rawELRRepository.getReferenceById(any())).thenReturn(model); var result = target.getById(modelDto.getId()); From 552d5afca702cf1d3553a34e2859d7e0af4dfc32 Mon Sep 17 00:00:00 2001 From: Ragul Shanmugam Date: Wed, 12 Mar 2025 08:23:13 -0700 Subject: [PATCH 6/6] PR comments fix --- .../src/main/resources/application.yaml | 2 +- .../src/main/resources/db/di-service-003.sql | 8 +++----- .../src/main/resources/db/di-service-004.sql | 5 +++++ 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/data-ingestion-service/src/main/resources/application.yaml b/data-ingestion-service/src/main/resources/application.yaml index c804441d2..098003e7c 100644 --- a/data-ingestion-service/src/main/resources/application.yaml +++ b/data-ingestion-service/src/main/resources/application.yaml @@ -37,7 +37,7 @@ ecr: dlt: scheduler: - enabled: ${SCHEDULER_ENABLED:true} + enabled: ${SCHEDULER_ENABLED:false} cron: ${SCHEDULER_CRON:0/30 * * * * *} --- diff --git a/data-ingestion-service/src/main/resources/db/di-service-003.sql b/data-ingestion-service/src/main/resources/db/di-service-003.sql index f318d0401..0cf826653 100644 --- a/data-ingestion-service/src/main/resources/db/di-service-003.sql +++ b/data-ingestion-service/src/main/resources/db/di-service-003.sql @@ -1,5 +1,3 @@ -ALTER TABLE NBS_DataIngest.dbo.elr_raw - ADD version VARCHAR(1); - -ALTER TABLE NBS_DataIngest.dbo.elr_dlt -ALTER COLUMN dlt_status NVARCHAR(30); \ No newline at end of file +ALTER TABLE NBS_MSGOUTE.dbo.NBS_interface + ADD original_payload_RR TEXT, + original_doc_type_cd_RR varchar(100); \ No newline at end of file diff --git a/data-ingestion-service/src/main/resources/db/di-service-004.sql b/data-ingestion-service/src/main/resources/db/di-service-004.sql index e69de29bb..f318d0401 100644 --- a/data-ingestion-service/src/main/resources/db/di-service-004.sql +++ b/data-ingestion-service/src/main/resources/db/di-service-004.sql @@ -0,0 +1,5 @@ +ALTER TABLE NBS_DataIngest.dbo.elr_raw + ADD version VARCHAR(1); + +ALTER TABLE NBS_DataIngest.dbo.elr_dlt +ALTER COLUMN dlt_status NVARCHAR(30); \ No newline at end of file