Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,18 @@ public boolean isMergeCompleted() {
return statusRepository.isMergeCompleted();
}

/**
* Checks if any reindex operation is currently in progress (merge, upload or staging).
*
* @return true if any entity type has a status of MERGE_IN_PROGRESS, UPLOAD_IN_PROGRESS or STAGING_IN_PROGRESS
*/
public boolean isReindexInProgress() {
return statusRepository.getReindexStatuses().stream()
.anyMatch(status -> status.getStatus() == ReindexStatus.MERGE_IN_PROGRESS
|| status.getStatus() == ReindexStatus.UPLOAD_IN_PROGRESS
|| status.getStatus() == ReindexStatus.STAGING_IN_PROGRESS);
}

private List<ReindexStatusEntity> constructNewStatusRecords(List<ReindexEntityType> entityTypes,
String targetTenantId) {
return entityTypes.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.folio.search.service.InstanceChildrenResourceService;
import org.folio.search.service.ResourceService;
import org.folio.search.service.reindex.ReindexConstants;
import org.folio.search.service.reindex.ReindexStatusService;
import org.folio.search.service.reindex.jdbc.InstanceChildResourceRepository;
import org.folio.search.service.reindex.jdbc.ItemRepository;
import org.folio.search.service.reindex.jdbc.MergeInstanceRepository;
Expand All @@ -42,6 +43,7 @@ public class ScheduledInstanceSubResourcesService {
private final TenantRepository tenantRepository;
private final Map<ReindexEntityType, ReindexJdbcRepository> repositories;
private final SubResourcesLockRepository subResourcesLockRepository;
private final ReindexStatusService reindexStatusService;
private final SystemUserScopedExecutionService executionService;
private final int subResourceBatchSize;
private final long staleLockThresholdMs;
Expand All @@ -51,6 +53,7 @@ public ScheduledInstanceSubResourcesService(ResourceService resourceService,
TenantRepository tenantRepository,
List<ReindexJdbcRepository> repositories,
SubResourcesLockRepository subResourcesLockRepository,
ReindexStatusService reindexStatusService,
SystemUserScopedExecutionService executionService,
MergeInstanceRepository instanceRepository,
ItemRepository itemRepository,
Expand All @@ -59,6 +62,7 @@ public ScheduledInstanceSubResourcesService(ResourceService resourceService,
this.tenantRepository = tenantRepository;
this.repositories = buildRepositoriesMap(repositories, instanceRepository, itemRepository);
this.subResourcesLockRepository = subResourcesLockRepository;
this.reindexStatusService = reindexStatusService;
this.executionService = executionService;
this.subResourceBatchSize = searchConfigurationProperties.getIndexing().getSubResourceBatchSize();
this.staleLockThresholdMs = searchConfigurationProperties.getIndexing().getStaleLockThresholdMs();
Expand Down Expand Up @@ -115,13 +119,28 @@ private void processEntityTypeWithLock(ReindexEntityType entityType, String tena
}

private void handleLockAcquisitionFailure(ReindexEntityType entityType, String tenant) {
if (isReindexInProgress()) {
log.info("persistChildren::Skipping stale lock check for entity type {} in tenant {} - reindex is in progress",
entityType, tenant);
return;
}

if (subResourcesLockRepository.checkAndReleaseStaleLock(entityType, tenant, staleLockThresholdMs)) {
log.warn("persistChildren::Released stale lock for entity type {} in tenant {}. "
+ "Lock was older than threshold of {} ms",
entityType, tenant, staleLockThresholdMs);
}
}

private boolean isReindexInProgress() {
try {
return reindexStatusService.isReindexInProgress();
} catch (Exception e) {
log.warn("persistChildren::Failed to check reindex status, assuming no reindex in progress", e);
return false;
}
}

private void processSubResources(ReindexEntityType entityType, String tenant, Timestamp timestamp) {
SubResourceResult result = null;
String lastId = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
Expand Down Expand Up @@ -359,4 +361,33 @@ void recreateMergeStatusRecords_shouldClearCache() {
assertThat(result).isEqualTo("new_tenant");
verify(statusRepository, times(2)).getTargetTenantId();
}

@ParameterizedTest
@EnumSource(value = ReindexStatus.class, names = {"MERGE_IN_PROGRESS", "UPLOAD_IN_PROGRESS", "STAGING_IN_PROGRESS"})
void isReindexInProgress_true(ReindexStatus inProgressStatus) {
// given
when(statusRepository.getReindexStatuses()).thenReturn(List.of(
new ReindexStatusEntity(ReindexEntityType.INSTANCE, inProgressStatus),
new ReindexStatusEntity(ReindexEntityType.HOLDINGS, ReindexStatus.MERGE_COMPLETED)));

// act
var actual = service.isReindexInProgress();

// assert
assertThat(actual).isTrue();
}

@Test
void isReindexInProgress_false() {
// given
when(statusRepository.getReindexStatuses()).thenReturn(List.of(
new ReindexStatusEntity(ReindexEntityType.INSTANCE, ReindexStatus.UPLOAD_COMPLETED),
new ReindexStatusEntity(ReindexEntityType.HOLDINGS, ReindexStatus.MERGE_COMPLETED)));

// act
var actual = service.isReindexInProgress();

// assert
assertThat(actual).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.folio.search.model.types.ReindexEntityType;
import org.folio.search.service.InstanceChildrenResourceService;
import org.folio.search.service.ResourceService;
import org.folio.search.service.reindex.ReindexStatusService;
import org.folio.search.service.reindex.jdbc.ItemRepository;
import org.folio.search.service.reindex.jdbc.MergeInstanceRepository;
import org.folio.search.service.reindex.jdbc.SubResourceResult;
Expand All @@ -45,6 +46,7 @@ class ScheduledInstanceSubResourcesServiceTest {
private @Mock ResourceService resourceService;
private @Mock TenantRepository tenantRepository;
private @Mock SubResourcesLockRepository subResourcesLockRepository;
private @Mock ReindexStatusService reindexStatusService;
private @Mock SystemUserScopedExecutionService executionService;
private @Mock InstanceChildrenResourceService instanceChildrenResourceService;
private @Mock SubjectRepository subjectRepository;
Expand All @@ -65,6 +67,7 @@ void setUp() {
tenantRepository,
List.of(subjectRepository),
subResourcesLockRepository,
reindexStatusService,
executionService,
instanceRepository,
itemRepository,
Expand Down Expand Up @@ -179,13 +182,15 @@ void persistChildren_ShouldReleaseStaleLockWhenLockAcquisitionFails() {
.when(executionService).executeSystemUserScoped(anyString(), any());
when(tenantRepository.fetchDataTenantIds()).thenReturn(List.of(TENANT_ID));
when(subResourcesLockRepository.lockSubResource(any(), eq(TENANT_ID))).thenReturn(Optional.empty());
when(reindexStatusService.isReindexInProgress()).thenReturn(false);
when(subResourcesLockRepository.checkAndReleaseStaleLock(any(), eq(TENANT_ID), anyLong())).thenReturn(true);

// Act
service.persistChildren();

// Assert
verify(subResourcesLockRepository, times(3)).lockSubResource(any(), eq(TENANT_ID));
verify(reindexStatusService, times(3)).isReindexInProgress();
verify(subResourcesLockRepository, times(3)).checkAndReleaseStaleLock(any(), eq(TENANT_ID), anyLong());
verify(subResourcesLockRepository, never()).unlockSubResource(any(), any(), any());
verify(subjectRepository, never()).fetchByTimestamp(anyString(), any(), anyInt());
Expand All @@ -199,19 +204,42 @@ void persistChildren_ShouldNotProcessWhenLockFailsAndNoStaleLock() {
.when(executionService).executeSystemUserScoped(anyString(), any());
when(tenantRepository.fetchDataTenantIds()).thenReturn(List.of(TENANT_ID));
when(subResourcesLockRepository.lockSubResource(any(), eq(TENANT_ID))).thenReturn(Optional.empty());
when(reindexStatusService.isReindexInProgress()).thenReturn(false);
when(subResourcesLockRepository.checkAndReleaseStaleLock(any(), eq(TENANT_ID), anyLong())).thenReturn(false);

// Act
service.persistChildren();

// Assert
verify(subResourcesLockRepository, times(3)).lockSubResource(any(), eq(TENANT_ID));
verify(reindexStatusService, times(3)).isReindexInProgress();
verify(subResourcesLockRepository, times(3)).checkAndReleaseStaleLock(any(), eq(TENANT_ID), anyLong());
verify(subResourcesLockRepository, never()).unlockSubResource(any(), any(), any());
verify(subjectRepository, never()).fetchByTimestamp(anyString(), any(), anyInt());
verifyNoInteractions(instanceRepository, itemRepository, resourceService);
}

@Test
void persistChildren_ShouldSkipStaleLockCheckWhenReindexInProgress() {
// Arrange
doAnswer(invocation -> invocation.<Callable<?>>getArgument(1).call())
.when(executionService).executeSystemUserScoped(anyString(), any());
when(tenantRepository.fetchDataTenantIds()).thenReturn(List.of(TENANT_ID));
when(subResourcesLockRepository.lockSubResource(any(), eq(TENANT_ID))).thenReturn(Optional.empty());
when(reindexStatusService.isReindexInProgress()).thenReturn(true);

// Act
service.persistChildren();

// Assert
verify(subResourcesLockRepository, times(3)).lockSubResource(any(), eq(TENANT_ID));
verify(reindexStatusService, times(3)).isReindexInProgress();
verify(subResourcesLockRepository, never()).checkAndReleaseStaleLock(any(), any(), anyLong());
verify(subResourcesLockRepository, never()).unlockSubResource(any(), any(), any());
verify(subjectRepository, never()).fetchByTimestamp(anyString(), any(), anyInt());
verifyNoInteractions(instanceRepository, itemRepository, resourceService);
}

private void mockSubResourceResult(String tenantId, Timestamp timestamp) {
when(subjectRepository.fetchByTimestamp(tenantId, timestamp, 3))
.thenReturn(new SubResourceResult(List.of(Map.of("id", "1", "tenantId", tenantId)), null));
Expand Down
Loading