From 4da229f96713709404f4e2414d13523c5d05abb9 Mon Sep 17 00:00:00 2001 From: Ruslan Lavrov Date: Mon, 2 Mar 2026 20:12:36 +0200 Subject: [PATCH 1/3] Added check to allow DI_SRS_MARC_BIB_RECORD_MODIFIED_READY_FOR_POST_PROCESSING event processing even for cancelled job --- .../dataimport/consumers/DataImportKafkaHandler.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/folio/inventory/dataimport/consumers/DataImportKafkaHandler.java b/src/main/java/org/folio/inventory/dataimport/consumers/DataImportKafkaHandler.java index a2a190b45..ac952d188 100644 --- a/src/main/java/org/folio/inventory/dataimport/consumers/DataImportKafkaHandler.java +++ b/src/main/java/org/folio/inventory/dataimport/consumers/DataImportKafkaHandler.java @@ -87,6 +87,7 @@ import static java.util.Objects.isNull; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.folio.DataImportEventTypes.DI_ERROR; +import static org.folio.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_MODIFIED_READY_FOR_POST_PROCESSING; import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.OKAPI_REQUEST_ID; import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.OKAPI_USER_ID; import static org.folio.okapi.common.XOkapiHeaders.PERMISSIONS; @@ -98,6 +99,9 @@ public class DataImportKafkaHandler implements AsyncRecordHandler CANCELLED_JOB_ALLOWED_EVENTS = List.of( + DI_SRS_MARC_BIB_RECORD_MODIFIED_READY_FOR_POST_PROCESSING.value() + ); private final Vertx vertx; private final ProfileSnapshotCache profileSnapshotCache; @@ -169,7 +173,7 @@ public Future handle(KafkaConsumerRecord kafkaRecord) { String jobExecutionId = eventPayload.getJobExecutionId(); LOGGER.info("Data import event payload has been received with event type: {}, recordId: {} by jobExecution: {} and chunkId: {}", eventPayload.getEventType(), recordId, jobExecutionId, chunkId); - if (cancelledJobsIdCache.contains(eventPayload.getJobExecutionId())) { + if (shouldSkipEventProcessing(eventPayload)) { LOGGER.info("Skip processing of event, topic: '{}', tenantId: '{}', jobExecutionId: '{}' recordId: '{}' because the job has been cancelled", kafkaRecord.topic(), eventPayload.getTenant(), eventPayload.getJobExecutionId(), recordId); return Future.succeededFuture(kafkaRecord.key()); @@ -260,6 +264,11 @@ private void registerDataImportProcessingHandlers(Storage storage, HttpClient cl EventManager.registerEventHandler(new MarcBibModifyEventHandler(mappingMetadataCache, new InstanceUpdateDelegate(storage), precedingSucceedingTitlesHelper, client)); } + private boolean shouldSkipEventProcessing(DataImportEventPayload eventPayload) { + return cancelledJobsIdCache.contains(eventPayload.getJobExecutionId()) + && !CANCELLED_JOB_ALLOWED_EVENTS.contains(eventPayload.getEventType()); + } + private void populateWithPermissionsHeader(DataImportEventPayload eventPayload, Map headersMap) { String permissions = headersMap.getOrDefault(PERMISSIONS, headersMap.get(PERMISSIONS.toLowerCase())); if (isNotBlank(permissions)) { From 0a608b84a9e9198fe38e4f0394e0c5949d6c9fbd Mon Sep 17 00:00:00 2001 From: Ruslan Lavrov Date: Mon, 2 Mar 2026 21:52:35 +0200 Subject: [PATCH 2/3] Added test --- .../handlers/DataImportKafkaHandlerTest.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/src/test/java/org/folio/inventory/dataimport/handlers/DataImportKafkaHandlerTest.java b/src/test/java/org/folio/inventory/dataimport/handlers/DataImportKafkaHandlerTest.java index 498f505e5..bc61eb938 100644 --- a/src/test/java/org/folio/inventory/dataimport/handlers/DataImportKafkaHandlerTest.java +++ b/src/test/java/org/folio/inventory/dataimport/handlers/DataImportKafkaHandlerTest.java @@ -48,6 +48,8 @@ import static com.github.tomakehurst.wiremock.client.WireMock.get; import static org.folio.ActionProfile.Action.CREATE; import static org.folio.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED; +import static org.folio.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_MATCHED; +import static org.folio.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_MODIFIED_READY_FOR_POST_PROCESSING; import static org.folio.inventory.dataimport.consumers.DataImportKafkaHandler.PROFILE_SNAPSHOT_ID_KEY; import static org.folio.okapi.common.XOkapiHeaders.PERMISSIONS; import static org.folio.rest.jaxrs.model.EntityType.INSTANCE; @@ -216,6 +218,7 @@ public void shouldReturnSucceededFutureAndSkipEventProcessingIfEventPayloadConta DataImportEventPayload dataImportEventPayload = new DataImportEventPayload() .withJobExecutionId(cancelledJobId) + .withEventType(DI_INCOMING_MARC_BIB_RECORD_PARSED.value()) .withTenant(TENANT_ID) .withOkapiUrl(mockServer.baseUrl()) .withContext(new HashMap<>(Map.of(PROFILE_SNAPSHOT_ID_KEY, profileSnapshotWrapper.getId()))); @@ -246,4 +249,47 @@ public void shouldReturnSucceededFutureAndSkipEventProcessingIfEventPayloadConta })); } + @Test + public void shouldProcessEventIfEventPayloadContainsCancelledJobExecutionIdButEventTypeIsDiSrsMarcBibRecordModifiedReadyForPostProcessing(TestContext context) { + // given + String expectedKafkaRecordKey = "test_key"; + String cancelledJobId = UUID.randomUUID().toString(); + cancelledJobsIdCache.put(cancelledJobId); + + DataImportEventPayload dataImportEventPayload = new DataImportEventPayload() + .withEventType(DI_SRS_MARC_BIB_RECORD_MODIFIED_READY_FOR_POST_PROCESSING.value()) + .withJobExecutionId(cancelledJobId) + .withTenant(TENANT_ID) + .withOkapiUrl(mockServer.baseUrl()) + .withContext(new HashMap<>(Map.of(PROFILE_SNAPSHOT_ID_KEY, profileSnapshotWrapper.getId()))) + .withEventsChain(List.of(DI_SRS_MARC_BIB_RECORD_MATCHED.value())); + context.assertEquals( + DI_SRS_MARC_BIB_RECORD_MODIFIED_READY_FOR_POST_PROCESSING.value(), dataImportEventPayload.getEventType()); + + Event event = new Event().withId("01").withEventPayload(Json.encode(dataImportEventPayload)); + List headers = List.of( + KafkaHeader.header(RECORD_ID_HEADER, UUID.randomUUID().toString()), + KafkaHeader.header(CHUNK_ID_HEADER, UUID.randomUUID().toString()) + ); + when(kafkaRecord.key()).thenReturn(expectedKafkaRecordKey); + when(kafkaRecord.value()).thenReturn(Json.encode(event)); + when(kafkaRecord.headers()).thenReturn(headers); + + EventHandler mockedEventHandler = mock(EventHandler.class); + when(mockedEventHandler.isEligible(any(DataImportEventPayload.class))).thenReturn(true); + when(mockedEventHandler.handle(any(DataImportEventPayload.class))) + .thenReturn(CompletableFuture.completedFuture(dataImportEventPayload)); + EventManager.registerEventHandler(mockedEventHandler); + + // when + Future future = dataImportKafkaHandler.handle(kafkaRecord); + + // then + future.onComplete(context.asyncAssertSuccess(actualKafkaRecordKey -> { + context.assertEquals(expectedKafkaRecordKey, actualKafkaRecordKey); + verify(mockedEventHandler).isEligible(any(DataImportEventPayload.class)); + verify(mockedEventHandler).handle(any(DataImportEventPayload.class)); + })); + } + } From dbdbd24a2203995d2bc031a2ad2dbefb3e9ab94b Mon Sep 17 00:00:00 2001 From: Ruslan Lavrov Date: Tue, 3 Mar 2026 23:00:01 +0200 Subject: [PATCH 3/3] news --- NEWS.md | 1 + .../dataimport/consumers/DataImportKafkaHandler.java | 12 +++++++----- .../handlers/DataImportKafkaHandlerTest.java | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/NEWS.md b/NEWS.md index 1398844e6..2004f2d91 100644 --- a/NEWS.md +++ b/NEWS.md @@ -22,6 +22,7 @@ * Make authority-storage, request-storage,instance-authority-links dependencies optional [MODINV-1346](https://folio-org.atlassian.net/browse/MODINV-1346) * Remove LinkUpdateReport sending on successful processing [MODINV-1348](https://folio-org.atlassian.net/browse/MODINV-1348) * Move consumption of authority DI events from mod-inventory to mod-entities-links [APPAUTHREC-3](https://folio-org.atlassian.net/browse/APPAUTHREC-3) +* Cancelled data import jobs updates SRS records without updating Instance records [MODINV-1353](https://folio-org.atlassian.net/browse/MODINV-1353) ## 21.1.0 2025-03-13 * Update deduplication logic in mod-inventory [MODINV-1151](https://folio-org.atlassian.net/browse/MODINV-1151) diff --git a/src/main/java/org/folio/inventory/dataimport/consumers/DataImportKafkaHandler.java b/src/main/java/org/folio/inventory/dataimport/consumers/DataImportKafkaHandler.java index ac952d188..41e01d724 100644 --- a/src/main/java/org/folio/inventory/dataimport/consumers/DataImportKafkaHandler.java +++ b/src/main/java/org/folio/inventory/dataimport/consumers/DataImportKafkaHandler.java @@ -81,6 +81,7 @@ import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import static java.lang.String.format; @@ -99,7 +100,7 @@ public class DataImportKafkaHandler implements AsyncRecordHandler CANCELLED_JOB_ALLOWED_EVENTS = List.of( + private static final Set CANCELLED_JOB_ALLOWED_EVENTS = Set.of( DI_SRS_MARC_BIB_RECORD_MODIFIED_READY_FOR_POST_PROCESSING.value() ); @@ -225,6 +226,7 @@ private void registerDataImportProcessingHandlers(Storage storage, HttpClient cl HoldingsPreloader holdingsPreloader = new HoldingsPreloader(ordersPreloaderHelper); ItemPreloader itemPreloader = new ItemPreloader(ordersPreloaderHelper); SnapshotService snapshotService = new SnapshotService(client); + PostgresClientFactory postgresClientFactory = new PostgresClientFactory(vertx); MatchValueLoaderFactory.register(new InstanceLoader(storage, instancePreloader)); MatchValueLoaderFactory.register(new ItemLoader(storage, itemPreloader)); @@ -252,10 +254,10 @@ private void registerDataImportProcessingHandlers(Storage storage, HttpClient cl ))); EventManager.registerEventHandler(new MatchAuthorityEventHandler(mappingMetadataCache, consortiumService)); - EventManager.registerEventHandler(new CreateItemEventHandler(storage, mappingMetadataCache, new ItemIdStorageService(new EntityIdStorageDaoImpl(new PostgresClientFactory(vertx))), orderHelperService)); - EventManager.registerEventHandler(new CreateHoldingEventHandler(storage, mappingMetadataCache, new HoldingsIdStorageService(new EntityIdStorageDaoImpl(new PostgresClientFactory(vertx))), orderHelperService, consortiumService)); - EventManager.registerEventHandler(new CreateInstanceEventHandler(storage, precedingSucceedingTitlesHelper, mappingMetadataCache, new InstanceIdStorageService(new EntityIdStorageDaoImpl(new PostgresClientFactory(vertx))), orderHelperService, snapshotService, client)); - EventManager.registerEventHandler(new CreateMarcHoldingsEventHandler(storage, mappingMetadataCache, new HoldingsIdStorageService(new EntityIdStorageDaoImpl(new PostgresClientFactory(vertx))), new HoldingsCollectionService(), consortiumService)); + EventManager.registerEventHandler(new CreateItemEventHandler(storage, mappingMetadataCache, new ItemIdStorageService(new EntityIdStorageDaoImpl(postgresClientFactory)), orderHelperService)); + EventManager.registerEventHandler(new CreateHoldingEventHandler(storage, mappingMetadataCache, new HoldingsIdStorageService(new EntityIdStorageDaoImpl(postgresClientFactory)), orderHelperService, consortiumService)); + EventManager.registerEventHandler(new CreateInstanceEventHandler(storage, precedingSucceedingTitlesHelper, mappingMetadataCache, new InstanceIdStorageService(new EntityIdStorageDaoImpl(postgresClientFactory)), orderHelperService, snapshotService, client)); + EventManager.registerEventHandler(new CreateMarcHoldingsEventHandler(storage, mappingMetadataCache, new HoldingsIdStorageService(new EntityIdStorageDaoImpl(postgresClientFactory)), new HoldingsCollectionService(), consortiumService)); EventManager.registerEventHandler(new UpdateMarcHoldingsEventHandler(storage, mappingMetadataCache, new KafkaEventPublisher(kafkaConfig, vertx, 100))); EventManager.registerEventHandler(new UpdateItemEventHandler(storage, mappingMetadataCache)); EventManager.registerEventHandler(new UpdateHoldingEventHandler(storage, mappingMetadataCache)); diff --git a/src/test/java/org/folio/inventory/dataimport/handlers/DataImportKafkaHandlerTest.java b/src/test/java/org/folio/inventory/dataimport/handlers/DataImportKafkaHandlerTest.java index bc61eb938..1c0410d08 100644 --- a/src/test/java/org/folio/inventory/dataimport/handlers/DataImportKafkaHandlerTest.java +++ b/src/test/java/org/folio/inventory/dataimport/handlers/DataImportKafkaHandlerTest.java @@ -187,7 +187,7 @@ public void shouldReturnFailedFutureWhenProcessingCoreHandlerFailed(TestContext DataImportEventPayload dataImportEventPayload = new DataImportEventPayload() .withJobExecutionId(UUID.randomUUID().toString()) .withEventType(DI_INCOMING_MARC_BIB_RECORD_PARSED.value()) - .withTenant("diku") + .withTenant(TENANT_ID) .withOkapiUrl(mockServer.baseUrl()) .withToken("test-token") .withContext(new HashMap<>(Map.of("JOB_PROFILE_SNAPSHOT_ID", profileSnapshotWrapper.getId())));