diff --git a/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/mq/RocketMqHeraAppConsumer.java b/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/mq/RocketMqHeraAppConsumer.java index 297f67eb..20319273 100644 --- a/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/mq/RocketMqHeraAppConsumer.java +++ b/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/mq/RocketMqHeraAppConsumer.java @@ -20,6 +20,7 @@ import com.alibaba.nacos.api.config.annotation.NacosValue; import com.google.gson.Gson; +import org.apache.commons.collections.CollectionUtils; import org.apache.ozhera.app.api.message.HeraAppInfoModifyMessage; import org.apache.ozhera.app.api.message.HeraAppModifyType; import org.apache.ozhera.app.api.model.HeraAppBaseInfoModel; @@ -30,7 +31,6 @@ import org.apache.ozhera.app.service.impl.HeraAppBaseInfoService; import org.apache.ozhera.app.service.mq.model.HeraAppMessage; import lombok.extern.slf4j.Slf4j; -import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.StringUtils; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; @@ -47,6 +47,7 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; +import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -79,10 +80,13 @@ public class RocketMqHeraAppConsumer { @NacosValue(value = "${rocketmq.sk}", autoRefreshed = true) private String sk; - //默认为空,根据需要配置 + //stop mq consumer @NacosValue(value = "${stop.mq:false}", autoRefreshed = true) private Boolean stopMq; + @NacosValue(value = "${stop.mq.message.plat.code:}", autoRefreshed = true) + private String mqStopPlatCode; + private DefaultMQPushConsumer heraAppMQPushConsumer; @Autowired @@ -157,6 +161,35 @@ private void consumeMessage(MessageExt message) { log.info("RocketMqHeraAppConsumer# consumeMessage convert heraAppMessage : {}", heraAppMessage.toString()); HeraAppBaseInfo changeHeraApp = heraAppMessage.baseInfo(); + + + if(stopMq){ + + log.info("Mq consumer stop ..."); + if(StringUtils.isBlank(mqStopPlatCode)){ + log.info("mqStopPlatCode is blank, stop all mq message access db!"); + return; + } + + + List platCodes = Arrays.stream(mqStopPlatCode.split(",")) + .map(String::trim) + .map(Integer::parseInt) + .collect(Collectors.toList()); + + if(CollectionUtils.isEmpty(platCodes)){ + log.info("platCodes is empty, stop all mq message access db!"); + return; + } + + if(platCodes.contains(changeHeraApp.getPlatformType())){ + log.info("stop mq access db by plat code : " + changeHeraApp.getPlatformType() + " ; config plat codes " + platCodes); + return; + } + + } + + HeraAppBaseInfo origHeraApp = matchExistHeraApp(heraAppMessage.baseInfo()); if (heraAppMessage.getDelete() != null && heraAppMessage.getDelete().intValue() == 1) { @@ -210,17 +243,14 @@ private HeraAppBaseInfo matchExistHeraApp(HeraAppBaseInfo heraAppBaseInfo){ return query.get(0); } - private void saveOrUpdateHeraAppRole(List members, String appId, Integer platFormType) { + private void saveOrUpdateHeraAppRole(List membersP, String appId, Integer platFormType) { - if(stopMq){ - log.info("Mq consumer stop ..."); + log.info("RocketMqHeraAppConsumer#saveOrUpdateHeraAppRole appId:{},platFormType:{},members:{}", appId, platFormType, membersP); + if (CollectionUtils.isEmpty(membersP)) { return; } - log.info("RocketMqHeraAppConsumer#saveOrUpdateHeraAppRole appId:{},platFormType:{},members:{}", appId, platFormType, members); - if (CollectionUtils.isEmpty(members)) { - return; - } + List members = membersP.stream().distinct().collect(Collectors.toList()); HeraAppRole role = new HeraAppRole(); role.setRole(0);