From 1cdef0707c567ca875d57508db36778b1286147e Mon Sep 17 00:00:00 2001 From: xinyual Date: Wed, 10 Dec 2025 16:33:04 +0800 Subject: [PATCH 1/2] switch to memory container Signed-off-by: xinyual --- .../org/opensearch/ml/common/CommonValue.java | 2 +- .../AbstractIndexInsightTask.java | 556 ------------------ .../indexInsight/FieldDescriptionTask.java | 338 ----------- .../ml/common/indexInsight/IndexInsight.java | 163 ----- .../IndexInsightAccessControllerHelper.java | 47 -- .../indexInsight/IndexInsightConfig.java | 84 --- .../common/indexInsight/IndexInsightTask.java | 37 -- .../indexInsight/IndexInsightTaskStatus.java | 25 - .../LogRelatedIndexCheckTask.java | 164 ------ .../indexInsight/MLIndexInsightType.java | 26 - .../indexInsight/StatisticalDataTask.java | 407 ------------- .../MLIndexInsightGetRequest.java | 2 +- .../StatisticalDataTaskTests.java | 2 - .../GetIndexInsightTransportAction.java | 66 ++- .../ml/helper/RemoteMemoryStoreHelper.java | 49 ++ .../ml/rest/RestMLExecuteAction.java | 2 + 16 files changed, 101 insertions(+), 1869 deletions(-) delete mode 100644 common/src/main/java/org/opensearch/ml/common/indexInsight/AbstractIndexInsightTask.java delete mode 100644 common/src/main/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTask.java delete mode 100644 common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsight.java delete mode 100644 common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightAccessControllerHelper.java delete mode 100644 common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightConfig.java delete mode 100644 common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightTask.java delete mode 100644 common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightTaskStatus.java delete mode 100644 common/src/main/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTask.java delete mode 100644 common/src/main/java/org/opensearch/ml/common/indexInsight/MLIndexInsightType.java delete mode 100644 common/src/main/java/org/opensearch/ml/common/indexInsight/StatisticalDataTask.java diff --git a/common/src/main/java/org/opensearch/ml/common/CommonValue.java b/common/src/main/java/org/opensearch/ml/common/CommonValue.java index adddf6b39d..a5c89814fe 100644 --- a/common/src/main/java/org/opensearch/ml/common/CommonValue.java +++ b/common/src/main/java/org/opensearch/ml/common/CommonValue.java @@ -36,7 +36,7 @@ public class CommonValue { // warm node public static String WARM_BOX_TYPE = "warm"; public static final String ML_INDEX_INSIGHT_CONFIG_INDEX = ".plugins-ml-index-insight-config"; - public static final String ML_INDEX_INSIGHT_STORAGE_INDEX = ".plugins-ml-index-insight-storage"; + public static final String ML_INDEX_INSIGHT_STORAGE_INDEX = "plugins-ml-index-insight-storage"; public static final String ML_MODEL_GROUP_INDEX = ".plugins-ml-model-group"; public static final String ML_MODEL_INDEX = ".plugins-ml-model"; diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/AbstractIndexInsightTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/AbstractIndexInsightTask.java deleted file mode 100644 index 4ee18707dc..0000000000 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/AbstractIndexInsightTask.java +++ /dev/null @@ -1,556 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.ml.common.indexInsight; - -import static org.opensearch.ml.common.CommonValue.INDEX_INSIGHT_AGENT_NAME; -import static org.opensearch.ml.common.CommonValue.INDEX_INSIGHT_GENERATING_TIMEOUT; -import static org.opensearch.ml.common.CommonValue.INDEX_INSIGHT_UPDATE_INTERVAL; -import static org.opensearch.ml.common.CommonValue.ML_INDEX_INSIGHT_STORAGE_INDEX; -import static org.opensearch.ml.common.indexInsight.IndexInsight.INDEX_NAME_FIELD; -import static org.opensearch.ml.common.indexInsight.IndexInsight.TASK_TYPE_FIELD; -import static org.opensearch.ml.common.utils.StringUtils.gson; - -import java.nio.charset.StandardCharsets; -import java.time.Instant; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; - -import org.opensearch.OpenSearchStatusException; -import org.opensearch.action.DocWriteResponse; -import org.opensearch.action.get.GetResponse; -import org.opensearch.action.index.IndexResponse; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.common.Numbers; -import org.opensearch.common.regex.Regex; -import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.core.action.ActionListener; -import org.opensearch.core.rest.RestStatus; -import org.opensearch.index.query.BoolQueryBuilder; -import org.opensearch.index.query.QueryBuilders; -import org.opensearch.index.query.RegexpQueryBuilder; -import org.opensearch.index.query.TermQueryBuilder; -import org.opensearch.ml.common.FunctionName; -import org.opensearch.ml.common.MLConfig; -import org.opensearch.ml.common.dataset.remote.RemoteInferenceInputDataSet; -import org.opensearch.ml.common.input.execute.agent.AgentMLInput; -import org.opensearch.ml.common.output.model.ModelTensor; -import org.opensearch.ml.common.output.model.ModelTensorOutput; -import org.opensearch.ml.common.output.model.ModelTensors; -import org.opensearch.ml.common.transport.config.MLConfigGetAction; -import org.opensearch.ml.common.transport.config.MLConfigGetRequest; -import org.opensearch.ml.common.transport.execute.MLExecuteTaskAction; -import org.opensearch.ml.common.transport.execute.MLExecuteTaskRequest; -import org.opensearch.remote.metadata.client.GetDataObjectRequest; -import org.opensearch.remote.metadata.client.PutDataObjectRequest; -import org.opensearch.remote.metadata.client.SdkClient; -import org.opensearch.remote.metadata.client.SearchDataObjectRequest; -import org.opensearch.remote.metadata.common.SdkClientUtils; -import org.opensearch.search.SearchHit; -import org.opensearch.search.builder.SearchSourceBuilder; -import org.opensearch.transport.client.Client; - -import com.google.common.hash.Hashing; -import com.jayway.jsonpath.JsonPath; - -import lombok.extern.log4j.Log4j2; - -/** - * Abstract base class providing default implementation for IndexInsightTask - */ -@Log4j2 -public abstract class AbstractIndexInsightTask implements IndexInsightTask { - - protected final MLIndexInsightType taskType; - protected final String sourceIndex; - protected final Client client; - protected final SdkClient sdkClient; - protected final String cmkRoleArn; - protected final String cmkAssumeRoleArn; - - protected AbstractIndexInsightTask( - MLIndexInsightType taskType, - String sourceIndex, - Client client, - SdkClient sdkClient, - String cmkRoleArn, - String cmkAssumeRoleArn - ) { - this.taskType = taskType; - this.sourceIndex = sourceIndex; - this.client = client; - this.sdkClient = sdkClient; - this.cmkRoleArn = cmkRoleArn; - this.cmkAssumeRoleArn = cmkAssumeRoleArn; - } - - /** - * Execute the index insight task: - * 1. Check if record exists in storage - * 2. Check status and last updated time - * 3. Check prerequisites - * 4. Run task logic - * 5. Write back to storage - */ - @Override - public void execute(String tenantId, ActionListener listener) { - getIndexInsight(generateDocId(), tenantId, ActionListener.wrap(getResponse -> { - if (getResponse.isExists()) { - handleExistingDoc(getResponse.getSourceAsMap(), tenantId, listener); - } else { - SearchSourceBuilder patternSourceBuilder = buildPatternSourceBuilder(taskType.name()); - try (ThreadContext.StoredContext searchContext = client.threadPool().getThreadContext().stashContext()) { - sdkClient - .searchDataObjectAsync( - SearchDataObjectRequest - .builder() - .tenantId(tenantId) - .indices(ML_INDEX_INSIGHT_STORAGE_INDEX) - .searchSourceBuilder(patternSourceBuilder) - .build() - ) - .whenComplete((r, throwable) -> { - searchContext.restore(); - if (throwable != null) { - Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable); - log.error("Failed to get index insight pattern", cause); - beginGeneration(tenantId, listener); - } else { - SearchResponse searchResponse = r.searchResponse(); - SearchHit[] hits = searchResponse.getHits().getHits(); - Map mappedPatternSource = matchPattern(hits, sourceIndex); - if (Objects.isNull(mappedPatternSource)) { - beginGeneration(tenantId, listener); - } else { - handlePatternMatchedDoc(mappedPatternSource, tenantId, listener); - } - } - }); - } catch (Exception e) { - listener.onFailure(e); - } - } - }, listener::onFailure)); - } - - protected void handleExistingDoc(Map source, String tenantId, ActionListener listener) { - String currentStatus = (String) source.get(IndexInsight.STATUS_FIELD); - Object v = source.get(IndexInsight.LAST_UPDATE_FIELD); - Long lastUpdateTime = (v == null) ? null - : (v instanceof Number n) ? n.longValue() - : (v instanceof CharSequence cs && cs.length() > 0) ? Numbers.toLong(cs.toString(), true) - : null; - long currentTime = Instant.now().toEpochMilli(); - - IndexInsightTaskStatus status = IndexInsightTaskStatus.fromString(currentStatus); - switch (status) { - case GENERATING: - // Check if generating timeout - if (lastUpdateTime != null && (currentTime - lastUpdateTime) > INDEX_INSIGHT_GENERATING_TIMEOUT) { - beginGeneration(tenantId, listener); - } else { - // If still generating and not timeout, task is already running - listener - .onFailure( - new OpenSearchStatusException("Index insight is being generated, please wait...", RestStatus.TOO_MANY_REQUESTS) - ); - } - break; - case COMPLETED: - // Check if needs update - if (lastUpdateTime != null && (currentTime - lastUpdateTime) > INDEX_INSIGHT_UPDATE_INTERVAL) { - beginGeneration(tenantId, listener); - } else { - // Return existing result - IndexInsight insight = IndexInsight - .builder() - .index((String) source.get(IndexInsight.INDEX_NAME_FIELD)) - .taskType(MLIndexInsightType.valueOf((String) source.get(IndexInsight.TASK_TYPE_FIELD))) - .content((String) source.get(IndexInsight.CONTENT_FIELD)) - .status(IndexInsightTaskStatus.COMPLETED) - .lastUpdatedTime(Instant.ofEpochMilli(lastUpdateTime)) - .tenantId(tenantId) - .build(); - listener.onResponse(insight); - } - break; - case FAILED: - // Retry failed task - beginGeneration(tenantId, listener); - break; - } - } - - /** - * Handle pattern matched document - */ - protected void handlePatternMatchedDoc(Map patternSource, String tenantId, ActionListener listener) { - String currentStatus = (String) patternSource.get(IndexInsight.STATUS_FIELD); - IndexInsightTaskStatus status = IndexInsightTaskStatus.fromString(currentStatus); - - // If pattern result is not completed, fall back to normal generation - if (status != IndexInsightTaskStatus.COMPLETED) { - beginGeneration(tenantId, listener); - return; - } - - // If pattern result is completed but expired, fall back to normal generation - Long lastUpdateTime = (Long) patternSource.get(IndexInsight.LAST_UPDATE_FIELD); - long currentTime = Instant.now().toEpochMilli(); - if (lastUpdateTime != null && (currentTime - lastUpdateTime) > INDEX_INSIGHT_UPDATE_INTERVAL) { - beginGeneration(tenantId, listener); - return; - } - - // Pattern result is completed and valid - handlePatternResult(patternSource, tenantId, listener); - } - - /** - * Begin the index insight generation process by updating task status to GENERATING and executing the task with prerequisites. - */ - protected void beginGeneration(String tenantId, ActionListener listener) { - IndexInsight indexInsight = IndexInsight - .builder() - .index(sourceIndex) - .tenantId(tenantId) - .taskType(taskType) - .status(IndexInsightTaskStatus.GENERATING) - .lastUpdatedTime(Instant.now()) - .build(); - - writeIndexInsight( - indexInsight, - tenantId, - ActionListener.wrap(r -> { runWithPrerequisites(tenantId, listener); }, listener::onFailure) - ); - } - - protected void runWithPrerequisites(String tenantId, ActionListener listener) { - List prerequisites = getPrerequisites(); - AtomicInteger completedCount = new AtomicInteger(0); - if (prerequisites.isEmpty()) { - runTask(tenantId, listener); - return; - } - - // Run all prerequisites - for (MLIndexInsightType prerequisite : prerequisites) { - IndexInsightTask prerequisiteTask = createPrerequisiteTask(prerequisite); - prerequisiteTask.execute(tenantId, ActionListener.wrap(prereqInsight -> { - if (completedCount.incrementAndGet() == prerequisites.size()) { - runTask(tenantId, listener); - } - }, e -> { saveFailedStatus(tenantId, new Exception("Failed to run prerequisite: " + prerequisite, e), listener); })); - } - } - - protected void saveResult(String content, String tenantId, ActionListener listener) { - IndexInsight insight = IndexInsight - .builder() - .index(sourceIndex) - .taskType(taskType) - .content(content) - .status(IndexInsightTaskStatus.COMPLETED) - .lastUpdatedTime(Instant.now()) - .tenantId(tenantId) - .build(); - - writeIndexInsight(insight, tenantId, ActionListener.wrap(r -> { listener.onResponse(insight); }, e -> { - saveFailedStatus(tenantId, e, listener); - })); - } - - protected void saveFailedStatus(String tenantId, Exception error, ActionListener listener) { - IndexInsight indexInsight = IndexInsight - .builder() - .tenantId(tenantId) - .index(sourceIndex) - .taskType(taskType) - .lastUpdatedTime(Instant.now()) - .status(IndexInsightTaskStatus.FAILED) - .build(); - writeIndexInsight( - indexInsight, - tenantId, - ActionListener.wrap(r -> { listener.onFailure(error); }, e -> { listener.onFailure(e); }) - ); - } - - /** - * Generate document ID for current task - */ - protected String generateDocId() { - return generateDocId(taskType); - } - - /** - * Generate document ID for specific task type - */ - protected String generateDocId(MLIndexInsightType taskType) { - String combined = sourceIndex + "_" + taskType.toString(); - return Hashing.sha256().hashString(combined, StandardCharsets.UTF_8).toString(); - } - - /** - * Get insight content from storage for a specific task type - */ - protected void getInsightContentFromContainer( - MLIndexInsightType taskType, - String tenantId, - ActionListener> listener - ) { - String docId = generateDocId(taskType); - getIndexInsight(docId, tenantId, ActionListener.wrap(getResponse -> { - try { - String content = getResponse.isExists() ? getResponse.getSourceAsMap().get(IndexInsight.CONTENT_FIELD).toString() : ""; - Map contentMap = gson.fromJson(content, Map.class); - listener.onResponse(contentMap); - } catch (Exception e) { - // Return empty content on JSON parsing failure - listener.onResponse(new HashMap<>()); - } - }, listener::onFailure)); - } - - protected void handlePatternResult(Map patternSource, String tenantId, ActionListener listener) { - // Default implementation: return pattern result as-is - Long lastUpdateTime = (Long) patternSource.get(IndexInsight.LAST_UPDATE_FIELD); - IndexInsight insight = IndexInsight - .builder() - .index(sourceIndex) - .taskType(taskType) - .content((String) patternSource.get(IndexInsight.CONTENT_FIELD)) - .status(IndexInsightTaskStatus.COMPLETED) - .lastUpdatedTime(Instant.ofEpochMilli(lastUpdateTime)) - .tenantId(tenantId) - .build(); - listener.onResponse(insight); - } - - private void getIndexInsight(String docId, String tenantId, ActionListener listener) { - try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { - sdkClient - .getDataObjectAsync( - GetDataObjectRequest - .builder() - .tenantId(tenantId) - .index(ML_INDEX_INSIGHT_STORAGE_INDEX) - .id(docId) - .cmkRoleArn(cmkRoleArn) - .assumeRoleArn(cmkAssumeRoleArn) - .build() - ) - .whenComplete((r, throwable) -> { - context.restore(); - if (throwable != null) { - Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable); - log.error("Failed to get index insight document", cause); - listener.onFailure(cause); - } else { - try { - GetResponse getResponse = r.getResponse(); - assert getResponse != null; - listener.onResponse(getResponse); - } catch (Exception e) { - listener.onFailure(e); - } - } - }); - } catch (Exception e) { - listener.onFailure(e); - } - } - - private void writeIndexInsight(IndexInsight indexInsight, String tenantId, ActionListener listener) { - String docId = generateDocId(); - try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { - sdkClient - .putDataObjectAsync( - PutDataObjectRequest - .builder() - .tenantId(tenantId) - .index(ML_INDEX_INSIGHT_STORAGE_INDEX) - .dataObject(indexInsight) - .id(docId) - .cmkRoleArn(cmkRoleArn) - .assumeRoleArn(cmkAssumeRoleArn) - .build() - ) - .whenComplete((r, throwable) -> { - context.restore(); - if (throwable != null) { - Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable); - log.error("Failed to write index insight document", cause); - listener.onFailure(cause); - } else { - try { - IndexResponse indexResponse = r.indexResponse(); - assert indexResponse != null; - if (indexResponse.getResult() == DocWriteResponse.Result.CREATED - || indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { - listener.onResponse(true); - } else { - listener.onFailure(new RuntimeException("Failed to put generating index insight doc")); - } - } catch (Exception e) { - listener.onFailure(e); - } - } - }); - } catch (Exception e) { - listener.onFailure(e); - } - } - - protected static void getAgentIdToRun(Client client, String tenantId, ActionListener actionListener) { - MLConfigGetRequest mlConfigGetRequest = new MLConfigGetRequest(INDEX_INSIGHT_AGENT_NAME, tenantId); - client.execute(MLConfigGetAction.INSTANCE, mlConfigGetRequest, ActionListener.wrap(r -> { - MLConfig mlConfig = r.getMlConfig(); - actionListener.onResponse(mlConfig.getConfiguration().getAgentId()); - }, actionListener::onFailure)); - } - - /** - * Flatten all the fields in the mappings, insert the field to fieldType mapping to a map - * @param mappingSource the mappings of an index - * @param fieldsToType the result containing the field to fieldType mapping - * @param prefix the parent field path - * @param includeFields whether include the `fields` in a text type field, for some use case like PPLTool, `fields` in a text type field - * cannot be included, but for CreateAnomalyDetectorTool, `fields` must be included. - */ - protected static void extractFieldNamesTypes( - Map mappingSource, - Map fieldsToType, - String prefix, - boolean includeFields - ) { - if (prefix.length() > 0) { - prefix += "."; - } - - for (Map.Entry entry : mappingSource.entrySet()) { - String n = entry.getKey(); - Object v = entry.getValue(); - - if (v instanceof Map) { - Map vMap = (Map) v; - if (vMap.containsKey("type")) { - String fieldType = (String) vMap.getOrDefault("type", ""); - // no need to extract alias into the result, and for object field, extract the subfields only - if (!fieldType.equals("alias") && !fieldType.equals("object")) { - fieldsToType.put(prefix + n, fieldType); - } - } - if (vMap.containsKey("properties")) { - extractFieldNamesTypes((Map) vMap.get("properties"), fieldsToType, prefix + n, includeFields); - } - if (includeFields && vMap.containsKey("fields")) { - extractFieldNamesTypes((Map) vMap.get("fields"), fieldsToType, prefix + n, true); - } - } - } - } - - private static SearchSourceBuilder buildPatternSourceBuilder(String taskType) { - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); - sourceBuilder.size(100); - - RegexpQueryBuilder regexpQuery = QueryBuilders.regexpQuery(INDEX_NAME_FIELD, ".*[*?,].*"); - TermQueryBuilder termQuery = QueryBuilders.termQuery(TASK_TYPE_FIELD, taskType); - BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().filter(regexpQuery).filter(termQuery); - - sourceBuilder.query(boolQuery); - return sourceBuilder; - } - - /** - * Auto-detects LLM response format and extracts the response text if response_filter is not configured - */ - private static String extractModelResponse(Map data) { - if (data.containsKey("choices")) { - return JsonPath.read(data, "$.choices[0].message.content"); - } - if (data.containsKey("content")) { - return JsonPath.read(data, "$.content[0].text"); - } - return JsonPath.read(data, "$.response"); - } - - private static Map matchPattern(SearchHit[] hits, String targetIndex) { - for (SearchHit hit : hits) { - Map source = hit.getSourceAsMap(); - String pattern = (String) source.get(INDEX_NAME_FIELD); - if (Regex.simpleMatch(pattern, targetIndex)) { - return source; - } - } - return null; - } - - /** - * Common method to call LLM with agent and handle response parsing - */ - protected static void callLLMWithAgent( - Client client, - String agentId, - String prompt, - String sourceIndex, - String tenantId, - ActionListener listener - ) { - AgentMLInput agentInput = AgentMLInput - .AgentMLInputBuilder() - .agentId(agentId) - .functionName(FunctionName.AGENT) - .inputDataset(RemoteInferenceInputDataSet.builder().parameters(Collections.singletonMap("prompt", prompt)).build()) - .tenantId(tenantId) - .build(); - - MLExecuteTaskRequest executeRequest = new MLExecuteTaskRequest(FunctionName.AGENT, agentInput); - - client.execute(MLExecuteTaskAction.INSTANCE, executeRequest, ActionListener.wrap(mlResp -> { - try { - ModelTensorOutput out = (ModelTensorOutput) mlResp.getOutput(); - ModelTensors t = out.getMlModelOutputs().get(0); - ModelTensor mt = t.getMlModelTensors().get(0); - String result = mt.getResult(); - String response; - // response_filter is not configured in the LLM connector - if (result.startsWith("{")) { - Map data = gson.fromJson(result, Map.class); - response = extractModelResponse(data); - } else { - // response_filter is configured in the LLM connector - response = result; - } - listener.onResponse(response); - } catch (Exception e) { - log.error("Error parsing LLM response for index {}: {}", sourceIndex, e.getMessage()); - listener.onFailure(e); - } - }, e -> { - log.error("Failed to call LLM for index {}: {}", sourceIndex, e.getMessage()); - listener.onFailure(e); - })); - } - - protected void handleError(String message, Exception e, String tenantId, ActionListener listener, boolean shouldStore) { - log.error(message, sourceIndex, e); - if (shouldStore) { - saveFailedStatus(tenantId, e, listener); - } else { - listener.onFailure(e); - } - } - - protected void handleError(String message, Exception e, String tenantId, ActionListener listener) { - handleError(message, e, tenantId, listener, true); - } -} diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTask.java deleted file mode 100644 index 1eda0c0390..0000000000 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTask.java +++ /dev/null @@ -1,338 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.ml.common.indexInsight; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.opensearch.ml.common.indexInsight.StatisticalDataTask.EXAMPLE_DOC_KEYWORD; -import static org.opensearch.ml.common.indexInsight.StatisticalDataTask.IMPORTANT_COLUMN_KEYWORD; -import static org.opensearch.ml.common.utils.StringUtils.gson; - -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.opensearch.action.LatchedActionListener; -import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest; -import org.opensearch.cluster.metadata.MappingMetadata; -import org.opensearch.core.action.ActionListener; -import org.opensearch.remote.metadata.client.SdkClient; -import org.opensearch.transport.client.Client; - -import lombok.extern.log4j.Log4j2; - -/** - * Field Description Task: Generates descriptions for index fields using LLM. - * This task analyzes index mapping and sample data to provide meaningful descriptions - * for each field in the index, helping down-stream tasks understand the purpose and content of fields. - */ -@Log4j2 -public class FieldDescriptionTask extends AbstractIndexInsightTask { - - private static final int BATCH_SIZE = 50; // Hard-coded value for now - - public FieldDescriptionTask(String sourceIndex, Client client, SdkClient sdkClient, String cmkRoleArn, String cmkAssumeRoleArn) { - super(MLIndexInsightType.FIELD_DESCRIPTION, sourceIndex, client, sdkClient, cmkRoleArn, cmkAssumeRoleArn); - } - - @Override - public void runTask(String tenantId, ActionListener listener) { - try { - getInsightContentFromContainer(MLIndexInsightType.STATISTICAL_DATA, tenantId, ActionListener.wrap(statisticalContentMap -> { - getAgentIdToRun( - client, - tenantId, - ActionListener - .wrap(agentId -> { batchProcessFields(statisticalContentMap, agentId, tenantId, listener); }, listener::onFailure) - ); - }, e -> handleError("Failed to get statistical content for index {}", e, tenantId, listener))); - } catch (Exception e) { - handleError("Failed to execute field description task for index {}", e, tenantId, listener); - } - } - - /** - * Filter pattern-matched field descriptions to only include fields present in current index - */ - private Map filterFieldDescriptions( - Map patternFieldDescriptions, - Map currentIndexFields - ) { - Map filteredDescriptions = new LinkedHashMap<>(); - - if (patternFieldDescriptions == null || currentIndexFields == null) { - return filteredDescriptions; - } - - currentIndexFields - .keySet() - .stream() - .filter(patternFieldDescriptions::containsKey) - .forEach(fieldName -> filteredDescriptions.put(fieldName, patternFieldDescriptions.get(fieldName))); - - return filteredDescriptions; - } - - @Override - protected void handlePatternResult(Map patternSource, String tenantId, ActionListener listener) { - try { - String patternContent = (String) patternSource.get(IndexInsight.CONTENT_FIELD); - Map patternFieldDescriptions = gson.fromJson(patternContent, Map.class); - - // Get current index mapping - GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(sourceIndex); - - client.admin().indices().getMappings(getMappingsRequest, ActionListener.wrap(getMappingsResponse -> { - try { - Map mappings = getMappingsResponse.getMappings(); - if (mappings.isEmpty()) { - beginGeneration(tenantId, listener); - return; - } - - // Extract field names from current index mapping - Map currentFields = new HashMap<>(); - for (MappingMetadata mappingMetadata : mappings.values()) { - Map mappingSource = (Map) mappingMetadata.getSourceAsMap().get("properties"); - if (mappingSource != null) { - extractFieldNamesTypes(mappingSource, currentFields, "", false); - } - } - - Map currentFieldsMap = new HashMap<>(currentFields); - - Map filteredDescriptions = filterFieldDescriptions(patternFieldDescriptions, currentFieldsMap); - - // Create filtered result without storing - Long lastUpdateTime = (Long) patternSource.get(IndexInsight.LAST_UPDATE_FIELD); - IndexInsight insight = IndexInsight - .builder() - .index(sourceIndex) - .taskType(taskType) - .content(gson.toJson(filteredDescriptions)) - .status(IndexInsightTaskStatus.COMPLETED) - .lastUpdatedTime(Instant.ofEpochMilli(lastUpdateTime)) - .build(); - listener.onResponse(insight); - - } catch (Exception e) { - log.error("Failed to process current index mapping for index {}", sourceIndex, e); - listener.onFailure(e); - } - }, e -> { - log.error("Failed to get current index mapping for index {}", sourceIndex, e); - listener.onFailure(e); - })); - - } catch (Exception e) { - log.error("Failed to filter field descriptions for index {}", sourceIndex, e); - listener.onFailure(e); - } - } - - @Override - public List getPrerequisites() { - return Collections.singletonList(MLIndexInsightType.STATISTICAL_DATA); - } - - private void batchProcessFields( - Map statisticalContentMap, - String agentId, - String tenantId, - ActionListener listener - ) { - Map mappingSource; - Object obj = statisticalContentMap.get(IMPORTANT_COLUMN_KEYWORD); - if (!(obj instanceof Map)) { - handleError( - "No mapping properties found for index: {}", - new IllegalStateException("No data distribution found for index: " + sourceIndex), - tenantId, - listener - ); - return; - } - mappingSource = (Map) obj; - - List allFields = List.of(mappingSource.keySet().toArray(new String[0])); - - if (allFields.isEmpty()) { - log.warn("No important fields found for index: {}", sourceIndex); - saveResult("", tenantId, ActionListener.wrap(insight -> { - log.info("Empty field description completed for: {}", sourceIndex); - listener.onResponse(insight); - }, e -> handleError("Failed to save empty field description result for index {}", e, tenantId, listener))); - return; - } - - List> batches = createBatches(allFields, BATCH_SIZE); - CountDownLatch countDownLatch = new CountDownLatch(batches.size()); - Map resultsMap = new ConcurrentHashMap<>(); - AtomicBoolean hasErrors = new AtomicBoolean(false); - ActionListener> resultListener = ActionListener.wrap(batchResult -> { - if (batchResult != null) - resultsMap.putAll(batchResult); - }, e -> { - hasErrors.set(true); - log.error("Batch processing failed for index {}: {}", sourceIndex, e.getMessage()); - }); - LatchedActionListener> latchedActionListener = new LatchedActionListener<>(resultListener, countDownLatch); - for (List batch : batches) { - processBatch(batch, statisticalContentMap, agentId, tenantId, latchedActionListener); - } - try { - countDownLatch.await(60, SECONDS); - if (!hasErrors.get()) { - saveResult(gson.toJson(resultsMap), tenantId, ActionListener.wrap(insight -> { - log.info("Field description completed for: {}", sourceIndex); - listener.onResponse(insight); - }, e -> handleError("Failed to save field description result for index {}", e, tenantId, listener))); - } else { - handleError("Batch processing failed for index {}", new Exception("Batch processing failed"), tenantId, listener); - } - } catch (InterruptedException e) { - log.error("Batch processing interrupted for index: {}", sourceIndex); - handleError("Batch processing interrupted for index {}", e, tenantId, listener); - } - } - - private List> createBatches(List fields, int batchSize) { - List> batches = new ArrayList<>(); - for (int i = 0; i < fields.size(); i += batchSize) { - int end = Math.min(i + batchSize, fields.size()); - batches.add(fields.subList(i, end)); - } - return batches; - } - - private void processBatch( - List batchFields, - Map statisticalContentMap, - String agentId, - String tenantId, - ActionListener> listener - ) { - String prompt = generateBatchPrompt(batchFields, statisticalContentMap); - - callLLMWithAgent(client, agentId, prompt, sourceIndex, tenantId, ActionListener.wrap(response -> { - try { - log.info("Batch LLM call successful for {} fields in index {}", batchFields.size(), sourceIndex); - Map batchResult = parseFieldDescription(response); - listener.onResponse(batchResult); - } catch (Exception e) { - log.error("Error parsing response for batch in index {}: {}", sourceIndex, e.getMessage()); - listener.onFailure(e); - } - }, e -> { listener.onFailure(e); })); - } - - private String generateBatchPrompt(List batchFields, Map statisticalContentMap) { - StringBuilder prompt = new StringBuilder(); - prompt.append("Please analyze the following OpenSearch index fields and provide descriptions:\\n\\n"); - prompt.append("Index Name: ").append(sourceIndex).append("\\n\\n"); - - prompt.append("Fields to describe:\\n"); - for (String field : batchFields) { - prompt.append("- ").append(field).append("\\n"); - } - prompt.append("\\n"); - // Filter statistical data based on current batch fields - Map relevantStatisticalData = extractRelevantStatisticalData(statisticalContentMap, batchFields); - if (!relevantStatisticalData.isEmpty()) { - if (relevantStatisticalData.containsKey(IMPORTANT_COLUMN_KEYWORD)) { - prompt.append("Some Field Distribution:\\n").append(relevantStatisticalData.get(IMPORTANT_COLUMN_KEYWORD)).append("\\n\\n"); - } - if (relevantStatisticalData.containsKey(EXAMPLE_DOC_KEYWORD)) { - prompt.append("Example Documents:\\n").append(relevantStatisticalData.get(EXAMPLE_DOC_KEYWORD)).append("\\n\\n"); - } - } - - prompt - .append( - "For each field listed above, provide a brief description of what it contains and its purpose. The description should not mention specific values from any example documents or include specific examples.\\n" - ); - prompt.append("For each field, provide description in the following format EXACTLY:\\n"); - prompt.append("field_name: description"); - - return prompt.toString(); - } - - private Map extractRelevantStatisticalData(Map statisticalContentMap, List batchFields) { - Map result = new LinkedHashMap<>(); - if (statisticalContentMap == null || statisticalContentMap.isEmpty() || batchFields.isEmpty()) { - return result; - } - - try { - Map distribution = (Map) statisticalContentMap.get(IMPORTANT_COLUMN_KEYWORD); - - // Extract relevant mapping - Map relevantMapping = new LinkedHashMap<>(); - for (String field : batchFields) { - if (distribution != null && distribution.containsKey(field)) { - relevantMapping.put(field, distribution.get(field)); - } - } - if (!relevantMapping.isEmpty()) { - result.put(IMPORTANT_COLUMN_KEYWORD, relevantMapping); - } - - // Extract example docs from distribution - List> exampleDocs = (List>) statisticalContentMap.get(EXAMPLE_DOC_KEYWORD); - if (exampleDocs != null && !exampleDocs.isEmpty()) { - List> filteredExampleDocs = new ArrayList<>(); - for (Map doc : exampleDocs) { - Map filteredDoc = new LinkedHashMap<>(); - for (String field : batchFields) { - if (doc.containsKey(field)) { - filteredDoc.put(field, doc.get(field)); - } - } - filteredExampleDocs.add(filteredDoc); - } - result.put(EXAMPLE_DOC_KEYWORD, filteredExampleDocs); - } - - } catch (Exception e) { - log.warn("Failed to extract relevant statistical data for batch fields: {}", e.getMessage()); - } - return result; - } - - private Map parseFieldDescription(String modelResponse) { - Map field2Desc = new LinkedHashMap<>(); - String[] lines = modelResponse.trim().split("\\n"); - - for (String line : lines) { - line = line.trim(); - String[] parts = line.split(":", 2); - if (parts.length == 2) { - String name = parts[0].trim(); - String desc = parts[1].trim(); - if (!desc.isEmpty()) { - field2Desc.put(name, desc); - } - } - } - - return field2Desc; - } - - @Override - public IndexInsightTask createPrerequisiteTask(MLIndexInsightType prerequisiteType) { - if (prerequisiteType == MLIndexInsightType.STATISTICAL_DATA) { - return new StatisticalDataTask(sourceIndex, client, sdkClient, cmkRoleArn, cmkAssumeRoleArn); - } - throw new IllegalStateException("Unsupported prerequisite type: " + prerequisiteType); - } - -} diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsight.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsight.java deleted file mode 100644 index e56138258e..0000000000 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsight.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.ml.common.indexInsight; - -import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; -import static org.opensearch.ml.common.CommonValue.TENANT_ID_FIELD; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.time.Instant; - -import org.opensearch.common.xcontent.XContentFactory; -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.core.xcontent.ToXContent; -import org.opensearch.core.xcontent.ToXContentObject; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; - -import lombok.Builder; -import lombok.EqualsAndHashCode; -import lombok.Getter; - -@EqualsAndHashCode -@Getter -public class IndexInsight implements ToXContentObject, Writeable { - public static final String INDEX_NAME_FIELD = "index_name"; - public static final String LAST_UPDATE_FIELD = "last_updated_time"; - public static final String CONTENT_FIELD = "content"; - public static final String STATUS_FIELD = "status"; - public static final String TASK_TYPE_FIELD = "task_type"; - - private final String index; - private final String content; - private final IndexInsightTaskStatus status; - private final MLIndexInsightType taskType; - private final Instant lastUpdatedTime; - private final String tenantId; - - @Builder(toBuilder = true) - public IndexInsight( - String index, - String content, - IndexInsightTaskStatus status, - MLIndexInsightType taskType, - Instant lastUpdatedTime, - String tenantId - ) { - this.index = index; - this.content = content; - this.status = status; - this.taskType = taskType; - this.lastUpdatedTime = lastUpdatedTime; - this.tenantId = tenantId; - } - - public IndexInsight(StreamInput input) throws IOException { - index = input.readString(); - status = IndexInsightTaskStatus.fromString(input.readString()); - taskType = MLIndexInsightType.fromString(input.readString()); - lastUpdatedTime = input.readInstant(); - content = input.readOptionalString(); - tenantId = input.readOptionalString(); - } - - public static IndexInsight parse(XContentParser parser) throws IOException { - String indexName = null; - String content = null; - IndexInsightTaskStatus status = null; - String taskType = null; - Instant lastUpdatedTime = null; - String tenantId = null; - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - String fieldName = parser.currentName(); - parser.nextToken(); - - switch (fieldName) { - case INDEX_NAME_FIELD: - indexName = parser.text(); - break; - case CONTENT_FIELD: - content = parser.text(); - break; - case STATUS_FIELD: - status = IndexInsightTaskStatus.fromString(parser.text()); - break; - case TASK_TYPE_FIELD: - taskType = parser.text(); - break; - case LAST_UPDATE_FIELD: - lastUpdatedTime = Instant.ofEpochMilli(parser.longValue()); - break; - case TENANT_ID_FIELD: - tenantId = parser.text(); - break; - default: - parser.skipChildren(); - break; - } - } - return IndexInsight - .builder() - .index(indexName) - .content(content) - .status(status) - .taskType(MLIndexInsightType.fromString(taskType)) - .lastUpdatedTime(lastUpdatedTime) - .tenantId(tenantId) - .build(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(index); - out.writeString(status.toString()); - out.writeString(taskType.toString()); - out.writeInstant(lastUpdatedTime); - out.writeOptionalString(content); - out.writeOptionalString(tenantId); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - if (index != null) { - builder.field(INDEX_NAME_FIELD, index); - } - if (content != null && !content.isEmpty()) { - builder.field(CONTENT_FIELD, content); - } - if (status != null) { - builder.field(STATUS_FIELD, status.toString()); - } - if (tenantId != null) { - builder.field(TENANT_ID_FIELD, tenantId); - } - builder.field(TASK_TYPE_FIELD, taskType.toString()); - builder.field(LAST_UPDATE_FIELD, lastUpdatedTime.toEpochMilli()); - builder.endObject(); - return builder; - } - - public static IndexInsight fromStream(StreamInput in) throws IOException { - return new IndexInsight(in); - } - - @Override - public String toString() { - try { - XContentBuilder builder = XContentFactory.jsonBuilder(); - builder.prettyPrint(); - toXContent(builder, ToXContent.EMPTY_PARAMS); - return builder.toString(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } -} diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightAccessControllerHelper.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightAccessControllerHelper.java deleted file mode 100644 index e0ccaf84e8..0000000000 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightAccessControllerHelper.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.ml.common.indexInsight; - -import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.core.action.ActionListener; -import org.opensearch.index.query.MatchAllQueryBuilder; -import org.opensearch.search.builder.SearchSourceBuilder; -import org.opensearch.transport.client.Client; - -import lombok.extern.log4j.Log4j2; - -@Log4j2 -public class IndexInsightAccessControllerHelper { - // Verify the access by dry run - public static void verifyAccessController(Client client, ActionListener actionListener, String sourceIndex) { - GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(sourceIndex); - client.admin().indices().getMappings(getMappingsRequest, ActionListener.wrap(getMappingsResponse -> { - if (getMappingsResponse.getMappings().isEmpty()) { - actionListener.onFailure(new IllegalArgumentException("No matching indices found for: " + sourceIndex)); - return; - } - // If mappings exist, do a search to verify access permissions - SearchRequest searchRequest = constructSimpleQueryRequest(sourceIndex); - client.search(searchRequest, ActionListener.wrap(r -> { actionListener.onResponse(true); }, e -> { - log.error(e.getMessage(), e); - actionListener.onFailure(e); - })); - }, e -> { - log.error(e.getMessage(), e); - actionListener.onFailure(e); - })); - } - - public static SearchRequest constructSimpleQueryRequest(String sourceIndex) { - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(new MatchAllQueryBuilder()); - searchSourceBuilder.size(1); - SearchRequest searchRequest = new SearchRequest(sourceIndex); - searchRequest.source(searchSourceBuilder); - return searchRequest; - } -} diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightConfig.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightConfig.java deleted file mode 100644 index ac6778a1dc..0000000000 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightConfig.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.ml.common.indexInsight; - -import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; -import static org.opensearch.ml.common.CommonValue.TENANT_ID_FIELD; - -import java.io.IOException; - -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.core.xcontent.ToXContentObject; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; - -import lombok.Builder; -import lombok.Getter; - -@Getter -public class IndexInsightConfig implements ToXContentObject, Writeable { - private final Boolean isEnable; - private final String tenantId; - - public static final String IS_ENABLE_FIELD = "is_enable"; - - @Builder(toBuilder = true) - public IndexInsightConfig(Boolean isEnable, String tenantId) { - this.isEnable = isEnable; - this.tenantId = tenantId; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeBoolean(isEnable); - out.writeString(tenantId); - } - - public IndexInsightConfig(StreamInput input) throws IOException { - isEnable = input.readBoolean(); - tenantId = input.readString(); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(IS_ENABLE_FIELD, isEnable); - if (tenantId != null) { - builder.field(TENANT_ID_FIELD, tenantId); - } - builder.endObject(); - return builder; - } - - public static IndexInsightConfig parse(XContentParser parser) throws IOException { - Boolean isEnable = null; - String tenantId = null; - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - String fieldName = parser.currentName(); - parser.nextToken(); - switch (fieldName) { - case IS_ENABLE_FIELD: - isEnable = parser.booleanValue(); - break; - case TENANT_ID_FIELD: - tenantId = parser.text(); - break; - default: - parser.skipChildren(); - break; - } - - } - return IndexInsightConfig.builder().isEnable(isEnable).tenantId(tenantId).build(); - } - - public static IndexInsightConfig fromStream(StreamInput in) throws IOException { - return new IndexInsightConfig(in); - } -} diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightTask.java deleted file mode 100644 index 926ab93591..0000000000 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightTask.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.ml.common.indexInsight; - -import java.util.List; - -import org.opensearch.core.action.ActionListener; - -/** - * Interface representing an index insight execution task - */ -public interface IndexInsightTask { - - /** - * Execute the index insight task - */ - void execute(String tenantId, ActionListener listener); - - /** - * Get the prerequisites of this task - */ - List getPrerequisites(); - - /** - * Run the specific task logic - */ - void runTask(String tenantId, ActionListener listener); - - /** - * Create prerequisite task instance - */ - IndexInsightTask createPrerequisiteTask(MLIndexInsightType prerequisiteType); - -} diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightTaskStatus.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightTaskStatus.java deleted file mode 100644 index 12689f7c32..0000000000 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightTaskStatus.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.ml.common.indexInsight; - -import java.util.Locale; - -public enum IndexInsightTaskStatus { - GENERATING, - COMPLETED, - FAILED; - - public static IndexInsightTaskStatus fromString(String status) { - if (status == null) { - throw new IllegalArgumentException("Index insight task status can't be null"); - } - try { - return IndexInsightTaskStatus.valueOf(status.toUpperCase(Locale.ROOT)); - } catch (Exception e) { - throw new IllegalArgumentException("Wrong index insight task status"); - } - } -} diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTask.java deleted file mode 100644 index 8df30a823c..0000000000 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTask.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.ml.common.indexInsight; - -import static org.opensearch.ml.common.utils.StringUtils.MAPPER; -import static org.opensearch.ml.common.utils.StringUtils.gson; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.opensearch.action.search.SearchRequest; -import org.opensearch.core.action.ActionListener; -import org.opensearch.index.query.MatchAllQueryBuilder; -import org.opensearch.remote.metadata.client.SdkClient; -import org.opensearch.search.SearchHit; -import org.opensearch.search.builder.SearchSourceBuilder; -import org.opensearch.transport.client.Client; - -import com.fasterxml.jackson.core.type.TypeReference; - -import lombok.extern.log4j.Log4j2; - -/** Check whether the index is log-related for downstream task:Log-based RCA analysis -1. Judge whether the index is related to log -2. Whether there is a column containing the whole log message -3. Whether there is a column serve as trace id which combine a set of logs into one flow -4. The Whole return is a fixed format which can be parsed in the following process - */ -@Log4j2 -public class LogRelatedIndexCheckTask extends AbstractIndexInsightTask { - - private String sampleDocString; - - private static final Map DEFAULT_RCA_RESULT = new HashMap<>(); - - static { - DEFAULT_RCA_RESULT.put("is_log_index", false); - DEFAULT_RCA_RESULT.put("log_message_field", null); - DEFAULT_RCA_RESULT.put("trace_id_field", null); - } - - private static final String RCA_TEMPLATE = - """ - I will provide you an index with the types and statistics of each field, and a few sample documents. - - Your task is to analyze the structure and semantics of this index, and determine whether it is suitable for Root Cause Analysis (RCA) on logs. - - Please help me answer the following 3 questions based on the provided information: - - 1. Is this index related to **log data**? - 2. Is there any **field that contains full log messages** (e.g., raw log lines or unstructured log content)? - 3. Is there any **field that can serve as a trace ID**, i.e., grouping multiple logs into the same logical execution or transaction flow? - - The index name is: - {indexName} - - Here are 3 sample documents from this index: - {samples} - - You should infer your answer **based on both field names, their data types, value examples, and overall context**. - Avoid simply repeating the input values. Think logically about what each field represents and how it might be used. - - Return your result in the **following strict JSON format** inside tags, so that it can be parsed later. Only include fields that you are confident about. - - - { - "is_log_index": true/false, - "log_message_field": "field_name" or null, - "trace_id_field": "field_name" or null - } - - - Rules: - - If you cannot confidently find a log message field or trace ID field, use `null`. - - Your judgment should be based on both semantics and field patterns (e.g., field names like "message", "log", "trace", "span", etc). - """; - - public LogRelatedIndexCheckTask(String sourceIndex, Client client, SdkClient sdkClient, String cmkRoleArn, String cmkAssumeRoleArn) { - super(MLIndexInsightType.LOG_RELATED_INDEX_CHECK, sourceIndex, client, sdkClient, cmkRoleArn, cmkAssumeRoleArn); - } - - @Override - public void runTask(String tenantId, ActionListener listener) { - try { - collectSampleDocString(ActionListener.wrap(sampleDocs -> { - getAgentIdToRun( - client, - tenantId, - ActionListener.wrap(agentId -> performLogAnalysis(agentId, tenantId, listener), listener::onFailure) - ); - }, listener::onFailure)); - } catch (Exception e) { - handleError("Failed log related check for {}", e, tenantId, listener); - } - } - - @Override - public List getPrerequisites() { - return Collections.emptyList(); - } - - private void collectSampleDocString(ActionListener listener) { - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.size(3).query(new MatchAllQueryBuilder()); - SearchRequest searchRequest = new SearchRequest(new String[] { sourceIndex }, searchSourceBuilder); - - client.search(searchRequest, ActionListener.wrap(searchResponse -> { - try { - List> samples = Arrays - .stream(searchResponse.getHits().getHits()) - .map(SearchHit::getSourceAsMap) - .toList(); - sampleDocString = gson.toJson(samples); - log.info("Collected sample documents for index: {}", sourceIndex); - listener.onResponse(sampleDocString); - } catch (Exception e) { - log.error("Failed to process sample documents for index: {}", sourceIndex, e); - listener.onFailure(e); - } - }, e -> { - log.error("Failed to collect sample documents for index: {}", sourceIndex, e); - listener.onFailure(e); - })); - } - - private void performLogAnalysis(String agentId, String tenantId, ActionListener listener) { - String prompt = RCA_TEMPLATE.replace("{indexName}", sourceIndex).replace("{samples}", sampleDocString); - - callLLMWithAgent(client, agentId, prompt, sourceIndex, tenantId, ActionListener.wrap(response -> { - try { - Map parsed = parseCheckResponse(response); - saveResult(MAPPER.writeValueAsString(parsed), tenantId, ActionListener.wrap(insight -> { - log.info("Log related check completed for index {}", sourceIndex); - listener.onResponse(insight); - }, e -> handleError("Failed to save log related check result for index {}", e, tenantId, listener))); - } catch (Exception e) { - handleError("Error parsing response of log related check for {}", e, tenantId, listener); - } - }, e -> handleError("Failed to call LLM for log related check: {}", e, tenantId, listener))); - } - - private Map parseCheckResponse(String resp) { - try { - String json = resp.split("", 2)[1].split("", 2)[0].trim(); - return MAPPER.readValue(json, new TypeReference<>() { - }); - } catch (Exception e) { - log.warn("Failed to parse RCA analysis response, returning default values", e); - return DEFAULT_RCA_RESULT; - } - } - - @Override - public IndexInsightTask createPrerequisiteTask(MLIndexInsightType prerequisiteType) { - throw new IllegalStateException("LogRelatedIndexCheckTask has no prerequisites"); - } - -} diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/MLIndexInsightType.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/MLIndexInsightType.java deleted file mode 100644 index e882d8ab2f..0000000000 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/MLIndexInsightType.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.ml.common.indexInsight; - -import java.util.Locale; - -public enum MLIndexInsightType { - STATISTICAL_DATA, - FIELD_DESCRIPTION, - LOG_RELATED_INDEX_CHECK, - ALL; - - public static MLIndexInsightType fromString(String type) { - if (type == null) { - throw new IllegalArgumentException("ML index insight type can't be null"); - } - try { - return MLIndexInsightType.valueOf(type.toUpperCase(Locale.ROOT)); - } catch (Exception e) { - throw new IllegalArgumentException("Wrong index insight type"); - } - } -} diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/StatisticalDataTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/StatisticalDataTask.java deleted file mode 100644 index f44ee2ef3a..0000000000 --- a/common/src/main/java/org/opensearch/ml/common/indexInsight/StatisticalDataTask.java +++ /dev/null @@ -1,407 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.ml.common.indexInsight; - -import static org.opensearch.ml.common.utils.StringUtils.gson; - -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.cluster.metadata.MappingMetadata; -import org.opensearch.core.action.ActionListener; -import org.opensearch.index.query.QueryBuilders; -import org.opensearch.ml.common.utils.mergeMetaDataUtils.MergeRuleHelper; -import org.opensearch.remote.metadata.client.SdkClient; -import org.opensearch.search.SearchHit; -import org.opensearch.search.aggregations.Aggregation; -import org.opensearch.search.aggregations.AggregationBuilders; -import org.opensearch.search.aggregations.AggregatorFactories; -import org.opensearch.search.aggregations.bucket.filter.Filters; -import org.opensearch.search.aggregations.bucket.filter.FiltersAggregationBuilder; -import org.opensearch.search.aggregations.bucket.filter.FiltersAggregator.KeyedFilter; -import org.opensearch.search.aggregations.bucket.sampler.Sampler; -import org.opensearch.search.aggregations.bucket.sampler.SamplerAggregationBuilder; -import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; -import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder; -import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder; -import org.opensearch.search.aggregations.metrics.MinAggregationBuilder; -import org.opensearch.search.aggregations.metrics.TopHits; -import org.opensearch.search.aggregations.metrics.TopHitsAggregationBuilder; -import org.opensearch.search.builder.SearchSourceBuilder; -import org.opensearch.search.sort.SortOrder; -import org.opensearch.transport.client.Client; - -import com.google.common.annotations.VisibleForTesting; - -import lombok.extern.log4j.Log4j2; - -/** - * Statistical Data Task: Collects sample documents from the target index for analysis. - * This task serves as the foundation for other index insight tasks by gathering sample data - * that provides context about the index structure and content. - * Will expand to support additional data types beyond sample documents in the future. - */ -@Log4j2 -public class StatisticalDataTask extends AbstractIndexInsightTask { - - private static final int TERM_SIZE = 5; - private static final List PREFIXES = List.of("unique_terms_", "unique_count_", "max_value_", "min_value_"); - private static final List UNIQUE_TERMS_LIST = List.of("text", "keyword", "integer", "long", "short"); - private static final List MIN_MAX_LIST = List.of("integer", "long", "float", "double", "short", "date"); - private static final Double HIGH_PRIORITY_COLUMN_THRESHOLD = 0.001; - private static final int SAMPLE_NUMBER = 100000; - private static final String PARSE_COLUMN_NAME_PATTERN = "(.*?)"; - private static final int FILTER_LLM_NUMBERS = 30; - public static final String NOT_NULL_KEYWORD = "not_null"; - public static final String IMPORTANT_COLUMN_KEYWORD = "important_column_and_distribution"; - public static final String EXAMPLE_DOC_KEYWORD = "example_docs"; - - private static final String UNIQUE_TERM_PREFIX = "unique_terms_"; - private static final String MAX_VALUE_PREFIX = "max_value_"; - private static final String MIN_VALUE_PREFIX = "min_value_"; - private static final String UNIQUE_COUNT_PREFIX = "unique_count_"; - - private static final String PROMPT_TEMPLATE = """ - Now I will give you the sample examples and some field's data distribution of one Opensearch index. - You should help me filter at most %s important columns. - For logs/trace/metric related indices, make sure you contain error/http response/time/latency/metric related columns. - You should contain your response column name inside tag - Here is the information of sample examples and some field's data distribution. - - IndexName: %s - detailed information: %s - """; - - public StatisticalDataTask(String sourceIndex, Client client, SdkClient sdkClient, String cmkRoleArn, String cmkAssumeRoleArn) { - super(MLIndexInsightType.STATISTICAL_DATA, sourceIndex, client, sdkClient, cmkRoleArn, cmkAssumeRoleArn); - } - - @Override - public void runTask(String tenantId, ActionListener listener) { - runTask(tenantId, listener, true); - } - - public void runTask(String tenantId, ActionListener listener, boolean shouldStore) { - try { - collectStatisticalData(tenantId, shouldStore, listener); - } catch (Exception e) { - handleError("Failed to execute statistical data task for index {}", e, tenantId, listener, shouldStore); - } - } - - @Override - protected void handlePatternMatchedDoc(Map patternSource, String tenantId, ActionListener listener) { - // For StatisticalDataTask, run without storing when pattern matched - runTask(tenantId, listener, false); - } - - @Override - public List getPrerequisites() { - return Collections.emptyList(); - } - - private void collectStatisticalData(String tenantId, boolean shouldStore, ActionListener listener) { - GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(sourceIndex); - - client.admin().indices().getMappings(getMappingsRequest, ActionListener.wrap(getMappingsResponse -> { - Map mappings = getMappingsResponse.getMappings(); - if (mappings.isEmpty()) { - listener.onFailure(new IllegalArgumentException("No matching mapping with index name: " + sourceIndex)); - return; - } - - Map allFields = new HashMap<>(); - for (MappingMetadata mappingMetadata : mappings.values()) { - Map mappingSource = (Map) mappingMetadata.getSourceAsMap().get("properties"); - MergeRuleHelper.merge(mappingSource, allFields); - } - Map fieldsToType = new HashMap<>(); - extractFieldNamesTypes(allFields, fieldsToType, "", false); - SearchRequest searchRequest = new SearchRequest(sourceIndex); - searchRequest.source(buildQuery(fieldsToType)); - - client.search(searchRequest, ActionListener.wrap(searchResponse -> { - Set highPriorityColumns = filterColumns(fieldsToType, searchResponse); - Map parsedResult = parseSearchResult(fieldsToType, highPriorityColumns, searchResponse); - filterImportantColumnByLLM(parsedResult, tenantId, ActionListener.wrap(response -> { - Map filteredResponse = new HashMap<>(); - filteredResponse - .put( - EXAMPLE_DOC_KEYWORD, - filterSampleColumns((List>) parsedResult.get(EXAMPLE_DOC_KEYWORD), response) - ); - Map importantColumns = (Map) parsedResult.get(IMPORTANT_COLUMN_KEYWORD); - Map filteredImportantColumns = importantColumns - .entrySet() - .stream() - .filter(entry -> response.isEmpty() || response.contains(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - filteredResponse.put(IMPORTANT_COLUMN_KEYWORD, filteredImportantColumns); - String statisticalContent = gson.toJson(filteredResponse); - - if (shouldStore) { - saveResult(statisticalContent, tenantId, listener); - } else { - // Return IndexInsight directly without storing - IndexInsight insight = IndexInsight - .builder() - .index(sourceIndex) - .taskType(taskType) - .content(statisticalContent) - .status(IndexInsightTaskStatus.COMPLETED) - .lastUpdatedTime(Instant.now()) - .build(); - listener.onResponse(insight); - } - }, listener::onFailure)); - }, e -> handleError("Failed to collect statistical data for index: {}", e, tenantId, listener, shouldStore))); - }, listener::onFailure)); - } - - @Override - public IndexInsightTask createPrerequisiteTask(MLIndexInsightType prerequisiteType) { - throw new IllegalStateException("StatisticalDataTask has no prerequisites"); - } - - public SearchSourceBuilder buildQuery(Map fields) { - AggregatorFactories.Builder subAggs = new AggregatorFactories.Builder(); - - for (Map.Entry field : fields.entrySet()) { - String name = field.getKey(); - String type = field.getValue(); - String fieldUsed = name; - - if ("text".equals(type)) { - fieldUsed = name + ".keyword"; - } - - if (UNIQUE_TERMS_LIST.contains(type)) { - TermsAggregationBuilder termsAgg = AggregationBuilders.terms(UNIQUE_TERM_PREFIX + name).field(fieldUsed).size(TERM_SIZE); - - CardinalityAggregationBuilder countAgg = AggregationBuilders.cardinality(UNIQUE_COUNT_PREFIX + name).field(fieldUsed); - - subAggs.addAggregator(termsAgg); - subAggs.addAggregator(countAgg); - } - if (MIN_MAX_LIST.contains(type)) { - MinAggregationBuilder minAgg = AggregationBuilders.min(MIN_VALUE_PREFIX + name).field(fieldUsed); - MaxAggregationBuilder maxAgg = AggregationBuilders.max(MAX_VALUE_PREFIX + name).field(fieldUsed); - - subAggs.addAggregator(minAgg); - subAggs.addAggregator(maxAgg); - } - } - - // Add top hits example_docs - TopHitsAggregationBuilder topHitsAgg = AggregationBuilders.topHits(EXAMPLE_DOC_KEYWORD).size(3); - subAggs.addAggregator(topHitsAgg); - - // Add not none count - List keyedFilters = new ArrayList<>(); - for (String fieldName : fields.keySet()) { - keyedFilters.add(new KeyedFilter(fieldName + "_" + NOT_NULL_KEYWORD, QueryBuilders.existsQuery(fieldName))); - } - FiltersAggregationBuilder nonNullAgg = AggregationBuilders.filters(NOT_NULL_KEYWORD, keyedFilters.toArray(new KeyedFilter[0])); - subAggs.addAggregator(nonNullAgg); - - // Wrap everything in a Sampler aggregation - SamplerAggregationBuilder samplerAgg = AggregationBuilders.sampler("sample").shardSize(SAMPLE_NUMBER).subAggregations(subAggs); - - // Build search source - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder() - .query(QueryBuilders.matchAllQuery()) - .sort("_doc", SortOrder.DESC) - .size(0) - .aggregation(samplerAgg); - - return sourceBuilder; - } - - private void filterImportantColumnByLLM(Map parsedResult, String tenantId, ActionListener> listener) { - Map importantColumns = (Map) parsedResult.get(IMPORTANT_COLUMN_KEYWORD); - if (importantColumns.size() <= FILTER_LLM_NUMBERS) { - listener.onResponse(new ArrayList<>()); // Too few columns and don't need to filter - return; - } - String prompt = generateFilterColumnPrompt(parsedResult); - getAgentIdToRun(client, tenantId, ActionListener.wrap(agentId -> { - callLLMWithAgent(client, agentId, prompt, sourceIndex, tenantId, ActionListener.wrap(response -> { - listener.onResponse(parseLLMFilteredResult(response)); - }, e -> { listener.onResponse(new ArrayList<>()); })); - }, e -> { listener.onResponse(new ArrayList<>()); })); - } - - private String generateFilterColumnPrompt(Map parsedResult) { - return String.format(PROMPT_TEMPLATE, FILTER_LLM_NUMBERS, sourceIndex, gson.toJson(parsedResult)); - } - - @VisibleForTesting - List> filterSampleColumns(List> originalDocs, List targetColumns) { - if (targetColumns.isEmpty()) { - return originalDocs; - } - List> results = new ArrayList<>(); - for (Map originalDoc : originalDocs) { - results.add(constructFilterMap("", originalDoc, targetColumns)); - } - return results; - } - - private Map constructFilterMap(String prefix, Map currentNode, List targetColumns) { - Map filterResult = new HashMap<>(); - for (Map.Entry entry : currentNode.entrySet()) { - String currentKey = prefix.isEmpty() ? entry.getKey() : prefix + "." + entry.getKey(); - Object currentValue = entry.getValue(); - if (targetColumns.contains(currentKey)) { - filterResult.put(entry.getKey(), currentValue); - } else if (currentValue instanceof Map) { - Map tmpNode = constructFilterMap(currentKey, (Map) currentValue, targetColumns); - if (!tmpNode.isEmpty()) { - filterResult.put(entry.getKey(), tmpNode); - } - } else if (currentValue instanceof List) { - List list = (List) currentValue; - if (!list.isEmpty() && list.get(0) instanceof Map) { - List> newList = new ArrayList<>(); - for (Object item : list) { - Map tmpNode = constructFilterMap(currentKey, (Map) item, targetColumns); - if (!tmpNode.isEmpty()) { - newList.add(tmpNode); - } - } - if (!newList.isEmpty()) { - filterResult.put(entry.getKey(), newList); - } - } - } - } - return filterResult; - } - - private List parseLLMFilteredResult(String LLMResponse) { - try { - Pattern pattern = Pattern.compile(PARSE_COLUMN_NAME_PATTERN); - Matcher matcher = pattern.matcher(LLMResponse); - List columns = new ArrayList<>(); - while (matcher.find()) { - columns.add(matcher.group(1).trim()); - } - return columns; - } catch (Exception e) { - throw new IllegalArgumentException("fail to parse LLM response"); - } - } - - private Map parseSearchResult( - Map allFieldsToType, - Set filteredNames, - SearchResponse searchResponse - ) { - Map aggregationMap = ((Sampler) searchResponse.getAggregations().getAsMap().get("sample")) - .getAggregations() - .getAsMap(); - Map result = new LinkedHashMap<>(); - Map finalResult = new LinkedHashMap<>(); - List exampleDocs = null; - for (Map.Entry entry : aggregationMap.entrySet()) { - String key = entry.getKey(); - Aggregation aggregation = entry.getValue(); - if (key.equals(EXAMPLE_DOC_KEYWORD)) { - SearchHit[] hits = ((TopHits) aggregation).getHits().getHits(); - exampleDocs = new ArrayList<>(hits.length); - for (SearchHit hit : hits) { - exampleDocs.add(hit.getSourceAsMap()); - } - } else { - for (String prefix : PREFIXES) { - if (key.startsWith(prefix)) { - String targetField = key.substring(prefix.length()); - if (!filteredNames.contains(targetField)) { - continue; - } - - String aggregationType = key.substring(0, prefix.length() - 1); - - Map aggregationResult = gson.fromJson(aggregation.toString(), Map.class); - Object targetValue; - try { - if (prefix.equals(UNIQUE_TERM_PREFIX)) { - // assuming result.get(key) is a Map containing "buckets" -> List> - Map aggResult = (Map) aggregationResult.get(key); - List> buckets = aggResult != null - ? (List>) aggResult.get("buckets") - : null; - if (buckets == null) { - continue; - } - - targetValue = buckets.stream().filter(bucket -> bucket != null).map(bucket -> bucket.get("key")).toList(); - } else { - Map aggResult = (Map) aggregationResult.get(key); - if (aggResult.containsKey("value_as_string")) { - targetValue = aggResult.get("value_as_string"); - } else { - targetValue = aggResult.get("value"); - } - } - result.computeIfAbsent(targetField, k -> new HashMap<>(Map.of("type", allFieldsToType.get(targetField)))); - ((Map) result.get(targetField)).put(aggregationType, targetValue); - break; - } catch (Exception e) { - log - .error( - "Failed to parse aggregation result from DSL in statistical index insight for index name: {}", - sourceIndex, - e - ); - } - } - } - } - } - if (exampleDocs != null) { - finalResult.put(EXAMPLE_DOC_KEYWORD, exampleDocs); - } - finalResult.put(IMPORTANT_COLUMN_KEYWORD, result); - return finalResult; - } - - private Set filterColumns(Map allFieldsToType, SearchResponse searchResponse) { - Sampler sampleAggregation = (Sampler) searchResponse.getAggregations().getAsMap().get("sample"); - Map aggregationMap = sampleAggregation.getAggregations().getAsMap(); - long totalDocCount = sampleAggregation.getDocCount(); - Set filteredNames = new HashSet<>(); - Filters aggregation; - try { - aggregation = (Filters) aggregationMap.get(NOT_NULL_KEYWORD); - } catch (Exception e) { - return filteredNames; - } - for (Filters.Bucket bucket : aggregation.getBuckets()) { - String targetField = bucket.getKey().toString(); - targetField = targetField.substring(0, targetField.length() - 1 - NOT_NULL_KEYWORD.length()); - long docCount = bucket.getDocCount(); - if (docCount > HIGH_PRIORITY_COLUMN_THRESHOLD * totalDocCount && allFieldsToType.containsKey(targetField)) { - filteredNames.add(targetField); - } - } - return filteredNames; - } - -} diff --git a/common/src/main/java/org/opensearch/ml/common/transport/indexInsight/MLIndexInsightGetRequest.java b/common/src/main/java/org/opensearch/ml/common/transport/indexInsight/MLIndexInsightGetRequest.java index ee07107bdc..e2aa0b6519 100644 --- a/common/src/main/java/org/opensearch/ml/common/transport/indexInsight/MLIndexInsightGetRequest.java +++ b/common/src/main/java/org/opensearch/ml/common/transport/indexInsight/MLIndexInsightGetRequest.java @@ -18,10 +18,10 @@ import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.ml.common.indexInsight.MLIndexInsightType; import lombok.Builder; import lombok.Getter; +import org.opensearch.ml.common.indexInsight.MLIndexInsightType; @Builder @Getter diff --git a/common/src/test/java/org/opensearch/ml/common/indexInsight/StatisticalDataTaskTests.java b/common/src/test/java/org/opensearch/ml/common/indexInsight/StatisticalDataTaskTests.java index 5671885e48..0b1c3d259e 100644 --- a/common/src/test/java/org/opensearch/ml/common/indexInsight/StatisticalDataTaskTests.java +++ b/common/src/test/java/org/opensearch/ml/common/indexInsight/StatisticalDataTaskTests.java @@ -26,8 +26,6 @@ import java.util.Map; import java.util.Set; -import javax.swing.*; - import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; diff --git a/plugin/src/main/java/org/opensearch/ml/action/IndexInsight/GetIndexInsightTransportAction.java b/plugin/src/main/java/org/opensearch/ml/action/IndexInsight/GetIndexInsightTransportAction.java index cafe4b3235..5b96425391 100644 --- a/plugin/src/main/java/org/opensearch/ml/action/IndexInsight/GetIndexInsightTransportAction.java +++ b/plugin/src/main/java/org/opensearch/ml/action/IndexInsight/GetIndexInsightTransportAction.java @@ -5,7 +5,13 @@ package org.opensearch.ml.action.IndexInsight; +import static org.opensearch.ml.common.CommonValue.ML_INDEX_INSIGHT_STORAGE_INDEX; +import static org.opensearch.ml.common.CommonValue.ML_INDEX_INSIGHT_STORAGE_INDEX_MAPPING_PATH; +import static org.opensearch.ml.common.indexInsight.MLIndexInsightType.FIELD_DESCRIPTION; +import static org.opensearch.ml.common.memorycontainer.MemoryContainerConstants.MEMORY_CONTAINER_ID_FIELD; import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_INDEX_INSIGHT_FEATURE_ENABLED; +import static org.opensearch.ml.common.indexInsight.MLIndexInsightType.LOG_RELATED_INDEX_CHECK; +import static org.opensearch.ml.common.indexInsight.MLIndexInsightType.STATISTICAL_DATA; import java.time.Instant; @@ -15,20 +21,22 @@ import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.ml.common.MLIndex; -import org.opensearch.ml.common.indexInsight.FieldDescriptionTask; import org.opensearch.ml.common.indexInsight.IndexInsight; -import org.opensearch.ml.common.indexInsight.IndexInsightAccessControllerHelper; -import org.opensearch.ml.common.indexInsight.IndexInsightTask; import org.opensearch.ml.common.indexInsight.IndexInsightTaskStatus; -import org.opensearch.ml.common.indexInsight.LogRelatedIndexCheckTask; import org.opensearch.ml.common.indexInsight.MLIndexInsightType; -import org.opensearch.ml.common.indexInsight.StatisticalDataTask; +import org.opensearch.ml.common.memorycontainer.RemoteStore; import org.opensearch.ml.common.settings.MLFeatureEnabledSetting; import org.opensearch.ml.common.transport.indexInsight.MLIndexInsightGetAction; import org.opensearch.ml.common.transport.indexInsight.MLIndexInsightGetRequest; import org.opensearch.ml.common.transport.indexInsight.MLIndexInsightGetResponse; import org.opensearch.ml.engine.indices.MLIndicesHandler; +import org.opensearch.ml.helper.MemoryContainerHelper; +import org.opensearch.ml.helper.RemoteMemoryStoreHelper; +import org.opensearch.ml.indexInsight.FieldDescriptionTask; +import org.opensearch.ml.indexInsight.IndexInsightAccessControllerHelper; +import org.opensearch.ml.common.indexInsight.IndexInsightTask; +import org.opensearch.ml.indexInsight.LogRelatedIndexCheckTask; +import org.opensearch.ml.indexInsight.StatisticalDataTask; import org.opensearch.ml.utils.TenantAwareHelper; import org.opensearch.remote.metadata.client.SdkClient; import org.opensearch.tasks.Task; @@ -41,7 +49,7 @@ public class GetIndexInsightTransportAction extends HandledTransportAction { private static final MLIndexInsightType[] ALL_TYPE_ORDER = { MLIndexInsightType.STATISTICAL_DATA, - MLIndexInsightType.FIELD_DESCRIPTION, + FIELD_DESCRIPTION, MLIndexInsightType.LOG_RELATED_INDEX_CHECK }; private final Client client; @@ -49,6 +57,8 @@ public class GetIndexInsightTransportAction extends HandledTransportAction { - ActionListener actionAfterDryRun = ActionListener.wrap(r -> { - executeTaskAndReturn(mlIndexInsightGetRequest, mlIndexInsightGetRequest.getTenantId(), actionListener); - }, actionListener::onFailure); - IndexInsightAccessControllerHelper.verifyAccessController(client, actionAfterDryRun, indexName); + String memoryContainerId = client.threadPool().getThreadContext().getHeader(MEMORY_CONTAINER_ID_FIELD); + String indexMappings = mlIndicesHandler.getMapping(ML_INDEX_INSIGHT_STORAGE_INDEX_MAPPING_PATH); + memoryContainerHelper.getMemoryContainer(memoryContainerId, ActionListener.wrap(mlMemoryContainer -> { + RemoteStore remoteStore = mlMemoryContainer.getConfiguration().getRemoteStore(); + if (remoteStore.getConnectorId() != null) { + remoteMemoryStoreHelper.createRemoteIndex(remoteStore.getConnectorId(), ML_INDEX_INSIGHT_STORAGE_INDEX, indexMappings, ActionListener.wrap(r2 -> { + ActionListener actionAfterDryRun = ActionListener.wrap(r -> { + executeTaskAndReturn(mlIndexInsightGetRequest, mlIndexInsightGetRequest.getTenantId(), actionListener); + }, actionListener::onFailure); + IndexInsightAccessControllerHelper.verifyAccessController(client, actionAfterDryRun, indexName); + }, e -> { + log.error("Failed to create index insight storage", e); + actionListener.onFailure(e); + })); + } + }, e -> { - log.error("Failed to create index insight storage", e); - actionListener.onFailure(e); + log.error("Failed to retrieve memory container", e); })); } @@ -192,7 +216,9 @@ IndexInsightTask createTask(MLIndexInsightGetRequest request) { client, sdkClient, request.getCmkRoleArn(), - request.getAssumeRoleArn() + request.getAssumeRoleArn(), + remoteMemoryStoreHelper, + memoryContainerHelper ); case FIELD_DESCRIPTION: return new FieldDescriptionTask( @@ -200,7 +226,9 @@ IndexInsightTask createTask(MLIndexInsightGetRequest request) { client, sdkClient, request.getCmkRoleArn(), - request.getAssumeRoleArn() + request.getAssumeRoleArn(), + remoteMemoryStoreHelper, + memoryContainerHelper ); case LOG_RELATED_INDEX_CHECK: return new LogRelatedIndexCheckTask( @@ -208,7 +236,9 @@ IndexInsightTask createTask(MLIndexInsightGetRequest request) { client, sdkClient, request.getCmkRoleArn(), - request.getAssumeRoleArn() + request.getAssumeRoleArn(), + remoteMemoryStoreHelper, + memoryContainerHelper ); default: throw new IllegalArgumentException("Unsupported task type: " + request.getTargetIndexInsight()); diff --git a/plugin/src/main/java/org/opensearch/ml/helper/RemoteMemoryStoreHelper.java b/plugin/src/main/java/org/opensearch/ml/helper/RemoteMemoryStoreHelper.java index 26805ac693..e5b08a7709 100644 --- a/plugin/src/main/java/org/opensearch/ml/helper/RemoteMemoryStoreHelper.java +++ b/plugin/src/main/java/org/opensearch/ml/helper/RemoteMemoryStoreHelper.java @@ -41,6 +41,7 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.action.update.UpdateResponse; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Nullable; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; @@ -84,6 +85,7 @@ public class RemoteMemoryStoreHelper { public static final String CREATE_INGEST_PIPELINE_ACTION = "create_ingest_pipeline"; public static final String CREATE_INDEX_ACTION = "create_index"; public static final String WRITE_DOC_ACTION = "write_doc"; + public static final String WRITE_DOC_ACTION_WITH_ID = "write_doc_withID"; public static final String BULK_LOAD_ACTION = "bulk_load"; public static final String SEARCH_INDEX_ACTION = "search_index"; public static final String GET_DOC_ACTION = "get_doc"; @@ -471,6 +473,21 @@ public void writeDocument( } } + public void writeDocumentWithDocID( + RemoteStore remoteStore, + String docId, + String indexName, + Map documentSource, + ActionListener listener + ) { + // If connectorId is provided, use the existing method + if (remoteStore.getConnector() != null) { + writeDocument(remoteStore.getConnector(), indexName, documentSource, listener, docId); + } else { + listener.onFailure(new IllegalArgumentException("RemoteStore must have either connectorId or internal connector configured")); + } + } + public void writeDocument( String connectorId, String indexName, @@ -529,6 +546,38 @@ public void writeDocument( } } + public void writeDocument( + Connector connector, + String indexName, + Map documentSource, + ActionListener listener, + String docId + ) { + try { + // Prepare parameters for connector execution + Map parameters = new HashMap<>(); + parameters.put(INDEX_NAME_PARAM, indexName); + parameters.put(DOC_ID_PARAM, docId); + parameters.put(INPUT_PARAM, StringUtils.toJsonWithPlainNumbers(documentSource)); + + // Execute the connector action with write_doc action name + executeConnectorAction(connector, WRITE_DOC_ACTION_WITH_ID, parameters, ActionListener.wrap(response -> { + // Extract document ID from response + XContentParser parser = createParserFromTensorOutput(response); + IndexResponse indexResponse = IndexResponse.fromXContent(parser); + listener.onResponse(indexResponse); + }, e -> { + log.error("Failed to write document to remote index: {}", indexName, e); + listener.onFailure(e); + })); + + } catch (Exception e) { + log.error("Error preparing remote document write for index: {}", indexName, e); + listener.onFailure(e); + } + } + + /** * Performs bulk write operations to remote storage using RemoteStore configuration */ diff --git a/plugin/src/main/java/org/opensearch/ml/rest/RestMLExecuteAction.java b/plugin/src/main/java/org/opensearch/ml/rest/RestMLExecuteAction.java index bf6c6a060c..93945a62a0 100644 --- a/plugin/src/main/java/org/opensearch/ml/rest/RestMLExecuteAction.java +++ b/plugin/src/main/java/org/opensearch/ml/rest/RestMLExecuteAction.java @@ -8,6 +8,7 @@ import static org.opensearch.core.rest.RestStatus.BAD_REQUEST; import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.ml.common.memorycontainer.MemoryContainerConstants.MEMORY_CONTAINER_ID_FIELD; import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_AG_UI_DISABLED_MESSAGE; import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_EXECUTE_TOOL_DISABLED_MESSAGE; import static org.opensearch.ml.plugin.MachineLearningPlugin.ML_BASE_URI; @@ -162,6 +163,7 @@ MLExecuteTaskRequest getRequest(RestRequest request, NodeClient client) throws I ); } putMcpRequestHeaders(request, client); + client.threadPool().getThreadContext().putHeader(MEMORY_CONTAINER_ID_FIELD, request.header(MEMORY_CONTAINER_ID_FIELD)); } } else if (uri.startsWith(ML_BASE_URI + "/tools/")) { if (!mlFeatureEnabledSetting.isToolExecuteEnabled()) { From 022f2dbcbf2a5f733b19b4de9e02de4da1931c24 Mon Sep 17 00:00:00 2001 From: xinyual Date: Wed, 10 Dec 2025 16:37:09 +0800 Subject: [PATCH 2/2] relocate Signed-off-by: xinyual --- .../ml/common/indexInsight/IndexInsight.java | 163 ++++++ .../common/indexInsight/IndexInsightTask.java | 37 ++ .../indexInsight/IndexInsightTaskStatus.java | 25 + .../indexInsight/MLIndexInsightType.java | 26 + .../AbstractIndexInsightTask.java | 534 ++++++++++++++++++ .../ml/indexInsight/FieldDescriptionTask.java | 345 +++++++++++ .../IndexInsightAccessControllerHelper.java | 47 ++ .../ml/indexInsight/IndexInsightConfig.java | 84 +++ .../LogRelatedIndexCheckTask.java | 170 ++++++ .../ml/indexInsight/StatisticalDataTask.java | 414 ++++++++++++++ 10 files changed, 1845 insertions(+) create mode 100644 common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsight.java create mode 100644 common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightTask.java create mode 100644 common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightTaskStatus.java create mode 100644 common/src/main/java/org/opensearch/ml/common/indexInsight/MLIndexInsightType.java create mode 100644 plugin/src/main/java/org/opensearch/ml/indexInsight/AbstractIndexInsightTask.java create mode 100644 plugin/src/main/java/org/opensearch/ml/indexInsight/FieldDescriptionTask.java create mode 100644 plugin/src/main/java/org/opensearch/ml/indexInsight/IndexInsightAccessControllerHelper.java create mode 100644 plugin/src/main/java/org/opensearch/ml/indexInsight/IndexInsightConfig.java create mode 100644 plugin/src/main/java/org/opensearch/ml/indexInsight/LogRelatedIndexCheckTask.java create mode 100644 plugin/src/main/java/org/opensearch/ml/indexInsight/StatisticalDataTask.java diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsight.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsight.java new file mode 100644 index 0000000000..e56138258e --- /dev/null +++ b/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsight.java @@ -0,0 +1,163 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ml.common.indexInsight; + +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.ml.common.CommonValue.TENANT_ID_FIELD; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Instant; + +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; + +@EqualsAndHashCode +@Getter +public class IndexInsight implements ToXContentObject, Writeable { + public static final String INDEX_NAME_FIELD = "index_name"; + public static final String LAST_UPDATE_FIELD = "last_updated_time"; + public static final String CONTENT_FIELD = "content"; + public static final String STATUS_FIELD = "status"; + public static final String TASK_TYPE_FIELD = "task_type"; + + private final String index; + private final String content; + private final IndexInsightTaskStatus status; + private final MLIndexInsightType taskType; + private final Instant lastUpdatedTime; + private final String tenantId; + + @Builder(toBuilder = true) + public IndexInsight( + String index, + String content, + IndexInsightTaskStatus status, + MLIndexInsightType taskType, + Instant lastUpdatedTime, + String tenantId + ) { + this.index = index; + this.content = content; + this.status = status; + this.taskType = taskType; + this.lastUpdatedTime = lastUpdatedTime; + this.tenantId = tenantId; + } + + public IndexInsight(StreamInput input) throws IOException { + index = input.readString(); + status = IndexInsightTaskStatus.fromString(input.readString()); + taskType = MLIndexInsightType.fromString(input.readString()); + lastUpdatedTime = input.readInstant(); + content = input.readOptionalString(); + tenantId = input.readOptionalString(); + } + + public static IndexInsight parse(XContentParser parser) throws IOException { + String indexName = null; + String content = null; + IndexInsightTaskStatus status = null; + String taskType = null; + Instant lastUpdatedTime = null; + String tenantId = null; + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + + switch (fieldName) { + case INDEX_NAME_FIELD: + indexName = parser.text(); + break; + case CONTENT_FIELD: + content = parser.text(); + break; + case STATUS_FIELD: + status = IndexInsightTaskStatus.fromString(parser.text()); + break; + case TASK_TYPE_FIELD: + taskType = parser.text(); + break; + case LAST_UPDATE_FIELD: + lastUpdatedTime = Instant.ofEpochMilli(parser.longValue()); + break; + case TENANT_ID_FIELD: + tenantId = parser.text(); + break; + default: + parser.skipChildren(); + break; + } + } + return IndexInsight + .builder() + .index(indexName) + .content(content) + .status(status) + .taskType(MLIndexInsightType.fromString(taskType)) + .lastUpdatedTime(lastUpdatedTime) + .tenantId(tenantId) + .build(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(index); + out.writeString(status.toString()); + out.writeString(taskType.toString()); + out.writeInstant(lastUpdatedTime); + out.writeOptionalString(content); + out.writeOptionalString(tenantId); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (index != null) { + builder.field(INDEX_NAME_FIELD, index); + } + if (content != null && !content.isEmpty()) { + builder.field(CONTENT_FIELD, content); + } + if (status != null) { + builder.field(STATUS_FIELD, status.toString()); + } + if (tenantId != null) { + builder.field(TENANT_ID_FIELD, tenantId); + } + builder.field(TASK_TYPE_FIELD, taskType.toString()); + builder.field(LAST_UPDATE_FIELD, lastUpdatedTime.toEpochMilli()); + builder.endObject(); + return builder; + } + + public static IndexInsight fromStream(StreamInput in) throws IOException { + return new IndexInsight(in); + } + + @Override + public String toString() { + try { + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.prettyPrint(); + toXContent(builder, ToXContent.EMPTY_PARAMS); + return builder.toString(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightTask.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightTask.java new file mode 100644 index 0000000000..926ab93591 --- /dev/null +++ b/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightTask.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ml.common.indexInsight; + +import java.util.List; + +import org.opensearch.core.action.ActionListener; + +/** + * Interface representing an index insight execution task + */ +public interface IndexInsightTask { + + /** + * Execute the index insight task + */ + void execute(String tenantId, ActionListener listener); + + /** + * Get the prerequisites of this task + */ + List getPrerequisites(); + + /** + * Run the specific task logic + */ + void runTask(String tenantId, ActionListener listener); + + /** + * Create prerequisite task instance + */ + IndexInsightTask createPrerequisiteTask(MLIndexInsightType prerequisiteType); + +} diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightTaskStatus.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightTaskStatus.java new file mode 100644 index 0000000000..12689f7c32 --- /dev/null +++ b/common/src/main/java/org/opensearch/ml/common/indexInsight/IndexInsightTaskStatus.java @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ml.common.indexInsight; + +import java.util.Locale; + +public enum IndexInsightTaskStatus { + GENERATING, + COMPLETED, + FAILED; + + public static IndexInsightTaskStatus fromString(String status) { + if (status == null) { + throw new IllegalArgumentException("Index insight task status can't be null"); + } + try { + return IndexInsightTaskStatus.valueOf(status.toUpperCase(Locale.ROOT)); + } catch (Exception e) { + throw new IllegalArgumentException("Wrong index insight task status"); + } + } +} diff --git a/common/src/main/java/org/opensearch/ml/common/indexInsight/MLIndexInsightType.java b/common/src/main/java/org/opensearch/ml/common/indexInsight/MLIndexInsightType.java new file mode 100644 index 0000000000..e882d8ab2f --- /dev/null +++ b/common/src/main/java/org/opensearch/ml/common/indexInsight/MLIndexInsightType.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ml.common.indexInsight; + +import java.util.Locale; + +public enum MLIndexInsightType { + STATISTICAL_DATA, + FIELD_DESCRIPTION, + LOG_RELATED_INDEX_CHECK, + ALL; + + public static MLIndexInsightType fromString(String type) { + if (type == null) { + throw new IllegalArgumentException("ML index insight type can't be null"); + } + try { + return MLIndexInsightType.valueOf(type.toUpperCase(Locale.ROOT)); + } catch (Exception e) { + throw new IllegalArgumentException("Wrong index insight type"); + } + } +} diff --git a/plugin/src/main/java/org/opensearch/ml/indexInsight/AbstractIndexInsightTask.java b/plugin/src/main/java/org/opensearch/ml/indexInsight/AbstractIndexInsightTask.java new file mode 100644 index 0000000000..3280186137 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/ml/indexInsight/AbstractIndexInsightTask.java @@ -0,0 +1,534 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ml.indexInsight; + +import static org.opensearch.ml.common.CommonValue.INDEX_INSIGHT_AGENT_NAME; +import static org.opensearch.ml.common.CommonValue.INDEX_INSIGHT_GENERATING_TIMEOUT; +import static org.opensearch.ml.common.CommonValue.INDEX_INSIGHT_UPDATE_INTERVAL; +import static org.opensearch.ml.common.CommonValue.ML_INDEX_INSIGHT_STORAGE_INDEX; +import static org.opensearch.ml.common.indexInsight.IndexInsight.INDEX_NAME_FIELD; +import static org.opensearch.ml.common.indexInsight.IndexInsight.TASK_TYPE_FIELD; +import static org.opensearch.ml.common.memorycontainer.MemoryContainerConstants.MEMORY_CONTAINER_ID_FIELD; +import static org.opensearch.ml.common.utils.StringUtils.gson; + +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; + +import org.opensearch.OpenSearchStatusException; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.get.GetResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.common.Numbers; +import org.opensearch.common.regex.Regex; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.query.RegexpQueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.ml.common.FunctionName; +import org.opensearch.ml.common.MLConfig; +import org.opensearch.ml.common.dataset.remote.RemoteInferenceInputDataSet; +import org.opensearch.ml.common.indexInsight.IndexInsight; +import org.opensearch.ml.common.indexInsight.IndexInsightTask; +import org.opensearch.ml.common.indexInsight.IndexInsightTaskStatus; +import org.opensearch.ml.common.indexInsight.MLIndexInsightType; +import org.opensearch.ml.common.input.execute.agent.AgentMLInput; +import org.opensearch.ml.common.memorycontainer.RemoteStore; +import org.opensearch.ml.common.output.model.ModelTensor; +import org.opensearch.ml.common.output.model.ModelTensorOutput; +import org.opensearch.ml.common.output.model.ModelTensors; +import org.opensearch.ml.common.transport.config.MLConfigGetAction; +import org.opensearch.ml.common.transport.config.MLConfigGetRequest; +import org.opensearch.ml.common.transport.execute.MLExecuteTaskAction; +import org.opensearch.ml.common.transport.execute.MLExecuteTaskRequest; +import org.opensearch.ml.helper.MemoryContainerHelper; +import org.opensearch.ml.helper.RemoteMemoryStoreHelper; +import org.opensearch.remote.metadata.client.SdkClient; +import org.opensearch.remote.metadata.client.SearchDataObjectRequest; +import org.opensearch.remote.metadata.common.SdkClientUtils; +import org.opensearch.search.SearchHit; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.transport.client.Client; + +import com.google.common.hash.Hashing; +import com.jayway.jsonpath.JsonPath; + +import lombok.extern.log4j.Log4j2; + +/** + * Abstract base class providing default implementation for IndexInsightTask + */ +@Log4j2 +public abstract class AbstractIndexInsightTask implements IndexInsightTask { + + protected final MLIndexInsightType taskType; + protected final String sourceIndex; + protected final Client client; + protected final SdkClient sdkClient; + protected final String cmkRoleArn; + protected final String cmkAssumeRoleArn; + protected final RemoteMemoryStoreHelper remoteMemoryStoreHelper; + protected final MemoryContainerHelper memoryContainerHelper; + protected final String memoryContainerId; + + protected AbstractIndexInsightTask( + MLIndexInsightType taskType, + String sourceIndex, + Client client, + SdkClient sdkClient, + String cmkRoleArn, + String cmkAssumeRoleArn, + RemoteMemoryStoreHelper remoteMemoryStoreHelper, + MemoryContainerHelper memoryContainerHelper + ) { + this.taskType = taskType; + this.sourceIndex = sourceIndex; + this.client = client; + this.sdkClient = sdkClient; + this.cmkRoleArn = cmkRoleArn; + this.cmkAssumeRoleArn = cmkAssumeRoleArn; + this.remoteMemoryStoreHelper = remoteMemoryStoreHelper; + this.memoryContainerHelper = memoryContainerHelper; + this.memoryContainerId = client.threadPool().getThreadContext().getHeader(MEMORY_CONTAINER_ID_FIELD); + } + + /** + * Execute the index insight task: + * 1. Check if record exists in storage + * 2. Check status and last updated time + * 3. Check prerequisites + * 4. Run task logic + * 5. Write back to storage + */ + @Override + public void execute(String tenantId, ActionListener listener) { + getIndexInsight(generateDocId(), tenantId, ActionListener.wrap(getResponse -> { + if (getResponse.isExists()) { + handleExistingDoc(getResponse.getSourceAsMap(), tenantId, listener); + } else { + SearchSourceBuilder patternSourceBuilder = buildPatternSourceBuilder(taskType.name()); + try (ThreadContext.StoredContext searchContext = client.threadPool().getThreadContext().stashContext()) { + sdkClient + .searchDataObjectAsync( + SearchDataObjectRequest + .builder() + .tenantId(tenantId) + .indices(ML_INDEX_INSIGHT_STORAGE_INDEX) + .searchSourceBuilder(patternSourceBuilder) + .build() + ) + .whenComplete((r, throwable) -> { + searchContext.restore(); + if (throwable != null) { + Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable); + log.error("Failed to get index insight pattern", cause); + beginGeneration(tenantId, listener); + } else { + SearchResponse searchResponse = r.searchResponse(); + SearchHit[] hits = searchResponse.getHits().getHits(); + Map mappedPatternSource = matchPattern(hits, sourceIndex); + if (Objects.isNull(mappedPatternSource)) { + beginGeneration(tenantId, listener); + } else { + handlePatternMatchedDoc(mappedPatternSource, tenantId, listener); + } + } + }); + } catch (Exception e) { + listener.onFailure(e); + } + } + }, listener::onFailure)); + } + + protected void handleExistingDoc(Map source, String tenantId, ActionListener listener) { + String currentStatus = (String) source.get(IndexInsight.STATUS_FIELD); + Object v = source.get(IndexInsight.LAST_UPDATE_FIELD); + Long lastUpdateTime = (v == null) ? null + : (v instanceof Number n) ? n.longValue() + : (v instanceof CharSequence cs && cs.length() > 0) ? Numbers.toLong(cs.toString(), true) + : null; + long currentTime = Instant.now().toEpochMilli(); + + IndexInsightTaskStatus status = IndexInsightTaskStatus.fromString(currentStatus); + switch (status) { + case GENERATING: + // Check if generating timeout + if (lastUpdateTime != null && (currentTime - lastUpdateTime) > INDEX_INSIGHT_GENERATING_TIMEOUT) { + beginGeneration(tenantId, listener); + } else { + // If still generating and not timeout, task is already running + listener + .onFailure( + new OpenSearchStatusException("Index insight is being generated, please wait...", RestStatus.TOO_MANY_REQUESTS) + ); + } + break; + case COMPLETED: + // Check if needs update + if (lastUpdateTime != null && (currentTime - lastUpdateTime) > INDEX_INSIGHT_UPDATE_INTERVAL) { + beginGeneration(tenantId, listener); + } else { + // Return existing result + IndexInsight insight = IndexInsight + .builder() + .index((String) source.get(IndexInsight.INDEX_NAME_FIELD)) + .taskType(MLIndexInsightType.valueOf((String) source.get(IndexInsight.TASK_TYPE_FIELD))) + .content((String) source.get(IndexInsight.CONTENT_FIELD)) + .status(IndexInsightTaskStatus.COMPLETED) + .lastUpdatedTime(Instant.ofEpochMilli(lastUpdateTime)) + .tenantId(tenantId) + .build(); + listener.onResponse(insight); + } + break; + case FAILED: + // Retry failed task + beginGeneration(tenantId, listener); + break; + } + } + + /** + * Handle pattern matched document + */ + protected void handlePatternMatchedDoc(Map patternSource, String tenantId, ActionListener listener) { + String currentStatus = (String) patternSource.get(IndexInsight.STATUS_FIELD); + IndexInsightTaskStatus status = IndexInsightTaskStatus.fromString(currentStatus); + + // If pattern result is not completed, fall back to normal generation + if (status != IndexInsightTaskStatus.COMPLETED) { + beginGeneration(tenantId, listener); + return; + } + + // If pattern result is completed but expired, fall back to normal generation + Long lastUpdateTime = (Long) patternSource.get(IndexInsight.LAST_UPDATE_FIELD); + long currentTime = Instant.now().toEpochMilli(); + if (lastUpdateTime != null && (currentTime - lastUpdateTime) > INDEX_INSIGHT_UPDATE_INTERVAL) { + beginGeneration(tenantId, listener); + return; + } + + // Pattern result is completed and valid + handlePatternResult(patternSource, tenantId, listener); + } + + /** + * Begin the index insight generation process by updating task status to GENERATING and executing the task with prerequisites. + */ + protected void beginGeneration(String tenantId, ActionListener listener) { + IndexInsight indexInsight = IndexInsight + .builder() + .index(sourceIndex) + .tenantId(tenantId) + .taskType(taskType) + .status(IndexInsightTaskStatus.GENERATING) + .lastUpdatedTime(Instant.now()) + .build(); + + writeIndexInsight( + indexInsight, + tenantId, + ActionListener.wrap(r -> { runWithPrerequisites(tenantId, listener); }, listener::onFailure) + ); + } + + protected void runWithPrerequisites(String tenantId, ActionListener listener) { + List prerequisites = getPrerequisites(); + AtomicInteger completedCount = new AtomicInteger(0); + if (prerequisites.isEmpty()) { + runTask(tenantId, listener); + return; + } + + // Run all prerequisites + for (MLIndexInsightType prerequisite : prerequisites) { + IndexInsightTask prerequisiteTask = createPrerequisiteTask(prerequisite); + prerequisiteTask.execute(tenantId, ActionListener.wrap(prereqInsight -> { + if (completedCount.incrementAndGet() == prerequisites.size()) { + runTask(tenantId, listener); + } + }, e -> { saveFailedStatus(tenantId, new Exception("Failed to run prerequisite: " + prerequisite, e), listener); })); + } + } + + protected void saveResult(String content, String tenantId, ActionListener listener) { + IndexInsight insight = IndexInsight + .builder() + .index(sourceIndex) + .taskType(taskType) + .content(content) + .status(IndexInsightTaskStatus.COMPLETED) + .lastUpdatedTime(Instant.now()) + .tenantId(tenantId) + .build(); + + writeIndexInsight(insight, tenantId, ActionListener.wrap(r -> { listener.onResponse(insight); }, e -> { + saveFailedStatus(tenantId, e, listener); + })); + } + + protected void saveFailedStatus(String tenantId, Exception error, ActionListener listener) { + IndexInsight indexInsight = IndexInsight + .builder() + .tenantId(tenantId) + .index(sourceIndex) + .taskType(taskType) + .lastUpdatedTime(Instant.now()) + .status(IndexInsightTaskStatus.FAILED) + .build(); + writeIndexInsight( + indexInsight, + tenantId, + ActionListener.wrap(r -> { listener.onFailure(error); }, e -> { listener.onFailure(e); }) + ); + } + + /** + * Generate document ID for current task + */ + protected String generateDocId() { + return generateDocId(taskType); + } + + /** + * Generate document ID for specific task type + */ + protected String generateDocId(MLIndexInsightType taskType) { + String combined = sourceIndex + "_" + taskType.toString(); + return Hashing.sha256().hashString(combined, StandardCharsets.UTF_8).toString(); + } + + /** + * Get insight content from storage for a specific task type + */ + protected void getInsightContentFromContainer( + MLIndexInsightType taskType, + String tenantId, + ActionListener> listener + ) { + String docId = generateDocId(taskType); + getIndexInsight(docId, tenantId, ActionListener.wrap(getResponse -> { + try { + String content = getResponse.isExists() ? getResponse.getSourceAsMap().get(IndexInsight.CONTENT_FIELD).toString() : ""; + Map contentMap = gson.fromJson(content, Map.class); + listener.onResponse(contentMap); + } catch (Exception e) { + // Return empty content on JSON parsing failure + listener.onResponse(new HashMap<>()); + } + }, listener::onFailure)); + } + + protected void handlePatternResult(Map patternSource, String tenantId, ActionListener listener) { + // Default implementation: return pattern result as-is + Long lastUpdateTime = (Long) patternSource.get(IndexInsight.LAST_UPDATE_FIELD); + IndexInsight insight = IndexInsight + .builder() + .index(sourceIndex) + .taskType(taskType) + .content((String) patternSource.get(IndexInsight.CONTENT_FIELD)) + .status(IndexInsightTaskStatus.COMPLETED) + .lastUpdatedTime(Instant.ofEpochMilli(lastUpdateTime)) + .tenantId(tenantId) + .build(); + listener.onResponse(insight); + } + + private void getIndexInsight(String docId, String tenantId, ActionListener listener) { + memoryContainerHelper.getMemoryContainer(memoryContainerId, ActionListener.wrap(mlMemoryContainer -> { + RemoteStore remoteStore = mlMemoryContainer.getConfiguration().getRemoteStore(); + remoteMemoryStoreHelper.getDocument(remoteStore, ML_INDEX_INSIGHT_STORAGE_INDEX, docId, ActionListener.wrap(listener::onResponse, e -> { + listener.onFailure(new RuntimeException("Fail to retrieve index insight", e)); + } + + )); + }, e -> { + listener.onFailure(new RuntimeException("Error happening when retrieve memory container", e)); + })); + } + + + private void writeIndexInsight(IndexInsight indexInsight, String tenantId, ActionListener listener) { + String docId = generateDocId(); + IndexRequest indexRequest = new IndexRequest(ML_INDEX_INSIGHT_STORAGE_INDEX).id(docId).source(indexInsight); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + memoryContainerHelper.getMemoryContainer(memoryContainerId, ActionListener.wrap(mlMemoryContainer -> { + RemoteStore remoteStore = mlMemoryContainer.getConfiguration().getRemoteStore(); + remoteMemoryStoreHelper.writeDocumentWithDocID(remoteStore, docId, tenantId, indexRequest.sourceAsMap(), ActionListener.wrap(indexResponse -> { + if (indexResponse.getResult() == DocWriteResponse.Result.CREATED + || indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { + listener.onResponse(true); + } else { + listener.onFailure(new RuntimeException("Failed to put generating index insight doc")); + } + }, e -> { + listener.onFailure(new RuntimeException("Error happening when putting doc", e)); + } + + )); + }, e -> { + listener.onFailure(new RuntimeException("Error happening when retrieve memory container", e)); + })); + } + + protected static void getAgentIdToRun(Client client, String tenantId, ActionListener actionListener) { + MLConfigGetRequest mlConfigGetRequest = new MLConfigGetRequest(INDEX_INSIGHT_AGENT_NAME, tenantId); + client.execute(MLConfigGetAction.INSTANCE, mlConfigGetRequest, ActionListener.wrap(r -> { + MLConfig mlConfig = r.getMlConfig(); + actionListener.onResponse(mlConfig.getConfiguration().getAgentId()); + }, actionListener::onFailure)); + } + + /** + * Flatten all the fields in the mappings, insert the field to fieldType mapping to a map + * @param mappingSource the mappings of an index + * @param fieldsToType the result containing the field to fieldType mapping + * @param prefix the parent field path + * @param includeFields whether include the `fields` in a text type field, for some use case like PPLTool, `fields` in a text type field + * cannot be included, but for CreateAnomalyDetectorTool, `fields` must be included. + */ + protected static void extractFieldNamesTypes( + Map mappingSource, + Map fieldsToType, + String prefix, + boolean includeFields + ) { + if (prefix.length() > 0) { + prefix += "."; + } + + for (Map.Entry entry : mappingSource.entrySet()) { + String n = entry.getKey(); + Object v = entry.getValue(); + + if (v instanceof Map) { + Map vMap = (Map) v; + if (vMap.containsKey("type")) { + String fieldType = (String) vMap.getOrDefault("type", ""); + // no need to extract alias into the result, and for object field, extract the subfields only + if (!fieldType.equals("alias") && !fieldType.equals("object")) { + fieldsToType.put(prefix + n, fieldType); + } + } + if (vMap.containsKey("properties")) { + extractFieldNamesTypes((Map) vMap.get("properties"), fieldsToType, prefix + n, includeFields); + } + if (includeFields && vMap.containsKey("fields")) { + extractFieldNamesTypes((Map) vMap.get("fields"), fieldsToType, prefix + n, true); + } + } + } + } + + private static SearchSourceBuilder buildPatternSourceBuilder(String taskType) { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.size(100); + + RegexpQueryBuilder regexpQuery = QueryBuilders.regexpQuery(INDEX_NAME_FIELD, ".*[*?,].*"); + TermQueryBuilder termQuery = QueryBuilders.termQuery(TASK_TYPE_FIELD, taskType); + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().filter(regexpQuery).filter(termQuery); + + sourceBuilder.query(boolQuery); + return sourceBuilder; + } + + /** + * Auto-detects LLM response format and extracts the response text if response_filter is not configured + */ + private static String extractModelResponse(Map data) { + if (data.containsKey("choices")) { + return JsonPath.read(data, "$.choices[0].message.content"); + } + if (data.containsKey("content")) { + return JsonPath.read(data, "$.content[0].text"); + } + return JsonPath.read(data, "$.response"); + } + + private static Map matchPattern(SearchHit[] hits, String targetIndex) { + for (SearchHit hit : hits) { + Map source = hit.getSourceAsMap(); + String pattern = (String) source.get(INDEX_NAME_FIELD); + if (Regex.simpleMatch(pattern, targetIndex)) { + return source; + } + } + return null; + } + + /** + * Common method to call LLM with agent and handle response parsing + */ + protected static void callLLMWithAgent( + Client client, + String agentId, + String prompt, + String sourceIndex, + String tenantId, + ActionListener listener + ) { + AgentMLInput agentInput = AgentMLInput + .AgentMLInputBuilder() + .agentId(agentId) + .functionName(FunctionName.AGENT) + .inputDataset(RemoteInferenceInputDataSet.builder().parameters(Collections.singletonMap("prompt", prompt)).build()) + .tenantId(tenantId) + .build(); + + MLExecuteTaskRequest executeRequest = new MLExecuteTaskRequest(FunctionName.AGENT, agentInput); + + client.execute(MLExecuteTaskAction.INSTANCE, executeRequest, ActionListener.wrap(mlResp -> { + try { + ModelTensorOutput out = (ModelTensorOutput) mlResp.getOutput(); + ModelTensors t = out.getMlModelOutputs().get(0); + ModelTensor mt = t.getMlModelTensors().get(0); + String result = mt.getResult(); + String response; + // response_filter is not configured in the LLM connector + if (result.startsWith("{")) { + Map data = gson.fromJson(result, Map.class); + response = extractModelResponse(data); + } else { + // response_filter is configured in the LLM connector + response = result; + } + listener.onResponse(response); + } catch (Exception e) { + log.error("Error parsing LLM response for index {}: {}", sourceIndex, e.getMessage()); + listener.onFailure(e); + } + }, e -> { + log.error("Failed to call LLM for index {}: {}", sourceIndex, e.getMessage()); + listener.onFailure(e); + })); + } + + protected void handleError(String message, Exception e, String tenantId, ActionListener listener, boolean shouldStore) { + log.error(message, sourceIndex, e); + if (shouldStore) { + saveFailedStatus(tenantId, e, listener); + } else { + listener.onFailure(e); + } + } + + protected void handleError(String message, Exception e, String tenantId, ActionListener listener) { + handleError(message, e, tenantId, listener, true); + } +} diff --git a/plugin/src/main/java/org/opensearch/ml/indexInsight/FieldDescriptionTask.java b/plugin/src/main/java/org/opensearch/ml/indexInsight/FieldDescriptionTask.java new file mode 100644 index 0000000000..ad7f036ed0 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/ml/indexInsight/FieldDescriptionTask.java @@ -0,0 +1,345 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ml.indexInsight; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.opensearch.ml.indexInsight.StatisticalDataTask.EXAMPLE_DOC_KEYWORD; +import static org.opensearch.ml.indexInsight.StatisticalDataTask.IMPORTANT_COLUMN_KEYWORD; +import static org.opensearch.ml.common.utils.StringUtils.gson; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.opensearch.action.LatchedActionListener; +import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.core.action.ActionListener; +import org.opensearch.ml.common.indexInsight.IndexInsight; +import org.opensearch.ml.common.indexInsight.IndexInsightTask; +import org.opensearch.ml.common.indexInsight.IndexInsightTaskStatus; +import org.opensearch.ml.common.indexInsight.MLIndexInsightType; +import org.opensearch.ml.helper.MemoryContainerHelper; +import org.opensearch.ml.helper.RemoteMemoryStoreHelper; +import org.opensearch.remote.metadata.client.SdkClient; +import org.opensearch.transport.client.Client; + +import lombok.extern.log4j.Log4j2; + +/** + * Field Description Task: Generates descriptions for index fields using LLM. + * This task analyzes index mapping and sample data to provide meaningful descriptions + * for each field in the index, helping down-stream tasks understand the purpose and content of fields. + */ +@Log4j2 +public class FieldDescriptionTask extends AbstractIndexInsightTask { + + private static final int BATCH_SIZE = 50; // Hard-coded value for now + + public FieldDescriptionTask(String sourceIndex, Client client, SdkClient sdkClient, String cmkRoleArn, String cmkAssumeRoleArn, RemoteMemoryStoreHelper remoteMemoryStoreHelper, + MemoryContainerHelper memoryContainerHelper) { + super(MLIndexInsightType.FIELD_DESCRIPTION, sourceIndex, client, sdkClient, cmkRoleArn, cmkAssumeRoleArn, remoteMemoryStoreHelper, memoryContainerHelper); + } + + @Override + public void runTask(String tenantId, ActionListener listener) { + try { + getInsightContentFromContainer(MLIndexInsightType.STATISTICAL_DATA, tenantId, ActionListener.wrap(statisticalContentMap -> { + getAgentIdToRun( + client, + tenantId, + ActionListener + .wrap(agentId -> { batchProcessFields(statisticalContentMap, agentId, tenantId, listener); }, listener::onFailure) + ); + }, e -> handleError("Failed to get statistical content for index {}", e, tenantId, listener))); + } catch (Exception e) { + handleError("Failed to execute field description task for index {}", e, tenantId, listener); + } + } + + /** + * Filter pattern-matched field descriptions to only include fields present in current index + */ + private Map filterFieldDescriptions( + Map patternFieldDescriptions, + Map currentIndexFields + ) { + Map filteredDescriptions = new LinkedHashMap<>(); + + if (patternFieldDescriptions == null || currentIndexFields == null) { + return filteredDescriptions; + } + + currentIndexFields + .keySet() + .stream() + .filter(patternFieldDescriptions::containsKey) + .forEach(fieldName -> filteredDescriptions.put(fieldName, patternFieldDescriptions.get(fieldName))); + + return filteredDescriptions; + } + + @Override + protected void handlePatternResult(Map patternSource, String tenantId, ActionListener listener) { + try { + String patternContent = (String) patternSource.get(IndexInsight.CONTENT_FIELD); + Map patternFieldDescriptions = gson.fromJson(patternContent, Map.class); + + // Get current index mapping + GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(sourceIndex); + + client.admin().indices().getMappings(getMappingsRequest, ActionListener.wrap(getMappingsResponse -> { + try { + Map mappings = getMappingsResponse.getMappings(); + if (mappings.isEmpty()) { + beginGeneration(tenantId, listener); + return; + } + + // Extract field names from current index mapping + Map currentFields = new HashMap<>(); + for (MappingMetadata mappingMetadata : mappings.values()) { + Map mappingSource = (Map) mappingMetadata.getSourceAsMap().get("properties"); + if (mappingSource != null) { + extractFieldNamesTypes(mappingSource, currentFields, "", false); + } + } + + Map currentFieldsMap = new HashMap<>(currentFields); + + Map filteredDescriptions = filterFieldDescriptions(patternFieldDescriptions, currentFieldsMap); + + // Create filtered result without storing + Long lastUpdateTime = (Long) patternSource.get(IndexInsight.LAST_UPDATE_FIELD); + IndexInsight insight = IndexInsight + .builder() + .index(sourceIndex) + .taskType(taskType) + .content(gson.toJson(filteredDescriptions)) + .status(IndexInsightTaskStatus.COMPLETED) + .lastUpdatedTime(Instant.ofEpochMilli(lastUpdateTime)) + .build(); + listener.onResponse(insight); + + } catch (Exception e) { + log.error("Failed to process current index mapping for index {}", sourceIndex, e); + listener.onFailure(e); + } + }, e -> { + log.error("Failed to get current index mapping for index {}", sourceIndex, e); + listener.onFailure(e); + })); + + } catch (Exception e) { + log.error("Failed to filter field descriptions for index {}", sourceIndex, e); + listener.onFailure(e); + } + } + + @Override + public List getPrerequisites() { + return Collections.singletonList(MLIndexInsightType.STATISTICAL_DATA); + } + + private void batchProcessFields( + Map statisticalContentMap, + String agentId, + String tenantId, + ActionListener listener + ) { + Map mappingSource; + Object obj = statisticalContentMap.get(IMPORTANT_COLUMN_KEYWORD); + if (!(obj instanceof Map)) { + handleError( + "No mapping properties found for index: {}", + new IllegalStateException("No data distribution found for index: " + sourceIndex), + tenantId, + listener + ); + return; + } + mappingSource = (Map) obj; + + List allFields = List.of(mappingSource.keySet().toArray(new String[0])); + + if (allFields.isEmpty()) { + log.warn("No important fields found for index: {}", sourceIndex); + saveResult("", tenantId, ActionListener.wrap(insight -> { + log.info("Empty field description completed for: {}", sourceIndex); + listener.onResponse(insight); + }, e -> handleError("Failed to save empty field description result for index {}", e, tenantId, listener))); + return; + } + + List> batches = createBatches(allFields, BATCH_SIZE); + CountDownLatch countDownLatch = new CountDownLatch(batches.size()); + Map resultsMap = new ConcurrentHashMap<>(); + AtomicBoolean hasErrors = new AtomicBoolean(false); + ActionListener> resultListener = ActionListener.wrap(batchResult -> { + if (batchResult != null) + resultsMap.putAll(batchResult); + }, e -> { + hasErrors.set(true); + log.error("Batch processing failed for index {}: {}", sourceIndex, e.getMessage()); + }); + LatchedActionListener> latchedActionListener = new LatchedActionListener<>(resultListener, countDownLatch); + for (List batch : batches) { + processBatch(batch, statisticalContentMap, agentId, tenantId, latchedActionListener); + } + try { + countDownLatch.await(60, SECONDS); + if (!hasErrors.get()) { + saveResult(gson.toJson(resultsMap), tenantId, ActionListener.wrap(insight -> { + log.info("Field description completed for: {}", sourceIndex); + listener.onResponse(insight); + }, e -> handleError("Failed to save field description result for index {}", e, tenantId, listener))); + } else { + handleError("Batch processing failed for index {}", new Exception("Batch processing failed"), tenantId, listener); + } + } catch (InterruptedException e) { + log.error("Batch processing interrupted for index: {}", sourceIndex); + handleError("Batch processing interrupted for index {}", e, tenantId, listener); + } + } + + private List> createBatches(List fields, int batchSize) { + List> batches = new ArrayList<>(); + for (int i = 0; i < fields.size(); i += batchSize) { + int end = Math.min(i + batchSize, fields.size()); + batches.add(fields.subList(i, end)); + } + return batches; + } + + private void processBatch( + List batchFields, + Map statisticalContentMap, + String agentId, + String tenantId, + ActionListener> listener + ) { + String prompt = generateBatchPrompt(batchFields, statisticalContentMap); + + callLLMWithAgent(client, agentId, prompt, sourceIndex, tenantId, ActionListener.wrap(response -> { + try { + log.info("Batch LLM call successful for {} fields in index {}", batchFields.size(), sourceIndex); + Map batchResult = parseFieldDescription(response); + listener.onResponse(batchResult); + } catch (Exception e) { + log.error("Error parsing response for batch in index {}: {}", sourceIndex, e.getMessage()); + listener.onFailure(e); + } + }, e -> { listener.onFailure(e); })); + } + + private String generateBatchPrompt(List batchFields, Map statisticalContentMap) { + StringBuilder prompt = new StringBuilder(); + prompt.append("Please analyze the following OpenSearch index fields and provide descriptions:\\n\\n"); + prompt.append("Index Name: ").append(sourceIndex).append("\\n\\n"); + + prompt.append("Fields to describe:\\n"); + for (String field : batchFields) { + prompt.append("- ").append(field).append("\\n"); + } + prompt.append("\\n"); + // Filter statistical data based on current batch fields + Map relevantStatisticalData = extractRelevantStatisticalData(statisticalContentMap, batchFields); + if (!relevantStatisticalData.isEmpty()) { + if (relevantStatisticalData.containsKey(IMPORTANT_COLUMN_KEYWORD)) { + prompt.append("Some Field Distribution:\\n").append(relevantStatisticalData.get(IMPORTANT_COLUMN_KEYWORD)).append("\\n\\n"); + } + if (relevantStatisticalData.containsKey(EXAMPLE_DOC_KEYWORD)) { + prompt.append("Example Documents:\\n").append(relevantStatisticalData.get(EXAMPLE_DOC_KEYWORD)).append("\\n\\n"); + } + } + + prompt + .append( + "For each field listed above, provide a brief description of what it contains and its purpose. The description should not mention specific values from any example documents or include specific examples.\\n" + ); + prompt.append("For each field, provide description in the following format EXACTLY:\\n"); + prompt.append("field_name: description"); + + return prompt.toString(); + } + + private Map extractRelevantStatisticalData(Map statisticalContentMap, List batchFields) { + Map result = new LinkedHashMap<>(); + if (statisticalContentMap == null || statisticalContentMap.isEmpty() || batchFields.isEmpty()) { + return result; + } + + try { + Map distribution = (Map) statisticalContentMap.get(IMPORTANT_COLUMN_KEYWORD); + + // Extract relevant mapping + Map relevantMapping = new LinkedHashMap<>(); + for (String field : batchFields) { + if (distribution != null && distribution.containsKey(field)) { + relevantMapping.put(field, distribution.get(field)); + } + } + if (!relevantMapping.isEmpty()) { + result.put(IMPORTANT_COLUMN_KEYWORD, relevantMapping); + } + + // Extract example docs from distribution + List> exampleDocs = (List>) statisticalContentMap.get(EXAMPLE_DOC_KEYWORD); + if (exampleDocs != null && !exampleDocs.isEmpty()) { + List> filteredExampleDocs = new ArrayList<>(); + for (Map doc : exampleDocs) { + Map filteredDoc = new LinkedHashMap<>(); + for (String field : batchFields) { + if (doc.containsKey(field)) { + filteredDoc.put(field, doc.get(field)); + } + } + filteredExampleDocs.add(filteredDoc); + } + result.put(EXAMPLE_DOC_KEYWORD, filteredExampleDocs); + } + + } catch (Exception e) { + log.warn("Failed to extract relevant statistical data for batch fields: {}", e.getMessage()); + } + return result; + } + + private Map parseFieldDescription(String modelResponse) { + Map field2Desc = new LinkedHashMap<>(); + String[] lines = modelResponse.trim().split("\\n"); + + for (String line : lines) { + line = line.trim(); + String[] parts = line.split(":", 2); + if (parts.length == 2) { + String name = parts[0].trim(); + String desc = parts[1].trim(); + if (!desc.isEmpty()) { + field2Desc.put(name, desc); + } + } + } + + return field2Desc; + } + + @Override + public IndexInsightTask createPrerequisiteTask(MLIndexInsightType prerequisiteType) { + if (prerequisiteType == MLIndexInsightType.STATISTICAL_DATA) { + return new StatisticalDataTask(sourceIndex, client, sdkClient, cmkRoleArn, cmkAssumeRoleArn, remoteMemoryStoreHelper, memoryContainerHelper); + } + throw new IllegalStateException("Unsupported prerequisite type: " + prerequisiteType); + } + +} diff --git a/plugin/src/main/java/org/opensearch/ml/indexInsight/IndexInsightAccessControllerHelper.java b/plugin/src/main/java/org/opensearch/ml/indexInsight/IndexInsightAccessControllerHelper.java new file mode 100644 index 0000000000..a246c68386 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/ml/indexInsight/IndexInsightAccessControllerHelper.java @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ml.indexInsight; + +import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.core.action.ActionListener; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.transport.client.Client; + +import lombok.extern.log4j.Log4j2; + +@Log4j2 +public class IndexInsightAccessControllerHelper { + // Verify the access by dry run + public static void verifyAccessController(Client client, ActionListener actionListener, String sourceIndex) { + GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(sourceIndex); + client.admin().indices().getMappings(getMappingsRequest, ActionListener.wrap(getMappingsResponse -> { + if (getMappingsResponse.getMappings().isEmpty()) { + actionListener.onFailure(new IllegalArgumentException("No matching indices found for: " + sourceIndex)); + return; + } + // If mappings exist, do a search to verify access permissions + SearchRequest searchRequest = constructSimpleQueryRequest(sourceIndex); + client.search(searchRequest, ActionListener.wrap(r -> { actionListener.onResponse(true); }, e -> { + log.error(e.getMessage(), e); + actionListener.onFailure(e); + })); + }, e -> { + log.error(e.getMessage(), e); + actionListener.onFailure(e); + })); + } + + public static SearchRequest constructSimpleQueryRequest(String sourceIndex) { + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(new MatchAllQueryBuilder()); + searchSourceBuilder.size(1); + SearchRequest searchRequest = new SearchRequest(sourceIndex); + searchRequest.source(searchSourceBuilder); + return searchRequest; + } +} diff --git a/plugin/src/main/java/org/opensearch/ml/indexInsight/IndexInsightConfig.java b/plugin/src/main/java/org/opensearch/ml/indexInsight/IndexInsightConfig.java new file mode 100644 index 0000000000..4c6b54949f --- /dev/null +++ b/plugin/src/main/java/org/opensearch/ml/indexInsight/IndexInsightConfig.java @@ -0,0 +1,84 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ml.indexInsight; + +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.ml.common.CommonValue.TENANT_ID_FIELD; + +import java.io.IOException; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import lombok.Builder; +import lombok.Getter; + +@Getter +public class IndexInsightConfig implements ToXContentObject, Writeable { + private final Boolean isEnable; + private final String tenantId; + + public static final String IS_ENABLE_FIELD = "is_enable"; + + @Builder(toBuilder = true) + public IndexInsightConfig(Boolean isEnable, String tenantId) { + this.isEnable = isEnable; + this.tenantId = tenantId; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(isEnable); + out.writeString(tenantId); + } + + public IndexInsightConfig(StreamInput input) throws IOException { + isEnable = input.readBoolean(); + tenantId = input.readString(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(IS_ENABLE_FIELD, isEnable); + if (tenantId != null) { + builder.field(TENANT_ID_FIELD, tenantId); + } + builder.endObject(); + return builder; + } + + public static IndexInsightConfig parse(XContentParser parser) throws IOException { + Boolean isEnable = null; + String tenantId = null; + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case IS_ENABLE_FIELD: + isEnable = parser.booleanValue(); + break; + case TENANT_ID_FIELD: + tenantId = parser.text(); + break; + default: + parser.skipChildren(); + break; + } + + } + return IndexInsightConfig.builder().isEnable(isEnable).tenantId(tenantId).build(); + } + + public static IndexInsightConfig fromStream(StreamInput in) throws IOException { + return new IndexInsightConfig(in); + } +} diff --git a/plugin/src/main/java/org/opensearch/ml/indexInsight/LogRelatedIndexCheckTask.java b/plugin/src/main/java/org/opensearch/ml/indexInsight/LogRelatedIndexCheckTask.java new file mode 100644 index 0000000000..8a3bbd8195 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/ml/indexInsight/LogRelatedIndexCheckTask.java @@ -0,0 +1,170 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ml.indexInsight; + +import static org.opensearch.ml.common.utils.StringUtils.MAPPER; +import static org.opensearch.ml.common.utils.StringUtils.gson; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.core.action.ActionListener; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.ml.common.indexInsight.IndexInsight; +import org.opensearch.ml.common.indexInsight.IndexInsightTask; +import org.opensearch.ml.common.indexInsight.MLIndexInsightType; +import org.opensearch.ml.helper.MemoryContainerHelper; +import org.opensearch.ml.helper.RemoteMemoryStoreHelper; +import org.opensearch.remote.metadata.client.SdkClient; +import org.opensearch.search.SearchHit; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.transport.client.Client; + +import com.fasterxml.jackson.core.type.TypeReference; + +import lombok.extern.log4j.Log4j2; + +/** Check whether the index is log-related for downstream task:Log-based RCA analysis +1. Judge whether the index is related to log +2. Whether there is a column containing the whole log message +3. Whether there is a column serve as trace id which combine a set of logs into one flow +4. The Whole return is a fixed format which can be parsed in the following process + */ +@Log4j2 +public class LogRelatedIndexCheckTask extends AbstractIndexInsightTask { + + private String sampleDocString; + + private static final Map DEFAULT_RCA_RESULT = new HashMap<>(); + + static { + DEFAULT_RCA_RESULT.put("is_log_index", false); + DEFAULT_RCA_RESULT.put("log_message_field", null); + DEFAULT_RCA_RESULT.put("trace_id_field", null); + } + + private static final String RCA_TEMPLATE = + """ + I will provide you an index with the types and statistics of each field, and a few sample documents. + + Your task is to analyze the structure and semantics of this index, and determine whether it is suitable for Root Cause Analysis (RCA) on logs. + + Please help me answer the following 3 questions based on the provided information: + + 1. Is this index related to **log data**? + 2. Is there any **field that contains full log messages** (e.g., raw log lines or unstructured log content)? + 3. Is there any **field that can serve as a trace ID**, i.e., grouping multiple logs into the same logical execution or transaction flow? + + The index name is: + {indexName} + + Here are 3 sample documents from this index: + {samples} + + You should infer your answer **based on both field names, their data types, value examples, and overall context**. + Avoid simply repeating the input values. Think logically about what each field represents and how it might be used. + + Return your result in the **following strict JSON format** inside tags, so that it can be parsed later. Only include fields that you are confident about. + + + { + "is_log_index": true/false, + "log_message_field": "field_name" or null, + "trace_id_field": "field_name" or null + } + + + Rules: + - If you cannot confidently find a log message field or trace ID field, use `null`. + - Your judgment should be based on both semantics and field patterns (e.g., field names like "message", "log", "trace", "span", etc). + """; + + public LogRelatedIndexCheckTask(String sourceIndex, Client client, SdkClient sdkClient, String cmkRoleArn, String cmkAssumeRoleArn, RemoteMemoryStoreHelper remoteMemoryStoreHelper, + MemoryContainerHelper memoryContainerHelper) { + super(MLIndexInsightType.LOG_RELATED_INDEX_CHECK, sourceIndex, client, sdkClient, cmkRoleArn, cmkAssumeRoleArn, remoteMemoryStoreHelper, memoryContainerHelper); + } + + @Override + public void runTask(String tenantId, ActionListener listener) { + try { + collectSampleDocString(ActionListener.wrap(sampleDocs -> { + getAgentIdToRun( + client, + tenantId, + ActionListener.wrap(agentId -> performLogAnalysis(agentId, tenantId, listener), listener::onFailure) + ); + }, listener::onFailure)); + } catch (Exception e) { + handleError("Failed log related check for {}", e, tenantId, listener); + } + } + + @Override + public List getPrerequisites() { + return Collections.emptyList(); + } + + private void collectSampleDocString(ActionListener listener) { + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.size(3).query(new MatchAllQueryBuilder()); + SearchRequest searchRequest = new SearchRequest(new String[] { sourceIndex }, searchSourceBuilder); + + client.search(searchRequest, ActionListener.wrap(searchResponse -> { + try { + List> samples = Arrays + .stream(searchResponse.getHits().getHits()) + .map(SearchHit::getSourceAsMap) + .toList(); + sampleDocString = gson.toJson(samples); + log.info("Collected sample documents for index: {}", sourceIndex); + listener.onResponse(sampleDocString); + } catch (Exception e) { + log.error("Failed to process sample documents for index: {}", sourceIndex, e); + listener.onFailure(e); + } + }, e -> { + log.error("Failed to collect sample documents for index: {}", sourceIndex, e); + listener.onFailure(e); + })); + } + + private void performLogAnalysis(String agentId, String tenantId, ActionListener listener) { + String prompt = RCA_TEMPLATE.replace("{indexName}", sourceIndex).replace("{samples}", sampleDocString); + + callLLMWithAgent(client, agentId, prompt, sourceIndex, tenantId, ActionListener.wrap(response -> { + try { + Map parsed = parseCheckResponse(response); + saveResult(MAPPER.writeValueAsString(parsed), tenantId, ActionListener.wrap(insight -> { + log.info("Log related check completed for index {}", sourceIndex); + listener.onResponse(insight); + }, e -> handleError("Failed to save log related check result for index {}", e, tenantId, listener))); + } catch (Exception e) { + handleError("Error parsing response of log related check for {}", e, tenantId, listener); + } + }, e -> handleError("Failed to call LLM for log related check: {}", e, tenantId, listener))); + } + + private Map parseCheckResponse(String resp) { + try { + String json = resp.split("", 2)[1].split("", 2)[0].trim(); + return MAPPER.readValue(json, new TypeReference<>() { + }); + } catch (Exception e) { + log.warn("Failed to parse RCA analysis response, returning default values", e); + return DEFAULT_RCA_RESULT; + } + } + + @Override + public IndexInsightTask createPrerequisiteTask(MLIndexInsightType prerequisiteType) { + throw new IllegalStateException("LogRelatedIndexCheckTask has no prerequisites"); + } + +} diff --git a/plugin/src/main/java/org/opensearch/ml/indexInsight/StatisticalDataTask.java b/plugin/src/main/java/org/opensearch/ml/indexInsight/StatisticalDataTask.java new file mode 100644 index 0000000000..091cd0fdff --- /dev/null +++ b/plugin/src/main/java/org/opensearch/ml/indexInsight/StatisticalDataTask.java @@ -0,0 +1,414 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ml.indexInsight; + +import static org.opensearch.ml.common.utils.StringUtils.gson; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.core.action.ActionListener; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.ml.common.indexInsight.IndexInsight; +import org.opensearch.ml.common.indexInsight.IndexInsightTask; +import org.opensearch.ml.common.indexInsight.IndexInsightTaskStatus; +import org.opensearch.ml.common.indexInsight.MLIndexInsightType; +import org.opensearch.ml.common.utils.mergeMetaDataUtils.MergeRuleHelper; +import org.opensearch.ml.helper.MemoryContainerHelper; +import org.opensearch.ml.helper.RemoteMemoryStoreHelper; +import org.opensearch.remote.metadata.client.SdkClient; +import org.opensearch.search.SearchHit; +import org.opensearch.search.aggregations.Aggregation; +import org.opensearch.search.aggregations.AggregationBuilders; +import org.opensearch.search.aggregations.AggregatorFactories; +import org.opensearch.search.aggregations.bucket.filter.Filters; +import org.opensearch.search.aggregations.bucket.filter.FiltersAggregationBuilder; +import org.opensearch.search.aggregations.bucket.filter.FiltersAggregator.KeyedFilter; +import org.opensearch.search.aggregations.bucket.sampler.Sampler; +import org.opensearch.search.aggregations.bucket.sampler.SamplerAggregationBuilder; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder; +import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.opensearch.search.aggregations.metrics.MinAggregationBuilder; +import org.opensearch.search.aggregations.metrics.TopHits; +import org.opensearch.search.aggregations.metrics.TopHitsAggregationBuilder; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.search.sort.SortOrder; +import org.opensearch.transport.client.Client; + +import com.google.common.annotations.VisibleForTesting; + +import lombok.extern.log4j.Log4j2; + +/** + * Statistical Data Task: Collects sample documents from the target index for analysis. + * This task serves as the foundation for other index insight tasks by gathering sample data + * that provides context about the index structure and content. + * Will expand to support additional data types beyond sample documents in the future. + */ +@Log4j2 +public class StatisticalDataTask extends AbstractIndexInsightTask { + + private static final int TERM_SIZE = 5; + private static final List PREFIXES = List.of("unique_terms_", "unique_count_", "max_value_", "min_value_"); + private static final List UNIQUE_TERMS_LIST = List.of("text", "keyword", "integer", "long", "short"); + private static final List MIN_MAX_LIST = List.of("integer", "long", "float", "double", "short", "date"); + private static final Double HIGH_PRIORITY_COLUMN_THRESHOLD = 0.001; + private static final int SAMPLE_NUMBER = 100000; + private static final String PARSE_COLUMN_NAME_PATTERN = "(.*?)"; + private static final int FILTER_LLM_NUMBERS = 30; + public static final String NOT_NULL_KEYWORD = "not_null"; + public static final String IMPORTANT_COLUMN_KEYWORD = "important_column_and_distribution"; + public static final String EXAMPLE_DOC_KEYWORD = "example_docs"; + + private static final String UNIQUE_TERM_PREFIX = "unique_terms_"; + private static final String MAX_VALUE_PREFIX = "max_value_"; + private static final String MIN_VALUE_PREFIX = "min_value_"; + private static final String UNIQUE_COUNT_PREFIX = "unique_count_"; + + private static final String PROMPT_TEMPLATE = """ + Now I will give you the sample examples and some field's data distribution of one Opensearch index. + You should help me filter at most %s important columns. + For logs/trace/metric related indices, make sure you contain error/http response/time/latency/metric related columns. + You should contain your response column name inside tag + Here is the information of sample examples and some field's data distribution. + + IndexName: %s + detailed information: %s + """; + + public StatisticalDataTask(String sourceIndex, Client client, SdkClient sdkClient, String cmkRoleArn, String cmkAssumeRoleArn, RemoteMemoryStoreHelper remoteMemoryStoreHelper, + MemoryContainerHelper memoryContainerHelper) { + super(MLIndexInsightType.STATISTICAL_DATA, sourceIndex, client, sdkClient, cmkRoleArn, cmkAssumeRoleArn, remoteMemoryStoreHelper, memoryContainerHelper); + } + + @Override + public void runTask(String tenantId, ActionListener listener) { + runTask(tenantId, listener, true); + } + + public void runTask(String tenantId, ActionListener listener, boolean shouldStore) { + try { + collectStatisticalData(tenantId, shouldStore, listener); + } catch (Exception e) { + handleError("Failed to execute statistical data task for index {}", e, tenantId, listener, shouldStore); + } + } + + @Override + protected void handlePatternMatchedDoc(Map patternSource, String tenantId, ActionListener listener) { + // For StatisticalDataTask, run without storing when pattern matched + runTask(tenantId, listener, false); + } + + @Override + public List getPrerequisites() { + return Collections.emptyList(); + } + + private void collectStatisticalData(String tenantId, boolean shouldStore, ActionListener listener) { + GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(sourceIndex); + + client.admin().indices().getMappings(getMappingsRequest, ActionListener.wrap(getMappingsResponse -> { + Map mappings = getMappingsResponse.getMappings(); + if (mappings.isEmpty()) { + listener.onFailure(new IllegalArgumentException("No matching mapping with index name: " + sourceIndex)); + return; + } + + Map allFields = new HashMap<>(); + for (MappingMetadata mappingMetadata : mappings.values()) { + Map mappingSource = (Map) mappingMetadata.getSourceAsMap().get("properties"); + MergeRuleHelper.merge(mappingSource, allFields); + } + Map fieldsToType = new HashMap<>(); + extractFieldNamesTypes(allFields, fieldsToType, "", false); + SearchRequest searchRequest = new SearchRequest(sourceIndex); + searchRequest.source(buildQuery(fieldsToType)); + + client.search(searchRequest, ActionListener.wrap(searchResponse -> { + Set highPriorityColumns = filterColumns(fieldsToType, searchResponse); + Map parsedResult = parseSearchResult(fieldsToType, highPriorityColumns, searchResponse); + filterImportantColumnByLLM(parsedResult, tenantId, ActionListener.wrap(response -> { + Map filteredResponse = new HashMap<>(); + filteredResponse + .put( + EXAMPLE_DOC_KEYWORD, + filterSampleColumns((List>) parsedResult.get(EXAMPLE_DOC_KEYWORD), response) + ); + Map importantColumns = (Map) parsedResult.get(IMPORTANT_COLUMN_KEYWORD); + Map filteredImportantColumns = importantColumns + .entrySet() + .stream() + .filter(entry -> response.isEmpty() || response.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + filteredResponse.put(IMPORTANT_COLUMN_KEYWORD, filteredImportantColumns); + String statisticalContent = gson.toJson(filteredResponse); + + if (shouldStore) { + saveResult(statisticalContent, tenantId, listener); + } else { + // Return IndexInsight directly without storing + IndexInsight insight = IndexInsight + .builder() + .index(sourceIndex) + .taskType(taskType) + .content(statisticalContent) + .status(IndexInsightTaskStatus.COMPLETED) + .lastUpdatedTime(Instant.now()) + .build(); + listener.onResponse(insight); + } + }, listener::onFailure)); + }, e -> handleError("Failed to collect statistical data for index: {}", e, tenantId, listener, shouldStore))); + }, listener::onFailure)); + } + + @Override + public IndexInsightTask createPrerequisiteTask(MLIndexInsightType prerequisiteType) { + throw new IllegalStateException("StatisticalDataTask has no prerequisites"); + } + + public SearchSourceBuilder buildQuery(Map fields) { + AggregatorFactories.Builder subAggs = new AggregatorFactories.Builder(); + + for (Map.Entry field : fields.entrySet()) { + String name = field.getKey(); + String type = field.getValue(); + String fieldUsed = name; + + if ("text".equals(type)) { + fieldUsed = name + ".keyword"; + } + + if (UNIQUE_TERMS_LIST.contains(type)) { + TermsAggregationBuilder termsAgg = AggregationBuilders.terms(UNIQUE_TERM_PREFIX + name).field(fieldUsed).size(TERM_SIZE); + + CardinalityAggregationBuilder countAgg = AggregationBuilders.cardinality(UNIQUE_COUNT_PREFIX + name).field(fieldUsed); + + subAggs.addAggregator(termsAgg); + subAggs.addAggregator(countAgg); + } + if (MIN_MAX_LIST.contains(type)) { + MinAggregationBuilder minAgg = AggregationBuilders.min(MIN_VALUE_PREFIX + name).field(fieldUsed); + MaxAggregationBuilder maxAgg = AggregationBuilders.max(MAX_VALUE_PREFIX + name).field(fieldUsed); + + subAggs.addAggregator(minAgg); + subAggs.addAggregator(maxAgg); + } + } + + // Add top hits example_docs + TopHitsAggregationBuilder topHitsAgg = AggregationBuilders.topHits(EXAMPLE_DOC_KEYWORD).size(3); + subAggs.addAggregator(topHitsAgg); + + // Add not none count + List keyedFilters = new ArrayList<>(); + for (String fieldName : fields.keySet()) { + keyedFilters.add(new KeyedFilter(fieldName + "_" + NOT_NULL_KEYWORD, QueryBuilders.existsQuery(fieldName))); + } + FiltersAggregationBuilder nonNullAgg = AggregationBuilders.filters(NOT_NULL_KEYWORD, keyedFilters.toArray(new KeyedFilter[0])); + subAggs.addAggregator(nonNullAgg); + + // Wrap everything in a Sampler aggregation + SamplerAggregationBuilder samplerAgg = AggregationBuilders.sampler("sample").shardSize(SAMPLE_NUMBER).subAggregations(subAggs); + + // Build search source + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder() + .query(QueryBuilders.matchAllQuery()) + .sort("_doc", SortOrder.DESC) + .size(0) + .aggregation(samplerAgg); + + return sourceBuilder; + } + + private void filterImportantColumnByLLM(Map parsedResult, String tenantId, ActionListener> listener) { + Map importantColumns = (Map) parsedResult.get(IMPORTANT_COLUMN_KEYWORD); + if (importantColumns.size() <= FILTER_LLM_NUMBERS) { + listener.onResponse(new ArrayList<>()); // Too few columns and don't need to filter + return; + } + String prompt = generateFilterColumnPrompt(parsedResult); + getAgentIdToRun(client, tenantId, ActionListener.wrap(agentId -> { + callLLMWithAgent(client, agentId, prompt, sourceIndex, tenantId, ActionListener.wrap(response -> { + listener.onResponse(parseLLMFilteredResult(response)); + }, e -> { listener.onResponse(new ArrayList<>()); })); + }, e -> { listener.onResponse(new ArrayList<>()); })); + } + + private String generateFilterColumnPrompt(Map parsedResult) { + return String.format(PROMPT_TEMPLATE, FILTER_LLM_NUMBERS, sourceIndex, gson.toJson(parsedResult)); + } + + @VisibleForTesting + List> filterSampleColumns(List> originalDocs, List targetColumns) { + if (targetColumns.isEmpty()) { + return originalDocs; + } + List> results = new ArrayList<>(); + for (Map originalDoc : originalDocs) { + results.add(constructFilterMap("", originalDoc, targetColumns)); + } + return results; + } + + private Map constructFilterMap(String prefix, Map currentNode, List targetColumns) { + Map filterResult = new HashMap<>(); + for (Map.Entry entry : currentNode.entrySet()) { + String currentKey = prefix.isEmpty() ? entry.getKey() : prefix + "." + entry.getKey(); + Object currentValue = entry.getValue(); + if (targetColumns.contains(currentKey)) { + filterResult.put(entry.getKey(), currentValue); + } else if (currentValue instanceof Map) { + Map tmpNode = constructFilterMap(currentKey, (Map) currentValue, targetColumns); + if (!tmpNode.isEmpty()) { + filterResult.put(entry.getKey(), tmpNode); + } + } else if (currentValue instanceof List) { + List list = (List) currentValue; + if (!list.isEmpty() && list.get(0) instanceof Map) { + List> newList = new ArrayList<>(); + for (Object item : list) { + Map tmpNode = constructFilterMap(currentKey, (Map) item, targetColumns); + if (!tmpNode.isEmpty()) { + newList.add(tmpNode); + } + } + if (!newList.isEmpty()) { + filterResult.put(entry.getKey(), newList); + } + } + } + } + return filterResult; + } + + private List parseLLMFilteredResult(String LLMResponse) { + try { + Pattern pattern = Pattern.compile(PARSE_COLUMN_NAME_PATTERN); + Matcher matcher = pattern.matcher(LLMResponse); + List columns = new ArrayList<>(); + while (matcher.find()) { + columns.add(matcher.group(1).trim()); + } + return columns; + } catch (Exception e) { + throw new IllegalArgumentException("fail to parse LLM response"); + } + } + + private Map parseSearchResult( + Map allFieldsToType, + Set filteredNames, + SearchResponse searchResponse + ) { + Map aggregationMap = ((Sampler) searchResponse.getAggregations().getAsMap().get("sample")) + .getAggregations() + .getAsMap(); + Map result = new LinkedHashMap<>(); + Map finalResult = new LinkedHashMap<>(); + List exampleDocs = null; + for (Map.Entry entry : aggregationMap.entrySet()) { + String key = entry.getKey(); + Aggregation aggregation = entry.getValue(); + if (key.equals(EXAMPLE_DOC_KEYWORD)) { + SearchHit[] hits = ((TopHits) aggregation).getHits().getHits(); + exampleDocs = new ArrayList<>(hits.length); + for (SearchHit hit : hits) { + exampleDocs.add(hit.getSourceAsMap()); + } + } else { + for (String prefix : PREFIXES) { + if (key.startsWith(prefix)) { + String targetField = key.substring(prefix.length()); + if (!filteredNames.contains(targetField)) { + continue; + } + + String aggregationType = key.substring(0, prefix.length() - 1); + + Map aggregationResult = gson.fromJson(aggregation.toString(), Map.class); + Object targetValue; + try { + if (prefix.equals(UNIQUE_TERM_PREFIX)) { + // assuming result.get(key) is a Map containing "buckets" -> List> + Map aggResult = (Map) aggregationResult.get(key); + List> buckets = aggResult != null + ? (List>) aggResult.get("buckets") + : null; + if (buckets == null) { + continue; + } + + targetValue = buckets.stream().filter(bucket -> bucket != null).map(bucket -> bucket.get("key")).toList(); + } else { + Map aggResult = (Map) aggregationResult.get(key); + if (aggResult.containsKey("value_as_string")) { + targetValue = aggResult.get("value_as_string"); + } else { + targetValue = aggResult.get("value"); + } + } + result.computeIfAbsent(targetField, k -> new HashMap<>(Map.of("type", allFieldsToType.get(targetField)))); + ((Map) result.get(targetField)).put(aggregationType, targetValue); + break; + } catch (Exception e) { + log + .error( + "Failed to parse aggregation result from DSL in statistical index insight for index name: {}", + sourceIndex, + e + ); + } + } + } + } + } + if (exampleDocs != null) { + finalResult.put(EXAMPLE_DOC_KEYWORD, exampleDocs); + } + finalResult.put(IMPORTANT_COLUMN_KEYWORD, result); + return finalResult; + } + + private Set filterColumns(Map allFieldsToType, SearchResponse searchResponse) { + Sampler sampleAggregation = (Sampler) searchResponse.getAggregations().getAsMap().get("sample"); + Map aggregationMap = sampleAggregation.getAggregations().getAsMap(); + long totalDocCount = sampleAggregation.getDocCount(); + Set filteredNames = new HashSet<>(); + Filters aggregation; + try { + aggregation = (Filters) aggregationMap.get(NOT_NULL_KEYWORD); + } catch (Exception e) { + return filteredNames; + } + for (Filters.Bucket bucket : aggregation.getBuckets()) { + String targetField = bucket.getKey().toString(); + targetField = targetField.substring(0, targetField.length() - 1 - NOT_NULL_KEYWORD.length()); + long docCount = bucket.getDocCount(); + if (docCount > HIGH_PRIORITY_COLUMN_THRESHOLD * totalDocCount && allFieldsToType.containsKey(targetField)) { + filteredNames.add(targetField); + } + } + return filteredNames; + } + +}