From 943c3996f83525080088d52c08be68fce2b082ec Mon Sep 17 00:00:00 2001 From: santosh kotha Date: Thu, 1 May 2025 15:52:07 +0100 Subject: [PATCH] PEG-333 Add jobstore throttling capability --- .../moj/cpp/jobmanager/it/JobServiceIT.java | 25 +++- .../persistence/JobJdbcRepository.java | 8 +- .../jobstore/persistence/JobRepository.java | 2 +- .../persistence/JobStoreConfiguration.java | 11 ++ .../moj/cpp/jobstore/service/JobService.java | 16 ++- .../persistence/JobJdbcRepositoryTest.java | 136 ++++++++++++++++-- .../JobStoreConfigurationTest.java | 7 + .../cpp/jobstore/service/JobServiceTest.java | 32 +++-- 8 files changed, 198 insertions(+), 39 deletions(-) diff --git a/job-manager/job-manager-it/src/test/java/uk/gov/moj/cpp/jobmanager/it/JobServiceIT.java b/job-manager/job-manager-it/src/test/java/uk/gov/moj/cpp/jobmanager/it/JobServiceIT.java index 3310d096a..1207a53ea 100644 --- a/job-manager/job-manager-it/src/test/java/uk/gov/moj/cpp/jobmanager/it/JobServiceIT.java +++ b/job-manager/job-manager-it/src/test/java/uk/gov/moj/cpp/jobmanager/it/JobServiceIT.java @@ -112,6 +112,8 @@ public WebApp war() { public void setup() throws Exception { final InitialContext initialContext = new InitialContext(); initialContext.bind("java:/app/JobServiceIT/DS.jobstore", dataSource); + initialContext.bind("java:/app/JobServiceIT/max.inProgress.job.count", "30"); + initialContext.bind("java:/app/JobServiceIT/worker.job.count", "10"); initEventDatabase(); } @@ -155,7 +157,15 @@ public void shouldUpdateJobsThatHaveAWorkerIdAndBeenIdleForMoreThanOneHour() thr userTransaction.commit(); detectDuplicates(); - assertThat(testJobJdbcRepository.jobsProcessed(), CoreMatchers.is(30)); + /* + Given + - 20 unassigned jobs (eligible to be assigned) - SET 1 + - 10 assigned with lockTime expired (eligible to be assigned) - SET 2 + - 10 assigned with lockTime not expired (not eligible to be assigned, with in active period of 1 hour) - SET 3 + After processing SET 1, in progress jobs count reaches threshold limit of 30 i.e. SET 1 (20) + SET 3 (10) + and hence no more jobs are assigned (even though SET 2 is eligible to be processed) + */ + assertThat(testJobJdbcRepository.jobsProcessed(), CoreMatchers.is(20)); } @Test @@ -164,8 +174,8 @@ public void shouldUpdateJobsThatHaveAWorkerIdAndBeenIdleForMoreThanOneHourAndRea testJobJdbcRepository.cleanJobTables(); testJobJdbcRepository.createJobs(20); testJobJdbcRepository.createIdleJobs(5, of(now().minus(65, MINUTES)), now().plus(30, MINUTES)); - testJobJdbcRepository.createIdleJobs(5, of(now().minus(65, MINUTES)), now()); - testJobJdbcRepository.createIdleJobs(10, of(now().minus(30, MINUTES)), now()); + testJobJdbcRepository.createIdleJobs(10, of(now().minus(65, MINUTES)), now()); + testJobJdbcRepository.createIdleJobs(5, of(now().minus(30, MINUTES)), now()); userTransaction.commit(); userTransaction.begin(); @@ -177,6 +187,15 @@ public void shouldUpdateJobsThatHaveAWorkerIdAndBeenIdleForMoreThanOneHourAndRea userTransaction.commit(); detectDuplicates(); + /* + Given + - 20 unassigned jobs (eligible to be assigned) - SET 1 + - 10 assigned with lockTime expired (eligible to be assigned) - SET 2 + - 5 assigned with lockTime expired but nextTaskStartTime is 30 mins in future (not eligible to be assigned) - SET 3 + - 5 assigned with lockTime not expired (not eligible to be assigned) - SET 4 + After processing SET 1 and partially SET 2, in progress jobs count reaches threshold limit of 30 i.e. SET 1 (20) + 5 out of SET 2 + SET 4 (5) + and hence no more jobs are assigned (even though 5 out of SET 2 are still eligible to be assigned) + */ assertThat(testJobJdbcRepository.jobsProcessed(), is(25)); } diff --git a/job-manager/jobstore-persistence/src/main/java/uk/gov/moj/cpp/jobstore/persistence/JobJdbcRepository.java b/job-manager/jobstore-persistence/src/main/java/uk/gov/moj/cpp/jobstore/persistence/JobJdbcRepository.java index c838a72ee..c44f6e7fd 100644 --- a/job-manager/jobstore-persistence/src/main/java/uk/gov/moj/cpp/jobstore/persistence/JobJdbcRepository.java +++ b/job-manager/jobstore-persistence/src/main/java/uk/gov/moj/cpp/jobstore/persistence/JobJdbcRepository.java @@ -49,7 +49,7 @@ public class JobJdbcRepository implements JobRepository { WHERE (worker_id IS NULL OR worker_lock_time < ?) AND priority = ? AND next_task_start_time < ? - LIMIT ? + LIMIT GREATEST(LEAST(? - (SELECT COUNT(*) FROM job WHERE worker_id IS NOT NULL AND worker_lock_time > ?), ?), 0) FOR UPDATE SKIP LOCKED) AND (worker_id IS NULL OR worker_lock_time < ?) """; @@ -130,7 +130,7 @@ public void updateNextTaskRetryDetails(final UUID jobId, final Timestamp nextTas } @Override - public int lockJobsFor(final UUID workerId, final Priority priority, final int jobCountToLock) { + public int lockJobsFor(final UUID workerId, final Priority priority, final int inProgressJobCountLimit, final int jobCountToLock) { final DataSource jobStoreDataSource = jobStoreDataSourceProvider.getJobStoreDataSource(); logger.debug("Locking jobs for worker: {}", workerId); @@ -143,8 +143,10 @@ public int lockJobsFor(final UUID workerId, final Priority priority, final int j preparedStatementWrapper.setTimestamp(3, oneHourAgo); preparedStatementWrapper.setString(4, priority.toString()); preparedStatementWrapper.setTimestamp(5, toSqlTimestamp(now)); - preparedStatementWrapper.setLong(6, valueOf(jobCountToLock)); + preparedStatementWrapper.setLong(6, valueOf(inProgressJobCountLimit)); preparedStatementWrapper.setTimestamp(7, oneHourAgo); + preparedStatementWrapper.setLong(8, valueOf(jobCountToLock)); + preparedStatementWrapper.setTimestamp(9, oneHourAgo); return preparedStatementWrapper.executeUpdate(); } catch (final SQLException e) { logger.error("Error locking jobs", e); diff --git a/job-manager/jobstore-persistence/src/main/java/uk/gov/moj/cpp/jobstore/persistence/JobRepository.java b/job-manager/jobstore-persistence/src/main/java/uk/gov/moj/cpp/jobstore/persistence/JobRepository.java index 400c1bfe2..e37ecd262 100644 --- a/job-manager/jobstore-persistence/src/main/java/uk/gov/moj/cpp/jobstore/persistence/JobRepository.java +++ b/job-manager/jobstore-persistence/src/main/java/uk/gov/moj/cpp/jobstore/persistence/JobRepository.java @@ -15,7 +15,7 @@ public interface JobRepository { void updateNextTaskRetryDetails(final UUID id, final Timestamp nextTaskStartTime, final Integer retryAttemptsRemaining); - int lockJobsFor(final UUID workerId, final Priority priority, final int jobCountToLock); + int lockJobsFor(final UUID workerId, final Priority priority, final int inProgressJobCountLimit, final int jobCountToLock); Stream findJobsLockedTo(final UUID workerId); diff --git a/job-manager/jobstore-persistence/src/main/java/uk/gov/moj/cpp/jobstore/persistence/JobStoreConfiguration.java b/job-manager/jobstore-persistence/src/main/java/uk/gov/moj/cpp/jobstore/persistence/JobStoreConfiguration.java index 4ce20728d..11d0590d2 100644 --- a/job-manager/jobstore-persistence/src/main/java/uk/gov/moj/cpp/jobstore/persistence/JobStoreConfiguration.java +++ b/job-manager/jobstore-persistence/src/main/java/uk/gov/moj/cpp/jobstore/persistence/JobStoreConfiguration.java @@ -30,6 +30,13 @@ public class JobStoreConfiguration { @Value(key = "worker.job.count", defaultValue = "10") private String workerJobCount; + /* When identifying unassigned jobs in order to ignore restrictions due to this configuration + i.e. to maintain backward compatibility default value should be configured to a high value. + */ + @Inject + @Value(key = "max.inProgress.job.count", defaultValue = "10000") + private String maxInProgressJobCount; + @Resource(lookup = "java:module/ModuleName") private String moduleName; @@ -53,6 +60,10 @@ public int getWorkerJobCount() { return parseInt(workerJobCount); } + public int getMaxInProgressJobCount() { + return parseInt(maxInProgressJobCount); + } + public String getModuleName() { return moduleName; } diff --git a/job-manager/jobstore-persistence/src/main/java/uk/gov/moj/cpp/jobstore/service/JobService.java b/job-manager/jobstore-persistence/src/main/java/uk/gov/moj/cpp/jobstore/service/JobService.java index b410d388a..290168602 100644 --- a/job-manager/jobstore-persistence/src/main/java/uk/gov/moj/cpp/jobstore/service/JobService.java +++ b/job-manager/jobstore-persistence/src/main/java/uk/gov/moj/cpp/jobstore/service/JobService.java @@ -4,19 +4,22 @@ import static java.util.stream.Stream.empty; import static uk.gov.justice.services.common.converter.ZonedDateTimes.toSqlTimestamp; +import uk.gov.justice.services.common.configuration.Value; import uk.gov.moj.cpp.jobstore.persistence.Job; import uk.gov.moj.cpp.jobstore.persistence.JobRepository; import uk.gov.moj.cpp.jobstore.persistence.JobStoreConfiguration; import uk.gov.moj.cpp.jobstore.persistence.Priority; +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import javax.json.JsonObject; import java.time.ZonedDateTime; import java.util.List; import java.util.UUID; import java.util.stream.Stream; -import javax.enterprise.context.ApplicationScoped; -import javax.inject.Inject; -import javax.json.JsonObject; +import static java.lang.Integer.parseInt; +import static uk.gov.justice.services.common.converter.ZonedDateTimes.toSqlTimestamp; @ApplicationScoped public class JobService { @@ -30,15 +33,16 @@ public class JobService { public Stream getUnassignedJobsFor(final UUID workerId, final List orderedPriorities) { final int workerJobCount = jobStoreConfiguration.getWorkerJobCount(); + final int maxInProgressJobCount = jobStoreConfiguration.getMaxInProgressJobCount(); final Priority firstPriority = orderedPriorities.get(0); - int rowsAffected = jobRepository.lockJobsFor(workerId, firstPriority, workerJobCount); + int rowsAffected = jobRepository.lockJobsFor(workerId, firstPriority, maxInProgressJobCount, workerJobCount); if (rowsAffected == 0) { final Priority secondPriority = orderedPriorities.get(1); - rowsAffected = jobRepository.lockJobsFor(workerId, secondPriority, workerJobCount); + rowsAffected = jobRepository.lockJobsFor(workerId, secondPriority, maxInProgressJobCount, workerJobCount); if (rowsAffected == 0) { final Priority thirdPriority = orderedPriorities.get(2); - rowsAffected = jobRepository.lockJobsFor(workerId, thirdPriority, workerJobCount); + rowsAffected = jobRepository.lockJobsFor(workerId, thirdPriority, maxInProgressJobCount, workerJobCount); } } if (rowsAffected == 0) { diff --git a/job-manager/jobstore-persistence/src/test/java/uk/gov/moj/cpp/jobstore/persistence/JobJdbcRepositoryTest.java b/job-manager/jobstore-persistence/src/test/java/uk/gov/moj/cpp/jobstore/persistence/JobJdbcRepositoryTest.java index e1e2bea0d..b956cb53e 100644 --- a/job-manager/jobstore-persistence/src/test/java/uk/gov/moj/cpp/jobstore/persistence/JobJdbcRepositoryTest.java +++ b/job-manager/jobstore-persistence/src/test/java/uk/gov/moj/cpp/jobstore/persistence/JobJdbcRepositoryTest.java @@ -1,5 +1,28 @@ package uk.gov.moj.cpp.jobstore.persistence; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; + +import uk.gov.justice.framework.libraries.datasource.providers.jobstore.JobStoreDataSourceProvider; +import uk.gov.justice.framework.libraries.datasource.providers.jobstore.TestJobStoreDataSourceProvider; +import uk.gov.justice.services.common.util.UtcClock; +import uk.gov.justice.services.test.utils.core.jdbc.LiquibaseDatabaseBootstrapper; +import uk.gov.justice.services.test.utils.core.messaging.Poller; + +import javax.json.JsonObject; +import javax.sql.DataSource; +import java.io.StringReader; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + import static java.time.ZonedDateTime.now; import static java.time.temporal.ChronoUnit.MILLIS; import static java.util.Optional.empty; @@ -9,9 +32,7 @@ import static javax.json.Json.createReader; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -20,8 +41,6 @@ import static uk.gov.moj.cpp.jobstore.persistence.Priority.LOW; import static uk.gov.moj.cpp.jobstore.persistence.Priority.MEDIUM; -import uk.gov.justice.framework.libraries.datasource.providers.jobstore.JobStoreDataSourceProvider; -import uk.gov.justice.framework.libraries.datasource.providers.jobstore.TestJobStoreDataSourceProvider; import uk.gov.justice.services.common.util.UtcClock; import uk.gov.justice.services.test.utils.core.jdbc.LiquibaseDatabaseBootstrapper; import uk.gov.justice.services.test.utils.core.messaging.Poller; @@ -37,6 +56,7 @@ import java.util.UUID; import javax.json.JsonObject; +import javax.sql.DataSource; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -197,15 +217,103 @@ public void shouldUpdateNextTaskRetryDetails() { assertThat(jobs.get(0).getRetryAttemptsRemaining(), is(retryAttemptsRemaining)); } - @Test - public void shouldLockJobsToWorker() throws SQLException { - createJobs(10); - final UUID workerId = randomUUID(); + @Nested + class LockJobsTest { - jdbcRepository.lockJobsFor(workerId, HIGH, 4); + @Test + public void shouldLockJobsToWorkerGivenInProgressJobsNotReachedLimit() { + final int inProgressJobCountLimit = 3; + final int jobCountToLock = 2; + createJobs(5); + final UUID workerId = randomUUID(); - final List jobs = jdbcRepository.findJobsLockedTo(workerId).collect(toList()); - assertThat(jobs.size(), is(4)); + jdbcRepository.lockJobsFor(workerId, HIGH, inProgressJobCountLimit, jobCountToLock); + + final List jobs = jdbcRepository.findJobsLockedTo(workerId).toList(); + assertThat(jobs.size(), is(2)); + } + + @Test + public void shouldLockJobsUpToInProgressLimitGivenInProgressJobsCountNotReachedLimit() { + final int inProgressJobCountLimit = 3; + createJobs(5); + final UUID workerId1 = randomUUID(); + final UUID workerId2 = randomUUID(); + lockJobsTo(workerId1, 1); + + jdbcRepository.lockJobsFor(workerId2, HIGH, inProgressJobCountLimit, 4); + final List jobsLockedToWorker2 = jdbcRepository.findJobsLockedTo(workerId2).toList(); + assertThat(jobsLockedToWorker2.size(), is(2)); //Only 2 left to reach InProgress limit + } + + @Test + public void multipleWorkersShouldBeAbleToLockJobsUntilInProgressLimitReached() { + final int inProgressJobCountLimit = 3; + createJobs(5); + final UUID workerId1 = randomUUID(); + final UUID workerId2 = randomUUID(); + final UUID workerId3 = randomUUID(); + final UUID workerId4 = randomUUID(); + + jdbcRepository.lockJobsFor(workerId1, HIGH, inProgressJobCountLimit, 1); + final List jobsLockedToWorker1 = jdbcRepository.findJobsLockedTo(workerId1).toList(); + assertThat(jobsLockedToWorker1.size(), is(1)); + + jdbcRepository.lockJobsFor(workerId2, HIGH, inProgressJobCountLimit, 1); + final List jobsLockedToWorker2 = jdbcRepository.findJobsLockedTo(workerId2).toList(); + assertThat(jobsLockedToWorker2.size(), is(1)); + + jdbcRepository.lockJobsFor(workerId3, HIGH, inProgressJobCountLimit, 1); + final List jobsLockedToWorker3 = jdbcRepository.findJobsLockedTo(workerId3).toList(); + assertThat(jobsLockedToWorker3.size(), is(1)); + + jdbcRepository.lockJobsFor(workerId4, HIGH, inProgressJobCountLimit, 1); + final List jobsLockedToWorker4 = jdbcRepository.findJobsLockedTo(workerId4).toList(); + assertThat(jobsLockedToWorker4.size(), is(0)); + } + + @Test + public void shouldNotLockJobsToWorkerGivenInProgressJobsCountReachedLimit() { + final int inProgressJobCountLimit = 3; + createJobs(5); + final UUID workerId1 = randomUUID(); + final UUID workerId2 = randomUUID(); + lockJobsTo(workerId1, 3); + + jdbcRepository.lockJobsFor(workerId2, HIGH, inProgressJobCountLimit, 1); + final List jobsLockedToWorker2 = jdbcRepository.findJobsLockedTo(workerId2).toList(); + assertThat(jobsLockedToWorker2.size(), is(0)); + } + + @Test + public void shouldNotLockJobsToWorkerGivenInProgressJobsCountGreaterThanConfiguredThreshold() { + final int inProgressJobCountLimit = 2; + createJobs(5); + final UUID workerId1 = randomUUID(); + final UUID workerId2 = randomUUID(); + lockJobsTo(workerId1, 4); + + jdbcRepository.lockJobsFor(workerId2, HIGH, inProgressJobCountLimit, 2); + final List jobsLockedToWorker2 = jdbcRepository.findJobsLockedTo(workerId2).toList(); + assertThat(jobsLockedToWorker2.size(), is(0)); + } + + @Test + public void shouldThrowJdbcRepositoryExceptionWhenLockingJobs() throws SQLException { + final PreparedStatementWrapperFactory preparedStatementWrapperFactory = mock(PreparedStatementWrapperFactory.class); + when(preparedStatementWrapperFactory.preparedStatementWrapperOf(any(), any())).thenThrow(SQLException.class); + + jdbcRepository.preparedStatementWrapperFactory = preparedStatementWrapperFactory; + + assertThrows(JdbcRepositoryException.class, () -> jdbcRepository.lockJobsFor(randomUUID(), HIGH, 10, 2)); + } + + private void lockJobsTo(final UUID workerId, final int jobCountToLock) { + //Lock jobs up to max allowed in progress limit + jdbcRepository.lockJobsFor(workerId, HIGH, 1000, jobCountToLock); + final List jobsLocked = jdbcRepository.findJobsLockedTo(workerId).toList(); + assertThat(jobsLocked.size(), is(jobCountToLock)); + } } @Test @@ -225,7 +333,7 @@ public void shouldFindLockedJobsToWorker() throws Exception { final List preTestJobs = jdbcRepository.findJobsLockedTo(worker).collect(toList()); assertThat(preTestJobs.size(), is(1)); - jdbcRepository.lockJobsFor(worker, HIGH, 10); + jdbcRepository.lockJobsFor(worker, HIGH, 1000, 10); final List jobs = jdbcRepository.findJobsLockedTo(worker).collect(toList()); @@ -328,7 +436,7 @@ public void shouldThrowJdbcRepositoryExceptionWhenLockingJobs() throws SQLExcept final PreparedStatementWrapperFactory preparedStatementWrapperFactory = mock(PreparedStatementWrapperFactory.class); when(preparedStatementWrapperFactory.preparedStatementWrapperOf(any(), any())).thenThrow(SQLException.class); jdbcRepository.preparedStatementWrapperFactory = preparedStatementWrapperFactory; - assertThrows(JdbcRepositoryException.class, () -> jdbcRepository.lockJobsFor(randomUUID(), HIGH, 2)); + assertThrows(JdbcRepositoryException.class, () -> jdbcRepository.lockJobsFor(randomUUID(), HIGH, 1000, 2)); } @Test diff --git a/job-manager/jobstore-persistence/src/test/java/uk/gov/moj/cpp/jobstore/persistence/JobStoreConfigurationTest.java b/job-manager/jobstore-persistence/src/test/java/uk/gov/moj/cpp/jobstore/persistence/JobStoreConfigurationTest.java index 5771470dd..1814d2423 100644 --- a/job-manager/jobstore-persistence/src/test/java/uk/gov/moj/cpp/jobstore/persistence/JobStoreConfigurationTest.java +++ b/job-manager/jobstore-persistence/src/test/java/uk/gov/moj/cpp/jobstore/persistence/JobStoreConfigurationTest.java @@ -44,4 +44,11 @@ public void shouldGetTheModuleName() throws Exception { setField(jobStoreConfiguration, "moduleName", "fred-bloggs"); assertThat(jobStoreConfiguration.getModuleName(), is("fred-bloggs")); } + + @Test + public void shouldGetMaxInProgressJobCount() throws Exception { + + setField(jobStoreConfiguration, "maxInProgressJobCount", "10"); + assertThat(jobStoreConfiguration.getMaxInProgressJobCount(), is(10)); + } } \ No newline at end of file diff --git a/job-manager/jobstore-persistence/src/test/java/uk/gov/moj/cpp/jobstore/service/JobServiceTest.java b/job-manager/jobstore-persistence/src/test/java/uk/gov/moj/cpp/jobstore/service/JobServiceTest.java index a8970c24e..f97e13440 100644 --- a/job-manager/jobstore-persistence/src/test/java/uk/gov/moj/cpp/jobstore/service/JobServiceTest.java +++ b/job-manager/jobstore-persistence/src/test/java/uk/gov/moj/cpp/jobstore/service/JobServiceTest.java @@ -58,17 +58,19 @@ public void shouldReturnNextUnassignedJobsForFirstPriority() { final UUID workerId = randomUUID(); final int workerJobCount = 10; + final int inProgressJobCountLimit = 100; final List priorities = List.of(MEDIUM, HIGH, LOW); when(jobStoreConfiguration.getWorkerJobCount()).thenReturn(workerJobCount); + when(jobStoreConfiguration.getMaxInProgressJobCount()).thenReturn(inProgressJobCountLimit); final List jobs = List.of(mock(Job.class), mock(Job.class), mock(Job.class)); - when(jobRepository.lockJobsFor(workerId, priorities.get(0), workerJobCount)).thenReturn(jobs.size()); + when(jobRepository.lockJobsFor(workerId, priorities.get(0), inProgressJobCountLimit, workerJobCount)).thenReturn(jobs.size()); when(jobRepository.findJobsLockedTo(workerId)).thenReturn(jobs.stream()); assertThat(jobService.getUnassignedJobsFor(workerId, priorities).count(), is(3L)); - verify(jobRepository).lockJobsFor(workerId, priorities.get(0), workerJobCount); + verify(jobRepository).lockJobsFor(workerId, priorities.get(0), inProgressJobCountLimit, workerJobCount); } @Test @@ -76,18 +78,20 @@ public void shouldReturnNextUnassignedJobsForSecondPriorityIfNoJobsWithFirstPrio final UUID workerId = randomUUID(); final int workerJobCount = 10; + final int inProgressJobCountLimit = 100; final List priorities = List.of(MEDIUM, HIGH, LOW); when(jobStoreConfiguration.getWorkerJobCount()).thenReturn(workerJobCount); + when(jobStoreConfiguration.getMaxInProgressJobCount()).thenReturn(inProgressJobCountLimit); final List jobs = List.of(mock(Job.class), mock(Job.class), mock(Job.class)); - when(jobRepository.lockJobsFor(workerId, priorities.get(0), workerJobCount)).thenReturn(0); - when(jobRepository.lockJobsFor(workerId, priorities.get(1), workerJobCount)).thenReturn(jobs.size()); + when(jobRepository.lockJobsFor(workerId, priorities.get(0), inProgressJobCountLimit, workerJobCount)).thenReturn(0); + when(jobRepository.lockJobsFor(workerId, priorities.get(1), inProgressJobCountLimit, workerJobCount)).thenReturn(jobs.size()); when(jobRepository.findJobsLockedTo(workerId)).thenReturn(jobs.stream()); assertThat(jobService.getUnassignedJobsFor(workerId, priorities).count(), is(3L)); - verify(jobRepository).lockJobsFor(workerId, priorities.get(1), workerJobCount); + verify(jobRepository).lockJobsFor(workerId, priorities.get(1), inProgressJobCountLimit, workerJobCount); } @Test @@ -95,19 +99,21 @@ public void shouldReturnNextUnassignedJobsForThirdPriorityIfNoJobsWithFirstNorSe final UUID workerId = randomUUID(); final int workerJobCount = 10; + final int inProgressJobCountLimit = 100; final List priorities = List.of(MEDIUM, HIGH, LOW); when(jobStoreConfiguration.getWorkerJobCount()).thenReturn(workerJobCount); + when(jobStoreConfiguration.getMaxInProgressJobCount()).thenReturn(inProgressJobCountLimit); final List jobs = List.of(mock(Job.class), mock(Job.class), mock(Job.class)); - when(jobRepository.lockJobsFor(workerId, priorities.get(0), workerJobCount)).thenReturn(0); - when(jobRepository.lockJobsFor(workerId, priorities.get(1), workerJobCount)).thenReturn(0); - when(jobRepository.lockJobsFor(workerId, priorities.get(2), workerJobCount)).thenReturn(jobs.size()); + when(jobRepository.lockJobsFor(workerId, priorities.get(0), inProgressJobCountLimit, workerJobCount)).thenReturn(0); + when(jobRepository.lockJobsFor(workerId, priorities.get(1), inProgressJobCountLimit, workerJobCount)).thenReturn(0); + when(jobRepository.lockJobsFor(workerId, priorities.get(2), inProgressJobCountLimit, workerJobCount)).thenReturn(jobs.size()); when(jobRepository.findJobsLockedTo(workerId)).thenReturn(jobs.stream()); assertThat(jobService.getUnassignedJobsFor(workerId, priorities).count(), is(3L)); - verify(jobRepository).lockJobsFor(workerId, priorities.get(2), workerJobCount); + verify(jobRepository).lockJobsFor(workerId, priorities.get(2), inProgressJobCountLimit, workerJobCount); } @Test @@ -115,12 +121,14 @@ public void shouldReturnNOUnassignedJobsfNoJobsWithFirstSecondNorThirdePriorityF final UUID workerId = randomUUID(); final int workerJobCount = 10; + final int inProgressJobCountLimit = 100; final List priorities = List.of(MEDIUM, HIGH, LOW); when(jobStoreConfiguration.getWorkerJobCount()).thenReturn(workerJobCount); - when(jobRepository.lockJobsFor(workerId, priorities.get(0), workerJobCount)).thenReturn(0); - when(jobRepository.lockJobsFor(workerId, priorities.get(1), workerJobCount)).thenReturn(0); - when(jobRepository.lockJobsFor(workerId, priorities.get(2), workerJobCount)).thenReturn(0); + when(jobStoreConfiguration.getMaxInProgressJobCount()).thenReturn(inProgressJobCountLimit); + when(jobRepository.lockJobsFor(workerId, priorities.get(0), inProgressJobCountLimit, workerJobCount)).thenReturn(0); + when(jobRepository.lockJobsFor(workerId, priorities.get(1), inProgressJobCountLimit, workerJobCount)).thenReturn(0); + when(jobRepository.lockJobsFor(workerId, priorities.get(2), inProgressJobCountLimit, workerJobCount)).thenReturn(0); assertThat(jobService.getUnassignedJobsFor(workerId, priorities).count(), is(0L)); verifyNoMoreInteractions(jobRepository);