diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteAlertingCommentAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteAlertingCommentAction.kt index fbf3ac423..880b0c83e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteAlertingCommentAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteAlertingCommentAction.kt @@ -11,13 +11,10 @@ import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionRequest -import org.opensearch.action.delete.DeleteRequest -import org.opensearch.action.search.SearchRequest -import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.alerting.AlertingPlugin import org.opensearch.alerting.comments.CommentsIndices.Companion.ALL_COMMENTS_INDEX_PATTERN -import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject @@ -38,7 +35,9 @@ import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.XContentParser import org.opensearch.core.xcontent.XContentParserUtils import org.opensearch.index.query.QueryBuilders +import org.opensearch.remote.metadata.client.DeleteDataObjectRequest import org.opensearch.remote.metadata.client.SdkClient +import org.opensearch.remote.metadata.client.SearchDataObjectRequest import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task import org.opensearch.transport.TransportService @@ -127,12 +126,16 @@ class TransportDeleteAlertingCommentAction @Inject constructor( // or if the user is Admin val canDelete = user == null || user.name == comment.user?.name || isAdmin(user) - val deleteRequest = DeleteRequest(sourceIndex, commentId) + val deleteRequest = DeleteDataObjectRequest.builder() + .index(sourceIndex) + .id(commentId) + .tenantId(client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)) + .build() if (canDelete) { - log.debug("Deleting the comment with id ${deleteRequest.id()}") - val deleteResponse = client.suspendUntil { delete(deleteRequest, it) } - actionListener.onResponse(DeleteCommentResponse(deleteResponse.id)) + log.debug("Deleting the comment with id $commentId") + val deleteResponse = sdkClient.deleteDataObject(deleteRequest) + actionListener.onResponse(DeleteCommentResponse(deleteResponse.id())) } else { actionListener.onFailure( AlertingException("Not allowed to delete this comment!", RestStatus.FORBIDDEN, IllegalStateException()) @@ -153,12 +156,15 @@ class TransportDeleteAlertingCommentAction @Inject constructor( .version(true) .seqNoAndPrimaryTerm(true) .query(queryBuilder) - val searchRequest = SearchRequest() - .source(searchSourceBuilder) + val searchRequest = SearchDataObjectRequest.builder() .indices(ALL_COMMENTS_INDEX_PATTERN) + .tenantId(client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)) + .searchSourceBuilder(searchSourceBuilder) + .build() - val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } - val comments = searchResponse.hits.map { hit -> + val sdkResponse = sdkClient.searchDataObject(searchRequest) + val searchResponse = sdkResponse.searchResponse() + val comments = searchResponse?.hits?.map { hit -> val xcp = XContentHelper.createParser( NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, @@ -171,7 +177,7 @@ class TransportDeleteAlertingCommentAction @Inject constructor( comment } - if (comments.isEmpty()) { + if (comments.isNullOrEmpty()) { actionListener.onFailure( AlertingException.wrap( OpenSearchStatusException("Comment not found", RestStatus.NOT_FOUND), diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt index 27c55b3db..39035e53e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt @@ -11,11 +11,10 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException -import org.opensearch.action.get.GetRequest -import org.opensearch.action.get.GetResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.AlertingPlugin import org.opensearch.alerting.MonitorMetadataService import org.opensearch.alerting.MonitorRunnerService import org.opensearch.alerting.action.ExecuteMonitorAction @@ -39,13 +38,14 @@ import org.opensearch.commons.authuser.User import org.opensearch.core.action.ActionListener import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.remote.metadata.client.GetDataObjectRequest import org.opensearch.remote.metadata.client.SdkClient +import org.opensearch.remote.metadata.common.SdkClientUtils import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client import java.time.Instant import java.util.Locale - private val log = LogManager.getLogger(TransportExecuteMonitorAction::class.java) private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) @@ -103,35 +103,41 @@ class TransportExecuteMonitorAction @Inject constructor( } if (execMonitorRequest.monitorId != null) { - val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).id(execMonitorRequest.monitorId) - client.get( - getRequest, - object : ActionListener { - override fun onResponse(response: GetResponse) { - if (!response.isExists) { - actionListener.onFailure( - AlertingException.wrap( - OpenSearchStatusException("Can't find monitor with id: ${response.id}", RestStatus.NOT_FOUND) - ) + val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER) + val getRequest = GetDataObjectRequest.builder() + .index(ScheduledJob.SCHEDULED_JOBS_INDEX) + .id(execMonitorRequest.monitorId) + .tenantId(tenantId) + .build() + sdkClient.getDataObjectAsync(getRequest).whenComplete { response, throwable -> + if (throwable != null) { + actionListener.onFailure(AlertingException.wrap(SdkClientUtils.unwrapAndConvertToException(throwable))) + return@whenComplete + } + try { + val getResponse = response.getResponse() + if (getResponse == null || !getResponse.isExists) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException("Can't find monitor with id: ${execMonitorRequest.monitorId}", RestStatus.NOT_FOUND) ) - return - } - if (!response.isSourceEmpty) { - XContentHelper.createParser( - xContentRegistry, LoggingDeprecationHandler.INSTANCE, - response.sourceAsBytesRef, XContentType.JSON - ).use { xcp -> - val monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor - executeMonitor(monitor) - } - } + ) + return@whenComplete } - - override fun onFailure(t: Exception) { - actionListener.onFailure(AlertingException.wrap(t)) + if (!getResponse.isSourceEmpty) { + XContentHelper.createParser( + xContentRegistry, LoggingDeprecationHandler.INSTANCE, + getResponse.sourceAsBytesRef, XContentType.JSON + ).use { xcp -> + val monitor = ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor + executeMonitor(monitor) + } } + } catch (e: Exception) { + log.error("Failed to get monitor ${execMonitorRequest.monitorId} for execution", e) + actionListener.onFailure(AlertingException.wrap(e)) } - ) + } } else { val monitor = when (user?.name.isNullOrEmpty()) { true -> execMonitorRequest.monitor as Monitor diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt index 6b37ba9a4..a936ecedb 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt @@ -10,15 +10,11 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.action.ActionRequest -import org.opensearch.action.get.GetRequest -import org.opensearch.action.get.GetResponse -import org.opensearch.action.search.SearchRequest -import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.alerting.AlertingPlugin import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.opensearchapi.addFilter -import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.use import org.opensearch.cluster.service.ClusterService @@ -44,7 +40,10 @@ import org.opensearch.core.xcontent.XContentParserUtils import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders +import org.opensearch.remote.metadata.client.GetDataObjectRequest import org.opensearch.remote.metadata.client.SdkClient +import org.opensearch.remote.metadata.client.SearchDataObjectRequest +import org.opensearch.remote.metadata.common.SdkClientUtils import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortBuilders import org.opensearch.search.sort.SortOrder @@ -203,10 +202,16 @@ class TransportGetAlertsAction @Inject constructor( } private suspend fun getMonitor(getAlertsRequest: GetAlertsRequest): Monitor? { - val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, getAlertsRequest.monitorId!!) + val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER) + val getRequest = GetDataObjectRequest.builder() + .index(ScheduledJob.SCHEDULED_JOBS_INDEX) + .id(getAlertsRequest.monitorId!!) + .tenantId(tenantId) + .build() try { - val getResponse: GetResponse = client.suspendUntil { client.get(getRequest, it) } - if (!getResponse.isExists) { + val response = sdkClient.getDataObject(getRequest) + val getResponse = response.getResponse() + if (getResponse == null || !getResponse.isExists) { return null } val xcp = XContentHelper.createParser( @@ -246,33 +251,40 @@ class TransportGetAlertsAction @Inject constructor( } fun search(alertIndex: String, searchSourceBuilder: SearchSourceBuilder, actionListener: ActionListener) { - val searchRequest = SearchRequest() + val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER) + val sdkSearchRequest = SearchDataObjectRequest.builder() .indices(alertIndex) - .source(searchSourceBuilder) + .tenantId(tenantId) + .searchSourceBuilder(searchSourceBuilder) + .build() - client.search( - searchRequest, - object : ActionListener { - override fun onResponse(response: SearchResponse) { - val totalAlertCount = response.hits.totalHits?.value?.toInt() - val alerts = response.hits.map { hit -> - val xcp = XContentHelper.createParser( - xContentRegistry, - LoggingDeprecationHandler.INSTANCE, - hit.sourceRef, - XContentType.JSON - ) - XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) - val alert = Alert.parse(xcp, hit.id, hit.version) - alert - } - actionListener.onResponse(GetAlertsResponse(alerts, totalAlertCount)) + sdkClient.searchDataObjectAsync(sdkSearchRequest).whenComplete { response, throwable -> + if (throwable != null) { + actionListener.onFailure(SdkClientUtils.unwrapAndConvertToException(throwable)) + return@whenComplete + } + try { + val searchResponse = response.searchResponse() + if (searchResponse == null) { + actionListener.onResponse(GetAlertsResponse(emptyList(), 0)) + return@whenComplete } - - override fun onFailure(t: Exception) { - actionListener.onFailure(t) + val totalAlertCount = searchResponse.hits.totalHits?.value?.toInt() + val alerts = searchResponse.hits.map { hit -> + val xcp = XContentHelper.createParser( + xContentRegistry, + LoggingDeprecationHandler.INSTANCE, + hit.sourceRef, + XContentType.JSON + ) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + Alert.parse(xcp, hit.id, hit.version) } + actionListener.onResponse(GetAlertsResponse(alerts, totalAlertCount)) + } catch (e: Exception) { + log.error("Failed to search alerts", e) + actionListener.onFailure(AlertingException.wrap(e)) } - ) + } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAlertsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAlertsAction.kt index 8d8d9f917..c43768f98 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAlertsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAlertsAction.kt @@ -14,9 +14,9 @@ import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.alerting.AlertingPlugin import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.opensearchapi.addFilter -import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.use import org.opensearch.cluster.service.ClusterService @@ -41,6 +41,7 @@ import org.opensearch.core.xcontent.XContentParserUtils import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders import org.opensearch.remote.metadata.client.SdkClient +import org.opensearch.remote.metadata.client.SearchDataObjectRequest import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortBuilders import org.opensearch.search.sort.SortOrder @@ -207,17 +208,20 @@ class TransportGetWorkflowAlertsAction @Inject constructor( actionListener: ActionListener, ) { try { - val searchRequest = SearchRequest() + val searchRequest = SearchDataObjectRequest.builder() .indices(alertIndex) - .source(searchSourceBuilder) + .tenantId(client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)) + .searchSourceBuilder(searchSourceBuilder) + .build() val alerts = mutableListOf() val associatedAlerts = mutableListOf() - val response: SearchResponse = client.suspendUntil { search(searchRequest, it) } - val totalAlertCount = response.hits.totalHits?.value?.toInt() - alerts.addAll( - parseAlertsFromSearchResponse(response) - ) + val sdkResponse = sdkClient.searchDataObject(searchRequest) + val response = sdkResponse.searchResponse() + val totalAlertCount = response?.hits?.totalHits?.value?.toInt() + if (response != null) { + alerts.addAll(parseAlertsFromSearchResponse(response)) + } if (alerts.isNotEmpty() && getWorkflowAlertsRequest.getAssociatedAlerts == true) getAssociatedAlerts( associatedAlerts, @@ -256,8 +260,16 @@ class TransportGetWorkflowAlertsAction @Inject constructor( queryBuilder.must(QueryBuilders.termsQuery("_id", associatedAlertIds)) queryBuilder.must(QueryBuilders.termQuery(Alert.STATE_FIELD, Alert.State.AUDIT.name)) searchRequest.source().query(queryBuilder) - val response: SearchResponse = client.suspendUntil { search(searchRequest, it) } - associatedAlerts.addAll(parseAlertsFromSearchResponse(response)) + val sdkSearchRequest = SearchDataObjectRequest.builder() + .indices(*searchRequest.indices()) + .tenantId(client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)) + .searchSourceBuilder(searchRequest.source()) + .build() + val sdkResponse = sdkClient.searchDataObject(sdkSearchRequest) + val response = sdkResponse.searchResponse() + if (response != null) { + associatedAlerts.addAll(parseAlertsFromSearchResponse(response)) + } } catch (e: Exception) { log.error("Failed to get associated alerts in get workflow alerts action", e) }