diff --git a/build.gradle b/build.gradle index 550e2f0c..86ee71f8 100644 --- a/build.gradle +++ b/build.gradle @@ -33,6 +33,7 @@ buildscript { } plugins { + id 'java' id 'java-library' id "com.diffplug.spotless" version "6.19.0" apply false id 'jacoco' @@ -85,7 +86,6 @@ allprojects { } } - group 'org.opensearch.sdk' publishing { @@ -129,6 +129,15 @@ repositories { maven { url "https://d1nvenhzbhpy0q.cloudfront.net/snapshots/lucene/"} } +subprojects { + repositories { + mavenLocal() + mavenCentral() + maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } + maven { url "https://d1nvenhzbhpy0q.cloudfront.net/snapshots/lucene/"} + } +} + dependencies { def opensearchVersion = "${opensearch_version}" @@ -149,6 +158,7 @@ dependencies { def slf4jVersion = "1.7.36" api("org.opensearch:opensearch:${opensearchVersion}") + implementation("org.apache.logging.log4j:log4j-api:${log4jVersion}") implementation("org.apache.logging.log4j:log4j-core:${log4jVersion}") implementation("org.apache.logging.log4j:log4j-jul:${log4jVersion}") @@ -172,17 +182,17 @@ dependencies { implementation("com.fasterxml.jackson.datatype:jackson-datatype-guava:${jacksonDatabindVersion}") implementation("com.fasterxml.jackson.core:jackson-annotations:${jacksonDatabindVersion}") - implementation("com.google.guava:guava:${guavaVersion}") + api("com.google.guava:guava:${guavaVersion}") implementation("javax.inject:javax.inject:${javaxVersion}") implementation("com.google.guava:failureaccess:${guavaFailureAccessVersion}") implementation("aopalliance:aopalliance:${aopallianceVersion}") constraints { - implementation("com.google.guava:guava:${guavaVersion}") { + api("com.google.guava:guava:${guavaVersion}") { because 'versions below 30.0 have active CVE' } } - implementation("com.google.inject:guice:${guiceVersion}") + api("com.google.inject:guice:${guiceVersion}") testImplementation("org.junit.jupiter:junit-jupiter-api:${junit5Version}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${junit5Version}") testImplementation("org.opensearch.test:framework:${opensearchVersion}") @@ -204,13 +214,6 @@ dependencies { } } -// this task runs the helloworld sample extension -task helloWorld(type: JavaExec) { - group = 'Execution' - description = 'Run HelloWorld Extension.' - mainClass = 'org.opensearch.sdk.sample.helloworld.HelloWorldExtension' - classpath = sourceSets.main.runtimeClasspath -} task getVersion() { doLast { diff --git a/sample/.gitignore b/sample/.gitignore new file mode 100644 index 00000000..f68d1099 --- /dev/null +++ b/sample/.gitignore @@ -0,0 +1,29 @@ +### IntelliJ IDEA ### +out/ +!**/src/main/**/out/ +!**/src/test/**/out/ + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache +bin/ +!**/src/main/**/bin/ +!**/src/test/**/bin/ + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/sample/build.gradle b/sample/build.gradle new file mode 100644 index 00000000..b11c6435 --- /dev/null +++ b/sample/build.gradle @@ -0,0 +1,99 @@ +import org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestFramework + +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +plugins { + id 'java' + id "com.diffplug.spotless" version "6.19.0" apply false + id 'jacoco' + id "com.form.diff-coverage" version "0.9.5" + // for javadocs and checks spotless doesn't do + id 'checkstyle' +} + +ext { + projectSubstitutions = [:] + licenseFile = rootProject.file('LICENSE.txt') + noticeFile = rootProject.file('NOTICE.txt') +} + + +apply plugin: 'application' +apply plugin: 'maven-publish' + +// Temporary to keep "gradle run" working +// TODO: change this to an extension designed for testing instead of duplicating a sample +// https://github.com/opensearch-project/opensearch-sdk-java/issues/175 +mainClassName = 'org.opensearch.sdk.sample.helloworld.HelloWorldExtension' + + +group 'org.opensearch.sdk.sample' +version '2.0.0-SNAPSHOT' + +java { + withSourcesJar() + withJavadocJar() +} + +publishing { + publications { + group = "${group}" + version = "${version}" + mavenJava(MavenPublication) { + from components.java + } + sourceCompatibility = 11 + targetCompatibility = 11 + } + + repositories { + maven { + name = "Snapshots" // optional target repository name + url = "https://aws.oss.sonatype.org/content/repositories/snapshots" + credentials { + username "$System.env.SONATYPE_USERNAME" + password "$System.env.SONATYPE_PASSWORD" + } + } + } +} + +test { + getTestFrameworkProperty().convention(getProviderFactory().provider(() -> new JUnitPlatformTestFramework(it.getFilter(), false))) + jvmArgs '--enable-preview' + systemProperty 'tests.security.manager', 'false' +} + +dependencies { + def opensearchVersion = "3.0.0-SNAPSHOT" + def jobSchedulerVersion = "3.0.0.0-SNAPSHOT" + def junit5Version = "5.9.3" + def junitPlatform = "1.9.3" + + implementation project(':') + implementation("org.opensearch:opensearch-job-scheduler:${jobSchedulerVersion}") + implementation("org.opensearch:opensearch-job-scheduler-spi:${jobSchedulerVersion}") + + testImplementation project(':').sourceSets.test.output + testImplementation("org.junit.jupiter:junit-jupiter-api:${junit5Version}") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${junit5Version}") + testImplementation("org.opensearch.test:framework:${opensearchVersion}") + testRuntimeOnly("org.junit.platform:junit-platform-launcher:${junitPlatform}") +} + +// this task runs the helloworld sample extension +task helloWorld(type: JavaExec) { + group = 'Execution' + description = 'Run HelloWorld Extension.' + mainClass = 'org.opensearch.sdk.sample.helloworld.HelloWorldExtension' + classpath = sourceSets.main.runtimeClasspath +} diff --git a/src/main/java/org/opensearch/sdk/sample/helloworld/ExampleCustomSettingConfig.java b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/ExampleCustomSettingConfig.java similarity index 100% rename from src/main/java/org/opensearch/sdk/sample/helloworld/ExampleCustomSettingConfig.java rename to sample/src/main/java/org/opensearch/sdk/sample/helloworld/ExampleCustomSettingConfig.java diff --git a/src/main/java/org/opensearch/sdk/sample/helloworld/HelloWorldExtension.java b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/HelloWorldExtension.java similarity index 67% rename from src/main/java/org/opensearch/sdk/sample/helloworld/HelloWorldExtension.java rename to sample/src/main/java/org/opensearch/sdk/sample/helloworld/HelloWorldExtension.java index f4b8566d..0af4457e 100644 --- a/src/main/java/org/opensearch/sdk/sample/helloworld/HelloWorldExtension.java +++ b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/HelloWorldExtension.java @@ -11,19 +11,29 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; +import com.google.common.collect.ImmutableList; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionResponse; import org.opensearch.common.settings.Setting; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.sdk.BaseExtension; import org.opensearch.sdk.Extension; import org.opensearch.sdk.ExtensionSettings; import org.opensearch.sdk.ExtensionsRunner; +import org.opensearch.sdk.SDKClient; import org.opensearch.sdk.api.ActionExtension; import org.opensearch.sdk.rest.ExtensionRestHandler; import org.opensearch.sdk.sample.helloworld.rest.RestHelloAction; import org.opensearch.sdk.sample.helloworld.rest.RestRemoteHelloAction; +import org.opensearch.sdk.sample.helloworld.schedule.GreetJob; +import org.opensearch.sdk.sample.helloworld.transport.HWJobParameterAction; +import org.opensearch.sdk.sample.helloworld.transport.HWJobParameterTransportAction; +import org.opensearch.sdk.sample.helloworld.transport.HWJobRunnerAction; +import org.opensearch.sdk.sample.helloworld.transport.HWJobRunnerTransportAction; import org.opensearch.sdk.sample.helloworld.transport.SampleAction; import org.opensearch.sdk.sample.helloworld.transport.SampleTransportAction; @@ -61,7 +71,30 @@ public List getExtensionRestHandlers() { @Override public List> getActions() { - return Arrays.asList(new ActionHandler<>(SampleAction.INSTANCE, SampleTransportAction.class)); + return Arrays.asList( + new ActionHandler<>(SampleAction.INSTANCE, SampleTransportAction.class), + new ActionHandler<>(HWJobRunnerAction.INSTANCE, HWJobRunnerTransportAction.class), + new ActionHandler<>(HWJobParameterAction.INSTANCE, HWJobParameterTransportAction.class) + ); + } + + @Override + public List getNamedXContent() { + return ImmutableList.of(GreetJob.XCONTENT_REGISTRY); + } + + @Deprecated + private SDKClient.SDKRestClient createRestClient(ExtensionsRunner runner) { + @SuppressWarnings("resource") + SDKClient.SDKRestClient client = runner.getSdkClient().initializeRestClient(); + return client; + } + + @Override + public Collection createComponents(ExtensionsRunner runner) { + SDKClient.SDKRestClient sdkRestClient = createRestClient(runner); + + return Collections.singletonList(sdkRestClient); } /** diff --git a/src/main/java/org/opensearch/sdk/sample/helloworld/rest/RestHelloAction.java b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/rest/RestHelloAction.java similarity index 100% rename from src/main/java/org/opensearch/sdk/sample/helloworld/rest/RestHelloAction.java rename to sample/src/main/java/org/opensearch/sdk/sample/helloworld/rest/RestHelloAction.java diff --git a/sample/src/main/java/org/opensearch/sdk/sample/helloworld/rest/RestRemoteHelloAction.java b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/rest/RestRemoteHelloAction.java new file mode 100644 index 00000000..bb444839 --- /dev/null +++ b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/rest/RestRemoteHelloAction.java @@ -0,0 +1,275 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sdk.sample.helloworld.rest; + +import com.google.common.io.Resources; +import jakarta.json.stream.JsonParser; +import org.apache.commons.codec.Charsets; +import org.opensearch.OpenSearchException; +import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.action.ActionListener; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.client.WarningFailureException; +import org.opensearch.client.json.JsonpMapper; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.json.jsonb.JsonbJsonpMapper; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.mapping.TypeMapping; +import org.opensearch.client.opensearch.core.IndexRequest; +import org.opensearch.client.opensearch.indices.CreateIndexRequest; +import org.opensearch.client.opensearch.indices.GetMappingResponse; +import org.opensearch.client.opensearch.indices.OpenSearchIndicesClient; +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.extensions.ExtensionsManager; +import org.opensearch.extensions.action.RemoteExtensionActionResponse; +import org.opensearch.extensions.rest.ExtensionRestResponse; +import org.opensearch.jobscheduler.JobSchedulerPlugin; +import org.opensearch.jobscheduler.rest.request.GetJobDetailsRequest; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.jobscheduler.spi.schedule.Schedule; +import org.opensearch.rest.NamedRoute; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.RestStatus; +import org.opensearch.sdk.ExtensionsRunner; +import org.opensearch.sdk.SDKClient; +import org.opensearch.sdk.action.RemoteExtensionAction; +import org.opensearch.sdk.action.RemoteExtensionActionRequest; +import org.opensearch.sdk.rest.BaseExtensionRestHandler; +import org.opensearch.sdk.sample.helloworld.schedule.GreetJob; +import org.opensearch.sdk.sample.helloworld.transport.HWJobParameterAction; +import org.opensearch.sdk.sample.helloworld.transport.HWJobRunnerAction; +import org.opensearch.sdk.sample.helloworld.transport.SampleAction; +import org.opensearch.sdk.sample.helloworld.transport.SampleRequest; +import org.opensearch.sdk.sample.helloworld.transport.SampleResponse; + +import java.io.IOException; +import java.io.StringReader; +import java.net.URL; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.opensearch.rest.RestRequest.Method.GET; +import static org.opensearch.rest.RestStatus.OK; + +/** + * Sample REST Handler demonstrating proxy actions to another extension + */ +public class RestRemoteHelloAction extends BaseExtensionRestHandler { + private ExtensionsRunner extensionsRunner; + + /** + * Instantiate this action + * + * @param runner The ExtensionsRunner instance + */ + public RestRemoteHelloAction(ExtensionsRunner runner) { + this.extensionsRunner = runner; + } + + @Override + public List routes() { + return List.of( + new NamedRoute.Builder().method(GET) + .path("/hello/{name}") + .handler(handleRemoteGetRequest) + .uniqueName(routePrefix("remote_greet_with_name")) + .legacyActionNames(Collections.emptySet()) + .build(), + new NamedRoute.Builder().method(GET) + .path("/schedule/hello") + .handler(handleScheduleRequest) + .uniqueName(routePrefix("scheduled_greet")) + .legacyActionNames(Collections.emptySet()) + .build() + ); + } + + private void registerJobDetails(SDKClient.SDKRestClient client) throws IOException { + + XContentBuilder requestBody = JsonXContent.contentBuilder(); + requestBody.startObject(); + requestBody.field(GetJobDetailsRequest.JOB_INDEX, GreetJob.HELLO_WORLD_JOB_INDEX); + requestBody.field(GetJobDetailsRequest.JOB_TYPE, GreetJob.PARSE_FIELD_NAME); + requestBody.field(GetJobDetailsRequest.JOB_PARAMETER_ACTION, HWJobParameterAction.class.getName()); + requestBody.field(GetJobDetailsRequest.JOB_RUNNER_ACTION, HWJobRunnerAction.class.getName()); + requestBody.field(GetJobDetailsRequest.EXTENSION_UNIQUE_ID, extensionsRunner.getSdkTransportService().getUniqueId()); + requestBody.endObject(); + + Request request = new Request("PUT", String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_job_details")); + request.setJsonEntity(Strings.toString(requestBody)); + + Response response = client.performRequest(request); + boolean registeredJobDetails = RestStatus.fromCode(response.getStatusLine().getStatusCode()) == RestStatus.OK ? true : false; + } + + /** + * Get hello world job index mapping json content. + * + * @return hello world job index mapping + * @throws IOException IOException if mapping file can't be read correctly + */ + public static String getHelloWorldJobMappings() throws IOException { + URL url = RestRemoteHelloAction.class.getClassLoader().getResource("mappings/hello-world-jobs.json"); + return Resources.toString(url, Charsets.UTF_8); + } + + private JsonpMapper setupMapper(int rand) { + // Randomly choose json-b or jackson + if (rand % 2 == 0) { + return new JsonbJsonpMapper() { + @Override + public boolean ignoreUnknownFields() { + return false; + } + }; + } else { + return new JacksonJsonpMapper() { + @Override + public boolean ignoreUnknownFields() { + return false; + } + }; + } + } + + /** + * Deserializes json string into a java object + * + * @param json The JSON string + * @param clazz The class to deserialize to + * @param The class to deserialize to + * + * @return Object instance of clazz requested + */ + public T fromJson(String json, Class clazz) { + int rand = new Random().nextInt(); + JsonpMapper mapper = setupMapper(rand); + JsonParser parser = mapper.jsonProvider().createParser(new StringReader(json)); + return mapper.deserialize(parser, clazz); + } + + private Function handleScheduleRequest = (request) -> { + SDKClient client = extensionsRunner.getSdkClient(); + SDKClient.SDKRestClient restClient = client.initializeRestClient(); + OpenSearchClient javaClient = client.initializeJavaClient(); + + try { + registerJobDetails(restClient); + } catch (WarningFailureException e) { + // ignore + } catch (Exception e) { + if (e instanceof ResourceAlreadyExistsException + || e.getCause() instanceof ResourceAlreadyExistsException + || e.getMessage().contains("resource_already_exists_exception")) { + // ignore + } else { + // Catch all other OpenSearchExceptions + return new ExtensionRestResponse(request, RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()); + } + } + + try { + String mappingsJson = getHelloWorldJobMappings(); + GetMappingResponse response = fromJson(mappingsJson, GetMappingResponse.class); + TypeMapping mappings = response.get(GreetJob.HELLO_WORLD_JOB_INDEX).mappings(); + + CreateIndexRequest cir = new CreateIndexRequest.Builder().index(GreetJob.HELLO_WORLD_JOB_INDEX).mappings(mappings).build(); + + OpenSearchIndicesClient indicesClient = javaClient.indices(); + indicesClient.create(cir); + } catch (IOException e) { + return new ExtensionRestResponse(request, RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()); + } catch (WarningFailureException e) { + // TODO This is failing on ConvertResponse. Ignoring. + /* + * org.opensearch.transport.RemoteTransportException: [hello-world][127.0.0.1:4532][internal:extensions/restexecuteonextensiontaction] + * Caused by: org.opensearch.common.io.stream.NotSerializableExceptionWrapper: warning_failure_exception: method [PUT], host [https://127.0.0.1:9200], URI [/.hello-world-jobs], status line [HTTP/2.0 200 OK] + * Warnings: [index name [.hello-world-jobs] starts with a dot '.', in the next major version, index names starting with a dot are reserved for hidden indices and system indices, this request accesses system indices: [.opendistro_security], but in a future major version, direct access to system indices will be prevented by default] + * {"acknowledged":true,"shards_acknowledged":true,"index":".hello-world-jobs"} + */ + } catch (OpenSearchException e) { + if (e instanceof ResourceAlreadyExistsException || e.getCause() instanceof ResourceAlreadyExistsException) {} else { + // Catch all other OpenSearchExceptions + return new ExtensionRestResponse(request, RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()); + } + } catch (Exception e) { + if (e.getMessage().contains("resource_already_exists_exception")) {} else { + // Catch all other OpenSearchExceptions + return new ExtensionRestResponse(request, RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()); + } + } + + Schedule schedule = new IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES); + Duration duration = Duration.of(1, ChronoUnit.MINUTES); + + GreetJob job = new GreetJob("hw", schedule, true, Instant.now(), null, Instant.now(), duration.getSeconds()); + + try { + // Reference: AnomalyDetector - IndexAnomalyDetectorJobActionHandler.indexAnomalyDetectorJob + IndexRequest ir = new IndexRequest.Builder().index(GreetJob.HELLO_WORLD_JOB_INDEX).document(job.toPojo()).build(); + javaClient.index(ir); + } catch (IOException e) { + return new ExtensionRestResponse(request, RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()); + } + + return new ExtensionRestResponse(request, OK, "GreetJob successfully scheduled"); + }; + + private Function handleRemoteGetRequest = (request) -> { + SDKClient client = extensionsRunner.getSdkClient(); + + String name = request.param("name"); + // Create a request using class on remote + // This class happens to be local for simplicity but is a class on the remote extension + SampleRequest sampleRequest = new SampleRequest(name); + + // Serialize this request in a proxy action request + // This requires that the remote extension has a corresponding transport action registered + // This Action class happens to be local for simplicity but is a class on the remote extension + RemoteExtensionActionRequest proxyActionRequest = new RemoteExtensionActionRequest(SampleAction.INSTANCE, sampleRequest); + + // TODO: We need async client.execute to hide these action listener details and return the future directly + // https://github.com/opensearch-project/opensearch-sdk-java/issues/584 + CompletableFuture futureResponse = new CompletableFuture<>(); + client.execute( + RemoteExtensionAction.INSTANCE, + proxyActionRequest, + ActionListener.wrap(r -> futureResponse.complete(r), e -> futureResponse.completeExceptionally(e)) + ); + try { + RemoteExtensionActionResponse response = futureResponse.orTimeout( + ExtensionsManager.EXTENSION_REQUEST_WAIT_TIMEOUT, + TimeUnit.SECONDS + ).get(); + if (!response.isSuccess()) { + return new ExtensionRestResponse(request, OK, "Remote extension response failed: " + response.getResponseBytesAsString()); + } + // Parse out the expected response class from the bytes + SampleResponse sampleResponse = new SampleResponse(StreamInput.wrap(response.getResponseBytes())); + return new ExtensionRestResponse(request, OK, "Received greeting from remote extension: " + sampleResponse.getGreeting()); + } catch (Exception e) { + return exceptionalRequest(request, e); + } + }; + +} diff --git a/sample/src/main/java/org/opensearch/sdk/sample/helloworld/schedule/GreetJob.java b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/schedule/GreetJob.java new file mode 100644 index 00000000..01c66837 --- /dev/null +++ b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/schedule/GreetJob.java @@ -0,0 +1,380 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sdk.sample.helloworld.schedule; + +import com.google.common.base.Objects; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.core.ParseField; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.schedule.CronSchedule; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.jobscheduler.spi.schedule.Schedule; +import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; + +import java.io.IOException; +import java.time.Instant; + +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + +/** + * Sample scheduled job for the HelloWorld Extension + */ +public class GreetJob implements Writeable, ToXContentObject, ScheduledJobParameter { + enum ScheduleType { + CRON, + INTERVAL + } + + public static final String PARSE_FIELD_NAME = "GreetJob"; + public static final NamedXContentRegistry.Entry XCONTENT_REGISTRY = new NamedXContentRegistry.Entry( + GreetJob.class, + new ParseField(PARSE_FIELD_NAME), + it -> parse(it) + ); + + public static final String HELLO_WORLD_JOB_INDEX = ".hello-world-jobs"; + public static final String NAME_FIELD = "name"; + public static final String LAST_UPDATE_TIME_FIELD = "last_update_time"; + public static final String LOCK_DURATION_SECONDS = "lock_duration_seconds"; + + public static final String SCHEDULE_FIELD = "schedule"; + public static final String IS_ENABLED_FIELD = "enabled"; + public static final String ENABLED_TIME_FIELD = "enabled_time"; + public static final String DISABLED_TIME_FIELD = "disabled_time"; + + private final String name; + private final Schedule schedule; + private final Boolean isEnabled; + private final Instant enabledTime; + private final Instant disabledTime; + private final Instant lastUpdateTime; + private final Long lockDurationSeconds; + + /** + * + * @param name name of the scheduled job + * @param schedule The schedule, cron or interval, the job run will run with + * @param isEnabled Flag to indices whether this job is enabled + * @param enabledTime Timestamp when the job was last enabled + * @param disabledTime Timestamp when the job was last disabled + * @param lastUpdateTime Timestamp when the job was last updated + * @param lockDurationSeconds Time in seconds for how long this job should acquire a lock + */ + public GreetJob( + String name, + Schedule schedule, + Boolean isEnabled, + Instant enabledTime, + Instant disabledTime, + Instant lastUpdateTime, + Long lockDurationSeconds + ) { + this.name = name; + this.schedule = schedule; + this.isEnabled = isEnabled; + this.enabledTime = enabledTime; + this.disabledTime = disabledTime; + this.lastUpdateTime = lastUpdateTime; + this.lockDurationSeconds = lockDurationSeconds; + } + + /** + * + * @param input The input stream + * @throws IOException Thrown if there is an error parsing the input stream into a GreetJob + */ + public GreetJob(StreamInput input) throws IOException { + name = input.readString(); + if (input.readEnum(ScheduleType.class) == ScheduleType.CRON) { + schedule = new CronSchedule(input); + } else { + schedule = new IntervalSchedule(input); + } + isEnabled = input.readBoolean(); + enabledTime = input.readInstant(); + disabledTime = input.readInstant(); + lastUpdateTime = input.readInstant(); + lockDurationSeconds = input.readLong(); + } + + /** + * + * @param builder An XContentBuilder instance + * @param params TOXContent.Params + * @return + * @throws IOException + */ + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + XContentBuilder xContentBuilder = builder.startObject() + .field(NAME_FIELD, name) + .field(SCHEDULE_FIELD, schedule) + .field(IS_ENABLED_FIELD, isEnabled) + .field(ENABLED_TIME_FIELD, enabledTime.toEpochMilli()) + .field(LAST_UPDATE_TIME_FIELD, lastUpdateTime.toEpochMilli()) + .field(LOCK_DURATION_SECONDS, lockDurationSeconds); + if (disabledTime != null) { + xContentBuilder.field(DISABLED_TIME_FIELD, disabledTime.toEpochMilli()); + } + return xContentBuilder.endObject(); + } + + /** + * + * @param output The output stream + * @throws IOException + */ + @Override + public void writeTo(StreamOutput output) throws IOException { + output.writeString(name); + if (schedule instanceof CronSchedule) { + output.writeEnum(ScheduleType.CRON); + } else { + output.writeEnum(ScheduleType.INTERVAL); + } + schedule.writeTo(output); + output.writeBoolean(isEnabled); + output.writeInstant(enabledTime); + output.writeInstant(disabledTime); + output.writeInstant(lastUpdateTime); + output.writeLong(lockDurationSeconds); + } + + /** + * + * @param parser Parser that takes builds a GreetJob from XContent + * @return An instance of a GreetJob + * @throws IOException + */ + public static GreetJob parse(XContentParser parser) throws IOException { + String name = null; + Schedule schedule = null; + // we cannot set it to null as isEnabled() would do the unboxing and results in null pointer exception + Boolean isEnabled = Boolean.FALSE; + Instant enabledTime = null; + Instant disabledTime = null; + Instant lastUpdateTime = null; + Long lockDurationSeconds = 5L; + + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + + switch (fieldName) { + case NAME_FIELD: + name = parser.text(); + break; + case SCHEDULE_FIELD: + schedule = ScheduleParser.parse(parser); + break; + case IS_ENABLED_FIELD: + isEnabled = parser.booleanValue(); + break; + case ENABLED_TIME_FIELD: + enabledTime = toInstant(parser); + break; + case DISABLED_TIME_FIELD: + disabledTime = toInstant(parser); + break; + case LAST_UPDATE_TIME_FIELD: + lastUpdateTime = toInstant(parser); + break; + case LOCK_DURATION_SECONDS: + lockDurationSeconds = parser.longValue(); + break; + default: + parser.skipChildren(); + break; + } + } + return new GreetJob(name, schedule, isEnabled, enabledTime, disabledTime, lastUpdateTime, lockDurationSeconds); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GreetJob that = (GreetJob) o; + return Objects.equal(getName(), that.getName()) + && Objects.equal(getSchedule(), that.getSchedule()) + && Objects.equal(isEnabled(), that.isEnabled()) + && Objects.equal(getEnabledTime(), that.getEnabledTime()) + && Objects.equal(getDisabledTime(), that.getDisabledTime()) + && Objects.equal(getLastUpdateTime(), that.getLastUpdateTime()) + && Objects.equal(getLockDurationSeconds(), that.getLockDurationSeconds()); + } + + @Override + public int hashCode() { + return Objects.hashCode(name, schedule, isEnabled, enabledTime, lastUpdateTime); + } + + @Override + public String getName() { + return name; + } + + @Override + public Schedule getSchedule() { + return schedule; + } + + @Override + public boolean isEnabled() { + return isEnabled; + } + + @Override + public Instant getEnabledTime() { + return enabledTime; + } + + public Instant getDisabledTime() { + return disabledTime; + } + + @Override + public Instant getLastUpdateTime() { + return lastUpdateTime; + } + + @Override + public Long getLockDurationSeconds() { + return lockDurationSeconds; + } + + /** + * Parse content parser to {@link Instant}. + * + * @param parser json based content parser + * @return instance of {@link Instant} + * @throws IOException IOException if content can't be parsed correctly + */ + public static Instant toInstant(XContentParser parser) throws IOException { + if (parser.currentToken() == null || parser.currentToken() == XContentParser.Token.VALUE_NULL) { + return null; + } + if (parser.currentToken().isValue()) { + return Instant.ofEpochMilli(parser.longValue()); + } + return null; + } + + /** + * + * @return Returns a plain old java object of a GreetJob for writing to the hello world jobs index + */ + public GreetJobPojo toPojo() { + GreetJobPojo.SchedulePojo.IntervalPojo interval = null; + if (this.schedule instanceof IntervalSchedule) { + interval = new GreetJobPojo.SchedulePojo.IntervalPojo( + ((IntervalSchedule) this.schedule).getUnit().toString(), + ((IntervalSchedule) this.schedule).getInterval(), + ((IntervalSchedule) this.schedule).getStartTime().toEpochMilli() + ); + } + return new GreetJobPojo( + this.enabledTime.toEpochMilli(), + this.lastUpdateTime.toEpochMilli(), + this.name, + this.lockDurationSeconds.intValue(), + this.isEnabled.booleanValue(), + new GreetJobPojo.SchedulePojo(interval) + ); + } + + /** + * A plain java representation of a GreetJob using only primitives + */ + public static class GreetJobPojo { + public long enabled_time; + public long last_update_time; + + public String name; + + public int lock_duration_seconds; + + public boolean enabled; + + public SchedulePojo schedule; + + /** + * + * @param enabledTime + * @param lastUpdateTime + * @param name + * @param lockDurationSeconds + * @param enabled + * @param schedule + */ + public GreetJobPojo( + long enabledTime, + long lastUpdateTime, + String name, + int lockDurationSeconds, + boolean enabled, + SchedulePojo schedule + ) { + this.enabled_time = enabledTime; + this.last_update_time = lastUpdateTime; + this.name = name; + this.lock_duration_seconds = lockDurationSeconds; + this.enabled = enabled; + this.schedule = schedule; + } + + /** + * A plain java representation of a Schedule using only primitives + */ + public static class SchedulePojo { + + public IntervalPojo interval; + + /** + * + * @param interval An Interval instance + */ + public SchedulePojo(IntervalPojo interval) { + this.interval = interval; + } + + /** + * A plain java representation of a Interval using only primitives + */ + public static class IntervalPojo { + public String unit; + + public int period; + + public long start_time; + + /** + * + * @param unit Unit of time + * @param period Number of units between job execution + * @param start_time The time when the interval first started + */ + public IntervalPojo(String unit, int period, long start_time) { + this.unit = unit; + this.period = period; + this.start_time = start_time; + } + } + } + } +} diff --git a/src/main/java/org/opensearch/sdk/sample/helloworld/spec/openapi.json b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/spec/openapi.json similarity index 100% rename from src/main/java/org/opensearch/sdk/sample/helloworld/spec/openapi.json rename to sample/src/main/java/org/opensearch/sdk/sample/helloworld/spec/openapi.json diff --git a/src/main/java/org/opensearch/sdk/sample/helloworld/spec/openapi.yaml b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/spec/openapi.yaml similarity index 100% rename from src/main/java/org/opensearch/sdk/sample/helloworld/spec/openapi.yaml rename to sample/src/main/java/org/opensearch/sdk/sample/helloworld/spec/openapi.yaml diff --git a/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/HWJobParameterAction.java b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/HWJobParameterAction.java new file mode 100644 index 00000000..78d09714 --- /dev/null +++ b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/HWJobParameterAction.java @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sdk.sample.helloworld.transport; + +import org.opensearch.action.ActionType; +import org.opensearch.jobscheduler.transport.response.JobParameterResponse; + +/** + * Hello World Job Parameter Action + */ +public class HWJobParameterAction extends ActionType { + public static final String NAME = "extensions:hw/greet_job_parameter"; + public static final HWJobParameterAction INSTANCE = new HWJobParameterAction(); + + private HWJobParameterAction() { + super(NAME, JobParameterResponse::new); + } +} diff --git a/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/HWJobParameterTransportAction.java b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/HWJobParameterTransportAction.java new file mode 100644 index 00000000..e5b05243 --- /dev/null +++ b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/HWJobParameterTransportAction.java @@ -0,0 +1,81 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sdk.sample.helloworld.transport; + +import com.google.inject.Inject; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.TransportAction; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.jobscheduler.model.ExtensionJobParameter; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.transport.request.JobParameterRequest; +import org.opensearch.jobscheduler.transport.response.JobParameterResponse; +import org.opensearch.sdk.SDKNamedXContentRegistry; +import org.opensearch.sdk.sample.helloworld.schedule.GreetJob; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskManager; + +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.sdk.sample.helloworld.util.RestHandlerUtils.wrapRestActionListener; + +/** + * Hello World Job Parameter Transport Action + */ +public class HWJobParameterTransportAction extends TransportAction { + + private static final Logger LOG = LogManager.getLogger(HWJobParameterTransportAction.class); + + private final SDKNamedXContentRegistry xContentRegistry; + + /** + * Instantiate this action + * + * @param actionFilters Action filters + * @param taskManager The task manager + * @param xContentRegistry The xContentRegistry + */ + @Inject + protected HWJobParameterTransportAction( + ActionFilters actionFilters, + TaskManager taskManager, + SDKNamedXContentRegistry xContentRegistry + ) { + super(HWJobParameterAction.NAME, actionFilters, taskManager); + this.xContentRegistry = xContentRegistry; + } + + @Override + protected void doExecute(Task task, JobParameterRequest request, ActionListener actionListener) { + + String errorMessage = "Failed to parse the Job Parameter"; + ActionListener listener = wrapRestActionListener(actionListener, errorMessage); + try { + XContentParser parser = XContentHelper.createParser( + xContentRegistry.getRegistry(), + LoggingDeprecationHandler.INSTANCE, + request.getJobSource(), + XContentType.JSON + ); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + ScheduledJobParameter scheduledJobParameter = GreetJob.parse(parser); + JobParameterResponse jobParameterResponse = new JobParameterResponse(new ExtensionJobParameter(scheduledJobParameter)); + listener.onResponse(jobParameterResponse); + } catch (Exception e) { + LOG.error(e); + listener.onFailure(e); + } + } +} diff --git a/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/HWJobRunnerAction.java b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/HWJobRunnerAction.java new file mode 100644 index 00000000..d9c41e97 --- /dev/null +++ b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/HWJobRunnerAction.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sdk.sample.helloworld.transport; + +import org.opensearch.action.ActionType; +import org.opensearch.jobscheduler.transport.response.JobRunnerResponse; + +/** + * Hello World Job Runner Action + */ +public class HWJobRunnerAction extends ActionType { + + public static final String NAME = "extensions:hw/greet_job_runner"; + public static final HWJobRunnerAction INSTANCE = new HWJobRunnerAction(); + + private HWJobRunnerAction() { + super(NAME, JobRunnerResponse::new); + } +} diff --git a/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/HWJobRunnerTransportAction.java b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/HWJobRunnerTransportAction.java new file mode 100644 index 00000000..bc3f290c --- /dev/null +++ b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/HWJobRunnerTransportAction.java @@ -0,0 +1,164 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sdk.sample.helloworld.transport; + +import com.google.inject.Inject; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.TransportAction; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.transport.request.JobRunnerRequest; +import org.opensearch.jobscheduler.transport.response.JobRunnerResponse; +import org.opensearch.sdk.SDKClient.SDKRestClient; +import org.opensearch.sdk.SDKNamedXContentRegistry; +import org.opensearch.sdk.sample.helloworld.schedule.GreetJob; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskManager; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.sdk.sample.helloworld.util.RestHandlerUtils.wrapRestActionListener; + +/** + * Hello World Job Runner Transport Action + */ +public class HWJobRunnerTransportAction extends TransportAction { + + private static final Logger LOG = LogManager.getLogger(HWJobRunnerTransportAction.class); + + private SDKRestClient client; + private final SDKNamedXContentRegistry xContentRegistry; + + /** + * Instantiate this action + * + * @param actionFilters Action filters + * @param taskManager The task manager + * @param xContentRegistry xContentRegistry + * @param client SDKRestClient + */ + @Inject + protected HWJobRunnerTransportAction( + ActionFilters actionFilters, + TaskManager taskManager, + SDKNamedXContentRegistry xContentRegistry, + SDKRestClient client + ) { + super(HWJobRunnerAction.NAME, actionFilters, taskManager); + this.client = client; + this.xContentRegistry = xContentRegistry; + } + + @Override + protected void doExecute(Task task, JobRunnerRequest request, ActionListener actionListener) { + String errorMessage = "Failed to run the Job"; + ActionListener listener = wrapRestActionListener(actionListener, errorMessage); + try { + JobExecutionContext jobExecutionContext = request.getJobExecutionContext(); + String jobParameterDocumentId = jobExecutionContext.getJobId(); + if (jobParameterDocumentId == null || jobParameterDocumentId.isEmpty()) { + listener.onFailure(new IllegalArgumentException("jobParameterDocumentId cannot be empty or null")); + } else { + CompletableFuture inProgressFuture = new CompletableFuture<>(); + findById(jobParameterDocumentId, new ActionListener<>() { + @Override + public void onResponse(GreetJob anomalyDetectorJob) { + inProgressFuture.complete(anomalyDetectorJob); + } + + @Override + public void onFailure(Exception e) { + logger.info("could not find GreetJob with id " + jobParameterDocumentId, e); + inProgressFuture.completeExceptionally(e); + } + }); + + try { + GreetJob scheduledJobParameter = inProgressFuture.orTimeout(10000, TimeUnit.MILLISECONDS).join(); + + JobRunnerResponse jobRunnerResponse; + if (scheduledJobParameter != null && validateJobExecutionContext(jobExecutionContext)) { + jobRunnerResponse = new JobRunnerResponse(true); + } else { + jobRunnerResponse = new JobRunnerResponse(false); + } + listener.onResponse(jobRunnerResponse); + if (jobRunnerResponse.getJobRunnerStatus()) { + HelloWorldJobRunner.getJobRunnerInstance().runJob(scheduledJobParameter, jobExecutionContext); + } + } catch (CompletionException e) { + if (e.getCause() instanceof TimeoutException) { + logger.info(" Request timed out with an exception ", e); + } else { + throw e; + } + } catch (Exception e) { + logger.info(" Could not find Job Parameter due to exception ", e); + } + + } + } catch (Exception e) { + LOG.error(e); + listener.onFailure(e); + } + } + + private void findById(String jobParameterId, ActionListener listener) { + GetRequest getRequest = new GetRequest(GreetJob.HELLO_WORLD_JOB_INDEX, jobParameterId); + try { + client.get(getRequest, ActionListener.wrap(response -> { + if (!response.isExists()) { + listener.onResponse(null); + } else { + try { + XContentParser parser = XContentType.JSON.xContent() + .createParser(xContentRegistry.getRegistry(), LoggingDeprecationHandler.INSTANCE, response.getSourceAsString()); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + listener.onResponse(GreetJob.parse(parser)); + } catch (IOException e) { + logger.error("IOException occurred finding GreetJob for jobParameterId " + jobParameterId, e); + listener.onFailure(e); + } + } + }, exception -> { + logger.error("Exception occurred finding GreetJob for jobParameterId " + jobParameterId, exception); + listener.onFailure(exception); + })); + } catch (Exception e) { + logger.error("Error occurred finding greet job with jobParameterId " + jobParameterId, e); + listener.onFailure(e); + } + + } + + private boolean validateJobExecutionContext(JobExecutionContext jobExecutionContext) { + if (jobExecutionContext != null + && jobExecutionContext.getJobId() != null + && !jobExecutionContext.getJobId().isEmpty() + && jobExecutionContext.getJobIndexName() != null + && !jobExecutionContext.getJobIndexName().isEmpty() + && jobExecutionContext.getExpectedExecutionTime() != null + && jobExecutionContext.getJobVersion() != null) { + return true; + } + return false; + } +} diff --git a/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/HelloWorldJobRunner.java b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/HelloWorldJobRunner.java new file mode 100644 index 00000000..a46965bd --- /dev/null +++ b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/HelloWorldJobRunner.java @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sdk.sample.helloworld.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; + +/** + * Hello World Job Runner + */ +public class HelloWorldJobRunner implements ScheduledJobRunner { + private static final Logger log = LogManager.getLogger(HelloWorldJobRunner.class); + + private static HelloWorldJobRunner INSTANCE; + + /** + * + * @return Return or create an instance of this job runner + */ + public static HelloWorldJobRunner getJobRunnerInstance() { + if (INSTANCE != null) { + return INSTANCE; + } + synchronized (HelloWorldJobRunner.class) { + if (INSTANCE != null) { + return INSTANCE; + } + INSTANCE = new HelloWorldJobRunner(); + return INSTANCE; + } + } + + @Override + public void runJob(ScheduledJobParameter job, JobExecutionContext context) { + System.out.println("Hello, world!"); + } +} diff --git a/src/main/java/org/opensearch/sdk/sample/helloworld/transport/SampleAction.java b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/SampleAction.java similarity index 100% rename from src/main/java/org/opensearch/sdk/sample/helloworld/transport/SampleAction.java rename to sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/SampleAction.java diff --git a/src/main/java/org/opensearch/sdk/sample/helloworld/transport/SampleRequest.java b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/SampleRequest.java similarity index 100% rename from src/main/java/org/opensearch/sdk/sample/helloworld/transport/SampleRequest.java rename to sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/SampleRequest.java diff --git a/src/main/java/org/opensearch/sdk/sample/helloworld/transport/SampleResponse.java b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/SampleResponse.java similarity index 100% rename from src/main/java/org/opensearch/sdk/sample/helloworld/transport/SampleResponse.java rename to sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/SampleResponse.java diff --git a/src/main/java/org/opensearch/sdk/sample/helloworld/transport/SampleTransportAction.java b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/SampleTransportAction.java similarity index 100% rename from src/main/java/org/opensearch/sdk/sample/helloworld/transport/SampleTransportAction.java rename to sample/src/main/java/org/opensearch/sdk/sample/helloworld/transport/SampleTransportAction.java diff --git a/sample/src/main/java/org/opensearch/sdk/sample/helloworld/util/RestHandlerUtils.java b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/util/RestHandlerUtils.java new file mode 100644 index 00000000..84172973 --- /dev/null +++ b/sample/src/main/java/org/opensearch/sdk/sample/helloworld/util/RestHandlerUtils.java @@ -0,0 +1,91 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sdk.sample.helloworld.util; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchStatusException; +import org.opensearch.action.ActionListener; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.indices.InvalidIndexNameException; +import org.opensearch.rest.RestStatus; + +import static org.opensearch.rest.RestStatus.BAD_REQUEST; +import static org.opensearch.rest.RestStatus.INTERNAL_SERVER_ERROR; + +/** + * Utility functions for REST handlers. + */ +public final class RestHandlerUtils { + private static final Logger logger = LogManager.getLogger(RestHandlerUtils.class); + + public static final ToXContent.MapParams XCONTENT_WITH_TYPE = new ToXContent.MapParams(ImmutableMap.of("with_type", "true")); + + private RestHandlerUtils() {} + + /** + * Wrap action listener to avoid return verbose error message and wrong 500 error to user. + * Suggestion for exception handling in HW: + * 1. If the error is caused by wrong input, throw IllegalArgumentException exception. + * 2. For other errors, please use OpenSearchStatusException. + * + * TODO: tune this function for wrapped exception, return root exception error message + * + * @param actionListener action listener + * @param generalErrorMessage general error message + * @param action listener response type + * @return wrapped action listener + */ + public static ActionListener wrapRestActionListener(ActionListener actionListener, String generalErrorMessage) { + return ActionListener.wrap(r -> { actionListener.onResponse(r); }, e -> { + logger.error("Wrap exception before sending back to user", e); + Throwable cause = Throwables.getRootCause(e); + if (isProperExceptionToReturn(e)) { + actionListener.onFailure(e); + } else if (isProperExceptionToReturn(cause)) { + actionListener.onFailure((Exception) cause); + } else { + RestStatus status = isBadRequest(e) ? BAD_REQUEST : INTERNAL_SERVER_ERROR; + String errorMessage = generalErrorMessage; + if (isBadRequest(e)) { + errorMessage = e.getMessage(); + } else if (cause != null && isBadRequest(cause)) { + errorMessage = cause.getMessage(); + } + actionListener.onFailure(new OpenSearchStatusException(errorMessage, status)); + } + }); + } + + /** + * @param e Throwable + * @return Return a boolean whether there was an exception thrown on a request + */ + public static boolean isBadRequest(Throwable e) { + if (e == null) { + return false; + } + return e instanceof IllegalArgumentException; + } + + /** + * @param e Throwable + * @return Return boolean if throwable is of a proper exception type + */ + public static boolean isProperExceptionToReturn(Throwable e) { + if (e == null) { + return false; + } + return e instanceof OpenSearchStatusException || e instanceof IndexNotFoundException || e instanceof InvalidIndexNameException; + } +} diff --git a/sample/src/main/resources/mappings/hello-world-jobs.json b/sample/src/main/resources/mappings/hello-world-jobs.json new file mode 100644 index 00000000..43cfec8c --- /dev/null +++ b/sample/src/main/resources/mappings/hello-world-jobs.json @@ -0,0 +1,50 @@ +{ + ".hello-world-jobs": { + "mappings": { + "properties": { + "schema_version": { + "type": "integer" + }, + "name": { + "type": "keyword" + }, + "schedule": { + "properties": { + "interval": { + "properties": { + "start_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "period": { + "type": "integer" + }, + "unit": { + "type": "keyword" + } + } + } + } + }, + "enabled": { + "type": "boolean" + }, + "enabled_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "disabled_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "last_update_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "lock_duration_seconds": { + "type": "long" + } + } + } + } +} diff --git a/sample/src/main/resources/sample/helloworld-settings.yml b/sample/src/main/resources/sample/helloworld-settings.yml new file mode 100644 index 00000000..50881050 --- /dev/null +++ b/sample/src/main/resources/sample/helloworld-settings.yml @@ -0,0 +1,5 @@ +extensionName: hw +hostAddress: 127.0.0.1 +hostPort: 4532 +opensearchAddress: 127.0.0.1 +opensearchPort: 9200 diff --git a/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java b/sample/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java similarity index 94% rename from src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java rename to sample/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java index 451de7be..4b85d044 100644 --- a/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java +++ b/sample/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java @@ -29,6 +29,7 @@ import org.opensearch.client.opensearch.OpenSearchAsyncClient; import org.opensearch.client.transport.rest_client.RestClientTransport; import org.opensearch.common.settings.Settings; +import org.opensearch.sdk.SDKNamedXContentRegistry; import org.opensearch.sdk.api.ActionExtension.ActionHandler; import org.opensearch.sdk.rest.ExtensionRestHandler; import org.opensearch.sdk.sample.helloworld.transport.SampleAction; @@ -78,11 +79,17 @@ public void setUp() throws Exception { Settings settings = Settings.builder().put(ExtensionsRunner.NODE_NAME_SETTING, "test").build(); ThreadPool threadPool = new ThreadPool(settings); TaskManager taskManager = new TaskManager(settings, threadPool, Collections.emptySet()); + SDKNamedXContentRegistry xContentRegistry = SDKNamedXContentRegistry.EMPTY; this.sdkClient = new SDKClient(extensionSettings); - this.injector = Guice.createInjector(new SDKActionModule(extension), b -> { + sdkClient.initialize(Map.of()); + SDKRestClient restClient = sdkClient.initializeRestClient("localhost", 9200); + SDKActionModule actionModule = new SDKActionModule(extension); + this.injector = Guice.createInjector(actionModule, b -> { b.bind(ThreadPool.class).toInstance(threadPool); b.bind(TaskManager.class).toInstance(taskManager); b.bind(SDKClient.class).toInstance(sdkClient); + b.bind(SDKNamedXContentRegistry.class).toInstance(xContentRegistry); + b.bind(SDKClient.SDKRestClient.class).toInstance(restClient); }); initializeSdkClient(); this.sdkRestClient = sdkClient.initializeRestClient("localhost", 9200); @@ -152,13 +159,13 @@ public void testExtensionRestHandlers() { List extensionRestHandlers = extension.getExtensionRestHandlers(); assertEquals(2, extensionRestHandlers.size()); assertEquals(4, extensionRestHandlers.get(0).routes().size()); - assertEquals(1, extensionRestHandlers.get(1).routes().size()); + assertEquals(2, extensionRestHandlers.get(1).routes().size()); } @Test public void testGetActions() { List> actions = extension.getActions(); - assertEquals(1, actions.size()); + assertEquals(3, actions.size()); } @Test diff --git a/src/test/java/org/opensearch/sdk/sample/helloworld/rest/TestRestHelloAction.java b/sample/src/test/java/org/opensearch/sdk/sample/helloworld/rest/TestRestHelloAction.java similarity index 100% rename from src/test/java/org/opensearch/sdk/sample/helloworld/rest/TestRestHelloAction.java rename to sample/src/test/java/org/opensearch/sdk/sample/helloworld/rest/TestRestHelloAction.java diff --git a/src/test/java/org/opensearch/sdk/sample/helloworld/transport/TestSampleAction.java b/sample/src/test/java/org/opensearch/sdk/sample/helloworld/transport/TestSampleAction.java similarity index 100% rename from src/test/java/org/opensearch/sdk/sample/helloworld/transport/TestSampleAction.java rename to sample/src/test/java/org/opensearch/sdk/sample/helloworld/transport/TestSampleAction.java diff --git a/settings.gradle b/settings.gradle index c15ed9a9..90b24732 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1 +1,5 @@ rootProject.name = 'opensearch-sdk-java' + + +include "sample" +project(":sample").name = rootProject.name + "-sample-extension" diff --git a/src/main/java/org/opensearch/sdk/sample/helloworld/rest/RestRemoteHelloAction.java b/src/main/java/org/opensearch/sdk/sample/helloworld/rest/RestRemoteHelloAction.java deleted file mode 100644 index d3c12e42..00000000 --- a/src/main/java/org/opensearch/sdk/sample/helloworld/rest/RestRemoteHelloAction.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.sdk.sample.helloworld.rest; - -import org.opensearch.action.ActionListener; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.extensions.ExtensionsManager; -import org.opensearch.extensions.action.RemoteExtensionActionResponse; -import org.opensearch.extensions.rest.ExtensionRestResponse; -import org.opensearch.rest.NamedRoute; -import org.opensearch.rest.RestRequest; -import org.opensearch.rest.RestResponse; -import org.opensearch.sdk.ExtensionsRunner; -import org.opensearch.sdk.SDKClient; -import org.opensearch.sdk.action.RemoteExtensionAction; -import org.opensearch.sdk.action.RemoteExtensionActionRequest; -import org.opensearch.sdk.rest.BaseExtensionRestHandler; -import org.opensearch.sdk.sample.helloworld.transport.SampleAction; -import org.opensearch.sdk.sample.helloworld.transport.SampleRequest; -import org.opensearch.sdk.sample.helloworld.transport.SampleResponse; - -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - -import static org.opensearch.rest.RestRequest.Method.GET; -import static org.opensearch.rest.RestStatus.OK; - -/** - * Sample REST Handler demonstrating proxy actions to another extension - */ -public class RestRemoteHelloAction extends BaseExtensionRestHandler { - - private ExtensionsRunner extensionsRunner; - - /** - * Instantiate this action - * - * @param runner The ExtensionsRunner instance - */ - public RestRemoteHelloAction(ExtensionsRunner runner) { - this.extensionsRunner = runner; - } - - @Override - public List routes() { - return List.of( - - new NamedRoute.Builder().method(GET) - .path("/hello/{name}") - .handler(handleRemoteGetRequest) - .uniqueName(routePrefix("remote_greet_with_name")) - .legacyActionNames(Collections.emptySet()) - .build() - ); - } - - private Function handleRemoteGetRequest = (request) -> { - SDKClient client = extensionsRunner.getSdkClient(); - - String name = request.param("name"); - // Create a request using class on remote - // This class happens to be local for simplicity but is a class on the remote extension - SampleRequest sampleRequest = new SampleRequest(name); - - // Serialize this request in a proxy action request - // This requires that the remote extension has a corresponding transport action registered - // This Action class happens to be local for simplicity but is a class on the remote extension - RemoteExtensionActionRequest proxyActionRequest = new RemoteExtensionActionRequest(SampleAction.INSTANCE, sampleRequest); - - // TODO: We need async client.execute to hide these action listener details and return the future directly - // https://github.com/opensearch-project/opensearch-sdk-java/issues/584 - CompletableFuture futureResponse = new CompletableFuture<>(); - client.execute( - RemoteExtensionAction.INSTANCE, - proxyActionRequest, - ActionListener.wrap(r -> futureResponse.complete(r), e -> futureResponse.completeExceptionally(e)) - ); - try { - RemoteExtensionActionResponse response = futureResponse.orTimeout( - ExtensionsManager.EXTENSION_REQUEST_WAIT_TIMEOUT, - TimeUnit.SECONDS - ).get(); - if (!response.isSuccess()) { - return new ExtensionRestResponse(request, OK, "Remote extension response failed: " + response.getResponseBytesAsString()); - } - // Parse out the expected response class from the bytes - SampleResponse sampleResponse = new SampleResponse(StreamInput.wrap(response.getResponseBytes())); - return new ExtensionRestResponse(request, OK, "Received greeting from remote extension: " + sampleResponse.getGreeting()); - } catch (Exception e) { - return exceptionalRequest(request, e); - } - }; - -} diff --git a/src/main/resources/mappings/hello-world-jobs.json b/src/main/resources/mappings/hello-world-jobs.json new file mode 100644 index 00000000..43cfec8c --- /dev/null +++ b/src/main/resources/mappings/hello-world-jobs.json @@ -0,0 +1,50 @@ +{ + ".hello-world-jobs": { + "mappings": { + "properties": { + "schema_version": { + "type": "integer" + }, + "name": { + "type": "keyword" + }, + "schedule": { + "properties": { + "interval": { + "properties": { + "start_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "period": { + "type": "integer" + }, + "unit": { + "type": "keyword" + } + } + } + } + }, + "enabled": { + "type": "boolean" + }, + "enabled_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "disabled_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "last_update_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "lock_duration_seconds": { + "type": "long" + } + } + } + } +} diff --git a/src/main/resources/sample/helloworld-settings.yml b/src/main/resources/sample/helloworld-settings.yml deleted file mode 100644 index 15cdbdcf..00000000 --- a/src/main/resources/sample/helloworld-settings.yml +++ /dev/null @@ -1,11 +0,0 @@ -extensionName: hello-world -hostAddress: 127.0.0.1 -hostPort: 4500 -opensearchAddress: 127.0.0.1 -opensearchPort: 9200 -#ssl.transport.enabled: true -#ssl.transport.pemcert_filepath: certs/extension-01.pem -#ssl.transport.pemkey_filepath: certs/extension-01-key.pem -#ssl.transport.pemtrustedcas_filepath: certs/root-ca.pem -#ssl.transport.enforce_hostname_verification: false -#path.home: