Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.alibaba.nacos.api.config.annotation.NacosValue;
import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
import org.apache.ozhera.app.api.message.HeraAppInfoModifyMessage;
import org.apache.ozhera.app.api.message.HeraAppModifyType;
import org.apache.ozhera.app.api.model.HeraAppBaseInfoModel;
Expand All @@ -30,7 +31,6 @@
import org.apache.ozhera.app.service.impl.HeraAppBaseInfoService;
import org.apache.ozhera.app.service.mq.model.HeraAppMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
Expand All @@ -47,6 +47,7 @@
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -79,10 +80,13 @@ public class RocketMqHeraAppConsumer {
@NacosValue(value = "${rocketmq.sk}", autoRefreshed = true)
private String sk;

//默认为空,根据需要配置
//stop mq consumer
@NacosValue(value = "${stop.mq:false}", autoRefreshed = true)
private Boolean stopMq;

@NacosValue(value = "${stop.mq.message.plat.code:}", autoRefreshed = true)
private String mqStopPlatCode;

private DefaultMQPushConsumer heraAppMQPushConsumer;

@Autowired
Expand Down Expand Up @@ -157,6 +161,35 @@ private void consumeMessage(MessageExt message) {
log.info("RocketMqHeraAppConsumer# consumeMessage convert heraAppMessage : {}", heraAppMessage.toString());

HeraAppBaseInfo changeHeraApp = heraAppMessage.baseInfo();


if(stopMq){

log.info("Mq consumer stop ...");
if(StringUtils.isBlank(mqStopPlatCode)){
log.info("mqStopPlatCode is blank, stop all mq message access db!");
return;
}


List<Integer> platCodes = Arrays.stream(mqStopPlatCode.split(","))
.map(String::trim)
.map(Integer::parseInt)
.collect(Collectors.toList());

if(CollectionUtils.isEmpty(platCodes)){
log.info("platCodes is empty, stop all mq message access db!");
return;
}

if(platCodes.contains(changeHeraApp.getPlatformType())){
log.info("stop mq access db by plat code : " + changeHeraApp.getPlatformType() + " ; config plat codes " + platCodes);
return;
}

}


HeraAppBaseInfo origHeraApp = matchExistHeraApp(heraAppMessage.baseInfo());

if (heraAppMessage.getDelete() != null && heraAppMessage.getDelete().intValue() == 1) {
Expand Down Expand Up @@ -210,17 +243,14 @@ private HeraAppBaseInfo matchExistHeraApp(HeraAppBaseInfo heraAppBaseInfo){
return query.get(0);
}

private void saveOrUpdateHeraAppRole(List<String> members, String appId, Integer platFormType) {
private void saveOrUpdateHeraAppRole(List<String> membersP, String appId, Integer platFormType) {

if(stopMq){
log.info("Mq consumer stop ...");
log.info("RocketMqHeraAppConsumer#saveOrUpdateHeraAppRole appId:{},platFormType:{},members:{}", appId, platFormType, membersP);
if (CollectionUtils.isEmpty(membersP)) {
return;
}

log.info("RocketMqHeraAppConsumer#saveOrUpdateHeraAppRole appId:{},platFormType:{},members:{}", appId, platFormType, members);
if (CollectionUtils.isEmpty(members)) {
return;
}
List<String> members = membersP.stream().distinct().collect(Collectors.toList());

HeraAppRole role = new HeraAppRole();
role.setRole(0);
Expand Down
Loading