diff --git a/src/main/java/org/folio/search/service/reindex/ReindexStatusService.java b/src/main/java/org/folio/search/service/reindex/ReindexStatusService.java index e23cb2754..db5dc40f3 100644 --- a/src/main/java/org/folio/search/service/reindex/ReindexStatusService.java +++ b/src/main/java/org/folio/search/service/reindex/ReindexStatusService.java @@ -108,6 +108,17 @@ public boolean isMergeCompleted() { return statusRepository.isMergeCompleted(); } + /** + * Checks if any reindex operation is currently in progress (merge or upload). + * + * @return true if any entity type has a status of MERGE_IN_PROGRESS or UPLOAD_IN_PROGRESS + */ + public boolean isReindexInProgress() { + return statusRepository.getReindexStatuses().stream() + .anyMatch(status -> status.getStatus() == ReindexStatus.MERGE_IN_PROGRESS + || status.getStatus() == ReindexStatus.UPLOAD_IN_PROGRESS); + } + private List constructNewStatusRecords(List entityTypes, ReindexStatus status) { return entityTypes.stream() diff --git a/src/main/java/org/folio/search/service/scheduled/ScheduledInstanceSubResourcesService.java b/src/main/java/org/folio/search/service/scheduled/ScheduledInstanceSubResourcesService.java index abd09c63a..f2f1b4e98 100644 --- a/src/main/java/org/folio/search/service/scheduled/ScheduledInstanceSubResourcesService.java +++ b/src/main/java/org/folio/search/service/scheduled/ScheduledInstanceSubResourcesService.java @@ -19,6 +19,7 @@ import org.folio.search.service.InstanceChildrenResourceService; import org.folio.search.service.ResourceService; import org.folio.search.service.reindex.ReindexConstants; +import org.folio.search.service.reindex.ReindexStatusService; import org.folio.search.service.reindex.jdbc.InstanceChildResourceRepository; import org.folio.search.service.reindex.jdbc.ItemRepository; import org.folio.search.service.reindex.jdbc.MergeInstanceRepository; @@ -42,6 +43,7 @@ public class ScheduledInstanceSubResourcesService { private final TenantRepository tenantRepository; private final Map repositories; private final SubResourcesLockRepository subResourcesLockRepository; + private final ReindexStatusService reindexStatusService; private final SystemUserScopedExecutionService executionService; private final int subResourceBatchSize; private final long staleLockThresholdMs; @@ -51,6 +53,7 @@ public ScheduledInstanceSubResourcesService(ResourceService resourceService, TenantRepository tenantRepository, List repositories, SubResourcesLockRepository subResourcesLockRepository, + ReindexStatusService reindexStatusService, SystemUserScopedExecutionService executionService, MergeInstanceRepository instanceRepository, ItemRepository itemRepository, @@ -59,6 +62,7 @@ public ScheduledInstanceSubResourcesService(ResourceService resourceService, this.tenantRepository = tenantRepository; this.repositories = buildRepositoriesMap(repositories, instanceRepository, itemRepository); this.subResourcesLockRepository = subResourcesLockRepository; + this.reindexStatusService = reindexStatusService; this.executionService = executionService; this.subResourceBatchSize = searchConfigurationProperties.getIndexing().getSubResourceBatchSize(); this.staleLockThresholdMs = searchConfigurationProperties.getIndexing().getStaleLockThresholdMs(); @@ -115,6 +119,12 @@ private void processEntityTypeWithLock(ReindexEntityType entityType, String tena } private void handleLockAcquisitionFailure(ReindexEntityType entityType, String tenant) { + if (isReindexInProgress()) { + log.info("persistChildren::Skipping stale lock check for entity type {} in tenant {} - reindex is in progress", + entityType, tenant); + return; + } + if (subResourcesLockRepository.checkAndReleaseStaleLock(entityType, tenant, staleLockThresholdMs)) { log.warn("persistChildren::Released stale lock for entity type {} in tenant {}. " + "Lock was older than threshold of {} ms", @@ -122,6 +132,15 @@ private void handleLockAcquisitionFailure(ReindexEntityType entityType, String t } } + private boolean isReindexInProgress() { + try { + return reindexStatusService.isReindexInProgress(); + } catch (Exception e) { + log.warn("persistChildren::Failed to check reindex status, assuming no reindex in progress", e); + return false; + } + } + private void processSubResources(ReindexEntityType entityType, String tenant, Timestamp timestamp) { SubResourceResult result = null; String lastId = null; diff --git a/src/test/java/org/folio/search/service/reindex/ReindexStatusServiceTest.java b/src/test/java/org/folio/search/service/reindex/ReindexStatusServiceTest.java index b87ddadb8..32655e00b 100644 --- a/src/test/java/org/folio/search/service/reindex/ReindexStatusServiceTest.java +++ b/src/test/java/org/folio/search/service/reindex/ReindexStatusServiceTest.java @@ -179,4 +179,46 @@ void updateReindexMergeInProgress() { // assert verify(statusRepository).setMergeInProgress(entityTypes); } + + @Test + void isReindexInProgress_trueWhenMerge() { + // given + when(statusRepository.getReindexStatuses()).thenReturn(List.of( + new ReindexStatusEntity(ReindexEntityType.INSTANCE, ReindexStatus.MERGE_IN_PROGRESS), + new ReindexStatusEntity(ReindexEntityType.HOLDINGS, ReindexStatus.MERGE_COMPLETED))); + + // act + var actual = service.isReindexInProgress(); + + // assert + assertThat(actual).isTrue(); + } + + @Test + void isReindexInProgress_trueWhenUpload() { + // given + when(statusRepository.getReindexStatuses()).thenReturn(List.of( + new ReindexStatusEntity(ReindexEntityType.INSTANCE, ReindexStatus.UPLOAD_IN_PROGRESS), + new ReindexStatusEntity(ReindexEntityType.HOLDINGS, ReindexStatus.MERGE_COMPLETED))); + + // act + var actual = service.isReindexInProgress(); + + // assert + assertThat(actual).isTrue(); + } + + @Test + void isReindexInProgress_false() { + // given + when(statusRepository.getReindexStatuses()).thenReturn(List.of( + new ReindexStatusEntity(ReindexEntityType.INSTANCE, ReindexStatus.UPLOAD_COMPLETED), + new ReindexStatusEntity(ReindexEntityType.HOLDINGS, ReindexStatus.MERGE_COMPLETED))); + + // act + var actual = service.isReindexInProgress(); + + // assert + assertThat(actual).isFalse(); + } } diff --git a/src/test/java/org/folio/search/service/scheduled/ScheduledInstanceSubResourcesServiceTest.java b/src/test/java/org/folio/search/service/scheduled/ScheduledInstanceSubResourcesServiceTest.java index 296bf6551..a36c90fcb 100644 --- a/src/test/java/org/folio/search/service/scheduled/ScheduledInstanceSubResourcesServiceTest.java +++ b/src/test/java/org/folio/search/service/scheduled/ScheduledInstanceSubResourcesServiceTest.java @@ -23,6 +23,7 @@ import org.folio.search.model.types.ReindexEntityType; import org.folio.search.service.InstanceChildrenResourceService; import org.folio.search.service.ResourceService; +import org.folio.search.service.reindex.ReindexStatusService; import org.folio.search.service.reindex.jdbc.ItemRepository; import org.folio.search.service.reindex.jdbc.MergeInstanceRepository; import org.folio.search.service.reindex.jdbc.SubResourceResult; @@ -45,6 +46,7 @@ class ScheduledInstanceSubResourcesServiceTest { private @Mock ResourceService resourceService; private @Mock TenantRepository tenantRepository; private @Mock SubResourcesLockRepository subResourcesLockRepository; + private @Mock ReindexStatusService reindexStatusService; private @Mock SystemUserScopedExecutionService executionService; private @Mock InstanceChildrenResourceService instanceChildrenResourceService; private @Mock SubjectRepository subjectRepository; @@ -65,6 +67,7 @@ void setUp() { tenantRepository, List.of(subjectRepository), subResourcesLockRepository, + reindexStatusService, executionService, instanceRepository, itemRepository, @@ -179,6 +182,7 @@ void persistChildren_ShouldReleaseStaleLockWhenLockAcquisitionFails() { .when(executionService).executeSystemUserScoped(anyString(), any()); when(tenantRepository.fetchDataTenantIds()).thenReturn(List.of(TENANT_ID)); when(subResourcesLockRepository.lockSubResource(any(), eq(TENANT_ID))).thenReturn(Optional.empty()); + when(reindexStatusService.isReindexInProgress()).thenReturn(false); when(subResourcesLockRepository.checkAndReleaseStaleLock(any(), eq(TENANT_ID), anyLong())).thenReturn(true); // Act @@ -186,6 +190,7 @@ void persistChildren_ShouldReleaseStaleLockWhenLockAcquisitionFails() { // Assert verify(subResourcesLockRepository, times(3)).lockSubResource(any(), eq(TENANT_ID)); + verify(reindexStatusService, times(3)).isReindexInProgress(); verify(subResourcesLockRepository, times(3)).checkAndReleaseStaleLock(any(), eq(TENANT_ID), anyLong()); verify(subResourcesLockRepository, never()).unlockSubResource(any(), any(), any()); verify(subjectRepository, never()).fetchByTimestamp(anyString(), any(), anyInt()); @@ -199,6 +204,7 @@ void persistChildren_ShouldNotProcessWhenLockFailsAndNoStaleLock() { .when(executionService).executeSystemUserScoped(anyString(), any()); when(tenantRepository.fetchDataTenantIds()).thenReturn(List.of(TENANT_ID)); when(subResourcesLockRepository.lockSubResource(any(), eq(TENANT_ID))).thenReturn(Optional.empty()); + when(reindexStatusService.isReindexInProgress()).thenReturn(false); when(subResourcesLockRepository.checkAndReleaseStaleLock(any(), eq(TENANT_ID), anyLong())).thenReturn(false); // Act @@ -206,12 +212,34 @@ void persistChildren_ShouldNotProcessWhenLockFailsAndNoStaleLock() { // Assert verify(subResourcesLockRepository, times(3)).lockSubResource(any(), eq(TENANT_ID)); + verify(reindexStatusService, times(3)).isReindexInProgress(); verify(subResourcesLockRepository, times(3)).checkAndReleaseStaleLock(any(), eq(TENANT_ID), anyLong()); verify(subResourcesLockRepository, never()).unlockSubResource(any(), any(), any()); verify(subjectRepository, never()).fetchByTimestamp(anyString(), any(), anyInt()); verifyNoInteractions(instanceRepository, itemRepository, resourceService); } + @Test + void persistChildren_ShouldSkipStaleLockCheckWhenReindexInProgress() { + // Arrange + doAnswer(invocation -> invocation.>getArgument(1).call()) + .when(executionService).executeSystemUserScoped(anyString(), any()); + when(tenantRepository.fetchDataTenantIds()).thenReturn(List.of(TENANT_ID)); + when(subResourcesLockRepository.lockSubResource(any(), eq(TENANT_ID))).thenReturn(Optional.empty()); + when(reindexStatusService.isReindexInProgress()).thenReturn(true); + + // Act + service.persistChildren(); + + // Assert + verify(subResourcesLockRepository, times(3)).lockSubResource(any(), eq(TENANT_ID)); + verify(reindexStatusService, times(3)).isReindexInProgress(); + verify(subResourcesLockRepository, never()).checkAndReleaseStaleLock(any(), any(), anyLong()); + verify(subResourcesLockRepository, never()).unlockSubResource(any(), any(), any()); + verify(subjectRepository, never()).fetchByTimestamp(anyString(), any(), anyInt()); + verifyNoInteractions(instanceRepository, itemRepository, resourceService); + } + private void mockSubResourceResult(String tenantId, Timestamp timestamp) { when(subjectRepository.fetchByTimestamp(tenantId, timestamp, 3)) .thenReturn(new SubResourceResult(List.of(Map.of("id", "1", "tenantId", tenantId)), null));