Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ozhera-log/log-agent-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ http://www.apache.org/licenses/LICENSE-2.0
<dependency>
<groupId>run.mone</groupId>
<artifactId>rpc</artifactId>
<version>1.6.4-jdk21-SNAPSHOT</version>
<version>1.6.5-jdk21-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
Expand Down Expand Up @@ -114,6 +114,10 @@ http://www.apache.org/licenses/LICENSE-2.0
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>run.mone</groupId>
<artifactId>rpc</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.ozhera.log.server.porcessor;

import com.google.common.util.concurrent.RateLimiter;
import com.xiaomi.data.push.rpc.common.CompressionUtil;
import com.xiaomi.data.push.rpc.netty.NettyRequestProcessor;
import com.xiaomi.data.push.rpc.protocol.RemotingCommand;
import com.xiaomi.youpin.docean.Ioc;
Expand Down Expand Up @@ -48,30 +50,67 @@
public class AgentCollectProgressProcessor implements NettyRequestProcessor {

@Resource
DefaultLogProcessCollector processService;
private DefaultLogProcessCollector processService;

private static Version version = new Version();
private static final RateLimiter ERROR_LIMITER = RateLimiter.create(2);

private static final Version VERSION = new Version();

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
log.debug("received a message from the agent");
log.debug("Received a message from the agent, remote address: {}", getIp(ctx));

RemotingCommand response = RemotingCommand.createResponseCommand(Constant.RPCCMD_AGENT_CODE);
String body = new String(request.getBody(), StandardCharsets.UTF_8);
response.setBody((VERSION + Constant.SUCCESS_MESSAGE).getBytes());

if (request.getBody() == null || request.getBody().length == 0) {
return response;
}

if (processService == null && Ioc.ins().containsBean(DefaultLogProcessCollector.class.getCanonicalName())) {
processService = Ioc.ins().getBean(DefaultLogProcessCollector.class);
}

UpdateLogProcessCmd cmd = parseRequestBody(request.getBody(), ctx);
if (cmd == null) {
return response;
}

if (processService != null) {
processService.collectLogProcess(cmd);
}

return response;
}

/**
* try to parse the request body
*/
private UpdateLogProcessCmd parseRequestBody(byte[] bodyBytes, ChannelHandlerContext ctx) {
String bodyStr = null;

try {
UpdateLogProcessCmd cmd = GSON.fromJson(body, UpdateLogProcessCmd.class);
log.debug("a request from the client sent by the agent:{}", cmd.getIp());
if (null == processService && Ioc.ins().containsBean(DefaultLogProcessCollector.class.getCanonicalName())) {
processService = Ioc.ins().getBean(DefaultLogProcessCollector.class);
}
if (null != processService) {
processService.collectLogProcess(cmd);
bodyStr = new String(bodyBytes, StandardCharsets.UTF_8);
UpdateLogProcessCmd cmd = GSON.fromJson(bodyStr, UpdateLogProcessCmd.class);
if (StringUtils.isBlank(cmd.getIp())) {
log.warn("Invalid agent request, ip={}, body={}", getIp(ctx), brief(bodyStr));
return null;
}
log.debug("Parsed request from agent: ip={}", cmd.getIp());
return cmd;
} catch (Exception ignored) {
}

try {
bodyStr = new String(CompressionUtil.decompress(bodyBytes), StandardCharsets.UTF_8);
UpdateLogProcessCmd cmd = GSON.fromJson(bodyStr, UpdateLogProcessCmd.class);
log.debug("Parsed decompressed request from agent: ip={}", cmd.getIp());
return cmd;
} catch (Exception e) {
log.error("processRequest error,ip:{},body:{}", getIp(ctx), body, e);
assert bodyStr != null;
log.error("processRequest error, ip={}, body={}", getIp(ctx), brief(bodyStr), e);
return null;
}
response.setBody(version.toString().getBytes());
response.setBody(Constant.SUCCESS_MESSAGE.getBytes());
return response;
}

@Override
Expand All @@ -90,4 +129,8 @@ private String getIp(ChannelHandlerContext ctx) {
}
return StringUtils.EMPTY;
}

