diff --git a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java index 955b5a578..f7a5faadb 100644 --- a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java +++ b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java @@ -51,7 +51,7 @@ public class Neo4jSinkWriter extends BaseSinkWriter { private Project project; private static final String DOT = "."; - private static RejectedExecutionHandler handler = + private static final RejectedExecutionHandler handler = (r, executor) -> { try { executor.getQueue().put(r); @@ -59,7 +59,7 @@ public class Neo4jSinkWriter extends BaseSinkWriter { Thread.currentThread().interrupt(); } }; - private static ExecutorService executor = + private static final ExecutorService executor = new ThreadPoolExecutor( NUM_THREADS, NUM_THREADS, @@ -169,7 +169,7 @@ private void awaitAllTasks(List> futures) private void writeNode(SubGraphRecord.Node node) { try { - Long statr = System.currentTimeMillis(); + long start = System.currentTimeMillis(); RecordAlterOperationEnum operation = context.getOperation(); if (StringUtils.isBlank(node.getId()) || StringUtils.isBlank(node.getName()) @@ -184,7 +184,7 @@ private void writeNode(SubGraphRecord.Node node) { List properties = Lists.newArrayList(); for (Map.Entry entry : node.getProperties().entrySet()) { Object entryValue = entry.getValue(); - if (!TypeChecker.isArrayOrCollectionOfPrimitives(entryValue)) { + if (!TypeChecker.isBasicType(entryValue)) { entryValue = JSON.toJSONString(entryValue); } properties.add(new LPGPropertyRecord(entry.getKey(), entryValue)); @@ -199,7 +199,7 @@ private void writeNode(SubGraphRecord.Node node) { log.info( String.format( "write Node succeed id:%s cons:%s", - node.getId(), System.currentTimeMillis() - statr)); + node.getId(), System.currentTimeMillis() - start)); } catch (Exception e) { throw new RuntimeException(e); } @@ -214,7 +214,7 @@ private String labelPrefix(String label) { private void writeEdge(SubGraphRecord.Edge edge) { try { - Long statr = System.currentTimeMillis(); + long start = System.currentTimeMillis(); RecordAlterOperationEnum operation = context.getOperation(); if (StringUtils.isBlank(edge.getFrom()) || StringUtils.isBlank(edge.getTo()) @@ -226,7 +226,7 @@ private void writeEdge(SubGraphRecord.Edge edge) { List properties = Lists.newArrayList(); for (Map.Entry entry : edge.getProperties().entrySet()) { Object entryValue = entry.getValue(); - if (!TypeChecker.isArrayOrCollectionOfPrimitives(entryValue)) { + if (!TypeChecker.isBasicType(entryValue)) { entryValue = JSON.toJSONString(entryValue); } properties.add(new LPGPropertyRecord(entry.getKey(), entryValue)); @@ -244,9 +244,10 @@ private void writeEdge(SubGraphRecord.Edge edge) { client.deleteEdge(edge.getLabel(), edgeRecords); } log.info( - String.format( - "write Edge succeed from:%s to:%s cons:%s", - edge.getFrom(), edge.getTo(), System.currentTimeMillis() - statr)); + "write Edge succeed from:{} to:{} cons:{}", + edge.getFrom(), + edge.getTo(), + System.currentTimeMillis() - start); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/cloudext/interface/graph-store/src/main/java/com/antgroup/openspg/cloudext/interfaces/graphstore/util/TypeChecker.java b/cloudext/interface/graph-store/src/main/java/com/antgroup/openspg/cloudext/interfaces/graphstore/util/TypeChecker.java index 0bde51d81..3935a0433 100644 --- a/cloudext/interface/graph-store/src/main/java/com/antgroup/openspg/cloudext/interfaces/graphstore/util/TypeChecker.java +++ b/cloudext/interface/graph-store/src/main/java/com/antgroup/openspg/cloudext/interfaces/graphstore/util/TypeChecker.java @@ -66,4 +66,36 @@ public static boolean isArrayOrCollectionOfPrimitives(Object obj) { } return false; } + + public static boolean isBasicType(Object obj) { + if (obj == null) { + return false; + } + if (obj instanceof Object[]) { + for (Object element : (Object[]) obj) { + if (!isBasicType(element)) { + return false; + } + } + return true; + + } else if (obj instanceof Collection) { + for (Object element : (Collection) obj) { + if (!isBasicType(element)) { + return false; + } + } + return true; + } + + return obj.getClass().isPrimitive() + || obj instanceof Integer + || obj instanceof Double + || obj instanceof Float + || obj instanceof Long + || obj instanceof Byte + || obj instanceof Boolean + || obj instanceof Character + || obj instanceof CharSequence; + } } diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/builder/RetrievalSyncTask.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/builder/RetrievalSyncTask.java index 41279dc68..477b222e5 100644 --- a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/builder/RetrievalSyncTask.java +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/builder/RetrievalSyncTask.java @@ -77,6 +77,10 @@ public SchedulerEnum.TaskStatus submit(TaskExecuteContext context) { context.addTraceLog("update index schema index_ids:%s", retrievals); List retrievalList = JSON.parseObject(retrievals, new TypeReference>() {}); Project project = projectService.queryById(job.getProjectId()); + if (project == null) { + context.addTraceLog("project not exist"); + return SchedulerEnum.TaskStatus.FINISH; + } for (Long id : retrievalList) { Retrieval retrieval = retrievalService.getById(id); context.addTraceLog("update index(%s) schema", retrieval.getName()); diff --git a/server/core/schema/service/src/main/java/com/antgroup/openspg/server/core/schema/service/alter/check/PropertyChecker.java b/server/core/schema/service/src/main/java/com/antgroup/openspg/server/core/schema/service/alter/check/PropertyChecker.java index f60999081..f7cd8054f 100644 --- a/server/core/schema/service/src/main/java/com/antgroup/openspg/server/core/schema/service/alter/check/PropertyChecker.java +++ b/server/core/schema/service/src/main/java/com/antgroup/openspg/server/core/schema/service/alter/check/PropertyChecker.java @@ -65,7 +65,9 @@ public void check(BaseAdvancedType advancedType, SchemaCheckContext context) { } this.checkBasicInfo(spgTypeIdentifier, property, context); - this.checkBuiltInProperty(advancedType.getSpgTypeEnum(), property); + if (!advancedType.isUpdate()) { + this.checkBuiltInProperty(advancedType.getSpgTypeEnum(), property); + } this.checkConstraint(property); if (property.getLogicalRule() != null) { diff --git a/server/infra/dao/src/main/java/com/antgroup/openspg/server/infra/dao/mapper/ProjectDOMapper.java b/server/infra/dao/src/main/java/com/antgroup/openspg/server/infra/dao/mapper/ProjectDOMapper.java index ef1a85b7b..8bdd24e23 100644 --- a/server/infra/dao/src/main/java/com/antgroup/openspg/server/infra/dao/mapper/ProjectDOMapper.java +++ b/server/infra/dao/src/main/java/com/antgroup/openspg/server/infra/dao/mapper/ProjectDOMapper.java @@ -42,6 +42,12 @@ public interface ProjectDOMapper { void deleteFromKgBuilderJob(Long id); + void deleteFromKgSchedulerInstance(Long id); + + void deleteFromKgSchedulerJob(Long id); + + void deleteFromKgSchedulerTask(Long id); + void deleteFromKgResourcePermission(Long projectId); int insert(ProjectDO record); diff --git a/server/infra/dao/src/main/java/com/antgroup/openspg/server/infra/dao/repository/common/ProjectRepositoryImpl.java b/server/infra/dao/src/main/java/com/antgroup/openspg/server/infra/dao/repository/common/ProjectRepositoryImpl.java index 81875669d..fc2740df7 100644 --- a/server/infra/dao/src/main/java/com/antgroup/openspg/server/infra/dao/repository/common/ProjectRepositoryImpl.java +++ b/server/infra/dao/src/main/java/com/antgroup/openspg/server/infra/dao/repository/common/ProjectRepositoryImpl.java @@ -85,6 +85,9 @@ public Integer deleteById(Long projectId) { projectDOMapper.deleteFromKgReasonTask(projectId); projectDOMapper.deleteFromKgReasonTutorial(projectId); projectDOMapper.deleteFromKgBuilderJob(projectId); + projectDOMapper.deleteFromKgSchedulerInstance(projectId); + projectDOMapper.deleteFromKgSchedulerJob(projectId); + projectDOMapper.deleteFromKgSchedulerTask(projectId); projectDOMapper.deleteFromKgResourcePermission(projectId); return projectDOMapper.deleteByPrimaryKey(projectId); } diff --git a/server/infra/dao/src/main/resources/mapper/ProjectDOMapper.xml b/server/infra/dao/src/main/resources/mapper/ProjectDOMapper.xml index ed04293c9..9a750e7fa 100644 --- a/server/infra/dao/src/main/resources/mapper/ProjectDOMapper.xml +++ b/server/infra/dao/src/main/resources/mapper/ProjectDOMapper.xml @@ -138,6 +138,15 @@ DELETE FROM kg_builder_job WHERE project_id = #{id,jdbcType=BIGINT}; + + DELETE FROM kg_scheduler_instance WHERE project_id = #{id,jdbcType=BIGINT}; + + + DELETE FROM kg_scheduler_job WHERE project_id = #{id,jdbcType=BIGINT}; + + + DELETE FROM kg_scheduler_task WHERE project_id = #{id,jdbcType=BIGINT}; + DELETE FROM kg_resource_permission WHERE resource_id = #{id,jdbcType=BIGINT} and resource_tag = 'KNOWLEDGE_BASE';