diff --git a/ozhera-log/log-agent-server/pom.xml b/ozhera-log/log-agent-server/pom.xml
index 5c1f9796c..531ac6497 100644
--- a/ozhera-log/log-agent-server/pom.xml
+++ b/ozhera-log/log-agent-server/pom.xml
@@ -66,7 +66,7 @@ http://www.apache.org/licenses/LICENSE-2.0
run.mone
rpc
- 1.6.4-jdk21-SNAPSHOT
+ 1.6.5-jdk21-SNAPSHOT
ch.qos.logback
@@ -114,6 +114,10 @@ http://www.apache.org/licenses/LICENSE-2.0
org.slf4j
slf4j-api
+
+ run.mone
+ rpc
+
diff --git a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java
index a1e33526e..b9c91f5f9 100644
--- a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java
+++ b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java
@@ -18,6 +18,8 @@
*/
package org.apache.ozhera.log.server.porcessor;
+import com.google.common.util.concurrent.RateLimiter;
+import com.xiaomi.data.push.rpc.common.CompressionUtil;
import com.xiaomi.data.push.rpc.netty.NettyRequestProcessor;
import com.xiaomi.data.push.rpc.protocol.RemotingCommand;
import com.xiaomi.youpin.docean.Ioc;
@@ -48,30 +50,67 @@
public class AgentCollectProgressProcessor implements NettyRequestProcessor {
@Resource
- DefaultLogProcessCollector processService;
+ private DefaultLogProcessCollector processService;
- private static Version version = new Version();
+ private static final RateLimiter ERROR_LIMITER = RateLimiter.create(2);
+
+ private static final Version VERSION = new Version();
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
- log.debug("received a message from the agent");
+ log.debug("Received a message from the agent, remote address: {}", getIp(ctx));
+
RemotingCommand response = RemotingCommand.createResponseCommand(Constant.RPCCMD_AGENT_CODE);
- String body = new String(request.getBody(), StandardCharsets.UTF_8);
+ response.setBody((VERSION + Constant.SUCCESS_MESSAGE).getBytes());
+
+ if (request.getBody() == null || request.getBody().length == 0) {
+ return response;
+ }
+
+ if (processService == null && Ioc.ins().containsBean(DefaultLogProcessCollector.class.getCanonicalName())) {
+ processService = Ioc.ins().getBean(DefaultLogProcessCollector.class);
+ }
+
+ UpdateLogProcessCmd cmd = parseRequestBody(request.getBody(), ctx);
+ if (cmd == null) {
+ return response;
+ }
+
+ if (processService != null) {
+ processService.collectLogProcess(cmd);
+ }
+
+ return response;
+ }
+
+ /**
+ * try to parse the request body
+ */
+ private UpdateLogProcessCmd parseRequestBody(byte[] bodyBytes, ChannelHandlerContext ctx) {
+ String bodyStr = null;
+
try {
- UpdateLogProcessCmd cmd = GSON.fromJson(body, UpdateLogProcessCmd.class);
- log.debug("a request from the client sent by the agent:{}", cmd.getIp());
- if (null == processService && Ioc.ins().containsBean(DefaultLogProcessCollector.class.getCanonicalName())) {
- processService = Ioc.ins().getBean(DefaultLogProcessCollector.class);
- }
- if (null != processService) {
- processService.collectLogProcess(cmd);
+ bodyStr = new String(bodyBytes, StandardCharsets.UTF_8);
+ UpdateLogProcessCmd cmd = GSON.fromJson(bodyStr, UpdateLogProcessCmd.class);
+ if (StringUtils.isBlank(cmd.getIp())) {
+ log.warn("Invalid agent request, ip={}, body={}", getIp(ctx), brief(bodyStr));
+ return null;
}
+ log.debug("Parsed request from agent: ip={}", cmd.getIp());
+ return cmd;
+ } catch (Exception ignored) {
+ }
+
+ try {
+ bodyStr = new String(CompressionUtil.decompress(bodyBytes), StandardCharsets.UTF_8);
+ UpdateLogProcessCmd cmd = GSON.fromJson(bodyStr, UpdateLogProcessCmd.class);
+ log.debug("Parsed decompressed request from agent: ip={}", cmd.getIp());
+ return cmd;
} catch (Exception e) {
- log.error("processRequest error,ip:{},body:{}", getIp(ctx), body, e);
+ assert bodyStr != null;
+ log.error("processRequest error, ip={}, body={}", getIp(ctx), brief(bodyStr), e);
+ return null;
}
- response.setBody(version.toString().getBytes());
- response.setBody(Constant.SUCCESS_MESSAGE.getBytes());
- return response;
}
@Override
@@ -90,4 +129,8 @@ private String getIp(ChannelHandlerContext ctx) {
}
return StringUtils.EMPTY;
}
+
+ private String brief(String body) {
+ return body.length() > 200 ? body.substring(0, 200) + "..." : body;
+ }
}
diff --git a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
index ff97e6224..dfd80ae10 100644
--- a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
+++ b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
@@ -35,9 +35,10 @@
import javax.annotation.Resource;
import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
import static org.apache.ozhera.log.common.Constant.GSON;
import static org.apache.ozhera.log.common.Constant.SYMBOL_COLON;
@@ -60,6 +61,19 @@ public class DefaultPublishConfigService implements PublishConfigService {
private volatile boolean configCompressValue = false;
+
+ private static final ExecutorService SEND_CONFIG_EXECUTOR;
+
+ static {
+ SEND_CONFIG_EXECUTOR = Executors.newThreadPerTaskExecutor(
+ Thread.ofVirtual()
+ .name("send-config-vt-", 0)
+ .uncaughtExceptionHandler((t, e) ->
+ log.error("send config uncaught exception", e))
+ .factory()
+ );
+ }
+
public void init() {
String raw = System.getenv(CONFIG_COMPRESS_KEY);
if (StringUtils.isBlank(raw)) {
@@ -80,17 +94,25 @@ public void init() {
* dubbo interface, the timeout period cannot be too long
*
* @param agentIp
- * @param logCollectMeta
+ * @param meta
*/
@Override
- public void sengConfigToAgent(String agentIp, LogCollectMeta logCollectMeta) {
+ public void sengConfigToAgent(String agentIp, LogCollectMeta meta) {
+ if (StringUtils.isBlank(agentIp) || meta == null) {
+ return;
+ }
+ doSendConfig(agentIp, meta);
+// SEND_CONFIG_EXECUTOR.execute(() -> );
+ }
+
+ private void doSendConfig(String agentIp, LogCollectMeta meta) {
int count = 1;
- while (count < 4) {
+ while (count < 3) {
Map logAgentMap = getAgentChannelMap();
- String agentCurrentIp = queryCurrentDockerAgentIP(agentIp, logAgentMap);
+ String agentCurrentIp = getCorrectDockerAgentIP(agentIp, logAgentMap);
if (logAgentMap.containsKey(agentCurrentIp)) {
- String sendStr = GSON.toJson(logCollectMeta);
- if (CollectionUtils.isNotEmpty(logCollectMeta.getAppLogMetaList())) {
+ String sendStr = GSON.toJson(meta);
+ if (CollectionUtils.isNotEmpty(meta.getAppLogMetaList())) {
RemotingCommand req = RemotingCommand.createRequestCommand(LogCmd.LOG_REQ);
req.setBody(sendStr.getBytes());
@@ -103,17 +125,17 @@ public void sengConfigToAgent(String agentIp, LogCollectMeta logCollectMeta) {
RemotingCommand res = rpcServer.sendMessage(logAgentMap.get(agentCurrentIp), req, 10000);
started.stop();
String response = new String(res.getBody());
- log.info("The configuration is sent successfully---->{},duration:{}s,agentIp:{}", response, started.elapsed().getSeconds(), agentCurrentIp);
+ log.info("The configuration is send successfully---->{},duration:{}s,agentIp:{}", response, started.elapsed().getSeconds(), agentCurrentIp);
if (Objects.equals(response, "ok")) {
break;
}
}
} else {
- log.info("The current agent IP is not connected,ip:{},configuration data:{}", agentIp, GSON.toJson(logCollectMeta));
+ log.info("The current agent IP is not connected,ip:{},configuration data:{}", agentIp, GSON.toJson(meta));
}
- //Retry policy - Retry 4 times, sleep 500 ms each time
+ //Retry policy - Retry 4 times, sleep 200 ms each time
try {
- TimeUnit.MILLISECONDS.sleep(500L);
+ TimeUnit.MILLISECONDS.sleep(200L);
} catch (final InterruptedException ignored) {
}
count++;
@@ -124,12 +146,10 @@ public void sengConfigToAgent(String agentIp, LogCollectMeta logCollectMeta) {
public List getAllAgentList() {
List remoteAddress = Lists.newArrayList();
List ipAddress = Lists.newArrayList();
- AgentContext.ins().map.entrySet().forEach(agentChannelEntry -> {
- String key = agentChannelEntry.getKey();
- remoteAddress.add(key);
- ipAddress.add(StringUtils.substringBefore(key, SYMBOL_COLON));
- }
- );
+ AgentContext.ins().map.forEach((key, value) -> {
+ remoteAddress.add(key);
+ ipAddress.add(StringUtils.substringBefore(key, SYMBOL_COLON));
+ });
if (COUNT_INCR.getAndIncrement() % 200 == 0) {
log.info("The set of remote addresses of the connected agent machine is:{}", GSON.toJson(remoteAddress));
}
@@ -142,13 +162,13 @@ private Map getAgentChannelMap() {
return logAgentMap;
}
- private String queryCurrentDockerAgentIP(String agentIp, Map logAgentMap) {
+ private String getCorrectDockerAgentIP(String agentIp, Map logAgentMap) {
if (Objects.equals(agentIp, NetUtil.getLocalIp())) {
//for Docker handles the agent on the current machine
final String tempIp = agentIp;
List ipList = getAgentChannelMap().keySet()
.stream().filter(ip -> ip.startsWith("172"))
- .collect(Collectors.toList());
+ .toList();
Optional optionalS = ipList.stream()
.filter(ip -> Objects.equals(logAgentMap.get(ip).getIp(), tempIp))
.findFirst();
diff --git a/ozhera-log/log-agent/pom.xml b/ozhera-log/log-agent/pom.xml
index d8bb355ce..d0362c058 100644
--- a/ozhera-log/log-agent/pom.xml
+++ b/ozhera-log/log-agent/pom.xml
@@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0
4.0.0
log-agent
- 2.2.15-SNAPSHOT
+ 2.2.16-SNAPSHOT
21
diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/bootstrap/MiLogAgentBootstrap.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/bootstrap/MiLogAgentBootstrap.java
index d68c096bf..b38e35c8e 100644
--- a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/bootstrap/MiLogAgentBootstrap.java
+++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/bootstrap/MiLogAgentBootstrap.java
@@ -22,15 +22,16 @@
import com.google.common.collect.Maps;
import com.xiaomi.data.push.bo.ClientInfo;
import com.xiaomi.data.push.rpc.RpcClient;
+import com.xiaomi.youpin.docean.Aop;
+import com.xiaomi.youpin.docean.Ioc;
+import lombok.extern.slf4j.Slf4j;
import org.apache.ozhera.log.agent.common.Version;
+import org.apache.ozhera.log.agent.config.AgentConfigManager;
+import org.apache.ozhera.log.agent.config.ConfigCenter;
+import org.apache.ozhera.log.agent.config.nacos.NacosConfigCenter;
import org.apache.ozhera.log.agent.rpc.task.PingTask;
import org.apache.ozhera.log.common.Config;
import org.apache.ozhera.log.utils.NetUtil;
-import com.xiaomi.youpin.docean.Aop;
-import com.xiaomi.youpin.docean.Ioc;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
import static org.apache.ozhera.log.utils.ConfigUtils.getConfigValue;
import static org.apache.ozhera.log.utils.ConfigUtils.getDataHashKey;
@@ -42,7 +43,7 @@
@Slf4j
public class MiLogAgentBootstrap {
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args) throws Exception {
String nacosAddr = getConfigValue("nacosAddr");
String serviceName = getConfigValue("serviceName");
log.info("nacosAddr:{},serviceName:{},version:{}", nacosAddr, serviceName, new Version());
@@ -63,10 +64,22 @@ public static void main(String[] args) throws IOException {
client.waitStarted();
log.info("create rpc client finish");
Aop.ins().init(Maps.newLinkedHashMap());
+ bootstrapAgentConfig(Ioc.ins());
Ioc.ins().putBean(client).init("org.apache.ozhera.log.agent", "com.xiaomi.youpin.docean");
//Because the client life cycle is advanced, the processor needs to be re-registered here
client.registerProcessor();
System.in.read();
}
+ private static void bootstrapAgentConfig(Ioc ioc) throws Exception {
+ ConfigCenter agentConfigCenter =
+ new NacosConfigCenter(Config.ins().get("config.address", ""));
+
+ AgentConfigManager agentConfigManager =
+ new AgentConfigManager(agentConfigCenter);
+
+ ioc.putBean(agentConfigManager);
+ }
+
+
}
diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
index ab9ac759c..fafac0941 100644
--- a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
+++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
@@ -24,6 +24,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.ozhera.log.agent.channel.memory.AgentMemoryService;
import org.apache.ozhera.log.agent.channel.pipeline.Pipeline;
+import org.apache.ozhera.log.agent.config.AgentConfigManager;
import org.apache.ozhera.log.agent.export.MsgExporter;
import org.apache.ozhera.log.agent.filter.FilterChain;
import org.apache.ozhera.log.agent.input.Input;
@@ -57,7 +58,7 @@ public class ChannelServiceFactory {
public ChannelServiceFactory(AgentMemoryService agentMemoryService, String memoryBasePath) {
this.agentMemoryService = agentMemoryService;
this.memoryBasePath = memoryBasePath;
- String specialFileSuffix = getConfigValue(DEFAULT_SPECIAL_FILE_SUFFIX_KEY);
+ String specialFileSuffix = AgentConfigManager.get(DEFAULT_SPECIAL_FILE_SUFFIX_KEY, getConfigValue(DEFAULT_SPECIAL_FILE_SUFFIX_KEY));
if (StringUtils.isNotBlank(specialFileSuffix)) {
multiSpecialFileSuffix = Lists.newArrayList(specialFileSuffix.split(SYMBOL_COMMA));
}
diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/AgentConfigManager.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/AgentConfigManager.java
new file mode 100644
index 000000000..05af14e49
--- /dev/null
+++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/AgentConfigManager.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ozhera.log.agent.config;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Properties;
+
+/**
+ * @author wtt
+ * @date 2025/12/23 14:13
+ * @version 1.0
+ */
+@Slf4j
+public class AgentConfigManager {
+
+ private static final String AGENT_CONFIG_DATA_ID = "org.apache.ozhera.log.agent.config";
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private static final Properties CACHE = new Properties();
+
+ private final ConfigCenter configCenter;
+
+ public AgentConfigManager(ConfigCenter configCenter) {
+ this.configCenter = configCenter;
+ init();
+ }
+
+ private void init() {
+ try {
+ String configContent = configCenter.getConfig(AGENT_CONFIG_DATA_ID);
+ refresh(configContent);
+
+ configCenter.addListener(AGENT_CONFIG_DATA_ID, this::refresh);
+ } catch (Exception e) {
+ log.error("[AgentConfig] init failed", e);
+ }
+ }
+
+ private synchronized void refresh(String configContent) {
+ if (configContent == null || configContent.isEmpty()) {
+ log.warn("[AgentConfig] empty config, skip refresh");
+ return;
+ }
+
+ try {
+ Properties newConfig = OBJECT_MAPPER.readValue(
+ configContent,
+ new TypeReference() {
+ }
+ );
+
+ CACHE.clear();
+ CACHE.putAll(newConfig);
+
+ log.info("[AgentConfig] config refreshed, size={}, content={}",
+ CACHE.size(), newConfig);
+
+ } catch (Exception e) {
+ log.error("[AgentConfig] refresh cache failed, content={}", configContent, e);
+ }
+ }
+
+ public static String get(String key) {
+ return CACHE.getProperty(key);
+ }
+
+ public static String get(String key, String defaultValue) {
+ return CACHE.getProperty(key, defaultValue);
+ }
+
+}
diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigCenter.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigCenter.java
new file mode 100644
index 000000000..7f990802b
--- /dev/null
+++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigCenter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ozhera.log.agent.config;
+
+/**
+ * @author wtt
+ * @date 2025/12/23 14:59
+ * @version 1.0
+ */
+public interface ConfigCenter {
+ /**
+ * get the configuration content (json/yaml/properties are all fine, the Agent does not care)
+ */
+ String getConfig(String dataId) throws Exception;
+
+ /**
+ * listen for configuration changes
+ */
+ void addListener(String dataId, ConfigChangeListener listener);
+}
diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigChangeListener.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigChangeListener.java
new file mode 100644
index 000000000..bb0d2ea0f
--- /dev/null
+++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigChangeListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ozhera.log.agent.config;
+
+/**
+ * @author wtt
+ * @date 2025/12/23 14:59
+ * @version 1.0
+ */
+@FunctionalInterface
+public interface ConfigChangeListener {
+
+ void onChange(String newConfig);
+
+}
diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/nacos/NacosConfigCenter.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/nacos/NacosConfigCenter.java
new file mode 100644
index 000000000..afc60d21a
--- /dev/null
+++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/nacos/NacosConfigCenter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ozhera.log.agent.config.nacos;
+
+import com.alibaba.nacos.api.config.ConfigService;
+import com.alibaba.nacos.api.config.listener.Listener;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.ozhera.log.agent.config.ConfigCenter;
+import org.apache.ozhera.log.agent.config.ConfigChangeListener;
+import org.apache.ozhera.log.agent.extension.nacos.NacosConfigUtil;
+
+import java.util.concurrent.Executor;
+
+import static org.apache.ozhera.log.common.Constant.DEFAULT_GROUP_ID;
+
+/**
+ * @author wtt
+ * @date 2025/12/23 15:03
+ * @version 1.0
+ */
+@Slf4j
+public class NacosConfigCenter implements ConfigCenter {
+
+ private final ConfigService configService;
+
+ public NacosConfigCenter(String serverAddr) throws Exception {
+ this.configService = new NacosConfigUtil(serverAddr).getConfigService();
+ }
+
+ @Override
+ public String getConfig(String dataId) throws Exception {
+ return configService.getConfig(dataId, DEFAULT_GROUP_ID, 3000);
+ }
+
+ @Override
+ public void addListener(String dataId, ConfigChangeListener listener) {
+ try {
+ configService.addListener(dataId, DEFAULT_GROUP_ID, new Listener() {
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+
+ @Override
+ public void receiveConfigInfo(String configInfo) {
+ listener.onChange(configInfo);
+ }
+ });
+ } catch (Exception e) {
+ log.error("[NacosConfigCenter] add listener failed", e);
+ }
+ }
+}
diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java
index 36220cf28..f033e6d49 100644
--- a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java
+++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java
@@ -21,6 +21,7 @@
import com.alibaba.nacos.api.config.ConfigFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
+import lombok.Getter;
import static org.apache.ozhera.log.common.Constant.DEFAULT_GROUP_ID;
import static org.apache.ozhera.log.common.Constant.DEFAULT_TIME_OUT_MS;
@@ -31,7 +32,9 @@
* @description
* @date 2025/6/10 16:52
*/
+@Getter
public class NacosConfigUtil {
+
private final ConfigService configService;
public NacosConfigUtil(String nacosAddr) throws NacosException {
@@ -41,4 +44,5 @@ public NacosConfigUtil(String nacosAddr) throws NacosException {
public String getConfig(String dataId) throws NacosException {
return configService.getConfig(dataId, DEFAULT_GROUP_ID, DEFAULT_TIME_OUT_MS);
}
+
}
diff --git a/ozhera-log/log-agent/src/main/resources/config.properties b/ozhera-log/log-agent/src/main/resources/config.properties
index a269eac69..7c9570b3a 100644
--- a/ozhera-log/log-agent/src/main/resources/config.properties
+++ b/ozhera-log/log-agent/src/main/resources/config.properties
@@ -17,6 +17,7 @@ app_max_index=30
nacosAddr=${nacosAddr}
serviceName=${serviceName}
+config.address=${nacosAddr}
log.path=${log.path}
# agent\u91C7\u96C6\u8FDB\u5EA6\u5B58\u50A8\u8DEF\u5F84