Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public WebApp war() {
public void setup() throws Exception {
final InitialContext initialContext = new InitialContext();
initialContext.bind("java:/app/JobServiceIT/DS.jobstore", dataSource);
initialContext.bind("java:/app/JobServiceIT/max.inProgress.job.count", "30");
initialContext.bind("java:/app/JobServiceIT/worker.job.count", "10");
initEventDatabase();
}

Expand Down Expand Up @@ -155,7 +157,15 @@ public void shouldUpdateJobsThatHaveAWorkerIdAndBeenIdleForMoreThanOneHour() thr
userTransaction.commit();

detectDuplicates();
assertThat(testJobJdbcRepository.jobsProcessed(), CoreMatchers.is(30));
/*
Given
- 20 unassigned jobs (eligible to be assigned) - SET 1
- 10 assigned with lockTime expired (eligible to be assigned) - SET 2
- 10 assigned with lockTime not expired (not eligible to be assigned, with in active period of 1 hour) - SET 3
After processing SET 1, in progress jobs count reaches threshold limit of 30 i.e. SET 1 (20) + SET 3 (10)
and hence no more jobs are assigned (even though SET 2 is eligible to be processed)
*/
assertThat(testJobJdbcRepository.jobsProcessed(), CoreMatchers.is(20));
}

@Test
Expand All @@ -164,8 +174,8 @@ public void shouldUpdateJobsThatHaveAWorkerIdAndBeenIdleForMoreThanOneHourAndRea
testJobJdbcRepository.cleanJobTables();
testJobJdbcRepository.createJobs(20);
testJobJdbcRepository.createIdleJobs(5, of(now().minus(65, MINUTES)), now().plus(30, MINUTES));
testJobJdbcRepository.createIdleJobs(5, of(now().minus(65, MINUTES)), now());
testJobJdbcRepository.createIdleJobs(10, of(now().minus(30, MINUTES)), now());
testJobJdbcRepository.createIdleJobs(10, of(now().minus(65, MINUTES)), now());
testJobJdbcRepository.createIdleJobs(5, of(now().minus(30, MINUTES)), now());
userTransaction.commit();

userTransaction.begin();
Expand All @@ -177,6 +187,15 @@ public void shouldUpdateJobsThatHaveAWorkerIdAndBeenIdleForMoreThanOneHourAndRea
userTransaction.commit();

