From c144b300627060d94c8e59aa8b3b9b1d6f368c0f Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Wed, 13 Aug 2025 11:30:52 -0400 Subject: [PATCH] Introduce PluginSubject and replace usages of stashContext in TransportGetAlertsAction Signed-off-by: Craig Perkins --- .../org/opensearch/alerting/AlertingPlugin.kt | 21 ++++- .../transport/TransportGetAlertsAction.kt | 94 ++++++++++--------- .../opensearch/alerting/util/PluginClient.kt | 64 +++++++++++++ build.gradle | 8 ++ 4 files changed, 139 insertions(+), 48 deletions(-) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/util/PluginClient.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 4ad7b6361..919ab9604 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -84,6 +84,7 @@ import org.opensearch.alerting.transport.TransportSearchEmailAccountAction import org.opensearch.alerting.transport.TransportSearchEmailGroupAction import org.opensearch.alerting.transport.TransportSearchMonitorAction import org.opensearch.alerting.util.DocLevelMonitorQueries +import org.opensearch.alerting.util.PluginClient import org.opensearch.alerting.util.destinationmigration.DestinationMigrationCoordinator import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.node.DiscoveryNodes @@ -116,6 +117,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.XContentParser import org.opensearch.env.Environment import org.opensearch.env.NodeEnvironment +import org.opensearch.identity.PluginSubject import org.opensearch.index.IndexModule import org.opensearch.indices.SystemIndexDescriptor import org.opensearch.monitor.jvm.JvmStats @@ -125,6 +127,7 @@ import org.opensearch.painless.spi.PainlessExtension import org.opensearch.percolator.PercolatorPluginExt import org.opensearch.plugins.ActionPlugin import org.opensearch.plugins.ExtensiblePlugin +import org.opensearch.plugins.IdentityAwarePlugin import org.opensearch.plugins.ReloadablePlugin import org.opensearch.plugins.ScriptPlugin import org.opensearch.plugins.SearchPlugin @@ -146,7 +149,7 @@ import java.util.function.Supplier * [BucketLevelTrigger.XCONTENT_REGISTRY], [ClusterMetricsInput.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects. */ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, - SearchPlugin, SystemIndexPlugin, PercolatorPluginExt() { + SearchPlugin, SystemIndexPlugin, IdentityAwarePlugin, PercolatorPluginExt() { override fun getContextAllowlists(): Map, List> { val whitelist = AllowlistLoader.loadFromResourceFiles(javaClass, "org.opensearch.alerting.txt") @@ -182,6 +185,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R lateinit var alertIndices: AlertIndices lateinit var clusterService: ClusterService lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator + lateinit var pluginClient: PluginClient var monitorTypeToMonitorRunners: MutableMap = mutableMapOf() override fun getRestHandlers( @@ -283,6 +287,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ): Collection { // Need to figure out how to use the OpenSearch DI classes rather than handwiring things here. val settings = environment.settings() + pluginClient = PluginClient(client) val lockService = LockService(client, clusterService) alertIndices = AlertIndices(settings, client, threadPool, clusterService) val alertService = AlertService(client, xContentRegistry, alertIndices) @@ -320,7 +325,10 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R commentsIndices = CommentsIndices(environment.settings(), client, threadPool, clusterService) docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService) scheduler = JobScheduler(threadPool, runner) - sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES) + sweeper = JobSweeper( + environment.settings(), client, clusterService, threadPool, xContentRegistry, + scheduler, ALERTING_JOB_TYPES + ) destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool, scheduledJobIndices) this.threadPool = threadPool this.clusterService = clusterService @@ -351,10 +359,17 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R destinationMigrationCoordinator, lockService, alertService, - triggerService + triggerService, + pluginClient ) } + override fun assignSubject(pluginSubject: PluginSubject) { + // When security is not installed, the pluginSubject will still be assigned. + requireNotNull(pluginSubject) + pluginClient.setSubject(pluginSubject) + } + override fun getSettings(): List> { return listOf( ScheduledJobSettings.REQUEST_TIMEOUT, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt index 1fc3ef83a..ce2caf9cc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt @@ -5,22 +5,16 @@ package org.opensearch.alerting.transport -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.action.ActionRequest import org.opensearch.action.get.GetRequest -import org.opensearch.action.get.GetResponse import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.opensearchapi.addFilter -import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings -import org.opensearch.alerting.util.use import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings @@ -51,9 +45,10 @@ import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client import java.io.IOException +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletableFuture.completedFuture private val log = LogManager.getLogger(TransportGetAlertsAction::class.java) -private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) class TransportGetAlertsAction @Inject constructor( transportService: TransportService, @@ -147,19 +142,16 @@ class TransportGetAlertsAction @Inject constructor( .size(tableProp.size) .from(tableProp.startIndex) - client.threadPool().threadContext.stashContext().use { - scope.launch { - try { - val alertIndex = resolveAlertsIndexName(getAlertsRequest) - getAlerts(alertIndex, searchSourceBuilder, actionListener, user) - } catch (t: Exception) { - log.error("Failed to get alerts", t) - if (t is AlertingException) { - actionListener.onFailure(t) - } else { - actionListener.onFailure(AlertingException.wrap(t)) - } - } + try { + resolveAlertsIndexNameAsync(getAlertsRequest).thenAccept { alertIndex -> + getAlerts(alertIndex, searchSourceBuilder, actionListener, user) + } + } catch (t: Exception) { + log.error("Failed to get alerts", t) + if (t is AlertingException) { + actionListener.onFailure(t) + } else { + actionListener.onFailure(AlertingException.wrap(t)) } } } @@ -179,43 +171,55 @@ class TransportGetAlertsAction @Inject constructor( } } + private fun normalizeAlertIndex(index: String): String = + if (index == AlertIndices.ALERT_INDEX) AlertIndices.ALL_ALERT_INDEX_PATTERN else index + /** Precedence order for resolving alert index to be queried: 1. alertIndex param. 2. alert index mentioned in monitor data sources. 3. Default alert indices pattern */ - suspend fun resolveAlertsIndexName(getAlertsRequest: GetAlertsRequest): String { - var alertIndex = AlertIndices.ALL_ALERT_INDEX_PATTERN - if (getAlertsRequest.alertIndex.isNullOrEmpty() == false) { - alertIndex = getAlertsRequest.alertIndex!! - } else if (getAlertsRequest.monitorId.isNullOrEmpty() == false) { - val retrievedMonitor = getMonitor(getAlertsRequest) - if (retrievedMonitor != null) { - alertIndex = retrievedMonitor.dataSources.alertsIndex + fun resolveAlertsIndexNameAsync(getAlertsRequest: GetAlertsRequest): CompletableFuture { + // 1) Explicit param wins + if (!getAlertsRequest.alertIndex.isNullOrBlank()) { + return completedFuture(normalizeAlertIndex(getAlertsRequest.alertIndex!!)) + } + + // 2) Lookup via monitor if monitorId present + if (!getAlertsRequest.monitorId.isNullOrBlank()) { + return getMonitorAsync(getAlertsRequest).thenApply { monitor -> + monitor?.dataSources?.alertsIndex?.let(::normalizeAlertIndex) + ?: AlertIndices.ALL_ALERT_INDEX_PATTERN } } - return if (alertIndex == AlertIndices.ALERT_INDEX) - AlertIndices.ALL_ALERT_INDEX_PATTERN - else - alertIndex + + // 3) Default + return completedFuture(AlertIndices.ALL_ALERT_INDEX_PATTERN) } - private suspend fun getMonitor(getAlertsRequest: GetAlertsRequest): Monitor? { + private fun getMonitorAsync(getAlertsRequest: GetAlertsRequest): CompletableFuture { + val future = CompletableFuture() val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, getAlertsRequest.monitorId!!) - try { - val getResponse: GetResponse = client.suspendUntil { client.get(getRequest, it) } - if (!getResponse.isExists) { - return null + client.getAsync(getRequest).thenAccept { getResponse -> + try { + if (!getResponse.isExists) { + future.complete(null) + return@thenAccept + } + val xcp = XContentHelper.createParser( + xContentRegistry, LoggingDeprecationHandler.INSTANCE, + getResponse.sourceAsBytesRef, XContentType.JSON + ) + val monitor = ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor + future.complete(monitor) + } catch (e: Exception) { + future.completeExceptionally(e) } - val xcp = XContentHelper.createParser( - xContentRegistry, LoggingDeprecationHandler.INSTANCE, - getResponse.sourceAsBytesRef, XContentType.JSON - ) - return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor - } catch (t: Exception) { - log.error("Failure in fetching monitor ${getAlertsRequest.monitorId} to resolve alert index in get alerts action", t) - return null + }.exceptionally { t -> + future.completeExceptionally(t) + null } + return future } fun getAlerts( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/PluginClient.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/PluginClient.kt new file mode 100644 index 000000000..9e843e191 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/PluginClient.kt @@ -0,0 +1,64 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.alerting.util + +import org.apache.logging.log4j.LogManager +import org.apache.logging.log4j.Logger +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionType +import org.opensearch.common.CheckedRunnable +import org.opensearch.core.action.ActionListener +import org.opensearch.core.action.ActionResponse +import org.opensearch.identity.Subject +import org.opensearch.transport.client.Client +import org.opensearch.transport.client.FilterClient + +/** + * A special client for executing transport actions as this plugin's system subject. + */ +class PluginClient : FilterClient { + private var subject: Subject? = null + + constructor(delegate: Client) : super(delegate) + + constructor(delegate: Client, subject: Subject) : super(delegate) { + this.subject = subject + } + + fun setSubject(subject: Subject) { + this.subject = subject + } + + override fun doExecute( + action: ActionType?, + request: Request?, + listener: ActionListener? + ) { + checkNotNull(subject) { "PluginClient is not initialized." } + threadPool().getThreadContext().newStoredContext(false).use { ctx -> + subject!!.runAs( + CheckedRunnable { + Companion.logger.info( + "Running transport action with subject: {}", + subject!!.getPrincipal().getName() + ) + super.doExecute( + action, + request, + ActionListener.runBefore(listener, CheckedRunnable { ctx.restore() }) + ) + } + ) + } + } + + companion object { + private val logger: Logger = LogManager.getLogger(PluginClient::class.java) + } +} diff --git a/build.gradle b/build.gradle index 57b5f186d..14a23c198 100644 --- a/build.gradle +++ b/build.gradle @@ -71,6 +71,10 @@ task ktlint(type: JavaExec, group: "verification") { description = "Check Kotlin code style." main = "com.pinterest.ktlint.Main" classpath = configurations.ktlint + jvmArgs = [ + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.util=ALL-UNNAMED" + ] args "alerting/**/*.kt", "elastic-api/**/*.kt", "core/**/*.kt" } @@ -78,6 +82,10 @@ task ktlintFormat(type: JavaExec, group: "formatting") { description = "Fix Kotlin code style deviations." main = "com.pinterest.ktlint.Main" classpath = configurations.ktlint + jvmArgs = [ + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.util=ALL-UNNAMED" + ] args "-F", "alerting/**/*.kt", "elastic-api/**/*.kt", "core/**/*.kt" }