From 4686c8e3a6ca47bb114458efb80ca23fc7cb6a6b Mon Sep 17 00:00:00 2001 From: xs3508198 <1131259489@qq.com> Date: Tue, 11 Feb 2025 10:54:25 +0800 Subject: [PATCH 1/2] feat: add mcp es calling functionality --- jcommon/mcp/mcp-elasticsearch/pom.xml | 160 +++++++++++++ .../ElasticsearchMcpApplication.java | 15 ++ .../config/McpStdioTransportConfig.java | 17 ++ .../function/ElasticsearchFunction.java | 212 ++++++++++++++++++ .../server/ElasticsearchMcpServer.java | 47 ++++ .../src/main/resources/application.properties | 2 + .../ElasticsearchMcpApplicationTests.java | 130 +++++++++++ 7 files changed, 583 insertions(+) create mode 100644 jcommon/mcp/mcp-elasticsearch/pom.xml create mode 100644 jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/ElasticsearchMcpApplication.java create mode 100644 jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/config/McpStdioTransportConfig.java create mode 100644 jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/function/ElasticsearchFunction.java create mode 100644 jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/server/ElasticsearchMcpServer.java create mode 100644 jcommon/mcp/mcp-elasticsearch/src/main/resources/application.properties create mode 100644 jcommon/mcp/mcp-elasticsearch/src/test/java/run/mone/mcp/elasticsearch/ElasticsearchMcpApplicationTests.java diff --git a/jcommon/mcp/mcp-elasticsearch/pom.xml b/jcommon/mcp/mcp-elasticsearch/pom.xml new file mode 100644 index 0000000000..5d21fa306d --- /dev/null +++ b/jcommon/mcp/mcp-elasticsearch/pom.xml @@ -0,0 +1,160 @@ + + + 4.0.0 + + run.mone + mcp + 1.6.1-jdk21-SNAPSHOT + + + run.mone.mcp.es + mcp-elasticsearch + 0.0.1-SNAPSHOT + mcp-es + mcp-es + + + + + + + + + + + + + + + 21 + + + + + + org.springframework + spring-core + 6.2.1 + + + + org.springframework + spring-context + 6.2.1 + + + + org.springframework + spring-webmvc + 6.2.1 + + + + run.mone + hive + 1.6.1-jdk21-SNAPSHOT + + + + com.fasterxml.jackson.core + jackson-databind + 2.18.2 + + + + com.fasterxml.jackson.core + jackson-core + 2.18.2 + + + + com.google.code.gson + gson + 2.11.0 + + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + 7.17.21 + + + + io.projectreactor + reactor-core + 3.7.0 + + + + org.springframework.boot + spring-boot-starter-web + 3.4.1 + + + + ch.qos.logback + logback-classic + ${logback.version} + + + + ch.qos.logback + logback-core + ${logback.version} + + + + org.apache.logging.log4j + log4j-api + 2.17.1 + + + org.apache.logging.log4j + log4j-core + 2.17.1 + + + + + + + + + + maven-compiler-plugin + 3.11.0 + + 21 + 21 + true + UTF-8 + + ${project.basedir}/src/main/java + + + + + + org.springframework.boot + spring-boot-maven-plugin + 2.7.14 + + run.mone.mcp.elasticsearch.ElasticsearchMcpApplication + app + + + + + repackage + + + + + + + + + + + diff --git a/jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/ElasticsearchMcpApplication.java b/jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/ElasticsearchMcpApplication.java new file mode 100644 index 0000000000..f270822b10 --- /dev/null +++ b/jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/ElasticsearchMcpApplication.java @@ -0,0 +1,15 @@ +package run.mone.mcp.elasticsearch; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; + +@SpringBootApplication +@ComponentScan("run.mone.mcp.elasticsearch") +public class ElasticsearchMcpApplication { + + public static void main(String[] args) { + SpringApplication.run(ElasticsearchMcpApplication.class, args); + } + +} diff --git a/jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/config/McpStdioTransportConfig.java b/jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/config/McpStdioTransportConfig.java new file mode 100644 index 0000000000..f88c6c15df --- /dev/null +++ b/jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/config/McpStdioTransportConfig.java @@ -0,0 +1,17 @@ +package run.mone.mcp.elasticsearch.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import run.mone.hive.mcp.server.transport.StdioServerTransport; + +@Configuration +@ConditionalOnProperty(name = "stdio.enabled", havingValue = "true") +public class McpStdioTransportConfig { + + @Bean + StdioServerTransport stdioServerTransport(ObjectMapper mapper){ + return new StdioServerTransport(mapper); + } +} diff --git a/jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/function/ElasticsearchFunction.java b/jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/function/ElasticsearchFunction.java new file mode 100644 index 0000000000..edd4036d36 --- /dev/null +++ b/jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/function/ElasticsearchFunction.java @@ -0,0 +1,212 @@ +package run.mone.mcp.elasticsearch.function; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpHost; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xcontent.XContentType; +import run.mone.hive.mcp.spec.McpSchema; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +@Data +@Slf4j +public class ElasticsearchFunction implements Function, McpSchema.CallToolResult> { + + private String name = "elasticsearchOperation"; + + private String desc = "Elasticsearch operations including index management, document CRUD, and search queries"; + + private String toolScheme = """ + { + "type": "object", + "properties": { + "operation": { + "type": "string", + "enum": ["index", "get", "search", "update", "delete", "createIndex", "deleteIndex", "existsIndex"], + "description": "The operation to perform on Elasticsearch" + }, + "index": { + "type": "string", + "description": "The index name to operate on" + }, + "id": { + "type": "string", + "description": "Document ID" + }, + "document": { + "type": "object", + "description": "Document content for indexing/updating" + }, + "query": { + "type": "object", + "description": "Search query DSL" + }, + "mappings": { + "type": "object", + "description": "Index mappings definition" + } + }, + "required": ["operation", "index"] + } + """; + + private RestHighLevelClient client; + + public ElasticsearchFunction(){ + this.client = new RestHighLevelClient( + RestClient.builder(new HttpHost("localhost", 9200, "http")) + ); + } + + @Override + public McpSchema.CallToolResult apply(Map args) { + String operation = (String) args.get("operation"); + String index = (String) args.get("index"); + + try{ + String res = switch (operation){ + case "index" -> handleIndexRequest(args); + case "get" -> handleGetRequest(args); + case "search" -> handleSearchRequest(args); + case "update" -> handleUpdateRequest(args); + case "delete" -> handleDeleteRequest(args); + case "createIndex" -> handleCreateIndex(args); + case "deleteIndex" -> handleDeleteIndex(index); + case "existsIndex" -> handleExistsIndex(index); + default -> "no this operation"; + }; + return new McpSchema.CallToolResult(List.of(new McpSchema.TextContent(res)), false); + } catch (IOException e) { + log.error("Error performing Elasticsearch operation: ", e); + return new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("Error: " + e.getMessage())), true); + }finally{ + closeClient(); + } + } + + private void closeClient() { + if (client != null) { + try { + client.close(); + } catch (IOException e) { + log.error("Failed to close Elasticsearch client: ", e); + } + } + } + + private String handleIndexRequest(Map args) throws IOException { + IndexRequest request = new IndexRequest((String) args.get("index")); + if(args.containsKey("id")){ + request.id((String) args.get("id")); + } + request.source((Map) args.get("document"), XContentType.JSON); + IndexResponse response = client.index(request, RequestOptions.DEFAULT); + return response.toString(); + } + + private String handleGetRequest(Map args) throws IOException { + GetRequest request = new GetRequest((String) args.get("index"), (String) args.get("id")); + GetResponse response = client.get(request, RequestOptions.DEFAULT); + return response.getSourceAsString(); + } + + private String handleSearchRequest(Map args) throws IOException { + SearchRequest request = new SearchRequest((String) args.get("index")); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + + //构建查询DSL + if(args.containsKey("query")){ + QueryBuilder queryBuilder = buildQueryBuilder((Map) args.get("query")); + sourceBuilder.query(queryBuilder); + } + request.source(sourceBuilder); + SearchResponse response = client.search(request, RequestOptions.DEFAULT); + return processSearchResults(response); + } + + private QueryBuilder buildQueryBuilder(Map query) { + if (query.containsKey("match_all")){ + return QueryBuilders.matchAllQuery(); + }else if (query.containsKey("field") && query.containsKey("value")){ + String field = (String) query.get("field"); + String value = (String) query.get("value"); + return QueryBuilders.matchQuery(field, value); + }else { + throw new IllegalArgumentException("Unsupported query format. Expected 'match_all' or 'field' and 'value'."); + } + + } + + private String processSearchResults(SearchResponse response) { + StringBuilder sb = new StringBuilder(); + for (SearchHit hit : response.getHits().getHits()) { + sb.append("ID: ").append(hit.getId()) + .append(" Score: ").append(hit.getScore()) + .append(" Source: ").append(hit.getSourceAsString()) + .append("\n"); + } + return sb.toString(); + } + + private String handleUpdateRequest(Map args) throws IOException { + UpdateRequest request = new UpdateRequest((String) args.get("index"), (String) args.get("id") + ).doc((Map) args.get("document"), XContentType.JSON); + + UpdateResponse response = client.update(request, RequestOptions.DEFAULT); + return response.toString(); + } + + private String handleDeleteRequest(Map args) throws IOException { + DeleteRequest request = new DeleteRequest((String) args.get("index"), (String) args.get("id")); + DeleteResponse response = client.delete(request, RequestOptions.DEFAULT); + return response.toString(); + } + + private String handleCreateIndex(Map args) throws IOException { + CreateIndexRequest request = new CreateIndexRequest((String) args.get("index")); + if (args.containsKey("mappings")) { + request.mapping((Map) args.get("mappings")); + } + client.indices().create(request, RequestOptions.DEFAULT); + return "Index created successfully"; + } + + private String handleDeleteIndex(String index) throws IOException { + DeleteIndexRequest request = new DeleteIndexRequest(index); + client.indices().delete(request, RequestOptions.DEFAULT); + return "Index deleted successfully"; + } + + private String handleExistsIndex(String index) throws IOException { + boolean exists = client.indices().exists( + new GetIndexRequest(index), RequestOptions.DEFAULT + ); + return "Index exists: " + exists; + } + + + +} diff --git a/jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/server/ElasticsearchMcpServer.java b/jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/server/ElasticsearchMcpServer.java new file mode 100644 index 0000000000..af8b084844 --- /dev/null +++ b/jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/server/ElasticsearchMcpServer.java @@ -0,0 +1,47 @@ +package run.mone.mcp.elasticsearch.server; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import org.springframework.stereotype.Component; +import run.mone.hive.mcp.server.McpServer; +import run.mone.hive.mcp.server.McpServer.ToolRegistration; +import run.mone.hive.mcp.server.McpSyncServer; +import run.mone.hive.mcp.spec.McpSchema; +import run.mone.hive.mcp.spec.ServerMcpTransport; +import run.mone.hive.mcp.spec.McpSchema.Tool; +import run.mone.mcp.elasticsearch.function.ElasticsearchFunction; + +@Component +public class ElasticsearchMcpServer { + private ServerMcpTransport transport; + private McpSyncServer syncServer; + + public ElasticsearchMcpServer(ServerMcpTransport transport) { + this.transport = transport; + } + public McpSyncServer start() { + McpSyncServer syncServer = McpServer.using(transport) + .serverInfo("elasticsearch_mcp", "1.0.0") + .capabilities(McpSchema.ServerCapabilities.builder() + .tools(true) + .logging() + .build()) + .sync(); + ElasticsearchFunction function = new ElasticsearchFunction(); + var toolRegistration = new ToolRegistration(new Tool(function.getName(), function.getDesc(), function.getToolScheme()), function); + syncServer.addTool(toolRegistration); + return syncServer; + } + + @PostConstruct + public void init() { + this.syncServer = start(); + } + + @PreDestroy + public void stop() { + if (this.syncServer != null) { + this.syncServer.closeGracefully(); + } + } +} diff --git a/jcommon/mcp/mcp-elasticsearch/src/main/resources/application.properties b/jcommon/mcp/mcp-elasticsearch/src/main/resources/application.properties new file mode 100644 index 0000000000..06fca475bb --- /dev/null +++ b/jcommon/mcp/mcp-elasticsearch/src/main/resources/application.properties @@ -0,0 +1,2 @@ +stdio.enabled=true +spring.main.web-application-type=none diff --git a/jcommon/mcp/mcp-elasticsearch/src/test/java/run/mone/mcp/elasticsearch/ElasticsearchMcpApplicationTests.java b/jcommon/mcp/mcp-elasticsearch/src/test/java/run/mone/mcp/elasticsearch/ElasticsearchMcpApplicationTests.java new file mode 100644 index 0000000000..beb82875e1 --- /dev/null +++ b/jcommon/mcp/mcp-elasticsearch/src/test/java/run/mone/mcp/elasticsearch/ElasticsearchMcpApplicationTests.java @@ -0,0 +1,130 @@ +package run.mone.mcp.elasticsearch; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import run.mone.hive.mcp.spec.McpSchema; +import run.mone.mcp.elasticsearch.function.ElasticsearchFunction; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +@SpringBootTest +class ElasticsearchMcpApplicationTests { + + @Test + void contextLoads() { + } + + @Test + void test1() throws IOException { + RestHighLevelClient client = new RestHighLevelClient( + RestClient.builder(new HttpHost("localhost", 9200, "http")) + ); + Map args = new HashMap<>(); + args.put("index", "users"); + args.put("id", "1"); + args.put("operation", "get"); + GetRequest request = new GetRequest((String) args.get("index"), (String) args.get("id")); + GetResponse response = client.get(request, RequestOptions.DEFAULT); + System.out.println("+++++"); + System.out.println(response.getSourceAsString()); + } + + //查询指定id与index的数据 + @Test + void test2() { + ElasticsearchFunction elasticsearchFunction = new ElasticsearchFunction(); + Map args = new HashMap<>(); + args.put("operation", "get"); + args.put("index", "users"); + args.put("id", "1"); + McpSchema.CallToolResult apply = elasticsearchFunction.apply(args); + apply.content().forEach(System.out::println); + } + + //查询所有数据 + @Test + void test3() { + ElasticsearchFunction elasticsearchFunction = new ElasticsearchFunction(); + Map args = new HashMap<>(); + args.put("operation", "search"); // 指定操作为 search + args.put("index", "users222"); // + args.put("query", Collections.singletonMap("match_all", Collections.emptyMap())); + McpSchema.CallToolResult apply = elasticsearchFunction.apply(args); + apply.content().forEach(System.out::println); + } + + //插入数据 + @Test + void test4() { + ElasticsearchFunction elasticsearchFunction = new ElasticsearchFunction(); + Map args = new HashMap<>(); + args.put("operation", "index"); + args.put("index", "users222"); + args.put("id", "5"); + Map document = new HashMap<>(); + document.put("name", "John Doe"); + document.put("age", "30"); + document.put("city", "New York"); + args.put("document", document); + McpSchema.CallToolResult apply = elasticsearchFunction.apply(args); + apply.content().forEach(System.out::println); + } + + + @Test + void test5() { + ElasticsearchFunction function = new ElasticsearchFunction(); + Map args = new HashMap<>(); + args.put("operation", "update"); + args.put("index", "users"); + args.put("id", "1"); + args.put("document", Map.of("age", 35)); + McpSchema.CallToolResult apply = function.apply(args); + apply.content().forEach(System.out::println); + + } + + @Test + void test6() { + ElasticsearchFunction function = new ElasticsearchFunction(); + + Map args = new HashMap<>(); + args.put("operation", "delete"); + args.put("index", "users"); + args.put("id", "4"); + McpSchema.CallToolResult apply = function.apply(args); + apply.content().forEach(System.out::println); + } + + @Test + void test7() { + ElasticsearchFunction function = new ElasticsearchFunction(); + Map args = new HashMap<>(); + args.put("operation", "createIndex"); + args.put("index", "new_users"); + args.put("mappings", Map.of( + "properties", Map.of( + "name", Map.of("type", "text"), + "age", Map.of("type", "integer") + ) + )); + McpSchema.CallToolResult apply = function.apply(args); + apply.content().forEach(System.out::println); + } + + +} + + + + + From 37fbdc7361130a55f78163fb77fb03159bcdeb9b Mon Sep 17 00:00:00 2001 From: xs3508198 <1131259489@qq.com> Date: Tue, 11 Feb 2025 14:36:59 +0800 Subject: [PATCH 2/2] feat: add mcp es calling functionality --- .../mcp/elasticsearch/function/ElasticsearchFunction.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/function/ElasticsearchFunction.java b/jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/function/ElasticsearchFunction.java index edd4036d36..50c1e1d518 100644 --- a/jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/function/ElasticsearchFunction.java +++ b/jcommon/mcp/mcp-elasticsearch/src/main/java/run/mone/mcp/elasticsearch/function/ElasticsearchFunction.java @@ -76,8 +76,11 @@ public class ElasticsearchFunction implements Function, McpS private RestHighLevelClient client; public ElasticsearchFunction(){ + String hostname = System.getenv().getOrDefault("HOSTNAME", "localhost"); + int port = Integer.parseInt(System.getenv().getOrDefault("PORT", "9200")); + String scheme = System.getenv().getOrDefault("SCHEME", "http"); this.client = new RestHighLevelClient( - RestClient.builder(new HttpHost("localhost", 9200, "http")) + RestClient.builder(new HttpHost(hostname, port, scheme)) ); }