detectDuplicates();
/*
Given
- 20 unassigned jobs (eligible to be assigned) - SET 1
- 10 assigned with lockTime expired (eligible to be assigned) - SET 2
- 5 assigned with lockTime expired but nextTaskStartTime is 30 mins in future (not eligible to be assigned) - SET 3
- 5 assigned with lockTime not expired (not eligible to be assigned) - SET 4
After processing SET 1 and partially SET 2, in progress jobs count reaches threshold limit of 30 i.e. SET 1 (20) + 5 out of SET 2 + SET 4 (5)
and hence no more jobs are assigned (even though 5 out of SET 2 are still eligible to be assigned)
*/
assertThat(testJobJdbcRepository.jobsProcessed(), is(25));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class JobJdbcRepository implements JobRepository {
WHERE (worker_id IS NULL OR worker_lock_time < ?)
AND priority = ?
AND next_task_start_time < ?
LIMIT ?
LIMIT GREATEST(LEAST(? - (SELECT COUNT(*) FROM job WHERE worker_id IS NOT NULL AND worker_lock_time > ?), ?), 0)
FOR UPDATE SKIP LOCKED)
AND (worker_id IS NULL OR worker_lock_time < ?)
""";
Expand Down Expand Up @@ -130,7 +130,7 @@ public void updateNextTaskRetryDetails(final UUID jobId, final Timestamp nextTas
}

@Override
public int lockJobsFor(final UUID workerId, final Priority priority, final int jobCountToLock) {
public int lockJobsFor(final UUID workerId, final Priority priority, final int inProgressJobCountLimit, final int jobCountToLock) {
final DataSource jobStoreDataSource = jobStoreDataSourceProvider.getJobStoreDataSource();
logger.debug("Locking jobs for worker: {}", workerId);

Expand All @@ -143,8 +143,10 @@ public int lockJobsFor(final UUID workerId, final Priority priority, final int j
preparedStatementWrapper.setTimestamp(3, oneHourAgo);
preparedStatementWrapper.setString(4, priority.toString());
preparedStatementWrapper.setTimestamp(5, toSqlTimestamp(now));
preparedStatementWrapper.setLong(6, valueOf(jobCountToLock));
preparedStatementWrapper.setLong(6, valueOf(inProgressJobCountLimit));
preparedStatementWrapper.setTimestamp(7, oneHourAgo);
preparedStatementWrapper.setLong(8, valueOf(jobCountToLock));
preparedStatementWrapper.setTimestamp(9, oneHourAgo);
return preparedStatementWrapper.executeUpdate();
} catch (final SQLException e) {
logger.error("Error locking jobs", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public interface JobRepository {

void updateNextTaskRetryDetails(final UUID id, final Timestamp nextTaskStartTime, final Integer retryAttemptsRemaining);

int lockJobsFor(final UUID workerId, final Priority priority, final int jobCountToLock);
int lockJobsFor(final UUID workerId, final Priority priority, final int inProgressJobCountLimit, final int jobCountToLock);

Stream<Job> findJobsLockedTo(final UUID workerId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ public class JobStoreConfiguration {
@Value(key = "worker.job.count", defaultValue = "10")
private String workerJobCount;

/* When identifying unassigned jobs in order to ignore restrictions due to this configuration
i.e. to maintain backward compatibility default value should be configured to a high value.
*/
@Inject
@Value(key = "max.inProgress.job.count", defaultValue = "10000")
private String maxInProgressJobCount;

@Resource(lookup = "java:module/ModuleName")
private String moduleName;

Expand All @@ -53,6 +60,10 @@ public int getWorkerJobCount() {
return parseInt(workerJobCount);
}

public int getMaxInProgressJobCount() {
return parseInt(maxInProgressJobCount);
}

public String getModuleName() {
return moduleName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@
import static java.util.stream.Stream.empty;
import static uk.gov.justice.services.common.converter.ZonedDateTimes.toSqlTimestamp;

import uk.gov.justice.services.common.configuration.Value;
import uk.gov.moj.cpp.jobstore.persistence.Job;
import uk.gov.moj.cpp.jobstore.persistence.JobRepository;
import uk.gov.moj.cpp.jobstore.persistence.JobStoreConfiguration;
import uk.gov.moj.cpp.jobstore.persistence.Priority;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.json.JsonObject;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.UUID;
import java.util.stream.Stream;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.json.JsonObject;
import static java.lang.Integer.parseInt;
import static uk.gov.justice.services.common.converter.ZonedDateTimes.toSqlTimestamp;

@ApplicationScoped
public class JobService {
Expand All @@ -30,15 +33,16 @@ public class JobService {
public Stream<Job> getUnassignedJobsFor(final UUID workerId, final List<Priority> orderedPriorities) {

final int workerJobCount = jobStoreConfiguration.getWorkerJobCount();
final int maxInProgressJobCount = jobStoreConfiguration.getMaxInProgressJobCount();
final Priority firstPriority = orderedPriorities.get(0);

int rowsAffected = jobRepository.lockJobsFor(workerId, firstPriority, workerJobCount);
int rowsAffected = jobRepository.lockJobsFor(workerId, firstPriority, maxInProgressJobCount, workerJobCount);
if (rowsAffected == 0) {
final Priority secondPriority = orderedPriorities.get(1);
rowsAffected = jobRepository.lockJobsFor(workerId, secondPriority, workerJobCount);
rowsAffected = jobRepository.lockJobsFor(workerId, secondPriority, maxInProgressJobCount, workerJobCount);
if (rowsAffected == 0) {
final Priority thirdPriority = orderedPriorities.get(2);
rowsAffected = jobRepository.lockJobsFor(workerId, thirdPriority, workerJobCount);
rowsAffected = jobRepository.lockJobsFor(workerId, thirdPriority, maxInProgressJobCount, workerJobCount);
}
}
if (rowsAffected == 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,28 @@
package uk.gov.moj.cpp.jobstore.persistence;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;

import uk.gov.justice.framework.libraries.datasource.providers.jobstore.JobStoreDataSourceProvider;
import uk.gov.justice.framework.libraries.datasource.providers.jobstore.TestJobStoreDataSourceProvider;
import uk.gov.justice.services.common.util.UtcClock;
import uk.gov.justice.services.test.utils.core.jdbc.LiquibaseDatabaseBootstrapper;
import uk.gov.justice.services.test.utils.core.messaging.Poller;

import javax.json.JsonObject;
import javax.sql.DataSource;
import java.io.StringReader;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

import static java.time.ZonedDateTime.now;
import static java.time.temporal.ChronoUnit.MILLIS;
import static java.util.Optional.empty;
Expand All @@ -9,9 +32,7 @@
import static javax.json.Json.createReader;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -20,8 +41,6 @@
import static uk.gov.moj.cpp.jobstore.persistence.Priority.LOW;
import static uk.gov.moj.cpp.jobstore.persistence.Priority.MEDIUM;

import uk.gov.justice.framework.libraries.datasource.providers.jobstore.JobStoreDataSourceProvider;
import uk.gov.justice.framework.libraries.datasource.providers.jobstore.TestJobStoreDataSourceProvider;
import uk.gov.justice.services.common.util.UtcClock;
import uk.gov.justice.services.test.utils.core.jdbc.LiquibaseDatabaseBootstrapper;
import uk.gov.justice.services.test.utils.core.messaging.Poller;
Expand All @@ -37,6 +56,7 @@
import java.util.UUID;

import javax.json.JsonObject;
import javax.sql.DataSource;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -197,15 +217,103 @@ public void shouldUpdateNextTaskRetryDetails() {
assertThat(jobs.get(0).getRetryAttemptsRemaining(), is(retryAttemptsRemaining));
}

@Test
public void shouldLockJobsToWorker() throws SQLException {
createJobs(10);
final UUID workerId = randomUUID();
@Nested
class LockJobsTest {

jdbcRepository.lockJobsFor(workerId, HIGH, 4);
@Test
public void shouldLockJobsToWorkerGivenInProgressJobsNotReachedLimit() {
final int inProgressJobCountLimit = 3;
final int jobCountToLock = 2;
createJobs(5);
final UUID workerId = randomUUID();

final List<Job> jobs = jdbcRepository.findJobsLockedTo(workerId).collect(toList());
assertThat(jobs.size(), is(4));
jdbcRepository.lockJobsFor(workerId, HIGH, inProgressJobCountLimit, jobCountToLock);

final List<Job> jobs = jdbcRepository.findJobsLockedTo(workerId).toList();
assertThat(jobs.size(), is(2));
}

@Test
public void shouldLockJobsUpToInProgressLimitGivenInProgressJobsCountNotReachedLimit() {
final int inProgressJobCountLimit = 3;
createJobs(5);
final UUID workerId1 = randomUUID();
final UUID workerId2 = randomUUID();
lockJobsTo(workerId1, 1);

jdbcRepository.lockJobsFor(workerId2, HIGH, inProgressJobCountLimit, 4);
final List<Job> jobsLockedToWorker2 = jdbcRepository.findJobsLockedTo(workerId2).toList();
assertThat(jobsLockedToWorker2.size(), is(2)); //Only 2 left to reach InProgress limit
}

@Test
public void multipleWorkersShouldBeAbleToLockJobsUntilInProgressLimitReached() {
final int inProgressJobCountLimit = 3;
createJobs(5);
final UUID workerId1 = randomUUID();
final UUID workerId2 = randomUUID();
final UUID workerId3 = randomUUID();
final UUID workerId4 = randomUUID();

jdbcRepository.lockJobsFor(workerId1, HIGH, inProgressJobCountLimit, 1);
final List<Job> jobsLockedToWorker1 = jdbcRepository.findJobsLockedTo(workerId1).toList();
assertThat(jobsLockedToWorker1.size(), is(1));

jdbcRepository.lockJobsFor(workerId2, HIGH, inProgressJobCountLimit, 1);
final List<Job> jobsLockedToWorker2 = jdbcRepository.findJobsLockedTo(workerId2).toList();
assertThat(jobsLockedToWorker2.size(), is(1));

jdbcRepository.lockJobsFor(workerId3, HIGH, inProgressJobCountLimit, 1);
final List<Job> jobsLockedToWorker3 = jdbcRepository.findJobsLockedTo(workerId3).toList();
assertThat(jobsLockedToWorker3.size(), is(1));

jdbcRepository.lockJobsFor(workerId4, HIGH, inProgressJobCountLimit, 1);
final List<Job> jobsLockedToWorker4 = jdbcRepository.findJobsLockedTo(workerId4).toList();
assertThat(jobsLockedToWorker4.size(), is(0));
}

@Test
public void shouldNotLockJobsToWorkerGivenInProgressJobsCountReachedLimit() {
final int inProgressJobCountLimit = 3;
createJobs(5);
final UUID workerId1 = randomUUID();
final UUID workerId2 = randomUUID();
lockJobsTo(workerId1, 3);

jdbcRepository.lockJobsFor(workerId2, HIGH, inProgressJobCountLimit, 1);
final List<Job> jobsLockedToWorker2 = jdbcRepository.findJobsLockedTo(workerId2).toList();
assertThat(jobsLockedToWorker2.size(), is(0));
}

@Test
public void shouldNotLockJobsToWorkerGivenInProgressJobsCountGreaterThanConfiguredThreshold() {
final int inProgressJobCountLimit = 2;
createJobs(5);
final UUID workerId1 = randomUUID();
final UUID workerId2 = randomUUID();
lockJobsTo(workerId1, 4);

jdbcRepository.lockJobsFor(workerId2, HIGH, inProgressJobCountLimit, 2);
final List<Job> jobsLockedToWorker2 = jdbcRepository.findJobsLockedTo(workerId2).toList();
assertThat(jobsLockedToWorker2.size(), is(0));
}

@Test
public void shouldThrowJdbcRepositoryExceptionWhenLockingJobs() throws SQLException {
final PreparedStatementWrapperFactory preparedStatementWrapperFactory = mock(PreparedStatementWrapperFactory.class);
when(preparedStatementWrapperFactory.preparedStatementWrapperOf(any(), any())).thenThrow(SQLException.class);

jdbcRepository.preparedStatementWrapperFactory = preparedStatementWrapperFactory;

assertThrows(JdbcRepositoryException.class, () -> jdbcRepository.lockJobsFor(randomUUID(), HIGH, 10, 2));
}

private void lockJobsTo(final UUID workerId, final int jobCountToLock) {
//Lock jobs up to max allowed in progress limit
jdbcRepository.lockJobsFor(workerId, HIGH, 1000, jobCountToLock);
final List<Job> jobsLocked = jdbcRepository.findJobsLockedTo(workerId).toList();
assertThat(jobsLocked.size(), is(jobCountToLock));
}
}

@Test
Expand All @@ -225,7 +333,7 @@ public void shouldFindLockedJobsToWorker() throws Exception {
final List<Job> preTestJobs = jdbcRepository.findJobsLockedTo(worker).collect(toList());
assertThat(preTestJobs.size(), is(1));

jdbcRepository.lockJobsFor(worker, HIGH, 10);
jdbcRepository.lockJobsFor(worker, HIGH, 1000, 10);

final List<Job> jobs = jdbcRepository.findJobsLockedTo(worker).collect(toList());

Expand Down Expand Up @@ -328,7 +436,7 @@ public void shouldThrowJdbcRepositoryExceptionWhenLockingJobs() throws SQLExcept
final PreparedStatementWrapperFactory preparedStatementWrapperFactory = mock(PreparedStatementWrapperFactory.class);
when(preparedStatementWrapperFactory.preparedStatementWrapperOf(any(), any())).thenThrow(SQLException.class);
jdbcRepository.preparedStatementWrapperFactory = preparedStatementWrapperFactory;
assertThrows(JdbcRepositoryException.class, () -> jdbcRepository.lockJobsFor(randomUUID(), HIGH, 2));
assertThrows(JdbcRepositoryException.class, () -> jdbcRepository.lockJobsFor(randomUUID(), HIGH, 1000, 2));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,11 @@ public void shouldGetTheModuleName() throws Exception {
setField(jobStoreConfiguration, "moduleName", "fred-bloggs");
assertThat(jobStoreConfiguration.getModuleName(), is("fred-bloggs"));
}

@Test
public void shouldGetMaxInProgressJobCount() throws Exception {

setField(jobStoreConfiguration, "maxInProgressJobCount", "10");
assertThat(jobStoreConfiguration.getMaxInProgressJobCount(), is(10));
}
}
Loading