Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class CommonValue {
// warm node
public static String WARM_BOX_TYPE = "warm";
public static final String ML_INDEX_INSIGHT_CONFIG_INDEX = ".plugins-ml-index-insight-config";
public static final String ML_INDEX_INSIGHT_STORAGE_INDEX = ".plugins-ml-index-insight-storage";
public static final String ML_INDEX_INSIGHT_STORAGE_INDEX = "plugins-ml-index-insight-storage";

public static final String ML_MODEL_GROUP_INDEX = ".plugins-ml-model-group";
public static final String ML_MODEL_INDEX = ".plugins-ml-model";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.ml.common.indexInsight.MLIndexInsightType;

import lombok.Builder;
import lombok.Getter;
import org.opensearch.ml.common.indexInsight.MLIndexInsightType;

@Builder
@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import java.util.Map;
import java.util.Set;

import javax.swing.*;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@

package org.opensearch.ml.action.IndexInsight;

import static org.opensearch.ml.common.CommonValue.ML_INDEX_INSIGHT_STORAGE_INDEX;
import static org.opensearch.ml.common.CommonValue.ML_INDEX_INSIGHT_STORAGE_INDEX_MAPPING_PATH;
import static org.opensearch.ml.common.indexInsight.MLIndexInsightType.FIELD_DESCRIPTION;
import static org.opensearch.ml.common.memorycontainer.MemoryContainerConstants.MEMORY_CONTAINER_ID_FIELD;
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_INDEX_INSIGHT_FEATURE_ENABLED;
import static org.opensearch.ml.common.indexInsight.MLIndexInsightType.LOG_RELATED_INDEX_CHECK;
import static org.opensearch.ml.common.indexInsight.MLIndexInsightType.STATISTICAL_DATA;

import java.time.Instant;

Expand All @@ -15,20 +21,22 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.ml.common.MLIndex;
import org.opensearch.ml.common.indexInsight.FieldDescriptionTask;
import org.opensearch.ml.common.indexInsight.IndexInsight;
import org.opensearch.ml.common.indexInsight.IndexInsightAccessControllerHelper;
import org.opensearch.ml.common.indexInsight.IndexInsightTask;
import org.opensearch.ml.common.indexInsight.IndexInsightTaskStatus;
import org.opensearch.ml.common.indexInsight.LogRelatedIndexCheckTask;
import org.opensearch.ml.common.indexInsight.MLIndexInsightType;
import org.opensearch.ml.common.indexInsight.StatisticalDataTask;
import org.opensearch.ml.common.memorycontainer.RemoteStore;
import org.opensearch.ml.common.settings.MLFeatureEnabledSetting;
import org.opensearch.ml.common.transport.indexInsight.MLIndexInsightGetAction;
import org.opensearch.ml.common.transport.indexInsight.MLIndexInsightGetRequest;
import org.opensearch.ml.common.transport.indexInsight.MLIndexInsightGetResponse;
import org.opensearch.ml.engine.indices.MLIndicesHandler;
import org.opensearch.ml.helper.MemoryContainerHelper;
import org.opensearch.ml.helper.RemoteMemoryStoreHelper;
import org.opensearch.ml.indexInsight.FieldDescriptionTask;
import org.opensearch.ml.indexInsight.IndexInsightAccessControllerHelper;
import org.opensearch.ml.common.indexInsight.IndexInsightTask;
import org.opensearch.ml.indexInsight.LogRelatedIndexCheckTask;
import org.opensearch.ml.indexInsight.StatisticalDataTask;
import org.opensearch.ml.utils.TenantAwareHelper;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.tasks.Task;
Expand All @@ -41,14 +49,16 @@
public class GetIndexInsightTransportAction extends HandledTransportAction<ActionRequest, MLIndexInsightGetResponse> {
private static final MLIndexInsightType[] ALL_TYPE_ORDER = {
MLIndexInsightType.STATISTICAL_DATA,
MLIndexInsightType.FIELD_DESCRIPTION,
FIELD_DESCRIPTION,
MLIndexInsightType.LOG_RELATED_INDEX_CHECK };

private final Client client;
private final SdkClient sdkClient;
private final NamedXContentRegistry xContentRegistry;
private final MLFeatureEnabledSetting mlFeatureEnabledSetting;
private final MLIndicesHandler mlIndicesHandler;
private final RemoteMemoryStoreHelper remoteMemoryStoreHelper;
private final MemoryContainerHelper memoryContainerHelper;

@Inject
public GetIndexInsightTransportAction(
Expand All @@ -58,14 +68,18 @@ public GetIndexInsightTransportAction(
MLFeatureEnabledSetting mlFeatureEnabledSetting,
Client client,
SdkClient sdkClient,
MLIndicesHandler mlIndicesHandler
MLIndicesHandler mlIndicesHandler,
RemoteMemoryStoreHelper remoteMemoryStoreHelper,
MemoryContainerHelper memoryContainerHelper
) {
super(MLIndexInsightGetAction.NAME, transportService, actionFilters, MLIndexInsightGetRequest::new);
this.client = client;
this.xContentRegistry = xContentRegistry;
this.sdkClient = sdkClient;
this.mlFeatureEnabledSetting = mlFeatureEnabledSetting;
this.mlIndicesHandler = mlIndicesHandler;
this.remoteMemoryStoreHelper = remoteMemoryStoreHelper;
this.memoryContainerHelper = memoryContainerHelper;
}

@Override
Expand All @@ -85,14 +99,24 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<MLInde
return;
}
String indexName = mlIndexInsightGetRequest.getIndexName();
mlIndicesHandler.initMLIndexIfAbsent(MLIndex.INDEX_INSIGHT_STORAGE, ActionListener.wrap(r2 -> {
ActionListener<Boolean> actionAfterDryRun = ActionListener.wrap(r -> {
executeTaskAndReturn(mlIndexInsightGetRequest, mlIndexInsightGetRequest.getTenantId(), actionListener);
}, actionListener::onFailure);
IndexInsightAccessControllerHelper.verifyAccessController(client, actionAfterDryRun, indexName);
String memoryContainerId = client.threadPool().getThreadContext().getHeader(MEMORY_CONTAINER_ID_FIELD);
String indexMappings = mlIndicesHandler.getMapping(ML_INDEX_INSIGHT_STORAGE_INDEX_MAPPING_PATH);
memoryContainerHelper.getMemoryContainer(memoryContainerId, ActionListener.wrap(mlMemoryContainer -> {
RemoteStore remoteStore = mlMemoryContainer.getConfiguration().getRemoteStore();
if (remoteStore.getConnectorId() != null) {
remoteMemoryStoreHelper.createRemoteIndex(remoteStore.getConnectorId(), ML_INDEX_INSIGHT_STORAGE_INDEX, indexMappings, ActionListener.<Boolean>wrap(r2 -> {
ActionListener<Boolean> actionAfterDryRun = ActionListener.wrap(r -> {
executeTaskAndReturn(mlIndexInsightGetRequest, mlIndexInsightGetRequest.getTenantId(), actionListener);
}, actionListener::onFailure);
IndexInsightAccessControllerHelper.verifyAccessController(client, actionAfterDryRun, indexName);
}, e -> {
log.error("Failed to create index insight storage", e);
actionListener.onFailure(e);
}));
}

}, e -> {
log.error("Failed to create index insight storage", e);
actionListener.onFailure(e);
log.error("Failed to retrieve memory container", e);
}));
}

Expand Down Expand Up @@ -192,23 +216,29 @@ IndexInsightTask createTask(MLIndexInsightGetRequest request) {
client,
sdkClient,
request.getCmkRoleArn(),
request.getAssumeRoleArn()
request.getAssumeRoleArn(),
remoteMemoryStoreHelper,
memoryContainerHelper
);
case FIELD_DESCRIPTION:
return new FieldDescriptionTask(
request.getIndexName(),
client,
sdkClient,
request.getCmkRoleArn(),
request.getAssumeRoleArn()
request.getAssumeRoleArn(),
remoteMemoryStoreHelper,
memoryContainerHelper
);
case LOG_RELATED_INDEX_CHECK:
return new LogRelatedIndexCheckTask(
request.getIndexName(),
client,
sdkClient,
request.getCmkRoleArn(),
request.getAssumeRoleArn()
request.getAssumeRoleArn(),
remoteMemoryStoreHelper,
memoryContainerHelper
);
default:
throw new IllegalArgumentException("Unsupported task type: " + request.getTargetIndexInsight());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
Expand Down Expand Up @@ -84,6 +85,7 @@ public class RemoteMemoryStoreHelper {
public static final String CREATE_INGEST_PIPELINE_ACTION = "create_ingest_pipeline";
public static final String CREATE_INDEX_ACTION = "create_index";
public static final String WRITE_DOC_ACTION = "write_doc";
public static final String WRITE_DOC_ACTION_WITH_ID = "write_doc_withID";
public static final String BULK_LOAD_ACTION = "bulk_load";
public static final String SEARCH_INDEX_ACTION = "search_index";
public static final String GET_DOC_ACTION = "get_doc";
Expand Down Expand Up @@ -471,6 +473,21 @@ public void writeDocument(
}
}

public void writeDocumentWithDocID(
RemoteStore remoteStore,
String docId,
String indexName,
Map<String, Object> documentSource,
ActionListener<IndexResponse> listener
) {
// If connectorId is provided, use the existing method
if (remoteStore.getConnector() != null) {
writeDocument(remoteStore.getConnector(), indexName, documentSource, listener, docId);
} else {
listener.onFailure(new IllegalArgumentException("RemoteStore must have either connectorId or internal connector configured"));
}
}

public void writeDocument(
String connectorId,
String indexName,
Expand Down Expand Up @@ -529,6 +546,38 @@ public void writeDocument(
}
}

public void writeDocument(
Connector connector,
String indexName,
Map<String, Object> documentSource,
ActionListener<IndexResponse> listener,
String docId
) {
try {
// Prepare parameters for connector execution
Map<String, String> parameters = new HashMap<>();
parameters.put(INDEX_NAME_PARAM, indexName);
parameters.put(DOC_ID_PARAM, docId);
parameters.put(INPUT_PARAM, StringUtils.toJsonWithPlainNumbers(documentSource));

// Execute the connector action with write_doc action name
executeConnectorAction(connector, WRITE_DOC_ACTION_WITH_ID, parameters, ActionListener.wrap(response -> {
// Extract document ID from response
XContentParser parser = createParserFromTensorOutput(response);
IndexResponse indexResponse = IndexResponse.fromXContent(parser);
listener.onResponse(indexResponse);
}, e -> {
log.error("Failed to write document to remote index: {}", indexName, e);
listener.onFailure(e);
}));

} catch (Exception e) {
log.error("Error preparing remote document write for index: {}", indexName, e);
listener.onFailure(e);
}
}


/**
* Performs bulk write operations to remote storage using RemoteStore configuration
*/
Expand Down
Loading
Loading