diff --git a/server/pom.xml b/server/pom.xml index ad67d9e5..8520de2f 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -20,6 +20,16 @@ + + software.amazon.awssdk + batch + 2.29.52 + + + software.amazon.awssdk + auth + 2.29.52 + org.springframework.boot spring-boot-starter @@ -71,6 +81,11 @@ ogcapi-features-java ${project.parent.version} + + au.org.aodn.ogcapi + ogcapi-processes-java + ${project.parent.version} + co.elastic.clients elasticsearch-java diff --git a/server/src/main/java/au/org/aodn/ogcapi/server/core/configuration/AwsConfig.java b/server/src/main/java/au/org/aodn/ogcapi/server/core/configuration/AwsConfig.java new file mode 100644 index 00000000..27093e93 --- /dev/null +++ b/server/src/main/java/au/org/aodn/ogcapi/server/core/configuration/AwsConfig.java @@ -0,0 +1,31 @@ +package au.org.aodn.ogcapi.server.core.configuration; + +import au.org.aodn.ogcapi.server.processes.RestServices; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.batch.BatchClient; + +@Configuration +public class AwsConfig { + + @Value("${aws.region}") + private String awsRegion; + + @Bean + public BatchClient batchClient() { + return BatchClient + .builder() + .region(Region.of(awsRegion)) + .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) + .build(); + } + + @Bean + public RestServices awsBatchService(BatchClient batchClient, ObjectMapper objectMapper) { + return new RestServices(batchClient, objectMapper); + } +} diff --git a/server/src/main/java/au/org/aodn/ogcapi/server/core/model/InlineValue.java b/server/src/main/java/au/org/aodn/ogcapi/server/core/model/InlineValue.java new file mode 100644 index 00000000..bfeeb12c --- /dev/null +++ b/server/src/main/java/au/org/aodn/ogcapi/server/core/model/InlineValue.java @@ -0,0 +1,5 @@ +package au.org.aodn.ogcapi.server.core.model; + +import au.org.aodn.ogcapi.processes.model.InlineOrRefData; + +public record InlineValue(String message) implements InlineOrRefData {} diff --git a/server/src/main/java/au/org/aodn/ogcapi/server/core/model/enumeration/DatasetDownloadEnums.java b/server/src/main/java/au/org/aodn/ogcapi/server/core/model/enumeration/DatasetDownloadEnums.java new file mode 100644 index 00000000..81a9f497 --- /dev/null +++ b/server/src/main/java/au/org/aodn/ogcapi/server/core/model/enumeration/DatasetDownloadEnums.java @@ -0,0 +1,49 @@ +package au.org.aodn.ogcapi.server.core.model.enumeration; + +import lombok.Getter; + +/** + * the values are used for aws batch's environment variables + */ +public class DatasetDownloadEnums { + + @Getter + public enum Condition { + UUID("UUID"), + START_DATE("START_DATE"), + END_DATE("END_DATE"), + MIN_LATITUDE("MIN_LAT"), + MAX_LATITUDE("MAX_LAT"), + MIN_LONGITUDE("MIN_LON"), + MAX_LONGITUDE("MAX_LON"), + RECIPIENT("RECIPIENT"); + + private final String value; + + Condition(String value) { + this.value = value; + } + } + + @Getter + public enum JobDefinition { + GENERATE_CSV_DATA_FILE("generate-csv-data-file"); + + private final String value; + + JobDefinition(String value) { + this.value = value; + } + } + + @Getter + public enum JobQueue { + GENERATING_CSV_DATA_FILE("generate-csv-data-file"); + + private final String value; + + JobQueue(String value) { + this.value = value; + } + } +} diff --git a/server/src/main/java/au/org/aodn/ogcapi/server/core/model/enumeration/ProcessIdEnum.java b/server/src/main/java/au/org/aodn/ogcapi/server/core/model/enumeration/ProcessIdEnum.java new file mode 100644 index 00000000..117b4c06 --- /dev/null +++ b/server/src/main/java/au/org/aodn/ogcapi/server/core/model/enumeration/ProcessIdEnum.java @@ -0,0 +1,15 @@ +package au.org.aodn.ogcapi.server.core.model.enumeration; + +import lombok.Getter; + +@Getter +public enum ProcessIdEnum { + DOWNLOAD_DATASET("download"), + ; + + private final String value; + + ProcessIdEnum(String value) { + this.value = value; + } +} diff --git a/server/src/main/java/au/org/aodn/ogcapi/server/processes/RestApi.java b/server/src/main/java/au/org/aodn/ogcapi/server/processes/RestApi.java new file mode 100644 index 00000000..c56913dc --- /dev/null +++ b/server/src/main/java/au/org/aodn/ogcapi/server/processes/RestApi.java @@ -0,0 +1,90 @@ +package au.org.aodn.ogcapi.server.processes; + + +import au.org.aodn.ogcapi.processes.api.ProcessesApi; +import au.org.aodn.ogcapi.processes.model.Execute; +import au.org.aodn.ogcapi.processes.model.InlineResponse200; +import au.org.aodn.ogcapi.processes.model.ProcessList; +import au.org.aodn.ogcapi.processes.model.Results; +import au.org.aodn.ogcapi.server.core.model.InlineValue; +import au.org.aodn.ogcapi.server.core.model.enumeration.ProcessIdEnum; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.enums.ParameterIn; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.Valid; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +@Slf4j +@RestController("ProcessesRestApi") +@RequestMapping(value = "/api/v1/ogc") +public class RestApi implements ProcessesApi { + + @Autowired + private RestServices restServices; + + @Override + // because the produces value in the interface declaration includes "/_" which may + // cause exception thrown sometimes. So i re-declared the produces value here + @RequestMapping(value = "/processes/{processID}/execution", + produces = { "application/json", "text/html" }, + consumes = { "application/json" }, + method = RequestMethod.POST) + public ResponseEntity execute( + @Parameter(in = ParameterIn.PATH, required=true, schema=@Schema()) + @PathVariable("processID") + String processID, + @Parameter(in = ParameterIn.DEFAULT, description = "Mandatory execute request JSON", required=true, schema=@Schema()) + @Valid + @RequestBody Execute body){ + + if (processID.equals(ProcessIdEnum.DOWNLOAD_DATASET.getValue())) { + try { + var response = restServices.downloadData( + (String) body.getInputs().get("collectionId"), + (String) body.getInputs().get("start_date"), + (String) body.getInputs().get("end_date"), + (String) body.getInputs().get("min_lat"), + (String) body.getInputs().get("min_lon"), + (String) body.getInputs().get("max_lat"), + (String) body.getInputs().get("max_lon"), + (String) body.getInputs().get("recipient") + ); + + var value = new InlineValue(response.getBody()); + var results = new Results(); + results.put("message", value); + + return ResponseEntity.ok(results); + + } catch (Exception e) { + log.error(e.getMessage()); + var response = new Results(); + var value = new InlineValue("Error while getting dataset"); + response.put("error", value); + + return ResponseEntity.badRequest().body(response); + } + } + + var response = new Results(); + var value = new InlineValue("Unknown process ID: " + processID); + response.put("error", value); + + return ResponseEntity.badRequest().body(response); + } + + + @Override + public ResponseEntity getProcessDescription(String processID) { + return ResponseEntity.status(HttpStatus.NOT_IMPLEMENTED).build(); + } + + @Override + public ResponseEntity getProcesses() { + return ResponseEntity.status(HttpStatus.NOT_IMPLEMENTED).build(); + } +} diff --git a/server/src/main/java/au/org/aodn/ogcapi/server/processes/RestServices.java b/server/src/main/java/au/org/aodn/ogcapi/server/processes/RestServices.java new file mode 100644 index 00000000..57599f3e --- /dev/null +++ b/server/src/main/java/au/org/aodn/ogcapi/server/processes/RestServices.java @@ -0,0 +1,121 @@ +package au.org.aodn.ogcapi.server.processes; + +import au.org.aodn.ogcapi.server.core.model.enumeration.DatasetDownloadEnums; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.ResponseEntity; +import software.amazon.awssdk.services.batch.BatchClient; +import software.amazon.awssdk.services.batch.model.*; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Slf4j +public class RestServices { + + private final BatchClient batchClient; + private final ObjectMapper objectMapper; + + public RestServices(BatchClient batchClient, ObjectMapper objectMapper) { + this.batchClient = batchClient; + this.objectMapper = objectMapper; + } + + public ResponseEntity downloadData( + String id, + String startDate, + String endDate, + String minLat, + String minLon, + String maxLat, + String maxLon, + String recipient + ) { + try { + + Map parameters = new HashMap<>(); + parameters.put(DatasetDownloadEnums.Condition.UUID.getValue(), id); + parameters.put(DatasetDownloadEnums.Condition.START_DATE.getValue(), startDate); + parameters.put(DatasetDownloadEnums.Condition.END_DATE.getValue(), endDate); + parameters.put(DatasetDownloadEnums.Condition.MIN_LATITUDE.getValue(), minLat); + parameters.put(DatasetDownloadEnums.Condition.MIN_LONGITUDE.getValue(), minLon); + parameters.put(DatasetDownloadEnums.Condition.MAX_LATITUDE.getValue(), maxLat); + parameters.put(DatasetDownloadEnums.Condition.MAX_LONGITUDE.getValue(), maxLon); + parameters.put(DatasetDownloadEnums.Condition.RECIPIENT.getValue(), recipient); + + + String jobId = submitJob( + "generating-data-file-for-" + recipient.replaceAll("[^a-zA-Z0-9-_]", "-"), + DatasetDownloadEnums.JobQueue.GENERATING_CSV_DATA_FILE.getValue(), + DatasetDownloadEnums.JobDefinition.GENERATE_CSV_DATA_FILE.getValue(), + parameters); + log.info("Job submitted with ID: " + jobId); + return ResponseEntity.ok("Job submitted with ID: " + jobId); + } catch (Exception e) { + + log.error("Error while getting dataset"); + log.error(e.getMessage()); + return ResponseEntity.badRequest().body("Error while getting dataset"); + } + } + + private String submitJob(String jobName, String jobQueue, String jobDefinition, Map parameters) { + + List environmentVariables = parameters.entrySet().stream() + .map(entry -> + KeyValuePair + .builder() + .name(entry.getKey()) + .value(entry.getValue()) + .build() + ) + .toList(); + + SubmitJobRequest submitJobRequest = SubmitJobRequest.builder() + .jobName(jobName) + .jobQueue(jobQueue) + .jobDefinition(jobDefinition) + .parameters(parameters) + .containerOverrides(co -> co.environment(environmentVariables)) + .build(); + + SubmitJobResponse submitJobResponse = batchClient.submitJob(submitJobRequest); + return submitJobResponse.jobId(); + } + + + + // TODO: This feature doesn't work yet. Will be implemented in the future as this one is not urgent + public boolean isJobQueueValid(String jobQueueName) throws IOException { + + var remoteJobQueueDetail = getRemoteJobQueueBy(jobQueueName); + var localJobQueueDetail = getLocalJobQueueDetailBy(jobQueueName); + + return remoteJobQueueDetail.equals(localJobQueueDetail); + } + + public JobQueueDetail getRemoteJobQueueBy(String name) { + var request = DescribeJobQueuesRequest + .builder() + .jobQueues(name) + .build(); + + var jobQueues = batchClient.describeJobQueues(request).jobQueues(); + + if (jobQueues != null && jobQueues.size() == 1) { + return jobQueues.get(0); + } + return null; + } + + public JobQueueDetail getLocalJobQueueDetailBy(String jobQueueName) throws IOException { + var configJsonPath = "server/src/main/java/au/org/aodn/ogcapi/server/processes/config/" + jobQueueName + ".json"; + var jsonFile = new File(configJsonPath); + var jsonStr = objectMapper.writeValueAsString(objectMapper.readValue(jsonFile, Object.class)); + return objectMapper.readValue(jsonStr, JobQueueDetail.class); + } + +} diff --git a/server/src/main/java/au/org/aodn/ogcapi/server/processes/config/generate-csv-data-file.json b/server/src/main/java/au/org/aodn/ogcapi/server/processes/config/generate-csv-data-file.json new file mode 100644 index 00000000..e7ffadfb --- /dev/null +++ b/server/src/main/java/au/org/aodn/ogcapi/server/processes/config/generate-csv-data-file.json @@ -0,0 +1,18 @@ +{ + "jobQueueName": "generate-csv-data-file", + "jobQueueArn": "arn:aws:batch:ap-southeast-2:704910415367:job-queue/generate-csv-data-file", + "state": "ENABLED", + "status": "VALID", + "statusReason": "JobQueue Healthy", + "priority": 1, + "computeEnvironmentOrder": [ + { + "order": 1, + "computeEnvironment": "arn:aws:batch:ap-southeast-2:704910415367:compute-environment/generate-csv-data-file" + } + ], + "serviceEnvironmentOrder": [], + "jobQueueType": "ECS_FARGATE", + "tags": {}, + "jobStateTimeLimitActions": [] +} diff --git a/server/src/main/resources/application.yaml b/server/src/main/resources/application.yaml index 7c866e21..07470c4c 100644 --- a/server/src/main/resources/application.yaml +++ b/server/src/main/resources/application.yaml @@ -19,6 +19,9 @@ elasticsearch: path: search_suggestions fields: abstract_phrases, parameter_vocabs_sayt, platform_vocabs_sayt, organisation_vocabs_sayt +aws: + region: ap-southeast-2 + api: host: http://localhost:${server.port} diff --git a/server/src/test/java/au/org/aodn/ogcapi/server/processes/RestApiTest.java b/server/src/test/java/au/org/aodn/ogcapi/server/processes/RestApiTest.java new file mode 100644 index 00000000..1b7cd560 --- /dev/null +++ b/server/src/test/java/au/org/aodn/ogcapi/server/processes/RestApiTest.java @@ -0,0 +1,87 @@ +package au.org.aodn.ogcapi.server.processes; + +import au.org.aodn.ogcapi.processes.model.Execute; +import au.org.aodn.ogcapi.processes.model.InlineResponse200; +import au.org.aodn.ogcapi.processes.model.Results; +import au.org.aodn.ogcapi.server.core.model.InlineValue; +import au.org.aodn.ogcapi.server.core.model.enumeration.ProcessIdEnum; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.http.ResponseEntity; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class RestApiTest { + + @Mock + private RestServices restServices; + + @InjectMocks + private RestApi restApi; + + private Execute executeRequest; + + @BeforeEach + public void setUp() { + executeRequest = new Execute(); + Map inputs = new HashMap<>(); + inputs.put("collectionId", "test-collection-id"); + inputs.put("start_date", "2023-01-01"); + inputs.put("end_date", "2023-01-31"); + inputs.put("min_lat", "-10.0"); + inputs.put("min_lon", "110.0"); + inputs.put("max_lat", "10.0"); + inputs.put("max_lon", "150.0"); + inputs.put("recipient", "test@example.com"); + executeRequest.setInputs(inputs); + } + + @Test + public void testExecuteDownloadDatasetSuccess() { + when(restServices.downloadData(anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString())) + .thenReturn(ResponseEntity.ok("Job submitted with ID: test-job-id")); + + ResponseEntity response = restApi.execute(ProcessIdEnum.DOWNLOAD_DATASET.getValue(), executeRequest); + + assertEquals(200, response.getStatusCode().value()); + Results results = (Results) response.getBody(); + assert results != null; + InlineValue message = (InlineValue) results.get("message"); + assertEquals("Job submitted with ID: test-job-id", message.message()); + } + + @Test + public void testExecuteDownloadDatasetError() { + when(restServices.downloadData(anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString())) + .thenThrow(new RuntimeException("Error while getting dataset")); + + ResponseEntity response = restApi.execute(ProcessIdEnum.DOWNLOAD_DATASET.getValue(), executeRequest); + + assertEquals(400, response.getStatusCode().value()); + Results results = (Results) response.getBody(); + assert results != null; + InlineValue error = (InlineValue) results.get("error"); + assertEquals("Error while getting dataset", error.message()); + } + + @Test + public void testExecuteUnknownProcessId() { + ResponseEntity response = restApi.execute("unknown-process-id", executeRequest); + + assertEquals(400, response.getStatusCode().value()); + Results results = (Results) response.getBody(); + assert results != null; + InlineValue error = (InlineValue) results.get("error"); + assertEquals("Unknown process ID: unknown-process-id", error.message()); + } +} diff --git a/server/src/test/java/au/org/aodn/ogcapi/server/processes/RestServicesTest.java b/server/src/test/java/au/org/aodn/ogcapi/server/processes/RestServicesTest.java new file mode 100644 index 00000000..e9dcac18 --- /dev/null +++ b/server/src/test/java/au/org/aodn/ogcapi/server/processes/RestServicesTest.java @@ -0,0 +1,59 @@ +package au.org.aodn.ogcapi.server.processes; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.springframework.http.ResponseEntity; +import software.amazon.awssdk.services.batch.BatchClient; +import software.amazon.awssdk.services.batch.model.SubmitJobRequest; +import software.amazon.awssdk.services.batch.model.SubmitJobResponse; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +public class RestServicesTest { + + @Mock + private BatchClient batchClient; + + @InjectMocks + private RestServices restServices; + + @BeforeEach + public void setUp() { + MockitoAnnotations.openMocks(this); + } + + @Test + public void testDownloadDataSuccess() { + // Arrange + String jobId = "12345"; + SubmitJobResponse submitJobResponse = SubmitJobResponse.builder().jobId(jobId).build(); + when(batchClient.submitJob(any(SubmitJobRequest.class))).thenReturn(submitJobResponse); + + // Act + ResponseEntity response = restServices.downloadData( + "id", "2021-01-01", "2021-01-31", "10.0", "20.0", "30.0", "40.0", "recipient@example.com"); + + // Assert + assertEquals(ResponseEntity.ok("Job submitted with ID: " + jobId), response); + verify(batchClient, times(1)).submitJob(any(SubmitJobRequest.class)); + } + + @Test + public void testDownloadDataFailure() { + // Arrange + when(batchClient.submitJob(any(SubmitJobRequest.class))).thenThrow(new RuntimeException("AWS Batch error")); + + // Act + ResponseEntity response = restServices.downloadData( + "id", "2021-01-01", "2021-01-31", "10.0", "20.0", "30.0", "40.0", "recipient@example.com"); + + // Assert + assertEquals(ResponseEntity.badRequest().body("Error while getting dataset"), response); + verify(batchClient, times(1)).submitJob(any(SubmitJobRequest.class)); + } +}