From b9df409e68441b59a7fbd0fe8786e22f2f19fcf6 Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Mon, 29 Oct 2018 12:41:22 +0100 Subject: [PATCH 1/2] extend task api --- .../org/elasticsearch/client/TasksClient.java | 15 ++++++ .../client/TasksRequestConverters.java | 9 ++++ .../org/elasticsearch/client/TasksIT.java | 54 +++++++++++++++++++ .../client/TasksRequestConvertersTests.java | 13 +++++ .../node/tasks/get/GetTaskResponse.java | 11 ++++ 5 files changed, 102 insertions(+) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java index 3b957b2defb0d..9d105248e66c1 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java @@ -22,6 +22,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; @@ -108,4 +110,17 @@ public void cancelAsync(CancelTasksRequest cancelTasksRequest, RequestOptions op emptySet() ); } + /** + * Get current task using the Task Management API. + * See + * Task Management API on elastic.co + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public GetTaskResponse get(GetTaskRequest request, RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity(request, TasksRequestConverters::getTask, options, + GetTaskResponse::fromXContent, emptySet()); + } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksRequestConverters.java index 45723dcc938c5..1ae170a60488e 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksRequestConverters.java @@ -22,6 +22,7 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; final class TasksRequestConverters { @@ -54,4 +55,12 @@ static Request listTasks(ListTasksRequest listTaskRequest) { .putParam("group_by", "none"); return request; } + + public static Request getTask(GetTaskRequest req) { + String endpoint = new RequestConverters.EndpointBuilder() + .addPathPartAsIs("_tasks") + .addPathPart(req.getTaskId().toString()) + .build(); + return new Request(HttpGet.METHOD_NAME, endpoint); + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java index baa97cfa5b4ef..e6dfd01075de9 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java @@ -19,17 +19,27 @@ package org.elasticsearch.client; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; +import org.elasticsearch.client.ml.DeleteJobRequest; +import org.elasticsearch.client.ml.DeleteJobResponse; +import org.elasticsearch.client.ml.PutJobRequest; +import org.elasticsearch.client.ml.job.config.Job; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import java.io.IOException; import static java.util.Collections.emptyList; +import static org.elasticsearch.client.MachineLearningGetResultsIT.buildJob; +import static org.elasticsearch.client.ml.job.config.JobTests.randomValidJobId; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; @@ -83,4 +93,48 @@ public void testCancelTasks() throws IOException { // The actual testing of task cancellation is covered by TasksIT.testTasksCancellation assertThat(response, notNullValue()); } + + public void testGetTask() throws IOException { + TaskId taskId = new TaskId(randomAlphaOfLength(5), randomNonNegativeLong()); + + GetTaskRequest getTaskRequest = new GetTaskRequest().setTaskId(taskId); + + ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, + () -> highLevelClient().tasks().get(getTaskRequest, RequestOptions.DEFAULT)); + + assertThat(exception.status(), equalTo(RestStatus.NOT_FOUND)); + + + ListTasksRequest request = new ListTasksRequest(); + ListTasksResponse response = execute(request, highLevelClient().tasks()::list, highLevelClient().tasks()::listAsync); + + assertThat(response, notNullValue()); + + } + + public void testGetTaskFoundWithError() throws IOException { + + String jobId = randomValidJobId(); + Job job = MachineLearningIT.buildJob(jobId); + MachineLearningClient machineLearningClient = highLevelClient().machineLearning(); + machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT); + + TaskId taskId = submitTask(jobId); + + + GetTaskRequest getTaskRequest = new GetTaskRequest().setTaskId(taskId); + + GetTaskResponse getTaskResponse = highLevelClient().tasks().get(getTaskRequest, RequestOptions.DEFAULT); + + assertTrue(getTaskResponse.getTask().isCompleted()); + } + + private TaskId submitTask(String jobId) throws IOException { + DeleteJobRequest deleteJobRequest = new DeleteJobRequest(jobId); + deleteJobRequest.setWaitForCompletion(false); + MachineLearningClient machineLearningClient = highLevelClient().machineLearning(); + DeleteJobResponse response = execute(deleteJobRequest, machineLearningClient::deleteJob, machineLearningClient::deleteJobAsync); + + return response.getTask(); + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksRequestConvertersTests.java index ff6726faee18d..bfdbfc9f76975 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksRequestConvertersTests.java @@ -22,6 +22,7 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; @@ -112,4 +113,16 @@ public void testListTasks() { assertEquals("TaskId cannot be used for list tasks request", exception.getMessage()); } } + + public void testGetTasks() { + GetTaskRequest request = new GetTaskRequest(); + TaskId taskId = new TaskId(randomAlphaOfLength(5), randomNonNegativeLong()); + request.setTaskId(taskId); + + Request httpRequest = TasksRequestConverters.getTask(request); + assertThat(httpRequest, notNullValue()); + assertThat(httpRequest.getMethod(), equalTo(HttpGet.METHOD_NAME)); + assertThat(httpRequest.getEntity(), nullValue()); + assertThat(httpRequest.getEndpoint(), equalTo("/_tasks/"+taskId.toString())); + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskResponse.java index 72f26d2d57692..18611a7d849d6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskResponse.java @@ -20,11 +20,17 @@ package org.elasticsearch.action.admin.cluster.node.tasks.get; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskResult; import java.io.IOException; @@ -75,4 +81,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public String toString() { return Strings.toString(this); } + + public static GetTaskResponse fromXContent(XContentParser parser) { + TaskResult result = TaskResult.PARSER.apply(parser, null); + return new GetTaskResponse(result); + } } From 54b4f4fdadefdff03134d34172775b723e86de41 Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Fri, 2 Nov 2018 11:55:48 +0100 Subject: [PATCH 2/2] response and test refactoring --- .../IndexUpgradeSubmissionResponse.java | 68 ++------------- .../client/tasks/TaskSubmissionResponse.java | 84 +++++++++++++++++++ .../org/elasticsearch/client/TasksIT.java | 58 ++++++++----- 3 files changed, 125 insertions(+), 85 deletions(-) create mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/migration/IndexUpgradeSubmissionResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/migration/IndexUpgradeSubmissionResponse.java index 7c0aefc722f7d..21191ec6437e8 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/migration/IndexUpgradeSubmissionResponse.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/migration/IndexUpgradeSubmissionResponse.java @@ -18,79 +18,23 @@ */ package org.elasticsearch.client.migration; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ObjectParser; -import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.client.tasks.TaskSubmissionResponse; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.tasks.TaskId; import java.io.IOException; -import java.util.Objects; /** * Response object that contains the taskID from submitted IndexUpgradeRequest */ -public class IndexUpgradeSubmissionResponse extends ActionResponse implements ToXContentObject { - - private static final ParseField TASK = new ParseField("task"); - - public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - "upgrade_submission_response", - true, a-> new IndexUpgradeSubmissionResponse( (TaskId) a[0])); - - static { - PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING); - } +public class IndexUpgradeSubmissionResponse extends TaskSubmissionResponse { public static IndexUpgradeSubmissionResponse fromXContent(XContentParser parser) throws IOException { - return PARSER.parse(parser, null); - } - - private final TaskId task; - - IndexUpgradeSubmissionResponse(@Nullable TaskId task) { - this.task = task; - } - - - /** - * Get the task id - * @return the id of the upgrade task. - */ - public TaskId getTask() { - return task; - } - - @Override - public int hashCode() { - return Objects.hash(task); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (other == null || getClass() != other.getClass()) { - return false; - } - - IndexUpgradeSubmissionResponse that = (IndexUpgradeSubmissionResponse) other; - return Objects.equals(task, that.task); + TaskId taskId = PARSER.parse(parser, null); + return new IndexUpgradeSubmissionResponse(taskId); } - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - if (task != null) { - builder.field(TASK.getPreferredName(), task.toString()); - } - builder.endObject(); - return builder; + IndexUpgradeSubmissionResponse(TaskId task) { + super(task); } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java new file mode 100644 index 0000000000000..d4ee17531a20c --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java @@ -0,0 +1,84 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.tasks; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; + +/** + * Response object that contains the task id + */ +public class TaskSubmissionResponse extends ActionResponse implements ToXContentObject { + + protected static final ParseField TASK = new ParseField("task"); + + protected static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "task_submission_response", true, a -> (TaskId) a[0]); + + static { + PARSER.declareField(ConstructingObjectParser.constructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING); + } + + public static TaskSubmissionResponse fromXContent(XContentParser parser) throws IOException { + TaskId taskId = PARSER.parse(parser, null); + return new TaskSubmissionResponse(taskId); + } + + private final TaskId task; + + protected TaskSubmissionResponse(TaskId task) { + this.task = task; + } + + /** + * Get the task id of the submitted task + */ + public TaskId getTask() { + return task; + } + + @Override + public int hashCode() { + return task.hashCode(); + } + + @Override + public boolean equals(Object other) { + return task.equals(other); + + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (task != null) { + builder.field(TASK.getPreferredName(), task.toString()); + } + builder.endObject(); + return builder; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java index e6dfd01075de9..149afaf0e4e1c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java @@ -27,19 +27,22 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; -import org.elasticsearch.client.ml.DeleteJobRequest; -import org.elasticsearch.client.ml.DeleteJobResponse; -import org.elasticsearch.client.ml.PutJobRequest; -import org.elasticsearch.client.ml.job.config.Job; +import org.elasticsearch.client.migration.IndexUpgradeRequest; +import org.elasticsearch.client.migration.IndexUpgradeSubmissionResponse; +import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import java.io.IOException; +import java.util.Map; +import java.util.function.BooleanSupplier; +import java.util.function.Predicate; +import java.util.function.Supplier; import static java.util.Collections.emptyList; -import static org.elasticsearch.client.MachineLearningGetResultsIT.buildJob; -import static org.elasticsearch.client.ml.job.config.JobTests.randomValidJobId; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; @@ -112,29 +115,38 @@ public void testGetTask() throws IOException { } - public void testGetTaskFoundWithError() throws IOException { + public void testGetTaskFoundWithError() throws IOException, InterruptedException { + createIndex("test", Settings.EMPTY); - String jobId = randomValidJobId(); - Job job = MachineLearningIT.buildJob(jobId); - MachineLearningClient machineLearningClient = highLevelClient().machineLearning(); - machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT); + IndexUpgradeSubmissionResponse submissionResult = highLevelClient().migration() + .submitUpgradeTask(new IndexUpgradeRequest("test"), RequestOptions.DEFAULT); - TaskId taskId = submitTask(jobId); + GetTaskRequest getTaskRequest = new GetTaskRequest().setTaskId(submissionResult.getTask()); + CheckedSupplier getTaskResponse = + () -> highLevelClient().tasks().get(getTaskRequest, RequestOptions.DEFAULT); + Predicate isCompleted = r -> r.getTask().isCompleted(); - GetTaskRequest getTaskRequest = new GetTaskRequest().setTaskId(taskId); - - GetTaskResponse getTaskResponse = highLevelClient().tasks().get(getTaskRequest, RequestOptions.DEFAULT); - - assertTrue(getTaskResponse.getTask().isCompleted()); + GetTaskResponse response = await(getTaskResponse, isCompleted); + Map errorAsMap = response.getTask().getErrorAsMap(); + assertThat((String) errorAsMap.get("reason"), containsString("cannot be upgraded")); } - private TaskId submitTask(String jobId) throws IOException { - DeleteJobRequest deleteJobRequest = new DeleteJobRequest(jobId); - deleteJobRequest.setWaitForCompletion(false); - MachineLearningClient machineLearningClient = highLevelClient().machineLearning(); - DeleteJobResponse response = execute(deleteJobRequest, machineLearningClient::deleteJob, machineLearningClient::deleteJobAsync); + private GetTaskResponse await(CheckedSupplier supplier, Predicate predicate) + throws InterruptedException { + BooleanSupplier isCompleted = () -> predicate.test(call(supplier).get()); + awaitBusy(isCompleted); + return call(supplier).get(); + } - return response.getTask(); + private Supplier call(CheckedSupplier supplier) { + return () -> { + try { + return supplier.get(); + } catch (IOException e) { + logger.warn("Exception while fetching task response", e); + return null; + } + }; } }