private String brief(String body) {
return body.length() > 200 ? body.substring(0, 200) + "..." : body;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@

import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.ozhera.log.common.Constant.GSON;
import static org.apache.ozhera.log.common.Constant.SYMBOL_COLON;
Expand All @@ -60,6 +61,19 @@ public class DefaultPublishConfigService implements PublishConfigService {

private volatile boolean configCompressValue = false;


private static final ExecutorService SEND_CONFIG_EXECUTOR;

static {
SEND_CONFIG_EXECUTOR = Executors.newThreadPerTaskExecutor(
Thread.ofVirtual()
.name("send-config-vt-", 0)
.uncaughtExceptionHandler((t, e) ->
log.error("send config uncaught exception", e))
.factory()
);
}

public void init() {
String raw = System.getenv(CONFIG_COMPRESS_KEY);
if (StringUtils.isBlank(raw)) {
Expand All @@ -80,17 +94,25 @@ public void init() {
* dubbo interface, the timeout period cannot be too long
*
* @param agentIp
* @param logCollectMeta
* @param meta
*/
@Override
public void sengConfigToAgent(String agentIp, LogCollectMeta logCollectMeta) {
public void sengConfigToAgent(String agentIp, LogCollectMeta meta) {
if (StringUtils.isBlank(agentIp) || meta == null) {
return;
}
doSendConfig(agentIp, meta);
// SEND_CONFIG_EXECUTOR.execute(() -> );
}

private void doSendConfig(String agentIp, LogCollectMeta meta) {
int count = 1;
while (count < 4) {
while (count < 3) {
Map<String, AgentChannel> logAgentMap = getAgentChannelMap();
String agentCurrentIp = queryCurrentDockerAgentIP(agentIp, logAgentMap);
String agentCurrentIp = getCorrectDockerAgentIP(agentIp, logAgentMap);
if (logAgentMap.containsKey(agentCurrentIp)) {
String sendStr = GSON.toJson(logCollectMeta);
if (CollectionUtils.isNotEmpty(logCollectMeta.getAppLogMetaList())) {
String sendStr = GSON.toJson(meta);
if (CollectionUtils.isNotEmpty(meta.getAppLogMetaList())) {
RemotingCommand req = RemotingCommand.createRequestCommand(LogCmd.LOG_REQ);
req.setBody(sendStr.getBytes());

Expand All @@ -103,17 +125,17 @@ public void sengConfigToAgent(String agentIp, LogCollectMeta logCollectMeta) {
RemotingCommand res = rpcServer.sendMessage(logAgentMap.get(agentCurrentIp), req, 10000);
started.stop();
String response = new String(res.getBody());
log.info("The configuration is sent successfully---->{},duration:{}s,agentIp:{}", response, started.elapsed().getSeconds(), agentCurrentIp);
log.info("The configuration is send successfully---->{},duration:{}s,agentIp:{}", response, started.elapsed().getSeconds(), agentCurrentIp);
if (Objects.equals(response, "ok")) {
break;
}
}
} else {
log.info("The current agent IP is not connected,ip:{},configuration data:{}", agentIp, GSON.toJson(logCollectMeta));
log.info("The current agent IP is not connected,ip:{},configuration data:{}", agentIp, GSON.toJson(meta));
}
//Retry policy - Retry 4 times, sleep 500 ms each time
//Retry policy - Retry 4 times, sleep 200 ms each time
try {
TimeUnit.MILLISECONDS.sleep(500L);
TimeUnit.MILLISECONDS.sleep(200L);
} catch (final InterruptedException ignored) {
}
count++;
Expand All @@ -124,12 +146,10 @@ public void sengConfigToAgent(String agentIp, LogCollectMeta logCollectMeta) {
public List<String> getAllAgentList() {
List<String> remoteAddress = Lists.newArrayList();
List<String> ipAddress = Lists.newArrayList();
AgentContext.ins().map.entrySet().forEach(agentChannelEntry -> {
String key = agentChannelEntry.getKey();
remoteAddress.add(key);
ipAddress.add(StringUtils.substringBefore(key, SYMBOL_COLON));
}
);
AgentContext.ins().map.forEach((key, value) -> {
remoteAddress.add(key);
ipAddress.add(StringUtils.substringBefore(key, SYMBOL_COLON));
});
if (COUNT_INCR.getAndIncrement() % 200 == 0) {
log.info("The set of remote addresses of the connected agent machine is:{}", GSON.toJson(remoteAddress));
}
Expand All @@ -142,13 +162,13 @@ private Map<String, AgentChannel> getAgentChannelMap() {
return logAgentMap;
}

private String queryCurrentDockerAgentIP(String agentIp, Map<String, AgentChannel> logAgentMap) {
private String getCorrectDockerAgentIP(String agentIp, Map<String, AgentChannel> logAgentMap) {
if (Objects.equals(agentIp, NetUtil.getLocalIp())) {
//for Docker handles the agent on the current machine
final String tempIp = agentIp;
List<String> ipList = getAgentChannelMap().keySet()
.stream().filter(ip -> ip.startsWith("172"))
.collect(Collectors.toList());
.toList();
Optional<String> optionalS = ipList.stream()
.filter(ip -> Objects.equals(logAgentMap.get(ip).getIp(), tempIp))
.findFirst();
Expand Down
2 changes: 1 addition & 1 deletion ozhera-log/log-agent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0
<modelVersion>4.0.0</modelVersion>

<artifactId>log-agent</artifactId>
<version>2.2.15-SNAPSHOT</version>
<version>2.2.16-SNAPSHOT</version>

<properties>
<maven.compiler.source>21</maven.compiler.source>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@
import com.google.common.collect.Maps;
import com.xiaomi.data.push.bo.ClientInfo;
import com.xiaomi.data.push.rpc.RpcClient;
import com.xiaomi.youpin.docean.Aop;
import com.xiaomi.youpin.docean.Ioc;
import lombok.extern.slf4j.Slf4j;
import org.apache.ozhera.log.agent.common.Version;
import org.apache.ozhera.log.agent.config.AgentConfigManager;
import org.apache.ozhera.log.agent.config.ConfigCenter;
import org.apache.ozhera.log.agent.config.nacos.NacosConfigCenter;
import org.apache.ozhera.log.agent.rpc.task.PingTask;
import org.apache.ozhera.log.common.Config;
import org.apache.ozhera.log.utils.NetUtil;
import com.xiaomi.youpin.docean.Aop;
import com.xiaomi.youpin.docean.Ioc;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;

import static org.apache.ozhera.log.utils.ConfigUtils.getConfigValue;
import static org.apache.ozhera.log.utils.ConfigUtils.getDataHashKey;
Expand All @@ -42,7 +43,7 @@
@Slf4j
public class MiLogAgentBootstrap {

public static void main(String[] args) throws IOException {
public static void main(String[] args) throws Exception {
String nacosAddr = getConfigValue("nacosAddr");
String serviceName = getConfigValue("serviceName");
log.info("nacosAddr:{},serviceName:{},version:{}", nacosAddr, serviceName, new Version());
Expand All @@ -63,10 +64,22 @@ public static void main(String[] args) throws IOException {
client.waitStarted();
log.info("create rpc client finish");
Aop.ins().init(Maps.newLinkedHashMap());
bootstrapAgentConfig(Ioc.ins());
Ioc.ins().putBean(client).init("org.apache.ozhera.log.agent", "com.xiaomi.youpin.docean");
//Because the client life cycle is advanced, the processor needs to be re-registered here
client.registerProcessor();
System.in.read();
}

private static void bootstrapAgentConfig(Ioc ioc) throws Exception {
ConfigCenter agentConfigCenter =
new NacosConfigCenter(Config.ins().get("config.address", ""));

AgentConfigManager agentConfigManager =
new AgentConfigManager(agentConfigCenter);

ioc.putBean(agentConfigManager);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.ozhera.log.agent.channel.memory.AgentMemoryService;
import org.apache.ozhera.log.agent.channel.pipeline.Pipeline;
import org.apache.ozhera.log.agent.config.AgentConfigManager;
import org.apache.ozhera.log.agent.export.MsgExporter;
import org.apache.ozhera.log.agent.filter.FilterChain;
import org.apache.ozhera.log.agent.input.Input;
Expand Down Expand Up @@ -57,7 +58,7 @@ public class ChannelServiceFactory {
public ChannelServiceFactory(AgentMemoryService agentMemoryService, String memoryBasePath) {
this.agentMemoryService = agentMemoryService;
this.memoryBasePath = memoryBasePath;
String specialFileSuffix = getConfigValue(DEFAULT_SPECIAL_FILE_SUFFIX_KEY);
String specialFileSuffix = AgentConfigManager.get(DEFAULT_SPECIAL_FILE_SUFFIX_KEY, getConfigValue(DEFAULT_SPECIAL_FILE_SUFFIX_KEY));
if (StringUtils.isNotBlank(specialFileSuffix)) {
multiSpecialFileSuffix = Lists.newArrayList(specialFileSuffix.split(SYMBOL_COMMA));
}
Expand Down
Loading
Loading