Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.alerting.util.PluginClient
import org.opensearch.alerting.util.destinationmigration.DestinationMigrationCoordinator
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.node.DiscoveryNodes
Expand Down Expand Up @@ -116,6 +117,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.identity.PluginSubject
import org.opensearch.index.IndexModule
import org.opensearch.indices.SystemIndexDescriptor
import org.opensearch.monitor.jvm.JvmStats
Expand All @@ -125,6 +127,7 @@ import org.opensearch.painless.spi.PainlessExtension
import org.opensearch.percolator.PercolatorPluginExt
import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.ExtensiblePlugin
import org.opensearch.plugins.IdentityAwarePlugin
import org.opensearch.plugins.ReloadablePlugin
import org.opensearch.plugins.ScriptPlugin
import org.opensearch.plugins.SearchPlugin
Expand All @@ -146,7 +149,7 @@ import java.util.function.Supplier
* [BucketLevelTrigger.XCONTENT_REGISTRY], [ClusterMetricsInput.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects.
*/
internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin,
SearchPlugin, SystemIndexPlugin, PercolatorPluginExt() {
SearchPlugin, SystemIndexPlugin, IdentityAwarePlugin, PercolatorPluginExt() {

override fun getContextAllowlists(): Map<ScriptContext<*>, List<Allowlist>> {
val whitelist = AllowlistLoader.loadFromResourceFiles(javaClass, "org.opensearch.alerting.txt")
Expand Down Expand Up @@ -182,6 +185,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
lateinit var alertIndices: AlertIndices
lateinit var clusterService: ClusterService
lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator
lateinit var pluginClient: PluginClient
var monitorTypeToMonitorRunners: MutableMap<String, RemoteMonitorRegistry> = mutableMapOf()

override fun getRestHandlers(
Expand Down Expand Up @@ -283,6 +287,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
): Collection<Any> {
// Need to figure out how to use the OpenSearch DI classes rather than handwiring things here.
val settings = environment.settings()
pluginClient = PluginClient(client)
val lockService = LockService(client, clusterService)
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
val alertService = AlertService(client, xContentRegistry, alertIndices)
Expand Down Expand Up @@ -320,7 +325,10 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
commentsIndices = CommentsIndices(environment.settings(), client, threadPool, clusterService)
docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService)
scheduler = JobScheduler(threadPool, runner)
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
sweeper = JobSweeper(
environment.settings(), client, clusterService, threadPool, xContentRegistry,
scheduler, ALERTING_JOB_TYPES
)
destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool, scheduledJobIndices)
this.threadPool = threadPool
this.clusterService = clusterService
Expand Down Expand Up @@ -351,10 +359,17 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
destinationMigrationCoordinator,
lockService,
alertService,
triggerService
triggerService,
pluginClient
)
}

override fun assignSubject(pluginSubject: PluginSubject) {
// When security is not installed, the pluginSubject will still be assigned.
requireNotNull(pluginSubject)
pluginClient.setSubject(pluginSubject)
}

override fun getSettings(): List<Setting<*>> {
return listOf(
ScheduledJobSettings.REQUEST_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,16 @@

package org.opensearch.alerting.transport

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionRequest
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.opensearchapi.addFilter
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.use
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -51,9 +45,10 @@ import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import org.opensearch.transport.client.Client
import java.io.IOException
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletableFuture.completedFuture

private val log = LogManager.getLogger(TransportGetAlertsAction::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

class TransportGetAlertsAction @Inject constructor(
transportService: TransportService,
Expand Down Expand Up @@ -147,19 +142,16 @@ class TransportGetAlertsAction @Inject constructor(
.size(tableProp.size)
.from(tableProp.startIndex)

client.threadPool().threadContext.stashContext().use {
scope.launch {
try {
val alertIndex = resolveAlertsIndexName(getAlertsRequest)
getAlerts(alertIndex, searchSourceBuilder, actionListener, user)
} catch (t: Exception) {
log.error("Failed to get alerts", t)
if (t is AlertingException) {
actionListener.onFailure(t)
} else {
actionListener.onFailure(AlertingException.wrap(t))
}
}
try {
resolveAlertsIndexNameAsync(getAlertsRequest).thenAccept { alertIndex ->
getAlerts(alertIndex, searchSourceBuilder, actionListener, user)
}
} catch (t: Exception) {
log.error("Failed to get alerts", t)
if (t is AlertingException) {
actionListener.onFailure(t)
} else {
actionListener.onFailure(AlertingException.wrap(t))
}
}
}
Expand All @@ -179,43 +171,55 @@ class TransportGetAlertsAction @Inject constructor(
}
}

private fun normalizeAlertIndex(index: String): String =
if (index == AlertIndices.ALERT_INDEX) AlertIndices.ALL_ALERT_INDEX_PATTERN else index

/** Precedence order for resolving alert index to be queried:
1. alertIndex param.
2. alert index mentioned in monitor data sources.
3. Default alert indices pattern
*/
suspend fun resolveAlertsIndexName(getAlertsRequest: GetAlertsRequest): String {
var alertIndex = AlertIndices.ALL_ALERT_INDEX_PATTERN
if (getAlertsRequest.alertIndex.isNullOrEmpty() == false) {
alertIndex = getAlertsRequest.alertIndex!!
} else if (getAlertsRequest.monitorId.isNullOrEmpty() == false) {
val retrievedMonitor = getMonitor(getAlertsRequest)
if (retrievedMonitor != null) {
alertIndex = retrievedMonitor.dataSources.alertsIndex
fun resolveAlertsIndexNameAsync(getAlertsRequest: GetAlertsRequest): CompletableFuture<String> {
// 1) Explicit param wins
if (!getAlertsRequest.alertIndex.isNullOrBlank()) {
return completedFuture(normalizeAlertIndex(getAlertsRequest.alertIndex!!))
}

// 2) Lookup via monitor if monitorId present
if (!getAlertsRequest.monitorId.isNullOrBlank()) {
return getMonitorAsync(getAlertsRequest).thenApply { monitor ->
monitor?.dataSources?.alertsIndex?.let(::normalizeAlertIndex)
?: AlertIndices.ALL_ALERT_INDEX_PATTERN
}
}
return if (alertIndex == AlertIndices.ALERT_INDEX)
AlertIndices.ALL_ALERT_INDEX_PATTERN
else
alertIndex

// 3) Default
return completedFuture(AlertIndices.ALL_ALERT_INDEX_PATTERN)
}

private suspend fun getMonitor(getAlertsRequest: GetAlertsRequest): Monitor? {
private fun getMonitorAsync(getAlertsRequest: GetAlertsRequest): CompletableFuture<Monitor?> {
val future = CompletableFuture<Monitor?>()
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, getAlertsRequest.monitorId!!)
try {
val getResponse: GetResponse = client.suspendUntil { client.get(getRequest, it) }
if (!getResponse.isExists) {
return null
client.getAsync(getRequest).thenAccept { getResponse ->
try {
if (!getResponse.isExists) {
future.complete(null)
return@thenAccept
}
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
getResponse.sourceAsBytesRef, XContentType.JSON
)
val monitor = ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
future.complete(monitor)
} catch (e: Exception) {
future.completeExceptionally(e)
}
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
getResponse.sourceAsBytesRef, XContentType.JSON
)
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
} catch (t: Exception) {
log.error("Failure in fetching monitor ${getAlertsRequest.monitorId} to resolve alert index in get alerts action", t)
return null
}.exceptionally { t ->
future.completeExceptionally(t)
null
}
return future
}

fun getAlerts(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.alerting.util

import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionType
import org.opensearch.common.CheckedRunnable
import org.opensearch.core.action.ActionListener
import org.opensearch.core.action.ActionResponse
import org.opensearch.identity.Subject
import org.opensearch.transport.client.Client
import org.opensearch.transport.client.FilterClient

/**
* A special client for executing transport actions as this plugin's system subject.
*/
class PluginClient : FilterClient {
private var subject: Subject? = null

constructor(delegate: Client) : super(delegate)

constructor(delegate: Client, subject: Subject) : super(delegate) {
this.subject = subject
}

fun setSubject(subject: Subject) {
this.subject = subject
}

override fun <Request : ActionRequest?, Response : ActionResponse?> doExecute(
action: ActionType<Response?>?,
request: Request?,
listener: ActionListener<Response?>?
) {
checkNotNull(subject) { "PluginClient is not initialized." }
threadPool().getThreadContext().newStoredContext(false).use { ctx ->
subject!!.runAs<RuntimeException?>(
CheckedRunnable {
Companion.logger.info(
"Running transport action with subject: {}",
subject!!.getPrincipal().getName()
)
super.doExecute<Request?, Response?>(
action,
request,
ActionListener.runBefore<Response?>(listener, CheckedRunnable<RuntimeException> { ctx.restore() })
)
}
)
}
}

companion object {
private val logger: Logger = LogManager.getLogger(PluginClient::class.java)
}
}
8 changes: 8 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,21 @@ task ktlint(type: JavaExec, group: "verification") {
description = "Check Kotlin code style."
main = "com.pinterest.ktlint.Main"
classpath = configurations.ktlint
jvmArgs = [
"--add-opens=java.base/java.lang=ALL-UNNAMED",
"--add-opens=java.base/java.util=ALL-UNNAMED"
]
args "alerting/**/*.kt", "elastic-api/**/*.kt", "core/**/*.kt"
}

task ktlintFormat(type: JavaExec, group: "formatting") {
description = "Fix Kotlin code style deviations."
main = "com.pinterest.ktlint.Main"
classpath = configurations.ktlint
jvmArgs = [
"--add-opens=java.base/java.lang=ALL-UNNAMED",
"--add-opens=java.base/java.util=ALL-UNNAMED"
]
args "-F", "alerting/**/*.kt", "elastic-api/**/*.kt", "core/**/*.kt"
}

Expand Down
Loading