From 0bc3c9b1d90dfc4536e9cf8eff76031ab09eac75 Mon Sep 17 00:00:00 2001 From: thundax-lyp <731378491@qq.com> Date: Fri, 11 Jul 2025 15:11:41 +0800 Subject: [PATCH 1/7] fix(server): add non null check in retrievalSyncTask --- .../service/task/sync/builder/RetrievalSyncTask.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/builder/RetrievalSyncTask.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/builder/RetrievalSyncTask.java index 41279dc68..477b222e5 100644 --- a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/builder/RetrievalSyncTask.java +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/builder/RetrievalSyncTask.java @@ -77,6 +77,10 @@ public SchedulerEnum.TaskStatus submit(TaskExecuteContext context) { context.addTraceLog("update index schema index_ids:%s", retrievals); List retrievalList = JSON.parseObject(retrievals, new TypeReference>() {}); Project project = projectService.queryById(job.getProjectId()); + if (project == null) { + context.addTraceLog("project not exist"); + return SchedulerEnum.TaskStatus.FINISH; + } for (Long id : retrievalList) { Retrieval retrieval = retrievalService.getById(id); context.addTraceLog("update index(%s) schema", retrieval.getName()); From d569e9826b2f38982b82a4327d047ad3ebe21e04 Mon Sep 17 00:00:00 2001 From: thundax-lyp <731378491@qq.com> Date: Fri, 11 Jul 2025 15:57:01 +0800 Subject: [PATCH 2/7] fix(server): delete `scheduler-xxx` while deleting a project --- .../openspg/server/infra/dao/mapper/ProjectDOMapper.java | 6 ++++++ .../dao/repository/common/ProjectRepositoryImpl.java | 3 +++ .../dao/src/main/resources/mapper/ProjectDOMapper.xml | 9 +++++++++ 3 files changed, 18 insertions(+) diff --git a/server/infra/dao/src/main/java/com/antgroup/openspg/server/infra/dao/mapper/ProjectDOMapper.java b/server/infra/dao/src/main/java/com/antgroup/openspg/server/infra/dao/mapper/ProjectDOMapper.java index ef1a85b7b..8bdd24e23 100644 --- a/server/infra/dao/src/main/java/com/antgroup/openspg/server/infra/dao/mapper/ProjectDOMapper.java +++ b/server/infra/dao/src/main/java/com/antgroup/openspg/server/infra/dao/mapper/ProjectDOMapper.java @@ -42,6 +42,12 @@ public interface ProjectDOMapper { void deleteFromKgBuilderJob(Long id); + void deleteFromKgSchedulerInstance(Long id); + + void deleteFromKgSchedulerJob(Long id); + + void deleteFromKgSchedulerTask(Long id); + void deleteFromKgResourcePermission(Long projectId); int insert(ProjectDO record); diff --git a/server/infra/dao/src/main/java/com/antgroup/openspg/server/infra/dao/repository/common/ProjectRepositoryImpl.java b/server/infra/dao/src/main/java/com/antgroup/openspg/server/infra/dao/repository/common/ProjectRepositoryImpl.java index 81875669d..fc2740df7 100644 --- a/server/infra/dao/src/main/java/com/antgroup/openspg/server/infra/dao/repository/common/ProjectRepositoryImpl.java +++ b/server/infra/dao/src/main/java/com/antgroup/openspg/server/infra/dao/repository/common/ProjectRepositoryImpl.java @@ -85,6 +85,9 @@ public Integer deleteById(Long projectId) { projectDOMapper.deleteFromKgReasonTask(projectId); projectDOMapper.deleteFromKgReasonTutorial(projectId); projectDOMapper.deleteFromKgBuilderJob(projectId); + projectDOMapper.deleteFromKgSchedulerInstance(projectId); + projectDOMapper.deleteFromKgSchedulerJob(projectId); + projectDOMapper.deleteFromKgSchedulerTask(projectId); projectDOMapper.deleteFromKgResourcePermission(projectId); return projectDOMapper.deleteByPrimaryKey(projectId); } diff --git a/server/infra/dao/src/main/resources/mapper/ProjectDOMapper.xml b/server/infra/dao/src/main/resources/mapper/ProjectDOMapper.xml index ed04293c9..9a750e7fa 100644 --- a/server/infra/dao/src/main/resources/mapper/ProjectDOMapper.xml +++ b/server/infra/dao/src/main/resources/mapper/ProjectDOMapper.xml @@ -138,6 +138,15 @@ DELETE FROM kg_builder_job WHERE project_id = #{id,jdbcType=BIGINT}; + + DELETE FROM kg_scheduler_instance WHERE project_id = #{id,jdbcType=BIGINT}; + + + DELETE FROM kg_scheduler_job WHERE project_id = #{id,jdbcType=BIGINT}; + + + DELETE FROM kg_scheduler_task WHERE project_id = #{id,jdbcType=BIGINT}; + DELETE FROM kg_resource_permission WHERE resource_id = #{id,jdbcType=BIGINT} and resource_tag = 'KNOWLEDGE_BASE'; From 43661867e55544289b1a334aa70e4d65b7870f2f Mon Sep 17 00:00:00 2001 From: thundax-lyp <731378491@qq.com> Date: Mon, 14 Jul 2025 10:42:58 +0800 Subject: [PATCH 3/7] fix(server): replace Neo4jSinkWriter by GraphStoreSinkWriter in controller --- .../http/server/openapi/GraphController.java | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/server/api/http-server/src/main/java/com/antgroup/openspg/server/api/http/server/openapi/GraphController.java b/server/api/http-server/src/main/java/com/antgroup/openspg/server/api/http/server/openapi/GraphController.java index 509ad45a1..a9ae1c4e5 100644 --- a/server/api/http-server/src/main/java/com/antgroup/openspg/server/api/http/server/openapi/GraphController.java +++ b/server/api/http-server/src/main/java/com/antgroup/openspg/server/api/http/server/openapi/GraphController.java @@ -18,12 +18,10 @@ import com.antgroup.openspg.builder.core.runtime.BuilderContext; import com.antgroup.openspg.builder.core.runtime.impl.DefaultBuilderCatalog; import com.antgroup.openspg.builder.model.pipeline.config.GraphStoreSinkNodeConfig; -import com.antgroup.openspg.builder.model.pipeline.config.Neo4jSinkNodeConfig; import com.antgroup.openspg.builder.model.record.BaseRecord; import com.antgroup.openspg.builder.model.record.RecordAlterOperationEnum; import com.antgroup.openspg.builder.model.record.SubGraphRecord; import com.antgroup.openspg.builder.runner.local.physical.sink.impl.GraphStoreSinkWriter; -import com.antgroup.openspg.builder.runner.local.physical.sink.impl.Neo4jSinkWriter; import com.antgroup.openspg.core.schema.model.identifier.SPGTypeIdentifier; import com.antgroup.openspg.core.schema.model.type.BaseSPGType; import com.antgroup.openspg.core.schema.model.type.ConceptList; @@ -41,10 +39,9 @@ import com.antgroup.openspg.server.biz.schema.SchemaManager; import com.antgroup.openspg.server.biz.service.GraphManager; import com.google.common.collect.Lists; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; + +import java.util.*; + import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -179,12 +176,12 @@ public void check() { public Boolean action() { ProjectSchema projectSchema = schemaManager.getProjectSchema(request.getProjectId()); boolean enableLeadTo = - (request.getEnableLeadTo() == null) ? false : request.getEnableLeadTo(); + request.getEnableLeadTo() != null && request.getEnableLeadTo(); Map conceptLists = getConceptLists(enableLeadTo, projectSchema); - Neo4jSinkWriter writer = - new Neo4jSinkWriter( - UUID.randomUUID().toString(), "图存储", new Neo4jSinkNodeConfig(true)); + GraphStoreSinkWriter writer = + new GraphStoreSinkWriter( + UUID.randomUUID().toString(), "图存储", new GraphStoreSinkNodeConfig(true)); BuilderContext context = new BuilderContext() .setProjectId(request.getProjectId()) @@ -201,12 +198,13 @@ public Boolean action() { SubGraphRecord subGraph = JSON.parseObject(JSON.toJSONString(request.getSubGraph()), SubGraphRecord.class); - writer.writeToNeo4j(subGraph); + List records = Lists.newArrayList(); + records.add(subGraph); + writer.write(records); + if (context.isEnableLeadTo()) { ReasonProcessor reasonProcessor = new ReasonProcessor(); reasonProcessor.init(context); - List records = Lists.newArrayList(); - records.add(subGraph); List reasonResults = reasonProcessor.process(records); if (CollectionUtils.isNotEmpty(reasonResults)) { GraphStoreSinkWriter sinkWriter = From 0eb5c49ea2865d1bd8b192a8566fb7bd0dc6aca7 Mon Sep 17 00:00:00 2001 From: thundax-lyp <731378491@qq.com> Date: Mon, 14 Jul 2025 11:08:34 +0800 Subject: [PATCH 4/7] fix(server): replace Neo4jSinkWriter by GraphStoreSinkWriter in controller --- .../server/api/http/server/openapi/GraphController.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/server/api/http-server/src/main/java/com/antgroup/openspg/server/api/http/server/openapi/GraphController.java b/server/api/http-server/src/main/java/com/antgroup/openspg/server/api/http/server/openapi/GraphController.java index a9ae1c4e5..8160ff894 100644 --- a/server/api/http-server/src/main/java/com/antgroup/openspg/server/api/http/server/openapi/GraphController.java +++ b/server/api/http-server/src/main/java/com/antgroup/openspg/server/api/http/server/openapi/GraphController.java @@ -39,9 +39,7 @@ import com.antgroup.openspg.server.biz.schema.SchemaManager; import com.antgroup.openspg.server.biz.service.GraphManager; import com.google.common.collect.Lists; - import java.util.*; - import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -175,8 +173,7 @@ public void check() { @Override public Boolean action() { ProjectSchema projectSchema = schemaManager.getProjectSchema(request.getProjectId()); - boolean enableLeadTo = - request.getEnableLeadTo() != null && request.getEnableLeadTo(); + boolean enableLeadTo = request.getEnableLeadTo() != null && request.getEnableLeadTo(); Map conceptLists = getConceptLists(enableLeadTo, projectSchema); GraphStoreSinkWriter writer = From be668261151bed0567c7058f6cdc0c4e2b021397 Mon Sep 17 00:00:00 2001 From: thundax-lyp <731378491@qq.com> Date: Mon, 14 Jul 2025 16:57:27 +0800 Subject: [PATCH 5/7] fix(server): #580, remove builtin-properties checker when update schema --- .../core/schema/service/alter/check/PropertyChecker.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/core/schema/service/src/main/java/com/antgroup/openspg/server/core/schema/service/alter/check/PropertyChecker.java b/server/core/schema/service/src/main/java/com/antgroup/openspg/server/core/schema/service/alter/check/PropertyChecker.java index f60999081..f7cd8054f 100644 --- a/server/core/schema/service/src/main/java/com/antgroup/openspg/server/core/schema/service/alter/check/PropertyChecker.java +++ b/server/core/schema/service/src/main/java/com/antgroup/openspg/server/core/schema/service/alter/check/PropertyChecker.java @@ -65,7 +65,9 @@ public void check(BaseAdvancedType advancedType, SchemaCheckContext context) { } this.checkBasicInfo(spgTypeIdentifier, property, context); - this.checkBuiltInProperty(advancedType.getSpgTypeEnum(), property); + if (!advancedType.isUpdate()) { + this.checkBuiltInProperty(advancedType.getSpgTypeEnum(), property); + } this.checkConstraint(property); if (property.getLogicalRule() != null) { From 56fca1fb1194afe9fb5b13612234d11f0346c8cc Mon Sep 17 00:00:00 2001 From: thundax-lyp <731378491@qq.com> Date: Tue, 15 Jul 2025 14:47:54 +0800 Subject: [PATCH 6/7] rollback --- .../http/server/openapi/GraphController.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/server/api/http-server/src/main/java/com/antgroup/openspg/server/api/http/server/openapi/GraphController.java b/server/api/http-server/src/main/java/com/antgroup/openspg/server/api/http/server/openapi/GraphController.java index 8160ff894..509ad45a1 100644 --- a/server/api/http-server/src/main/java/com/antgroup/openspg/server/api/http/server/openapi/GraphController.java +++ b/server/api/http-server/src/main/java/com/antgroup/openspg/server/api/http/server/openapi/GraphController.java @@ -18,10 +18,12 @@ import com.antgroup.openspg.builder.core.runtime.BuilderContext; import com.antgroup.openspg.builder.core.runtime.impl.DefaultBuilderCatalog; import com.antgroup.openspg.builder.model.pipeline.config.GraphStoreSinkNodeConfig; +import com.antgroup.openspg.builder.model.pipeline.config.Neo4jSinkNodeConfig; import com.antgroup.openspg.builder.model.record.BaseRecord; import com.antgroup.openspg.builder.model.record.RecordAlterOperationEnum; import com.antgroup.openspg.builder.model.record.SubGraphRecord; import com.antgroup.openspg.builder.runner.local.physical.sink.impl.GraphStoreSinkWriter; +import com.antgroup.openspg.builder.runner.local.physical.sink.impl.Neo4jSinkWriter; import com.antgroup.openspg.core.schema.model.identifier.SPGTypeIdentifier; import com.antgroup.openspg.core.schema.model.type.BaseSPGType; import com.antgroup.openspg.core.schema.model.type.ConceptList; @@ -39,7 +41,10 @@ import com.antgroup.openspg.server.biz.schema.SchemaManager; import com.antgroup.openspg.server.biz.service.GraphManager; import com.google.common.collect.Lists; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -173,12 +178,13 @@ public void check() { @Override public Boolean action() { ProjectSchema projectSchema = schemaManager.getProjectSchema(request.getProjectId()); - boolean enableLeadTo = request.getEnableLeadTo() != null && request.getEnableLeadTo(); + boolean enableLeadTo = + (request.getEnableLeadTo() == null) ? false : request.getEnableLeadTo(); Map conceptLists = getConceptLists(enableLeadTo, projectSchema); - GraphStoreSinkWriter writer = - new GraphStoreSinkWriter( - UUID.randomUUID().toString(), "图存储", new GraphStoreSinkNodeConfig(true)); + Neo4jSinkWriter writer = + new Neo4jSinkWriter( + UUID.randomUUID().toString(), "图存储", new Neo4jSinkNodeConfig(true)); BuilderContext context = new BuilderContext() .setProjectId(request.getProjectId()) @@ -195,13 +201,12 @@ public Boolean action() { SubGraphRecord subGraph = JSON.parseObject(JSON.toJSONString(request.getSubGraph()), SubGraphRecord.class); - List records = Lists.newArrayList(); - records.add(subGraph); - writer.write(records); - + writer.writeToNeo4j(subGraph); if (context.isEnableLeadTo()) { ReasonProcessor reasonProcessor = new ReasonProcessor(); reasonProcessor.init(context); + List records = Lists.newArrayList(); + records.add(subGraph); List reasonResults = reasonProcessor.process(records); if (CollectionUtils.isNotEmpty(reasonResults)) { GraphStoreSinkWriter sinkWriter = From cdeeffd07f0fe01555d6988355618493135c58a2 Mon Sep 17 00:00:00 2001 From: thundax-lyp <731378491@qq.com> Date: Fri, 18 Jul 2025 17:42:04 +0800 Subject: [PATCH 7/7] fix(builder): #585 --- .../physical/sink/impl/Neo4jSinkWriter.java | 21 ++++++------ .../graphstore/util/TypeChecker.java | 32 +++++++++++++++++++ 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java index 955b5a578..f7a5faadb 100644 --- a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java +++ b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java @@ -51,7 +51,7 @@ public class Neo4jSinkWriter extends BaseSinkWriter { private Project project; private static final String DOT = "."; - private static RejectedExecutionHandler handler = + private static final RejectedExecutionHandler handler = (r, executor) -> { try { executor.getQueue().put(r); @@ -59,7 +59,7 @@ public class Neo4jSinkWriter extends BaseSinkWriter { Thread.currentThread().interrupt(); } }; - private static ExecutorService executor = + private static final ExecutorService executor = new ThreadPoolExecutor( NUM_THREADS, NUM_THREADS, @@ -169,7 +169,7 @@ private void awaitAllTasks(List> futures) private void writeNode(SubGraphRecord.Node node) { try { - Long statr = System.currentTimeMillis(); + long start = System.currentTimeMillis(); RecordAlterOperationEnum operation = context.getOperation(); if (StringUtils.isBlank(node.getId()) || StringUtils.isBlank(node.getName()) @@ -184,7 +184,7 @@ private void writeNode(SubGraphRecord.Node node) { List properties = Lists.newArrayList(); for (Map.Entry entry : node.getProperties().entrySet()) { Object entryValue = entry.getValue(); - if (!TypeChecker.isArrayOrCollectionOfPrimitives(entryValue)) { + if (!TypeChecker.isBasicType(entryValue)) { entryValue = JSON.toJSONString(entryValue); } properties.add(new LPGPropertyRecord(entry.getKey(), entryValue)); @@ -199,7 +199,7 @@ private void writeNode(SubGraphRecord.Node node) { log.info( String.format( "write Node succeed id:%s cons:%s", - node.getId(), System.currentTimeMillis() - statr)); + node.getId(), System.currentTimeMillis() - start)); } catch (Exception e) { throw new RuntimeException(e); } @@ -214,7 +214,7 @@ private String labelPrefix(String label) { private void writeEdge(SubGraphRecord.Edge edge) { try { - Long statr = System.currentTimeMillis(); + long start = System.currentTimeMillis(); RecordAlterOperationEnum operation = context.getOperation(); if (StringUtils.isBlank(edge.getFrom()) || StringUtils.isBlank(edge.getTo()) @@ -226,7 +226,7 @@ private void writeEdge(SubGraphRecord.Edge edge) { List properties = Lists.newArrayList(); for (Map.Entry entry : edge.getProperties().entrySet()) { Object entryValue = entry.getValue(); - if (!TypeChecker.isArrayOrCollectionOfPrimitives(entryValue)) { + if (!TypeChecker.isBasicType(entryValue)) { entryValue = JSON.toJSONString(entryValue); } properties.add(new LPGPropertyRecord(entry.getKey(), entryValue)); @@ -244,9 +244,10 @@ private void writeEdge(SubGraphRecord.Edge edge) { client.deleteEdge(edge.getLabel(), edgeRecords); } log.info( - String.format( - "write Edge succeed from:%s to:%s cons:%s", - edge.getFrom(), edge.getTo(), System.currentTimeMillis() - statr)); + "write Edge succeed from:{} to:{} cons:{}", + edge.getFrom(), + edge.getTo(), + System.currentTimeMillis() - start); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/cloudext/interface/graph-store/src/main/java/com/antgroup/openspg/cloudext/interfaces/graphstore/util/TypeChecker.java b/cloudext/interface/graph-store/src/main/java/com/antgroup/openspg/cloudext/interfaces/graphstore/util/TypeChecker.java index 0bde51d81..3935a0433 100644 --- a/cloudext/interface/graph-store/src/main/java/com/antgroup/openspg/cloudext/interfaces/graphstore/util/TypeChecker.java +++ b/cloudext/interface/graph-store/src/main/java/com/antgroup/openspg/cloudext/interfaces/graphstore/util/TypeChecker.java @@ -66,4 +66,36 @@ public static boolean isArrayOrCollectionOfPrimitives(Object obj) { } return false; } + + public static boolean isBasicType(Object obj) { + if (obj == null) { + return false; + } + if (obj instanceof Object[]) { + for (Object element : (Object[]) obj) { + if (!isBasicType(element)) { + return false; + } + } + return true; + + } else if (obj instanceof Collection) { + for (Object element : (Collection) obj) { + if (!isBasicType(element)) { + return false; + } + } + return true; + } + + return obj.getClass().isPrimitive() + || obj instanceof Integer + || obj instanceof Double + || obj instanceof Float + || obj instanceof Long + || obj instanceof Byte + || obj instanceof Boolean + || obj instanceof Character + || obj instanceof CharSequence; + } }