Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions src/main/java/io/github/dibog/AwsCWEventDump.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private LogStream findLogStream(String aGroupName, String aStreamName) {
return null;
}

private void log(Collection<ILoggingEvent> aEvents) {
private void log(List<ILoggingEvent> logbackEvents) {
if(dateFormat!=null) {
dateHolder.setTime(System.currentTimeMillis());
String newStreamName = streamName + "-" + dateFormat.format(dateHolder);
Expand All @@ -174,9 +174,17 @@ else if (awsLogs==null) {
closeStream();
openStream(streamName);
}

// Need to sort events by timestamp prior to sending to CloudWatch
Collections.sort (logbackEvents, new Comparator<ILoggingEvent>() {
@Override
public int compare(ILoggingEvent a, ILoggingEvent b) {
return Long.compare (a.getTimeStamp(), b.getTimeStamp());
}
});

Collection<InputLogEvent> events = new ArrayList<>(aEvents.size());
for(ILoggingEvent event : aEvents) {
Collection<InputLogEvent> events = new ArrayList<>(logbackEvents.size());
for(ILoggingEvent event : logbackEvents) {
if(event.getLoggerContextVO()!=null) {
events.add(new InputLogEvent()
.withTimestamp(event.getTimeStamp())
Expand Down Expand Up @@ -205,23 +213,23 @@ public void queue(ILoggingEvent event) {
}

public void run() {
List<ILoggingEvent> collections = new LinkedList<ILoggingEvent>();
List<ILoggingEvent> logbackEvents = new LinkedList<>();
LoggerContextVO context = null;
while(!done) {

try {
int[] nbs = queue.drainTo(collections);
if(context==null && !collections.isEmpty()) {
context = collections.get(0).getLoggerContextVO();
int[] nbs = queue.drainTo(logbackEvents);
if(context==null && !logbackEvents.isEmpty()) {
context = logbackEvents.get(0).getLoggerContextVO();
}

int msgProcessed = nbs[0];
int msgSkipped = nbs[1];
if(context!=null && msgSkipped>0) {
collections.add(new SkippedEvent(msgSkipped, context));
logbackEvents.add(new SkippedEvent(msgSkipped, context));
}
log(collections);
collections.clear();
log(logbackEvents);
logbackEvents.clear();
}
catch(InterruptedException e) {
// ignoring
Expand Down
29 changes: 15 additions & 14 deletions src/main/java/io/github/dibog/AwsLogAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,19 @@ public void setCreateLogGroup(boolean createLogGroup) {
}

@Override
protected void append(ILoggingEvent event) {

AwsCWEventDump queue = dump;
if (dump != null) {
event.prepareForDeferredProcessing();
dump.queue(event);
}
protected synchronized void append(ILoggingEvent event) {

event.prepareForDeferredProcessing();
dump.queue(event);
}

@Override
public void start() {
dump = new AwsCWEventDump(this );
public synchronized void start() {
if (isStarted()) {
return;
}

dump = new AwsCWEventDump(this);

Thread t = new Thread(dump);
t.setDaemon(true);
Expand All @@ -89,12 +89,13 @@ public void start() {
}

@Override
public void stop() {
super.stop();
if(dump!=null) {
// flush it
dump.shutdown();
public synchronized void stop() {
if (!isStarted()) {
return;
}

super.stop();
dump.shutdown();
dump = null;
}
}