From ec7c4c7a2d6d858d199a6faf510b986fd7db7d67 Mon Sep 17 00:00:00 2001 From: Manaswini Ragamouni Date: Thu, 26 Mar 2026 18:32:47 +0000 Subject: [PATCH] Use SdkClient for persistence in Delete, Search Monitor, GetDestinations, SearchComments Replace direct client.get/search calls with sdkClient equivalents: - TransportDeleteMonitorAction: sdkClient.getDataObject() in getMonitor() - TransportSearchMonitorAction: sdkClient.searchDataObjectAsync() in search() - TransportGetDestinationsAction: sdkClient.searchDataObjectAsync() in search() - TransportSearchAlertingCommentAction: sdkClient.searchDataObjectAsync() in search() Walk full cause chain for IndexNotFoundException handling. Add unit tests for SearchMonitor and GetDestinations SDK paths. Signed-off-by: Manaswini Ragamouni --- .../transport/TransportDeleteMonitorAction.kt | 38 +++--- .../TransportGetDestinationsAction.kt | 64 +++++----- .../TransportSearchAlertingCommentAction.kt | 33 +++-- .../transport/TransportSearchMonitorAction.kt | 49 ++++---- .../TransportGetDestinationsActionTests.kt | 112 +++++++++++++++++ .../TransportSearchMonitorActionTests.kt | 116 ++++++++++++++++++ 6 files changed, 337 insertions(+), 75 deletions(-) create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsActionTests.kt create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorActionTests.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt index 464992cfe..dfecbad17 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt @@ -11,12 +11,10 @@ import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionRequest -import org.opensearch.action.get.GetRequest -import org.opensearch.action.get.GetResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.WriteRequest.RefreshPolicy -import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.AlertingPlugin import org.opensearch.alerting.service.DeleteMonitorService import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.cluster.service.ClusterService @@ -36,6 +34,7 @@ import org.opensearch.commons.utils.recreateObject import org.opensearch.core.action.ActionListener import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.remote.metadata.client.GetDataObjectRequest import org.opensearch.remote.metadata.client.SdkClient import org.opensearch.tasks.Task import org.opensearch.transport.TransportService @@ -118,21 +117,32 @@ class TransportDeleteMonitorAction @Inject constructor( } private suspend fun getMonitor(): Monitor { - val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId) + val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER) + val getRequest = GetDataObjectRequest.builder() + .index(ScheduledJob.SCHEDULED_JOBS_INDEX) + .id(monitorId) + .tenantId(tenantId) + .build() - val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) } - if (getResponse.isExists == false) { - actionListener.onFailure( - AlertingException.wrap( - OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND) + try { + val response = sdkClient.getDataObject(getRequest) + val getResponse = response.getResponse() + if (getResponse == null || !getResponse.isExists) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND) + ) ) + } + val xcp = XContentHelper.createParser( + xContentRegistry, LoggingDeprecationHandler.INSTANCE, + getResponse!!.sourceAsBytesRef, XContentType.JSON ) + return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor + } catch (e: Exception) { + log.error("GetMonitor operation failed for $monitorId", e) + throw OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND, e) } - val xcp = XContentHelper.createParser( - xContentRegistry, LoggingDeprecationHandler.INSTANCE, - getResponse.sourceAsBytesRef, XContentType.JSON - ) - return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsAction.kt index df3c12ce4..01f432a47 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsAction.kt @@ -6,10 +6,9 @@ package org.opensearch.alerting.transport import org.apache.logging.log4j.LogManager -import org.opensearch.action.search.SearchRequest -import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.alerting.AlertingPlugin import org.opensearch.alerting.action.GetDestinationsAction import org.opensearch.alerting.action.GetDestinationsRequest import org.opensearch.alerting.action.GetDestinationsResponse @@ -34,6 +33,8 @@ import org.opensearch.core.xcontent.XContentParserUtils import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders import org.opensearch.remote.metadata.client.SdkClient +import org.opensearch.remote.metadata.client.SearchDataObjectRequest +import org.opensearch.remote.metadata.common.SdkClientUtils import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.fetch.subphase.FetchSourceContext import org.opensearch.search.sort.SortBuilders @@ -42,7 +43,6 @@ import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client import java.io.IOException - private val log = LogManager.getLogger(TransportGetDestinationsAction::class.java) class TransportGetDestinationsAction @Inject constructor( @@ -136,34 +136,42 @@ class TransportGetDestinationsAction @Inject constructor( } fun search(searchSourceBuilder: SearchSourceBuilder, actionListener: ActionListener) { - val searchRequest = SearchRequest() - .source(searchSourceBuilder) + val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER) + val sdkSearchRequest = SearchDataObjectRequest.builder() .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) - client.search( - searchRequest, - object : ActionListener { - override fun onResponse(response: SearchResponse) { - val totalDestinationCount = response.hits.totalHits?.value?.toInt() - val destinations = mutableListOf() - for (hit in response.hits) { - val id = hit.id - val version = hit.version - val seqNo = hit.seqNo.toInt() - val primaryTerm = hit.primaryTerm.toInt() - val xcp = XContentType.JSON.xContent() - .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.sourceAsString) - XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) - XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp) - XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) - destinations.add(Destination.parse(xcp, id, version, seqNo, primaryTerm)) - } - actionListener.onResponse(GetDestinationsResponse(RestStatus.OK, totalDestinationCount, destinations)) + .tenantId(tenantId) + .searchSourceBuilder(searchSourceBuilder) + .build() + + sdkClient.searchDataObjectAsync(sdkSearchRequest).whenComplete { response, throwable -> + if (throwable != null) { + actionListener.onFailure(AlertingException.wrap(SdkClientUtils.unwrapAndConvertToException(throwable))) + return@whenComplete + } + try { + val searchResponse = response.searchResponse() + if (searchResponse == null) { + actionListener.onResponse(GetDestinationsResponse(RestStatus.OK, 0, emptyList())) + return@whenComplete } - - override fun onFailure(t: Exception) { - actionListener.onFailure(AlertingException.wrap(t)) + val totalDestinationCount = searchResponse.hits.totalHits?.value?.toInt() + val destinations = mutableListOf() + for (hit in searchResponse.hits) { + val id = hit.id + val version = hit.version + val seqNo = hit.seqNo.toInt() + val primaryTerm = hit.primaryTerm.toInt() + val xcp = XContentType.JSON.xContent() + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.sourceAsString) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + destinations.add(Destination.parse(xcp, id, version, seqNo, primaryTerm)) } + actionListener.onResponse(GetDestinationsResponse(RestStatus.OK, totalDestinationCount, destinations)) + } catch (e: Exception) { + actionListener.onFailure(AlertingException.wrap(e)) } - ) + } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchAlertingCommentAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchAlertingCommentAction.kt index fd30e9bf4..3a4581e68 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchAlertingCommentAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchAlertingCommentAction.kt @@ -15,6 +15,7 @@ import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.alerting.AlertingPlugin import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings @@ -41,12 +42,13 @@ import org.opensearch.core.xcontent.XContentParserUtils import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.QueryBuilders import org.opensearch.remote.metadata.client.SdkClient +import org.opensearch.remote.metadata.client.SearchDataObjectRequest +import org.opensearch.remote.metadata.common.SdkClientUtils import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client import java.io.IOException - private val log = LogManager.getLogger(TransportSearchAlertingCommentAction::class.java) private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) @@ -137,18 +139,25 @@ class TransportSearchAlertingCommentAction @Inject constructor( } fun search(searchRequest: SearchRequest, actionListener: ActionListener) { - client.search( - searchRequest, - object : ActionListener { - override fun onResponse(response: SearchResponse) { - actionListener.onResponse(response) - } - - override fun onFailure(t: Exception) { - actionListener.onFailure(AlertingException.wrap(t)) - } + val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER) + val sdkSearchRequest = SearchDataObjectRequest.builder() + .indices(*searchRequest.indices()) + .tenantId(tenantId) + .searchSourceBuilder(searchRequest.source()) + .build() + + sdkClient.searchDataObjectAsync(sdkSearchRequest).whenComplete { response, throwable -> + if (throwable != null) { + actionListener.onFailure(AlertingException.wrap(SdkClientUtils.unwrapAndConvertToException(throwable))) + return@whenComplete + } + val searchResponse = response.searchResponse() + if (searchResponse != null) { + actionListener.onResponse(searchResponse) + } else { + actionListener.onFailure(AlertingException.wrap(OpenSearchStatusException("Failed to search comments", RestStatus.INTERNAL_SERVER_ERROR))) } - ) + } } // retrieve the IDs of all Alerts after filtering by current User's diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt index 307f9eb7e..771133628 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt @@ -15,6 +15,7 @@ import org.opensearch.action.search.SearchResponse.Clusters import org.opensearch.action.search.ShardSearchFailure import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.alerting.AlertingPlugin import org.opensearch.alerting.opensearchapi.addFilter import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.use @@ -37,17 +38,17 @@ import org.opensearch.index.query.ExistsQueryBuilder import org.opensearch.index.query.MatchQueryBuilder import org.opensearch.index.query.QueryBuilders import org.opensearch.remote.metadata.client.SdkClient +import org.opensearch.remote.metadata.client.SearchDataObjectRequest +import org.opensearch.remote.metadata.common.SdkClientUtils import org.opensearch.search.SearchHits import org.opensearch.search.aggregations.InternalAggregations import org.opensearch.search.internal.InternalSearchResponse import org.opensearch.search.profile.SearchProfileShardResults import org.opensearch.search.suggest.Suggest import org.opensearch.tasks.Task -import org.opensearch.transport.RemoteTransportException import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client import java.util.Collections - private val log = LogManager.getLogger(TransportSearchMonitorAction::class.java) class TransportSearchMonitorAction @Inject constructor( @@ -139,34 +140,40 @@ class TransportSearchMonitorAction @Inject constructor( // Checks if the exception is caused by an IndexNotFoundException (directly or nested). private fun isIndexNotFoundException(e: Exception): Boolean { - if (e is IndexNotFoundException) return true - if (e is RemoteTransportException) { - val cause = e.cause + var cause: Throwable? = e + while (cause != null) { if (cause is IndexNotFoundException) return true + cause = cause.cause } return false } fun search(searchRequest: SearchRequest, actionListener: ActionListener) { - client.search( - searchRequest, - object : ActionListener { - override fun onResponse(response: SearchResponse) { - actionListener.onResponse(response) - } + val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER) + val sdkSearchRequest = SearchDataObjectRequest.builder() + .indices(*searchRequest.indices()) + .tenantId(tenantId) + .searchSourceBuilder(searchRequest.source()) + .build() - override fun onFailure(ex: Exception) { - if (isIndexNotFoundException(ex)) { - log.error("Index not found while searching monitor", ex) - val emptyResponse = getEmptySearchResponse() - actionListener.onResponse(emptyResponse) - } else { - log.error("Unexpected error while searching monitor", ex) - actionListener.onFailure(AlertingException.wrap(ex)) - } + sdkClient.searchDataObjectAsync(sdkSearchRequest).whenComplete { response, throwable -> + if (throwable != null) { + val cause = SdkClientUtils.unwrapAndConvertToException(throwable) + if (isIndexNotFoundException(cause)) { + actionListener.onResponse(getEmptySearchResponse()) + } else { + log.error("Unexpected error while searching monitor", cause) + actionListener.onFailure(AlertingException.wrap(cause)) } + return@whenComplete } - ) + val searchResponse = response.searchResponse() + if (searchResponse != null) { + actionListener.onResponse(searchResponse) + } else { + actionListener.onResponse(getEmptySearchResponse()) + } + } } private fun addOwnerFieldIfNotExists(searchRequest: SearchRequest) { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsActionTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsActionTests.kt new file mode 100644 index 000000000..5fbfd479f --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsActionTests.kt @@ -0,0 +1,112 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import org.junit.Before +import org.mockito.ArgumentCaptor +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito +import org.mockito.Mockito.verify +import org.opensearch.action.support.ActionFilters +import org.opensearch.alerting.AlertingPlugin.Companion.TENANT_ID_HEADER +import org.opensearch.alerting.action.GetDestinationsResponse +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.ClusterSettings +import org.opensearch.common.settings.Setting +import org.opensearch.common.settings.Settings +import org.opensearch.common.util.concurrent.ThreadContext +import org.opensearch.core.action.ActionListener +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.remote.metadata.client.SdkClient +import org.opensearch.remote.metadata.client.SearchDataObjectRequest +import org.opensearch.remote.metadata.client.SearchDataObjectResponse +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.test.OpenSearchTestCase +import org.opensearch.threadpool.ThreadPool +import org.opensearch.transport.TransportService +import org.opensearch.transport.client.Client +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +import org.mockito.Mockito.`when` as whenever +class TransportGetDestinationsActionTests : OpenSearchTestCase() { + + private lateinit var client: Client + private lateinit var sdkClient: SdkClient + private lateinit var clusterService: ClusterService + private lateinit var threadPool: ThreadPool + private lateinit var threadContext: ThreadContext + + @Before + fun setup() { + client = Mockito.mock(Client::class.java) + sdkClient = Mockito.mock(SdkClient::class.java) + clusterService = Mockito.mock(ClusterService::class.java) + threadPool = Mockito.mock(ThreadPool::class.java) + threadContext = ThreadContext(Settings.EMPTY) + + whenever(client.threadPool()).thenReturn(threadPool) + whenever(threadPool.threadContext).thenReturn(threadContext) + + val settingSet = hashSetOf>() + settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + settingSet.add(AlertingSettings.FILTER_BY_BACKEND_ROLES) + val clusterSettings = ClusterSettings(Settings.EMPTY, settingSet) + whenever(clusterService.clusterSettings).thenReturn(clusterSettings) + } + + fun `test search passes tenantId to SDK`() { + val expectedTenantId = "test-tenant:test-scope" + threadContext.putHeader(TENANT_ID_HEADER, expectedTenantId) + + val future: CompletionStage = + CompletableFuture.completedFuture(SearchDataObjectResponse(null as org.opensearch.action.search.SearchResponse?)) + whenever(sdkClient.searchDataObjectAsync(any(SearchDataObjectRequest::class.java))).thenReturn(future) + + val action = TransportGetDestinationsAction( + Mockito.mock(TransportService::class.java), + client, + clusterService, + Mockito.mock(ActionFilters::class.java), + Settings.EMPTY, + Mockito.mock(NamedXContentRegistry::class.java), + sdkClient + ) + + @Suppress("UNCHECKED_CAST") + val listener = Mockito.mock(ActionListener::class.java) as ActionListener + action.search(SearchSourceBuilder(), listener) + + val captor = ArgumentCaptor.forClass(SearchDataObjectRequest::class.java) + verify(sdkClient).searchDataObjectAsync(captor.capture()) + assertEquals(expectedTenantId, captor.value.tenantId()) + } + + fun `test search SDK exception propagated to listener`() { + threadContext.putHeader(TENANT_ID_HEADER, "test-tenant:test-scope") + + val future: CompletionStage = CompletableFuture().also { + it.completeExceptionally(RuntimeException("SDK search failed")) + } + whenever(sdkClient.searchDataObjectAsync(any(SearchDataObjectRequest::class.java))).thenReturn(future) + + val action = TransportGetDestinationsAction( + Mockito.mock(TransportService::class.java), + client, + clusterService, + Mockito.mock(ActionFilters::class.java), + Settings.EMPTY, + Mockito.mock(NamedXContentRegistry::class.java), + sdkClient + ) + + @Suppress("UNCHECKED_CAST") + val listener = Mockito.mock(ActionListener::class.java) as ActionListener + action.search(SearchSourceBuilder(), listener) + + verify(listener).onFailure(any()) + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorActionTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorActionTests.kt new file mode 100644 index 000000000..03cc893b1 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorActionTests.kt @@ -0,0 +1,116 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import org.junit.Before +import org.mockito.ArgumentCaptor +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito +import org.mockito.Mockito.verify +import org.opensearch.action.support.ActionFilters +import org.opensearch.alerting.AlertingPlugin.Companion.TENANT_ID_HEADER +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.ClusterSettings +import org.opensearch.common.settings.Setting +import org.opensearch.common.settings.Settings +import org.opensearch.common.util.concurrent.ThreadContext +import org.opensearch.core.action.ActionListener +import org.opensearch.core.common.io.stream.NamedWriteableRegistry +import org.opensearch.remote.metadata.client.SdkClient +import org.opensearch.remote.metadata.client.SearchDataObjectRequest +import org.opensearch.remote.metadata.client.SearchDataObjectResponse +import org.opensearch.test.OpenSearchTestCase +import org.opensearch.threadpool.ThreadPool +import org.opensearch.transport.TransportService +import org.opensearch.transport.client.Client +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +import org.mockito.Mockito.`when` as whenever +class TransportSearchMonitorActionTests : OpenSearchTestCase() { + + private lateinit var client: Client + private lateinit var sdkClient: SdkClient + private lateinit var clusterService: ClusterService + private lateinit var threadPool: ThreadPool + private lateinit var threadContext: ThreadContext + + @Before + fun setup() { + client = Mockito.mock(Client::class.java) + sdkClient = Mockito.mock(SdkClient::class.java) + clusterService = Mockito.mock(ClusterService::class.java) + threadPool = Mockito.mock(ThreadPool::class.java) + threadContext = ThreadContext(Settings.EMPTY) + + whenever(client.threadPool()).thenReturn(threadPool) + whenever(threadPool.threadContext).thenReturn(threadContext) + + val settingSet = hashSetOf>() + settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + settingSet.add(AlertingSettings.FILTER_BY_BACKEND_ROLES) + val clusterSettings = ClusterSettings(Settings.EMPTY, settingSet) + whenever(clusterService.clusterSettings).thenReturn(clusterSettings) + } + + fun `test search passes tenantId to SDK`() { + val expectedTenantId = "test-tenant:test-scope" + threadContext.putHeader(TENANT_ID_HEADER, expectedTenantId) + + val future: CompletionStage = + CompletableFuture.completedFuture(SearchDataObjectResponse(null as org.opensearch.action.search.SearchResponse?)) + whenever(sdkClient.searchDataObjectAsync(any(SearchDataObjectRequest::class.java))).thenReturn(future) + + val action = TransportSearchMonitorAction( + Mockito.mock(TransportService::class.java), + Settings.EMPTY, + client, + clusterService, + Mockito.mock(ActionFilters::class.java), + Mockito.mock(NamedWriteableRegistry::class.java), + sdkClient + ) + + @Suppress("UNCHECKED_CAST") + val listener = Mockito.mock(ActionListener::class.java) as ActionListener + action.search( + org.opensearch.action.search.SearchRequest(".opendistro-alerting-config"), + listener + ) + + val captor = ArgumentCaptor.forClass(SearchDataObjectRequest::class.java) + verify(sdkClient).searchDataObjectAsync(captor.capture()) + assertEquals(expectedTenantId, captor.value.tenantId()) + } + + fun `test search SDK exception propagated to listener`() { + threadContext.putHeader(TENANT_ID_HEADER, "test-tenant:test-scope") + + val future: CompletionStage = CompletableFuture().also { + it.completeExceptionally(RuntimeException("SDK search failed")) + } + whenever(sdkClient.searchDataObjectAsync(any(SearchDataObjectRequest::class.java))).thenReturn(future) + + val action = TransportSearchMonitorAction( + Mockito.mock(TransportService::class.java), + Settings.EMPTY, + client, + clusterService, + Mockito.mock(ActionFilters::class.java), + Mockito.mock(NamedWriteableRegistry::class.java), + sdkClient + ) + + @Suppress("UNCHECKED_CAST") + val listener = Mockito.mock(ActionListener::class.java) as ActionListener + action.search( + org.opensearch.action.search.SearchRequest(".opendistro-alerting-config"), + listener + ) + + verify(listener).onFailure(any()) + } +}