Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion data-ingestion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ dependencies {
implementation 'io.prometheus:simpleclient_hotspot:0.16.0'
implementation 'io.prometheus:simpleclient_httpserver:0.16.0'
implementation 'io.prometheus:simpleclient_pushgateway:0.16.0'
implementation 'io.micrometer:micrometer-registry-prometheus:1.13.4'
implementation 'io.micrometer:micrometer-registry-prometheus:1.15.0-M2'

// Apache camel
implementation 'org.apache.camel.springboot:camel-spring-boot-starter:4.8.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package gov.cdc.dataingestion.camel.routes;

import gov.cdc.dataingestion.rawmessage.dto.RawERLDto;
import gov.cdc.dataingestion.rawmessage.service.RawELRService;
import gov.cdc.dataingestion.rawmessage.dto.RawElrDto;
import gov.cdc.dataingestion.rawmessage.service.RawElrService;
import org.apache.camel.Handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -19,10 +19,10 @@
public class HL7FileProcessComponent {
private static Logger logger = LoggerFactory.getLogger(HL7FileProcessComponent.class);
String msgType = "HL7";
private RawELRService rawELRService;
private RawElrService rawELRService;

@Autowired
public HL7FileProcessComponent(RawELRService rawELRService){
public HL7FileProcessComponent(RawElrService rawELRService){
this.rawELRService=rawELRService;
}
@Handler
Expand All @@ -34,11 +34,12 @@ public String process(String body) {
String hl7Str = body;
logger.debug("HL7 Message:{}", hl7Str);
if (hl7Str != null && !hl7Str.trim().isEmpty()) {
RawERLDto rawERLDto = new RawERLDto();
rawERLDto.setType(msgType);
rawERLDto.setValidationActive(true);
rawERLDto.setPayload(hl7Str);
elrId = rawELRService.submission(rawERLDto,version);
RawElrDto rawElrDto = new RawElrDto();
rawElrDto.setType(msgType);
rawElrDto.setValidationActive(true);
rawElrDto.setPayload(hl7Str);
rawElrDto.setVersion(version);
elrId = rawELRService.submission(rawElrDto);
}
} catch (Exception e) {
logger.error(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import gov.cdc.dataingestion.deadletter.model.ElrDeadLetterDto;
import gov.cdc.dataingestion.deadletter.service.ElrDeadLetterService;
import gov.cdc.dataingestion.exception.DeadLetterTopicException;
import gov.cdc.dataingestion.exception.KafkaProducerException;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
Expand Down Expand Up @@ -87,7 +88,7 @@ public ResponseEntity<ElrDeadLetterDto> getErrorMessage(@PathVariable("dlt-id")
schema = @Schema(type = "string"))}
)
@PostMapping(consumes = MediaType.TEXT_PLAIN_VALUE, path = "/api/elrs/{dlt-id}")
public ResponseEntity<ElrDeadLetterDto> messageReInject(@PathVariable("dlt-id") String dltId, @RequestBody final String payload) throws DeadLetterTopicException {
public ResponseEntity<ElrDeadLetterDto> messageReInject(@PathVariable("dlt-id") String dltId, @RequestBody final String payload) throws DeadLetterTopicException, KafkaProducerException {
return ResponseEntity.ok(elrDeadLetterService.updateAndReprocessingMessage(dltId, payload));
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package gov.cdc.dataingestion.deadletter.repository;

import gov.cdc.dataingestion.deadletter.repository.model.ElrDeadLetterModel;
import jakarta.transaction.Transactional;
import org.springframework.data.domain.Sort;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;

import java.util.List;
Expand All @@ -11,4 +14,22 @@
@Repository
public interface IElrDeadLetterRepository extends JpaRepository<ElrDeadLetterModel, String> {
Optional<List<ElrDeadLetterModel>> findAllDltRecordByDltStatus (String dltStatus, Sort sort);
}

@Modifying
@Transactional
@Query(value = "INSERT INTO elr_dlt (error_message_id, error_message_source, error_stack_trace, error_stack_trace_short, dlt_status, dlt_occurrence, message, created_by, updated_by) VALUES(:id, :topicName, :errorStatus, :errorStatus , 'KAFKA_ERROR_' + :type, :dltOccurrence, :payload, 'elr_raw_service', 'elr_raw_service')", nativeQuery = true)
void addErrorStatusForRawId(String id, String topicName, String type, String payload, String errorStatus, int dltOccurrence);

@Modifying
@Transactional
@Query(value = "UPDATE elr_dlt SET dlt_occurrence = :dltOccurrence, dlt_status = :errorStatus WHERE error_message_id =:id", nativeQuery = true)
void updateDltOccurrenceForRawId(String id, int dltOccurrence, String errorStatus);

@Modifying
@Transactional
@Query(value = "UPDATE elr_dlt SET dlt_status = :errorStatus WHERE error_message_id = :id", nativeQuery = true)
void updateErrorStatusForRawId(String id, String errorStatus);

@Query(value = "SELECT * FROM elr_dlt WHERE dlt_status LIKE '%KAFKA%' AND dlt_occurrence <= 2", nativeQuery = true)
List<ElrDeadLetterModel> getAllErrorDltRecordForKafkaError();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@
import gov.cdc.dataingestion.deadletter.repository.IElrDeadLetterRepository;
import gov.cdc.dataingestion.deadletter.repository.model.ElrDeadLetterModel;
import gov.cdc.dataingestion.exception.DeadLetterTopicException;
import gov.cdc.dataingestion.exception.KafkaProducerException;
import gov.cdc.dataingestion.kafka.integration.service.KafkaProducerService;
import gov.cdc.dataingestion.report.repository.IRawELRRepository;
import gov.cdc.dataingestion.report.repository.model.RawERLModel;
import gov.cdc.dataingestion.rawmessage.dto.RawElrDto;
import gov.cdc.dataingestion.rawmessage.service.RawElrService;
import gov.cdc.dataingestion.report.repository.IRawElrRepository;
import gov.cdc.dataingestion.report.repository.model.RawElrModel;
import gov.cdc.dataingestion.validation.repository.IValidatedELRRepository;
import gov.cdc.dataingestion.validation.repository.model.ValidatedELRModel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.*;

import static gov.cdc.dataingestion.share.helper.TimeStampHelper.getCurrentTimeStamp;

Expand All @@ -35,9 +35,10 @@
@SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"})
public class ElrDeadLetterService {
private final IElrDeadLetterRepository dltRepository;
private final IRawELRRepository rawELRRepository;
private final IRawElrRepository rawELRRepository;
private final IValidatedELRRepository validatedELRRepository;
private final KafkaProducerService kafkaProducerService;
private final RawElrService rawELRService;

@Value("${service.timezone}")
private String tz = "UTC";
Expand All @@ -63,13 +64,14 @@ public class ElrDeadLetterService {

public ElrDeadLetterService(
IElrDeadLetterRepository dltRepository,
IRawELRRepository rawELRRepository,
IRawElrRepository rawELRRepository,
IValidatedELRRepository validatedELRRepository,
KafkaProducerService kafkaProducerService) {
KafkaProducerService kafkaProducerService, RawElrService rawELRService) {
this.dltRepository = dltRepository;
this.rawELRRepository = rawELRRepository;
this.validatedELRRepository = validatedELRRepository;
this.kafkaProducerService = kafkaProducerService;
this.rawELRService = rawELRService;
}

public List<ElrDeadLetterDto> getAllErrorDltRecord() {
Expand Down Expand Up @@ -101,7 +103,7 @@ public ElrDeadLetterDto getDltRecordById(String id) throws DeadLetterTopicExcept



public ElrDeadLetterDto updateAndReprocessingMessage(String id, String body) throws DeadLetterTopicException {
public ElrDeadLetterDto updateAndReprocessingMessage(String id, String body) throws DeadLetterTopicException, KafkaProducerException {
var existingRecord = getDltRecordById(id);
if(!existingRecord.getDltStatus().equalsIgnoreCase(EnumElrDltStatus.ERROR.name())) {
throw new DeadLetterTopicException("Selected record is in REINJECTED state. Please either wait for the ERROR state to occur or select a different record.");
Expand All @@ -114,7 +116,7 @@ public ElrDeadLetterDto updateAndReprocessingMessage(String id, String body) thr
if (!rawRecord.isPresent()) {
throw new DeadLetterTopicException(DEAD_LETTER_NULL_EXCEPTION);
}
RawERLModel rawModel = rawRecord.get();
RawElrModel rawModel = rawRecord.get();
rawModel.setPayload(body);
rawModel.setUpdatedOn(getCurrentTimeStamp(tz));

Expand Down Expand Up @@ -207,4 +209,33 @@ private boolean isValidUUID(String uuidString) {
return false;
}
}

public void processFailedMessagesFromKafka() throws KafkaProducerException {
List<ElrDeadLetterModel> dltMessagesList = dltRepository.getAllErrorDltRecordForKafkaError();
if(!dltMessagesList.isEmpty()) {
Iterator<ElrDeadLetterModel> iterator = dltMessagesList.iterator();
while (iterator.hasNext()) {
ElrDeadLetterModel message = iterator.next();
RawElrDto rawElrDto = new RawElrDto();
rawElrDto.setId(message.getErrorMessageId());
rawElrDto.setType(getElrMessageType(message.getDltStatus()));
rawElrDto.setPayload(message.getMessage());
rawElrDto.setValidationActive(true);
dltRepository.updateErrorStatusForRawId(message.getErrorMessageId(), "PROCESSED");
rawELRService.updateRawMessageAfterRetry(rawElrDto, 2);
iterator.remove();
}
}
}

String getElrMessageType(String dltStatus) {
String delimiter = "KAFKA_ERROR";
int delimiterIndex = dltStatus.indexOf(delimiter);

String msgType = "";
if (delimiterIndex != -1) {
msgType = dltStatus.substring(delimiterIndex + 1 + delimiter.length());
}
return msgType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package gov.cdc.dataingestion.deadletter.service;

import gov.cdc.dataingestion.exception.KafkaProducerException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@Configuration
@EnableScheduling
public class SchedulerConfig {

@Value("${dlt.scheduler.enabled}")
private boolean isSchedulerEnabled;

private final ElrDeadLetterService elrDeadLetterService;

public SchedulerConfig(ElrDeadLetterService elrDeadLetterService) {
this.elrDeadLetterService = elrDeadLetterService;
}

@Scheduled(cron = "${dlt.scheduler.cron}")
public void scheduleTask() throws KafkaProducerException {
if (isSchedulerEnabled) {
elrDeadLetterService.processFailedMessagesFromKafka();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package gov.cdc.dataingestion.exception;

/**
1118 - require constructor complaint
125 - comment complaint
6126 - String block complaint
1135 - todos complaint
* */
@SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"})
public class KafkaProducerException extends Exception{
public KafkaProducerException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import gov.cdc.dataingestion.nbs.converters.Hl7ToRhapsodysXmlConverter;
import gov.cdc.dataingestion.nbs.repository.model.NbsInterfaceModel;
import gov.cdc.dataingestion.nbs.services.NbsRepositoryServiceProvider;
import gov.cdc.dataingestion.report.repository.IRawELRRepository;
import gov.cdc.dataingestion.report.repository.model.RawERLModel;
import gov.cdc.dataingestion.report.repository.IRawElrRepository;
import gov.cdc.dataingestion.report.repository.model.RawElrModel;
import gov.cdc.dataingestion.reportstatus.model.ReportStatusIdData;
import gov.cdc.dataingestion.reportstatus.repository.IReportStatusRepository;
import gov.cdc.dataingestion.validation.integration.validator.interfaces.IHL7DuplicateValidator;
Expand Down Expand Up @@ -86,7 +86,7 @@ public class KafkaConsumerService {
private String prepFhirTopic = "fhir_prep";
private final KafkaProducerService kafkaProducerService;
private final IHL7v2Validator iHl7v2Validator;
private final IRawELRRepository iRawELRRepository;
private final IRawElrRepository iRawELRRepository;
private final IValidatedELRRepository iValidatedELRRepository;
private final IHL7DuplicateValidator iHL7DuplicateValidator;
private final NbsRepositoryServiceProvider nbsRepositoryServiceProvider;
Expand All @@ -105,7 +105,7 @@ public class KafkaConsumerService {
//region CONSTRUCTOR
public KafkaConsumerService(
IValidatedELRRepository iValidatedELRRepository,
IRawELRRepository iRawELRRepository,
IRawElrRepository iRawELRRepository,
KafkaProducerService kafkaProducerService,
IHL7v2Validator iHl7v2Validator,
IHL7DuplicateValidator iHL7DuplicateValidator,
Expand Down Expand Up @@ -182,7 +182,7 @@ public void handleMessageForRawElr(String message,
}
try {
validationHandler(message, hl7ValidationActivated, dataProcessingEnable);
} catch (DuplicateHL7FileFoundException | DiHL7Exception e) {
} catch (DuplicateHL7FileFoundException | DiHL7Exception | KafkaProducerException e) {
throw new RuntimeException(e); //NOSONAR
}
});
Expand Down Expand Up @@ -250,12 +250,20 @@ public void handleMessageForElrXml(String message,
iReportStatusRepository.save(reportStatusIdData);

if (dataProcessingApplied) {
kafkaProducerService.sendMessageAfterConvertedToXml(
String.valueOf(nbsInterfaceModel.getNbsInterfaceUid()), "dp_elr_unprocessed", 0);
try {
kafkaProducerService.sendMessageAfterConvertedToXml(
String.valueOf(nbsInterfaceModel.getNbsInterfaceUid()), "dp_elr_unprocessed", 0);
} catch (KafkaProducerException e) {
throw new RuntimeException(e); //NOSONAR
}
}
else {
kafkaProducerService.sendMessageAfterConvertedToXml(
nbsInterfaceModel.getNbsInterfaceUid().toString(), convertedToXmlTopic, 0);
try {
kafkaProducerService.sendMessageAfterConvertedToXml(
nbsInterfaceModel.getNbsInterfaceUid().toString(), convertedToXmlTopic, 0);
} catch (KafkaProducerException e) {
throw new RuntimeException(e); //NOSONAR
}
}
});

Expand Down Expand Up @@ -318,7 +326,11 @@ public void handleMessageForXmlConversionElr(String message,
@Header(KafkaHeaderValue.DATA_PROCESSING_ENABLE) String dataProcessingEnable) {
timeMetricsBuilder.recordXmlPrepTime(() -> {
log.debug(topicDebugLog, message, topic);
xmlConversionHandler(message, operation, dataProcessingEnable);
try {
xmlConversionHandler(message, operation, dataProcessingEnable);
} catch (KafkaProducerException e) {
throw new RuntimeException(e); //NOSONAR
}
});
}

Expand Down Expand Up @@ -455,7 +467,7 @@ private String getDltErrorSource(String incomingTopic) {
return erroredSource;
}

private void preparationForConversionHandler(String message, String dataProcessingEnable) throws ConversionPrepareException {
private void preparationForConversionHandler(String message, String dataProcessingEnable) throws ConversionPrepareException, KafkaProducerException {
Optional<ValidatedELRModel> validatedElrResponse = this.iValidatedELRRepository.findById(message);
if(validatedElrResponse.isPresent()) {
kafkaProducerService.sendMessagePreparationTopic(validatedElrResponse.get(), prepXmlTopic, TopicPreparationType.XML, 0, dataProcessingEnable);
Expand All @@ -469,7 +481,7 @@ private void preparationForConversionHandler(String message, String dataProcessi
* make this public so we can add unit test for now.
* we need to implementation interface pattern for NBS convert and transformation classes. it better for unit testing
* */
public void xmlConversionHandlerProcessing(String message, String operation, String dataProcessingEnable) {
public void xmlConversionHandlerProcessing(String message, String operation, String dataProcessingEnable) throws KafkaProducerException {
String hl7Msg = "";
try {
Optional<ValidatedELRModel> validatedELRModel = iValidatedELRRepository.findById(message);
Expand Down Expand Up @@ -548,13 +560,13 @@ public void xmlConversionHandlerProcessing(String message, String operation, Str
);
}
}
private void xmlConversionHandler(String message, String operation, String dataProcessingEnable) {
private void xmlConversionHandler(String message, String operation, String dataProcessingEnable) throws KafkaProducerException {
log.debug("Received message id will be retrieved from db and associated hl7 will be converted to xml");
xmlConversionHandlerProcessing(message, operation, dataProcessingEnable);
}
private void validationHandler(String message, boolean hl7ValidationActivated, String dataProcessingEnable) throws DuplicateHL7FileFoundException, DiHL7Exception {
Optional<RawERLModel> rawElrResponse = this.iRawELRRepository.findById(message);
RawERLModel elrModel;
private void validationHandler(String message, boolean hl7ValidationActivated, String dataProcessingEnable) throws DuplicateHL7FileFoundException, DiHL7Exception, KafkaProducerException {
Optional<RawElrModel> rawElrResponse = this.iRawELRRepository.findById(message);
RawElrModel elrModel;
if (!rawElrResponse.isEmpty()) {
elrModel = rawElrResponse.get();
} else {
Expand Down
Loading
Loading