Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@ import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionRequest
import org.opensearch.action.delete.DeleteRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.comments.CommentsIndices.Companion.ALL_COMMENTS_INDEX_PATTERN
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
Expand All @@ -38,7 +35,9 @@ import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.index.query.QueryBuilders
import org.opensearch.remote.metadata.client.DeleteDataObjectRequest
import org.opensearch.remote.metadata.client.SdkClient
import org.opensearch.remote.metadata.client.SearchDataObjectRequest
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
Expand Down Expand Up @@ -127,12 +126,16 @@ class TransportDeleteAlertingCommentAction @Inject constructor(
// or if the user is Admin
val canDelete = user == null || user.name == comment.user?.name || isAdmin(user)

val deleteRequest = DeleteRequest(sourceIndex, commentId)
val deleteRequest = DeleteDataObjectRequest.builder()
.index(sourceIndex)
.id(commentId)
.tenantId(client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER))
.build()

if (canDelete) {
log.debug("Deleting the comment with id ${deleteRequest.id()}")
val deleteResponse = client.suspendUntil { delete(deleteRequest, it) }
actionListener.onResponse(DeleteCommentResponse(deleteResponse.id))
log.debug("Deleting the comment with id $commentId")
val deleteResponse = sdkClient.deleteDataObject(deleteRequest)
actionListener.onResponse(DeleteCommentResponse(deleteResponse.id()))
} else {
actionListener.onFailure(
AlertingException("Not allowed to delete this comment!", RestStatus.FORBIDDEN, IllegalStateException())
Expand All @@ -153,12 +156,15 @@ class TransportDeleteAlertingCommentAction @Inject constructor(
.version(true)
.seqNoAndPrimaryTerm(true)
.query(queryBuilder)
val searchRequest = SearchRequest()
.source(searchSourceBuilder)
val searchRequest = SearchDataObjectRequest.builder()
.indices(ALL_COMMENTS_INDEX_PATTERN)
.tenantId(client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER))
.searchSourceBuilder(searchSourceBuilder)
.build()

val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }
val comments = searchResponse.hits.map { hit ->
val sdkResponse = sdkClient.searchDataObject(searchRequest)
val searchResponse = sdkResponse.searchResponse()
val comments = searchResponse?.hits?.map { hit ->
val xcp = XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
Expand All @@ -171,7 +177,7 @@ class TransportDeleteAlertingCommentAction @Inject constructor(
comment
}

if (comments.isEmpty()) {
if (comments.isNullOrEmpty()) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException("Comment not found", RestStatus.NOT_FOUND),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.MonitorRunnerService
import org.opensearch.alerting.action.ExecuteMonitorAction
Expand All @@ -39,13 +38,14 @@ import org.opensearch.commons.authuser.User
import org.opensearch.core.action.ActionListener
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.remote.metadata.client.GetDataObjectRequest
import org.opensearch.remote.metadata.client.SdkClient
import org.opensearch.remote.metadata.common.SdkClientUtils
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import org.opensearch.transport.client.Client
import java.time.Instant
import java.util.Locale

private val log = LogManager.getLogger(TransportExecuteMonitorAction::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

Expand Down Expand Up @@ -103,35 +103,41 @@ class TransportExecuteMonitorAction @Inject constructor(
}

if (execMonitorRequest.monitorId != null) {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).id(execMonitorRequest.monitorId)
client.get(
getRequest,
object : ActionListener<GetResponse> {
override fun onResponse(response: GetResponse) {
if (!response.isExists) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException("Can't find monitor with id: ${response.id}", RestStatus.NOT_FOUND)
)
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
val getRequest = GetDataObjectRequest.builder()
.index(ScheduledJob.SCHEDULED_JOBS_INDEX)
.id(execMonitorRequest.monitorId)
.tenantId(tenantId)
.build()
sdkClient.getDataObjectAsync(getRequest).whenComplete { response, throwable ->
if (throwable != null) {
actionListener.onFailure(AlertingException.wrap(SdkClientUtils.unwrapAndConvertToException(throwable)))
return@whenComplete
}
try {
val getResponse = response.getResponse()
if (getResponse == null || !getResponse.isExists) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException("Can't find monitor with id: ${execMonitorRequest.monitorId}", RestStatus.NOT_FOUND)
)
return
}
if (!response.isSourceEmpty) {
XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef, XContentType.JSON
).use { xcp ->
val monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor
executeMonitor(monitor)
}
}
)
return@whenComplete
}

override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
if (!getResponse.isSourceEmpty) {
XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
getResponse.sourceAsBytesRef, XContentType.JSON
).use { xcp ->
val monitor = ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
executeMonitor(monitor)
}
}
} catch (e: Exception) {
log.error("Failed to get monitor ${execMonitorRequest.monitorId} for execution", e)
actionListener.onFailure(AlertingException.wrap(e))
}
)
}
} else {
val monitor = when (user?.name.isNullOrEmpty()) {
true -> execMonitorRequest.monitor as Monitor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,11 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionRequest
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.opensearchapi.addFilter
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.use
import org.opensearch.cluster.service.ClusterService
Expand All @@ -44,7 +40,10 @@ import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
import org.opensearch.remote.metadata.client.GetDataObjectRequest
import org.opensearch.remote.metadata.client.SdkClient
import org.opensearch.remote.metadata.client.SearchDataObjectRequest
import org.opensearch.remote.metadata.common.SdkClientUtils
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortBuilders
import org.opensearch.search.sort.SortOrder
Expand Down Expand Up @@ -203,10 +202,16 @@ class TransportGetAlertsAction @Inject constructor(
}

private suspend fun getMonitor(getAlertsRequest: GetAlertsRequest): Monitor? {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, getAlertsRequest.monitorId!!)
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
val getRequest = GetDataObjectRequest.builder()
.index(ScheduledJob.SCHEDULED_JOBS_INDEX)
.id(getAlertsRequest.monitorId!!)
.tenantId(tenantId)
.build()
try {
val getResponse: GetResponse = client.suspendUntil { client.get(getRequest, it) }
if (!getResponse.isExists) {
val response = sdkClient.getDataObject(getRequest)
val getResponse = response.getResponse()
if (getResponse == null || !getResponse.isExists) {
return null
}
val xcp = XContentHelper.createParser(
Expand Down Expand Up @@ -246,33 +251,40 @@ class TransportGetAlertsAction @Inject constructor(
}

fun search(alertIndex: String, searchSourceBuilder: SearchSourceBuilder, actionListener: ActionListener<GetAlertsResponse>) {
val searchRequest = SearchRequest()
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
val sdkSearchRequest = SearchDataObjectRequest.builder()
.indices(alertIndex)
.source(searchSourceBuilder)
.tenantId(tenantId)
.searchSourceBuilder(searchSourceBuilder)
.build()

client.search(
searchRequest,
object : ActionListener<SearchResponse> {
override fun onResponse(response: SearchResponse) {
val totalAlertCount = response.hits.totalHits?.value?.toInt()
val alerts = response.hits.map { hit ->
val xcp = XContentHelper.createParser(
xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
hit.sourceRef,
XContentType.JSON
)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
val alert = Alert.parse(xcp, hit.id, hit.version)
alert
}
actionListener.onResponse(GetAlertsResponse(alerts, totalAlertCount))
sdkClient.searchDataObjectAsync(sdkSearchRequest).whenComplete { response, throwable ->
if (throwable != null) {
actionListener.onFailure(SdkClientUtils.unwrapAndConvertToException(throwable))
return@whenComplete
}
try {
val searchResponse = response.searchResponse()
if (searchResponse == null) {
actionListener.onResponse(GetAlertsResponse(emptyList(), 0))
return@whenComplete
}

override fun onFailure(t: Exception) {
actionListener.onFailure(t)
val totalAlertCount = searchResponse.hits.totalHits?.value?.toInt()
val alerts = searchResponse.hits.map { hit ->
val xcp = XContentHelper.createParser(
xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
hit.sourceRef,
XContentType.JSON
)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
Alert.parse(xcp, hit.id, hit.version)
}
actionListener.onResponse(GetAlertsResponse(alerts, totalAlertCount))
} catch (e: Exception) {
log.error("Failed to search alerts", e)
actionListener.onFailure(AlertingException.wrap(e))
}
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.opensearchapi.addFilter
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.use
import org.opensearch.cluster.service.ClusterService
Expand All @@ -41,6 +41,7 @@ import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
import org.opensearch.remote.metadata.client.SdkClient
import org.opensearch.remote.metadata.client.SearchDataObjectRequest
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortBuilders
import org.opensearch.search.sort.SortOrder
Expand Down Expand Up @@ -207,17 +208,20 @@ class TransportGetWorkflowAlertsAction @Inject constructor(
actionListener: ActionListener<GetWorkflowAlertsResponse>,
) {
try {
val searchRequest = SearchRequest()
val searchRequest = SearchDataObjectRequest.builder()
.indices(alertIndex)
.source(searchSourceBuilder)
.tenantId(client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER))
.searchSourceBuilder(searchSourceBuilder)
.build()
val alerts = mutableListOf<Alert>()
val associatedAlerts = mutableListOf<Alert>()

val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }
val totalAlertCount = response.hits.totalHits?.value?.toInt()
alerts.addAll(
parseAlertsFromSearchResponse(response)
)
val sdkResponse = sdkClient.searchDataObject(searchRequest)
val response = sdkResponse.searchResponse()
val totalAlertCount = response?.hits?.totalHits?.value?.toInt()
if (response != null) {
alerts.addAll(parseAlertsFromSearchResponse(response))
}
if (alerts.isNotEmpty() && getWorkflowAlertsRequest.getAssociatedAlerts == true)
getAssociatedAlerts(
associatedAlerts,
Expand Down Expand Up @@ -256,8 +260,16 @@ class TransportGetWorkflowAlertsAction @Inject constructor(
queryBuilder.must(QueryBuilders.termsQuery("_id", associatedAlertIds))
queryBuilder.must(QueryBuilders.termQuery(Alert.STATE_FIELD, Alert.State.AUDIT.name))
searchRequest.source().query(queryBuilder)
val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }
associatedAlerts.addAll(parseAlertsFromSearchResponse(response))
val sdkSearchRequest = SearchDataObjectRequest.builder()
.indices(*searchRequest.indices())
.tenantId(client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER))
.searchSourceBuilder(searchRequest.source())
.build()
val sdkResponse = sdkClient.searchDataObject(sdkSearchRequest)
val response = sdkResponse.searchResponse()
if (response != null) {
associatedAlerts.addAll(parseAlertsFromSearchResponse(response))
}
} catch (e: Exception) {
log.error("Failed to get associated alerts in get workflow alerts action", e)
}
Expand Down
Loading