From 180826906e0fd32454c9c8bce4ea0fc657bb5ff5 Mon Sep 17 00:00:00 2001 From: wtt <1136220284@qq.com> Date: Wed, 19 Nov 2025 10:51:46 +0800 Subject: [PATCH 1/5] fix: solve the blocking problem caused by serverless startup --- ozhera-log/log-agent/pom.xml | 2 +- .../org/apache/ozhera/log/agent/rpc/task/PingTask.java | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/ozhera-log/log-agent/pom.xml b/ozhera-log/log-agent/pom.xml index 5b62ad15..b2145662 100644 --- a/ozhera-log/log-agent/pom.xml +++ b/ozhera-log/log-agent/pom.xml @@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0 4.0.0 log-agent - 2.2.12-SNAPSHOT + 2.2.13-SNAPSHOT 21 diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/task/PingTask.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/task/PingTask.java index 9af684b4..504496af 100644 --- a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/task/PingTask.java +++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/task/PingTask.java @@ -23,10 +23,10 @@ import com.xiaomi.data.push.rpc.netty.ResponseFuture; import com.xiaomi.data.push.rpc.protocol.RemotingCommand; import com.xiaomi.data.push.task.Task; +import lombok.extern.slf4j.Slf4j; import org.apache.ozhera.log.api.model.meta.AppLogMeta; import org.apache.ozhera.log.api.model.vo.PingReq; import org.apache.ozhera.log.utils.NetUtil; -import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -66,7 +66,10 @@ public PingTask(RpcClient client) { req.setBody(GSON.toJson(ping).getBytes()); client.sendMessage(service, req, PingTask::handleResponse); } - RpcClient.startLatch.countDown(); + //when there is no server, the agent will be blocked when it starts. + if (!client.getServerList().get().isEmpty()) { + RpcClient.startLatch.countDown(); + } } catch (Exception ex) { log.error("ping error:{}", ex.getMessage()); } From 50d803bcaf14181ff9ff50d4ffa20c824b08c9d1 Mon Sep 17 00:00:00 2001 From: wtt <1136220284@qq.com> Date: Tue, 9 Dec 2025 15:22:05 +0800 Subject: [PATCH 2/5] fix: fix pipeline execution logic error --- .../apache/ozhera/log/agent/channel/ChannelServiceImpl.java | 2 +- .../log/agent/channel/WildcardChannelServiceImpl.java | 2 +- .../apache/ozhera/log/agent/channel/pipeline/Pipeline.java | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java index 73ce0461..7ea9bc5a 100644 --- a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java +++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java @@ -479,7 +479,7 @@ private void lastLineRemainSendSchedule(String patternCode) { private void wrapDataToSend(String lineMsg, AtomicReference readResult, String pattern, String patternCode, long ct) { RequestContext requestContext = RequestContext.builder().channelDefine(channelDefine).readResult(readResult.get()).lineMsg(lineMsg).build(); - if (!pipeline.invoke(requestContext)) { + if (pipeline.invoke(requestContext)) { return; } LineMessage lineMessage = createLineMessage(lineMsg, readResult, pattern, patternCode, ct); diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java index f4c17222..5260d50f 100644 --- a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java +++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java @@ -446,7 +446,7 @@ private void scheduleLastLineSender(MLog mLog, AtomicReference readR private void wrapDataToSend(String lineMsg, AtomicReference readResult, String patternCode, long ct) { RequestContext requestContext = RequestContext.builder().channelDefine(channelDefine).readResult(readResult.get()).lineMsg(lineMsg).build(); - if (!pipeline.invoke(requestContext)) { + if (pipeline.invoke(requestContext)) { return; } String filePathName = readResult.get().getFilePathName(); diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Pipeline.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Pipeline.java index 580c852f..c529397f 100644 --- a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Pipeline.java +++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Pipeline.java @@ -54,10 +54,10 @@ public boolean invoke(RequestContext ctx) { continue; } boolean shouldContinue = valve.invoke(ctx); - if (!shouldContinue) { - return false; + if (shouldContinue) { + return true; } } - return true; + return false; } } From 09f4e9e02b7510e09e3278acb6d7b60e3ef42c97 Mon Sep 17 00:00:00 2001 From: wtt <1136220284@qq.com> Date: Wed, 24 Dec 2025 10:11:14 +0800 Subject: [PATCH 3/5] feat: support dynamic configuration management --- ozhera-log/log-agent-server/pom.xml | 6 +- .../AgentCollectProgressProcessor.java | 73 +++++++++++++++---- .../service/DefaultPublishConfigService.java | 58 ++++++++++----- ozhera-log/log-agent/pom.xml | 2 +- .../agent/bootstrap/MiLogAgentBootstrap.java | 25 +++++-- .../agent/channel/ChannelServiceFactory.java | 3 +- .../log/agent/config/AgentConfigManager.java | 73 +++++++++++++++++++ .../ozhera/log/agent/config/ConfigCenter.java | 18 +++++ .../agent/config/ConfigChangeListener.java | 13 ++++ .../agent/config/nacos/NacosConfigCenter.java | 51 +++++++++++++ .../extension/nacos/NacosConfigUtil.java | 4 + .../src/main/resources/config.properties | 1 + 12 files changed, 284 insertions(+), 43 deletions(-) create mode 100644 ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/AgentConfigManager.java create mode 100644 ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigCenter.java create mode 100644 ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigChangeListener.java create mode 100644 ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/nacos/NacosConfigCenter.java diff --git a/ozhera-log/log-agent-server/pom.xml b/ozhera-log/log-agent-server/pom.xml index 5c1f9796..531ac649 100644 --- a/ozhera-log/log-agent-server/pom.xml +++ b/ozhera-log/log-agent-server/pom.xml @@ -66,7 +66,7 @@ http://www.apache.org/licenses/LICENSE-2.0 run.mone rpc - 1.6.4-jdk21-SNAPSHOT + 1.6.5-jdk21-SNAPSHOT ch.qos.logback @@ -114,6 +114,10 @@ http://www.apache.org/licenses/LICENSE-2.0 org.slf4j slf4j-api + + run.mone + rpc + diff --git a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java index a1e33526..b9c91f5f 100644 --- a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java +++ b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java @@ -18,6 +18,8 @@ */ package org.apache.ozhera.log.server.porcessor; +import com.google.common.util.concurrent.RateLimiter; +import com.xiaomi.data.push.rpc.common.CompressionUtil; import com.xiaomi.data.push.rpc.netty.NettyRequestProcessor; import com.xiaomi.data.push.rpc.protocol.RemotingCommand; import com.xiaomi.youpin.docean.Ioc; @@ -48,30 +50,67 @@ public class AgentCollectProgressProcessor implements NettyRequestProcessor { @Resource - DefaultLogProcessCollector processService; + private DefaultLogProcessCollector processService; - private static Version version = new Version(); + private static final RateLimiter ERROR_LIMITER = RateLimiter.create(2); + + private static final Version VERSION = new Version(); @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { - log.debug("received a message from the agent"); + log.debug("Received a message from the agent, remote address: {}", getIp(ctx)); + RemotingCommand response = RemotingCommand.createResponseCommand(Constant.RPCCMD_AGENT_CODE); - String body = new String(request.getBody(), StandardCharsets.UTF_8); + response.setBody((VERSION + Constant.SUCCESS_MESSAGE).getBytes()); + + if (request.getBody() == null || request.getBody().length == 0) { + return response; + } + + if (processService == null && Ioc.ins().containsBean(DefaultLogProcessCollector.class.getCanonicalName())) { + processService = Ioc.ins().getBean(DefaultLogProcessCollector.class); + } + + UpdateLogProcessCmd cmd = parseRequestBody(request.getBody(), ctx); + if (cmd == null) { + return response; + } + + if (processService != null) { + processService.collectLogProcess(cmd); + } + + return response; + } + + /** + * try to parse the request body + */ + private UpdateLogProcessCmd parseRequestBody(byte[] bodyBytes, ChannelHandlerContext ctx) { + String bodyStr = null; + try { - UpdateLogProcessCmd cmd = GSON.fromJson(body, UpdateLogProcessCmd.class); - log.debug("a request from the client sent by the agent:{}", cmd.getIp()); - if (null == processService && Ioc.ins().containsBean(DefaultLogProcessCollector.class.getCanonicalName())) { - processService = Ioc.ins().getBean(DefaultLogProcessCollector.class); - } - if (null != processService) { - processService.collectLogProcess(cmd); + bodyStr = new String(bodyBytes, StandardCharsets.UTF_8); + UpdateLogProcessCmd cmd = GSON.fromJson(bodyStr, UpdateLogProcessCmd.class); + if (StringUtils.isBlank(cmd.getIp())) { + log.warn("Invalid agent request, ip={}, body={}", getIp(ctx), brief(bodyStr)); + return null; } + log.debug("Parsed request from agent: ip={}", cmd.getIp()); + return cmd; + } catch (Exception ignored) { + } + + try { + bodyStr = new String(CompressionUtil.decompress(bodyBytes), StandardCharsets.UTF_8); + UpdateLogProcessCmd cmd = GSON.fromJson(bodyStr, UpdateLogProcessCmd.class); + log.debug("Parsed decompressed request from agent: ip={}", cmd.getIp()); + return cmd; } catch (Exception e) { - log.error("processRequest error,ip:{},body:{}", getIp(ctx), body, e); + assert bodyStr != null; + log.error("processRequest error, ip={}, body={}", getIp(ctx), brief(bodyStr), e); + return null; } - response.setBody(version.toString().getBytes()); - response.setBody(Constant.SUCCESS_MESSAGE.getBytes()); - return response; } @Override @@ -90,4 +129,8 @@ private String getIp(ChannelHandlerContext ctx) { } return StringUtils.EMPTY; } + + private String brief(String body) { + return body.length() > 200 ? body.substring(0, 200) + "..." : body; + } } diff --git a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java index ff97e622..dfd80ae1 100644 --- a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java +++ b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java @@ -35,9 +35,10 @@ import javax.annotation.Resource; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import static org.apache.ozhera.log.common.Constant.GSON; import static org.apache.ozhera.log.common.Constant.SYMBOL_COLON; @@ -60,6 +61,19 @@ public class DefaultPublishConfigService implements PublishConfigService { private volatile boolean configCompressValue = false; + + private static final ExecutorService SEND_CONFIG_EXECUTOR; + + static { + SEND_CONFIG_EXECUTOR = Executors.newThreadPerTaskExecutor( + Thread.ofVirtual() + .name("send-config-vt-", 0) + .uncaughtExceptionHandler((t, e) -> + log.error("send config uncaught exception", e)) + .factory() + ); + } + public void init() { String raw = System.getenv(CONFIG_COMPRESS_KEY); if (StringUtils.isBlank(raw)) { @@ -80,17 +94,25 @@ public void init() { * dubbo interface, the timeout period cannot be too long * * @param agentIp - * @param logCollectMeta + * @param meta */ @Override - public void sengConfigToAgent(String agentIp, LogCollectMeta logCollectMeta) { + public void sengConfigToAgent(String agentIp, LogCollectMeta meta) { + if (StringUtils.isBlank(agentIp) || meta == null) { + return; + } + doSendConfig(agentIp, meta); +// SEND_CONFIG_EXECUTOR.execute(() -> ); + } + + private void doSendConfig(String agentIp, LogCollectMeta meta) { int count = 1; - while (count < 4) { + while (count < 3) { Map logAgentMap = getAgentChannelMap(); - String agentCurrentIp = queryCurrentDockerAgentIP(agentIp, logAgentMap); + String agentCurrentIp = getCorrectDockerAgentIP(agentIp, logAgentMap); if (logAgentMap.containsKey(agentCurrentIp)) { - String sendStr = GSON.toJson(logCollectMeta); - if (CollectionUtils.isNotEmpty(logCollectMeta.getAppLogMetaList())) { + String sendStr = GSON.toJson(meta); + if (CollectionUtils.isNotEmpty(meta.getAppLogMetaList())) { RemotingCommand req = RemotingCommand.createRequestCommand(LogCmd.LOG_REQ); req.setBody(sendStr.getBytes()); @@ -103,17 +125,17 @@ public void sengConfigToAgent(String agentIp, LogCollectMeta logCollectMeta) { RemotingCommand res = rpcServer.sendMessage(logAgentMap.get(agentCurrentIp), req, 10000); started.stop(); String response = new String(res.getBody()); - log.info("The configuration is sent successfully---->{},duration:{}s,agentIp:{}", response, started.elapsed().getSeconds(), agentCurrentIp); + log.info("The configuration is send successfully---->{},duration:{}s,agentIp:{}", response, started.elapsed().getSeconds(), agentCurrentIp); if (Objects.equals(response, "ok")) { break; } } } else { - log.info("The current agent IP is not connected,ip:{},configuration data:{}", agentIp, GSON.toJson(logCollectMeta)); + log.info("The current agent IP is not connected,ip:{},configuration data:{}", agentIp, GSON.toJson(meta)); } - //Retry policy - Retry 4 times, sleep 500 ms each time + //Retry policy - Retry 4 times, sleep 200 ms each time try { - TimeUnit.MILLISECONDS.sleep(500L); + TimeUnit.MILLISECONDS.sleep(200L); } catch (final InterruptedException ignored) { } count++; @@ -124,12 +146,10 @@ public void sengConfigToAgent(String agentIp, LogCollectMeta logCollectMeta) { public List getAllAgentList() { List remoteAddress = Lists.newArrayList(); List ipAddress = Lists.newArrayList(); - AgentContext.ins().map.entrySet().forEach(agentChannelEntry -> { - String key = agentChannelEntry.getKey(); - remoteAddress.add(key); - ipAddress.add(StringUtils.substringBefore(key, SYMBOL_COLON)); - } - ); + AgentContext.ins().map.forEach((key, value) -> { + remoteAddress.add(key); + ipAddress.add(StringUtils.substringBefore(key, SYMBOL_COLON)); + }); if (COUNT_INCR.getAndIncrement() % 200 == 0) { log.info("The set of remote addresses of the connected agent machine is:{}", GSON.toJson(remoteAddress)); } @@ -142,13 +162,13 @@ private Map getAgentChannelMap() { return logAgentMap; } - private String queryCurrentDockerAgentIP(String agentIp, Map logAgentMap) { + private String getCorrectDockerAgentIP(String agentIp, Map logAgentMap) { if (Objects.equals(agentIp, NetUtil.getLocalIp())) { //for Docker handles the agent on the current machine final String tempIp = agentIp; List ipList = getAgentChannelMap().keySet() .stream().filter(ip -> ip.startsWith("172")) - .collect(Collectors.toList()); + .toList(); Optional optionalS = ipList.stream() .filter(ip -> Objects.equals(logAgentMap.get(ip).getIp(), tempIp)) .findFirst(); diff --git a/ozhera-log/log-agent/pom.xml b/ozhera-log/log-agent/pom.xml index d8bb355c..d0362c05 100644 --- a/ozhera-log/log-agent/pom.xml +++ b/ozhera-log/log-agent/pom.xml @@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0 4.0.0 log-agent - 2.2.15-SNAPSHOT + 2.2.16-SNAPSHOT 21 diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/bootstrap/MiLogAgentBootstrap.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/bootstrap/MiLogAgentBootstrap.java index d68c096b..b38e35c8 100644 --- a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/bootstrap/MiLogAgentBootstrap.java +++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/bootstrap/MiLogAgentBootstrap.java @@ -22,15 +22,16 @@ import com.google.common.collect.Maps; import com.xiaomi.data.push.bo.ClientInfo; import com.xiaomi.data.push.rpc.RpcClient; +import com.xiaomi.youpin.docean.Aop; +import com.xiaomi.youpin.docean.Ioc; +import lombok.extern.slf4j.Slf4j; import org.apache.ozhera.log.agent.common.Version; +import org.apache.ozhera.log.agent.config.AgentConfigManager; +import org.apache.ozhera.log.agent.config.ConfigCenter; +import org.apache.ozhera.log.agent.config.nacos.NacosConfigCenter; import org.apache.ozhera.log.agent.rpc.task.PingTask; import org.apache.ozhera.log.common.Config; import org.apache.ozhera.log.utils.NetUtil; -import com.xiaomi.youpin.docean.Aop; -import com.xiaomi.youpin.docean.Ioc; -import lombok.extern.slf4j.Slf4j; - -import java.io.IOException; import static org.apache.ozhera.log.utils.ConfigUtils.getConfigValue; import static org.apache.ozhera.log.utils.ConfigUtils.getDataHashKey; @@ -42,7 +43,7 @@ @Slf4j public class MiLogAgentBootstrap { - public static void main(String[] args) throws IOException { + public static void main(String[] args) throws Exception { String nacosAddr = getConfigValue("nacosAddr"); String serviceName = getConfigValue("serviceName"); log.info("nacosAddr:{},serviceName:{},version:{}", nacosAddr, serviceName, new Version()); @@ -63,10 +64,22 @@ public static void main(String[] args) throws IOException { client.waitStarted(); log.info("create rpc client finish"); Aop.ins().init(Maps.newLinkedHashMap()); + bootstrapAgentConfig(Ioc.ins()); Ioc.ins().putBean(client).init("org.apache.ozhera.log.agent", "com.xiaomi.youpin.docean"); //Because the client life cycle is advanced, the processor needs to be re-registered here client.registerProcessor(); System.in.read(); } + private static void bootstrapAgentConfig(Ioc ioc) throws Exception { + ConfigCenter agentConfigCenter = + new NacosConfigCenter(Config.ins().get("config.address", "")); + + AgentConfigManager agentConfigManager = + new AgentConfigManager(agentConfigCenter); + + ioc.putBean(agentConfigManager); + } + + } diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java index ab9ac759..fafac094 100644 --- a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java +++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.ozhera.log.agent.channel.memory.AgentMemoryService; import org.apache.ozhera.log.agent.channel.pipeline.Pipeline; +import org.apache.ozhera.log.agent.config.AgentConfigManager; import org.apache.ozhera.log.agent.export.MsgExporter; import org.apache.ozhera.log.agent.filter.FilterChain; import org.apache.ozhera.log.agent.input.Input; @@ -57,7 +58,7 @@ public class ChannelServiceFactory { public ChannelServiceFactory(AgentMemoryService agentMemoryService, String memoryBasePath) { this.agentMemoryService = agentMemoryService; this.memoryBasePath = memoryBasePath; - String specialFileSuffix = getConfigValue(DEFAULT_SPECIAL_FILE_SUFFIX_KEY); + String specialFileSuffix = AgentConfigManager.get(DEFAULT_SPECIAL_FILE_SUFFIX_KEY, getConfigValue(DEFAULT_SPECIAL_FILE_SUFFIX_KEY)); if (StringUtils.isNotBlank(specialFileSuffix)) { multiSpecialFileSuffix = Lists.newArrayList(specialFileSuffix.split(SYMBOL_COMMA)); } diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/AgentConfigManager.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/AgentConfigManager.java new file mode 100644 index 00000000..e48f4b5c --- /dev/null +++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/AgentConfigManager.java @@ -0,0 +1,73 @@ +package org.apache.ozhera.log.agent.config; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; + +import java.util.Properties; + +/** + * @author wtt + * @date 2025/12/23 14:13 + * @version 1.0 + */ +@Slf4j +public class AgentConfigManager { + + private static final String AGENT_CONFIG_DATA_ID = "org.apache.ozhera.log.agent.config"; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final Properties CACHE = new Properties(); + + private final ConfigCenter configCenter; + + public AgentConfigManager(ConfigCenter configCenter) { + this.configCenter = configCenter; + init(); + } + + private void init() { + try { + String configContent = configCenter.getConfig(AGENT_CONFIG_DATA_ID); + refresh(configContent); + + configCenter.addListener(AGENT_CONFIG_DATA_ID, this::refresh); + } catch (Exception e) { + log.error("[AgentConfig] init failed", e); + } + } + + private synchronized void refresh(String configContent) { + if (configContent == null || configContent.isEmpty()) { + log.warn("[AgentConfig] empty config, skip refresh"); + return; + } + + try { + Properties newConfig = OBJECT_MAPPER.readValue( + configContent, + new TypeReference() { + } + ); + + CACHE.clear(); + CACHE.putAll(newConfig); + + log.info("[AgentConfig] config refreshed, size={}, content={}", + CACHE.size(), newConfig); + + } catch (Exception e) { + log.error("[AgentConfig] refresh cache failed, content={}", configContent, e); + } + } + + public static String get(String key) { + return CACHE.getProperty(key); + } + + public static String get(String key, String defaultValue) { + return CACHE.getProperty(key, defaultValue); + } + +} diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigCenter.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigCenter.java new file mode 100644 index 00000000..d9c3086a --- /dev/null +++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigCenter.java @@ -0,0 +1,18 @@ +package org.apache.ozhera.log.agent.config; + +/** + * @author wtt + * @date 2025/12/23 14:59 + * @version 1.0 + */ +public interface ConfigCenter { + /** + * get the configuration content (json/yaml/properties are all fine, the Agent does not care) + */ + String getConfig(String dataId) throws Exception; + + /** + * listen for configuration changes + */ + void addListener(String dataId, ConfigChangeListener listener); +} diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigChangeListener.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigChangeListener.java new file mode 100644 index 00000000..086e415c --- /dev/null +++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigChangeListener.java @@ -0,0 +1,13 @@ +package org.apache.ozhera.log.agent.config; + +/** + * @author wtt + * @date 2025/12/23 14:59 + * @version 1.0 + */ +@FunctionalInterface +public interface ConfigChangeListener { + + void onChange(String newConfig); + +} diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/nacos/NacosConfigCenter.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/nacos/NacosConfigCenter.java new file mode 100644 index 00000000..05301409 --- /dev/null +++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/nacos/NacosConfigCenter.java @@ -0,0 +1,51 @@ +package org.apache.ozhera.log.agent.config.nacos; + +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import lombok.extern.slf4j.Slf4j; +import org.apache.ozhera.log.agent.config.ConfigCenter; +import org.apache.ozhera.log.agent.config.ConfigChangeListener; +import org.apache.ozhera.log.agent.extension.nacos.NacosConfigUtil; + +import java.util.concurrent.Executor; + +import static org.apache.ozhera.log.common.Constant.DEFAULT_GROUP_ID; + +/** + * @author wtt + * @date 2025/12/23 15:03 + * @version 1.0 + */ +@Slf4j +public class NacosConfigCenter implements ConfigCenter { + + private final ConfigService configService; + + public NacosConfigCenter(String serverAddr) throws Exception { + this.configService = new NacosConfigUtil(serverAddr).getConfigService(); + } + + @Override + public String getConfig(String dataId) throws Exception { + return configService.getConfig(dataId, DEFAULT_GROUP_ID, 3000); + } + + @Override + public void addListener(String dataId, ConfigChangeListener listener) { + try { + configService.addListener(dataId, DEFAULT_GROUP_ID, new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configInfo) { + listener.onChange(configInfo); + } + }); + } catch (Exception e) { + log.error("[NacosConfigCenter] add listener failed", e); + } + } +} diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java index 36220cf2..f033e6d4 100644 --- a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java +++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java @@ -21,6 +21,7 @@ import com.alibaba.nacos.api.config.ConfigFactory; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.exception.NacosException; +import lombok.Getter; import static org.apache.ozhera.log.common.Constant.DEFAULT_GROUP_ID; import static org.apache.ozhera.log.common.Constant.DEFAULT_TIME_OUT_MS; @@ -31,7 +32,9 @@ * @description * @date 2025/6/10 16:52 */ +@Getter public class NacosConfigUtil { + private final ConfigService configService; public NacosConfigUtil(String nacosAddr) throws NacosException { @@ -41,4 +44,5 @@ public NacosConfigUtil(String nacosAddr) throws NacosException { public String getConfig(String dataId) throws NacosException { return configService.getConfig(dataId, DEFAULT_GROUP_ID, DEFAULT_TIME_OUT_MS); } + } diff --git a/ozhera-log/log-agent/src/main/resources/config.properties b/ozhera-log/log-agent/src/main/resources/config.properties index a269eac6..7c9570b3 100644 --- a/ozhera-log/log-agent/src/main/resources/config.properties +++ b/ozhera-log/log-agent/src/main/resources/config.properties @@ -17,6 +17,7 @@ app_max_index=30 nacosAddr=${nacosAddr} serviceName=${serviceName} +config.address=${nacosAddr} log.path=${log.path} # agent\u91C7\u96C6\u8FDB\u5EA6\u5B58\u50A8\u8DEF\u5F84 From e899da4a42008c99026dba9125e96f144f69b044 Mon Sep 17 00:00:00 2001 From: wtt <1136220284@qq.com> Date: Wed, 24 Dec 2025 10:33:07 +0800 Subject: [PATCH 4/5] chore(license): Add Apache license header to configuration class --- .../log/agent/config/AgentConfigManager.java | 18 ++++++++++ .../ozhera/log/agent/config/ConfigCenter.java | 18 ++++++++++ .../agent/config/ConfigChangeListener.java | 18 ++++++++++ .../agent/config/nacos/NacosConfigCenter.java | 18 ++++++++++ ozhera-monitor/pom.xml | 36 +++++++++---------- ozhera-operator/pom.xml | 36 +++++++++---------- 6 files changed, 108 insertions(+), 36 deletions(-) diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/AgentConfigManager.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/AgentConfigManager.java index e48f4b5c..05af14e4 100644 --- a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/AgentConfigManager.java +++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/AgentConfigManager.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.ozhera.log.agent.config; import com.fasterxml.jackson.core.type.TypeReference; diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigCenter.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigCenter.java index d9c3086a..7f990802 100644 --- a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigCenter.java +++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigCenter.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.ozhera.log.agent.config; /** diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigChangeListener.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigChangeListener.java index 086e415c..bb0d2ea0 100644 --- a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigChangeListener.java +++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigChangeListener.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.ozhera.log.agent.config; /** diff --git a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/nacos/NacosConfigCenter.java b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/nacos/NacosConfigCenter.java index 05301409..afc60d21 100644 --- a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/nacos/NacosConfigCenter.java +++ b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/nacos/NacosConfigCenter.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.ozhera.log.agent.config.nacos; import com.alibaba.nacos.api.config.ConfigService; diff --git a/ozhera-monitor/pom.xml b/ozhera-monitor/pom.xml index 831fbfe3..7c045b38 100644 --- a/ozhera-monitor/pom.xml +++ b/ozhera-monitor/pom.xml @@ -164,24 +164,24 @@ - - org.apache.maven.plugins - maven-gpg-plugin - 1.6 - - - sign-artifacts - deploy - - sign - - - gpg - C962B99B87C5B41E79940E4BE72ADFA74291C3C0 - - - - + + + + + + + + + + + + + + + + + + diff --git a/ozhera-operator/pom.xml b/ozhera-operator/pom.xml index 4c672a01..d761f944 100644 --- a/ozhera-operator/pom.xml +++ b/ozhera-operator/pom.xml @@ -122,24 +122,24 @@ http://www.apache.org/licenses/LICENSE-2.0 - - org.apache.maven.plugins - maven-gpg-plugin - 1.6 - - - sign-artifacts - deploy - - sign - - - gpg - C962B99B87C5B41E79940E4BE72ADFA74291C3C0 - - - - + + + + + + + + + + + + + + + + + + From 58c951702e391c2444a784bd76f304a553115c3e Mon Sep 17 00:00:00 2001 From: wtt <1136220284@qq.com> Date: Wed, 24 Dec 2025 10:35:45 +0800 Subject: [PATCH 5/5] chore(build): update --- ozhera-monitor/pom.xml | 36 ++++++++++++++++++------------------ ozhera-operator/pom.xml | 36 ++++++++++++++++++------------------ 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/ozhera-monitor/pom.xml b/ozhera-monitor/pom.xml index 7c045b38..831fbfe3 100644 --- a/ozhera-monitor/pom.xml +++ b/ozhera-monitor/pom.xml @@ -164,24 +164,24 @@ - - - - - - - - - - - - - - - - - - + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + sign-artifacts + deploy + + sign + + + gpg + C962B99B87C5B41E79940E4BE72ADFA74291C3C0 + + + + diff --git a/ozhera-operator/pom.xml b/ozhera-operator/pom.xml index d761f944..4c672a01 100644 --- a/ozhera-operator/pom.xml +++ b/ozhera-operator/pom.xml @@ -122,24 +122,24 @@ http://www.apache.org/licenses/LICENSE-2.0 - - - - - - - - - - - - - - - - - - + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + sign-artifacts + deploy + + sign + + + gpg + C962B99B87C5B41E79940E4BE72ADFA74291C3C0 + + + +