From 29a7d8663b6176c944defc2a93702008755163ad Mon Sep 17 00:00:00 2001 From: Jindrich Novy Date: Mon, 1 Dec 2025 12:28:33 +0100 Subject: [PATCH] Replace pthread-based logging buffer with signal masking for thread safety This commit replaces the pthread mutex-based synchronization in the async logging buffer with signal masking, eliminating the pthread dependency while maintaining thread safety. Fixes: #38 Signed-off-by: Jindrich Novy --- Makefile | 2 +- nix/derivation.nix | 2 +- src/cli.c | 9 + src/conmon.c | 3 + src/ctr_exit.c | 9 + src/ctr_logging.c | 7 + src/ctr_logging_buffer.c | 482 ++++++++++++++++++++++++++ src/ctr_logging_buffer.h | 54 +++ src/ctr_stdio.c | 5 +- test/10-async-logging.bats | 75 ++++ test/11-async-logging-edge-cases.bats | 85 +++++ 11 files changed, 729 insertions(+), 4 deletions(-) create mode 100644 src/ctr_logging_buffer.c create mode 100644 src/ctr_logging_buffer.h create mode 100644 test/10-async-logging.bats create mode 100644 test/11-async-logging-edge-cases.bats diff --git a/Makefile b/Makefile index 4ccf7f91..c00001bf 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ LIBEXECDIR ?= ${PREFIX}/libexec PKG_CONFIG ?= pkg-config HEADERS := $(wildcard src/*.h) -OBJS := src/conmon.o src/cmsg.o src/ctr_logging.o src/utils.o src/cli.o src/globals.o src/cgroup.o src/conn_sock.o src/oom.o src/ctrl.o src/ctr_stdio.o src/parent_pipe_fd.o src/ctr_exit.o src/runtime_args.o src/close_fds.o src/seccomp_notify.o +OBJS := src/conmon.o src/cmsg.o src/ctr_logging.o src/ctr_logging_buffer.o src/utils.o src/cli.o src/globals.o src/cgroup.o src/conn_sock.o src/oom.o src/ctrl.o src/ctr_stdio.o src/parent_pipe_fd.o src/ctr_exit.o src/runtime_args.o src/close_fds.o src/seccomp_notify.o MAKEFILE_PATH := $(dir $(abspath $(lastword $(MAKEFILE_LIST)))) diff --git a/nix/derivation.nix b/nix/derivation.nix index 029a7432..7123dd9a 100644 --- a/nix/derivation.nix +++ b/nix/derivation.nix @@ -24,7 +24,7 @@ with pkgs; stdenv.mkDerivation rec { # Static builds will use PKG_CONFIG_PATH approach instead ]; prePatch = '' - export CFLAGS='-static -pthread' + export CFLAGS='-static' export LDFLAGS='-s -w -static-libgcc -static' export EXTRA_LDFLAGS='-s -w -linkmode external -extldflags "-static -lm"' ${lib.optionalString (!enableSystemd) "export DISABLE_SYSTEMD=1"} diff --git a/src/cli.c b/src/cli.c index de1ddabe..24b54d58 100644 --- a/src/cli.c +++ b/src/cli.c @@ -1,6 +1,7 @@ #include "cli.h" #include "globals.h" #include "ctr_logging.h" +#include "ctr_logging_buffer.h" #include "config.h" #include "utils.h" @@ -224,6 +225,14 @@ void process_cli() configure_log_drivers(opt_log_path, opt_log_size_max, opt_log_global_size_max, opt_cid, opt_name, opt_log_tag, opt_log_labels); + /* Initialize async logging for improved log performance */ + if (!init_async_logging()) { + nwarn("Failed to initialize async logging, falling back to direct logging"); + } else { + /* Register cleanup handler for async logging */ + atexit(shutdown_async_logging); + } + /* Warn if --no-container-partial-message is used without journald logging */ if (opt_no_container_partial_message && !logging_is_journald_enabled()) { nwarnf("--no-container-partial-message has no effect without journald log driver"); diff --git a/src/conmon.c b/src/conmon.c index d0bb2965..0eaf56e5 100644 --- a/src/conmon.c +++ b/src/conmon.c @@ -14,6 +14,7 @@ #include "conn_sock.h" #include "ctrl.h" #include "ctr_stdio.h" +#include "ctr_logging_buffer.h" #include "config.h" #include "parent_pipe_fd.h" #include "ctr_exit.h" @@ -418,6 +419,8 @@ int main(int argc, char *argv[]) if (mainfd_stderr >= 0) { g_unix_fd_add(mainfd_stderr, G_IO_IN, stdio_cb, GINT_TO_POINTER(STDERR_PIPE)); } + /* Setup log buffer timer for async flushing */ + setup_log_timer_in_main_loop(); if (opt_timeout > 0) { g_timeout_add_seconds(opt_timeout, timeout_cb, NULL); diff --git a/src/ctr_exit.c b/src/ctr_exit.c index eaad82ee..2179ae46 100644 --- a/src/ctr_exit.c +++ b/src/ctr_exit.c @@ -6,6 +6,7 @@ #include "parent_pipe_fd.h" #include "globals.h" #include "ctr_logging.h" +#include "ctr_logging_buffer.h" #include "close_fds.h" #include "oom.h" @@ -129,6 +130,8 @@ void runtime_exit_cb(G_GNUC_UNUSED GPid pid, int status, G_GNUC_UNUSED gpointer { runtime_status = status; create_pid = -1; + /* Flush logs before runtime exit to ensure CRI-O sees all logs */ + flush_log_buffer(); g_main_loop_quit(main_loop); } @@ -137,6 +140,10 @@ void container_exit_cb(G_GNUC_UNUSED GPid pid, int status, G_GNUC_UNUSED gpointe if (get_exit_status(status) != 0) { ninfof("container %d exited with status %d", pid, get_exit_status(status)); } + + /* Force flush async log buffer before container exit to ensure CRI-O sees all logs */ + flush_log_buffer(); + container_status = status; container_pid = -1; /* In the case of a quickly exiting exec command, the container exit callback @@ -149,6 +156,8 @@ void container_exit_cb(G_GNUC_UNUSED GPid pid, int status, G_GNUC_UNUSED gpointe return; } + /* Final flush before main loop quits */ + flush_log_buffer(); g_main_loop_quit(main_loop); } diff --git a/src/ctr_logging.c b/src/ctr_logging.c index 56c83b6c..75158441 100644 --- a/src/ctr_logging.c +++ b/src/ctr_logging.c @@ -1,5 +1,6 @@ #define _GNU_SOURCE #include "ctr_logging.h" +#include "ctr_logging_buffer.h" #include "cli.h" #include "config.h" #include @@ -761,6 +762,9 @@ static void set_k8s_timestamp(char *buf, ssize_t buflen, const char *pipename) /* Force closing any open FD. */ void close_logging_fds(void) { + /* Shutdown async logging system */ + shutdown_async_logging(); + if (k8s_log_fd >= 0) close(k8s_log_fd); k8s_log_fd = -1; @@ -1243,6 +1247,9 @@ static void reopen_k8s_file(void) void sync_logs(void) { + /* Flush any buffered logs before syncing */ + flush_log_buffer(); + /* Sync the logs to disk */ if (k8s_log_fd > 0) if (fsync(k8s_log_fd) < 0) diff --git a/src/ctr_logging_buffer.c b/src/ctr_logging_buffer.c new file mode 100644 index 00000000..4b5cc13c --- /dev/null +++ b/src/ctr_logging_buffer.c @@ -0,0 +1,482 @@ +#define _GNU_SOURCE +#include "ctr_logging_buffer.h" +#include "ctr_logging.h" +#include "utils.h" +#include +#include +#include +#include +#include +#include +#include +#include + +/* Global buffer manager */ +static _Atomic(async_log_buffer_t *) g_log_buffer = NULL; +static sigset_t critical_signals; +static bool signal_mask_initialized = false; +static _Atomic(guint) timer_source_id = 0; + +/* Forward declarations */ +static bool add_entry_to_buffer_locked(stdpipe_t pipe, char *buf, ssize_t size); +static bool should_flush_buffer_locked(void); +static size_t prepare_buffer_flush_locked(log_entry_t *dest_buffer); +static void perform_flush_io(log_entry_t *entries, size_t count); +static void flush_buffer_contents_locked(void); +static int enter_critical_section(sigset_t *old_mask); +static void exit_critical_section(const sigset_t *old_mask); +static void setup_flush_timer(void); +static void cleanup_flush_timer(void); + +/* Initialize the signal mask for critical sections */ +static void init_signal_mask(void) +{ + if (signal_mask_initialized) + return; + + sigemptyset(&critical_signals); + /* Block all signals that could corrupt buffer state, but keep critical sections minimal */ + sigaddset(&critical_signals, SIGTERM); + sigaddset(&critical_signals, SIGINT); + sigaddset(&critical_signals, SIGCHLD); + sigaddset(&critical_signals, SIGPIPE); + sigaddset(&critical_signals, SIGALRM); + sigaddset(&critical_signals, SIGUSR1); + sigaddset(&critical_signals, SIGUSR2); + signal_mask_initialized = true; +} + +/* Enter critical section by blocking signals */ +static int enter_critical_section(sigset_t *old_mask) +{ + /* Safety check: signal mask must be initialized before use */ + if (!signal_mask_initialized) { + nwarnf("Signal mask not initialized - this is a programming error"); + return -1; + } + + if (sigprocmask(SIG_BLOCK, &critical_signals, old_mask) != 0) { + nwarnf("Failed to block signals: %m"); + return -1; + } + return 0; +} + +/* Exit critical section by restoring signal mask */ +static void exit_critical_section(const sigset_t *old_mask) +{ + if (sigprocmask(SIG_SETMASK, old_mask, NULL) != 0) { + nwarnf("Failed to restore signal mask: %m"); + } +} + +bool init_async_logging(void) +{ + /* Initialize signal mask once at startup */ + init_signal_mask(); + + sigset_t old_mask; + if (enter_critical_section(&old_mask) != 0) { + return false; + } + + /* Check if already initialized */ + async_log_buffer_t *current = atomic_load_explicit(&g_log_buffer, memory_order_acquire); + if (current != NULL) { + exit_critical_section(&old_mask); + nwarn("Async logging already initialized"); + return true; + } + + /* Allocate buffer */ + async_log_buffer_t *new_buffer = g_malloc0(sizeof(async_log_buffer_t)); + if (!new_buffer) { + exit_critical_section(&old_mask); + nwarn("Failed to allocate log buffer"); + return false; + } + + /* Initialize simple fields */ + new_buffer->count = 0; + new_buffer->total_size = 0; + new_buffer->entries_written = 0; + new_buffer->entries_dropped = 0; + atomic_init(&new_buffer->initialized, false); + atomic_init(&new_buffer->shutdown_requested, false); + + /* Mark as initialized */ + atomic_store_explicit(&new_buffer->initialized, true, memory_order_release); + + /* Publish the buffer with release semantics */ + atomic_store_explicit(&g_log_buffer, new_buffer, memory_order_release); + + exit_critical_section(&old_mask); + + /* Setup timer outside critical section to avoid deadlock */ + setup_flush_timer(); + + ninfo("Async logging initialized successfully"); + return true; +} + +void shutdown_async_logging(void) +{ + sigset_t old_mask; + if (enter_critical_section(&old_mask) != 0) { + return; + } + + async_log_buffer_t *buffer = atomic_load_explicit(&g_log_buffer, memory_order_acquire); + if (!buffer) { + exit_critical_section(&old_mask); + return; + } + + /* Mark shutdown to prevent new operations */ + atomic_store_explicit(&buffer->shutdown_requested, true, memory_order_release); + + /* Clean up timer first (outside critical section to avoid deadlock) */ + exit_critical_section(&old_mask); + cleanup_flush_timer(); + + /* Re-enter critical section for final operations */ + if (enter_critical_section(&old_mask) != 0) { + return; + } + + buffer = atomic_load_explicit(&g_log_buffer, memory_order_acquire); + if (!buffer) { + exit_critical_section(&old_mask); + return; + } + + /* Perform final flush */ + flush_buffer_contents_locked(); + + /* Get final statistics before cleanup */ + size_t written = buffer->entries_written; + size_t dropped = buffer->entries_dropped; + + /* Clear the global pointer with release semantics */ + atomic_store_explicit(&g_log_buffer, NULL, memory_order_release); + + exit_critical_section(&old_mask); + + ninfof("Async logging shutdown. Entries written: %zu, dropped: %zu", written, dropped); + free(buffer); +} + +bool write_to_logs_buffered(stdpipe_t pipe, char *buf, ssize_t num_read) +{ + /* Validate input parameters first */ + if (!buf || num_read < 0) { + return false; + } + + sigset_t old_mask; + if (enter_critical_section(&old_mask) != 0) { + /* Failed to enter critical section, fallback to direct logging */ + return write_to_logs(pipe, buf, num_read); + } + + /* Handle empty writes explicitly - these are valid drain operations */ + if (num_read == 0) { + exit_critical_section(&old_mask); + return write_to_logs(pipe, buf, num_read); + } + + /* Check buffer state atomically in critical section */ + async_log_buffer_t *buffer = atomic_load_explicit(&g_log_buffer, memory_order_acquire); + bool initialized = buffer ? atomic_load_explicit(&buffer->initialized, memory_order_acquire) : false; + bool shutdown = buffer ? atomic_load_explicit(&buffer->shutdown_requested, memory_order_acquire) : false; + + if (!buffer || !initialized || shutdown) { + exit_critical_section(&old_mask); + return write_to_logs(pipe, buf, num_read); + } + + /* Try to add to buffer */ + bool success = add_entry_to_buffer_locked(pipe, buf, num_read); + + if (!success) { + /* Buffer full - flush and try again */ + flush_buffer_contents_locked(); + success = add_entry_to_buffer_locked(pipe, buf, num_read); + } + + if (!success) { + /* Still can't fit - drop entry and fallback to direct write */ + buffer->entries_dropped++; + exit_critical_section(&old_mask); + nwarnf("Log buffer full, dropping entry and writing directly"); + return write_to_logs(pipe, buf, num_read); + } + + /* Check if we should flush immediately */ + bool should_flush = should_flush_buffer_locked(); + if (should_flush) { + flush_buffer_contents_locked(); + } + + exit_critical_section(&old_mask); + return true; +} + +void flush_log_buffer(void) +{ + size_t flush_count = 0; + log_entry_t *temp_entries = NULL; + + /* Critical section: only copy buffer contents, don't do I/O */ + sigset_t old_mask; + if (enter_critical_section(&old_mask) != 0) { + return; + } + + async_log_buffer_t *buffer = atomic_load_explicit(&g_log_buffer, memory_order_acquire); + bool initialized = buffer ? atomic_load_explicit(&buffer->initialized, memory_order_acquire) : false; + bool shutdown = buffer ? atomic_load_explicit(&buffer->shutdown_requested, memory_order_acquire) : false; + + if (buffer && initialized && !shutdown && buffer->count > 0) { + temp_entries = buffer->flush_buffer; /* Use pre-allocated buffer */ + flush_count = prepare_buffer_flush_locked(temp_entries); + } + + exit_critical_section(&old_mask); + + /* Now do I/O outside critical section using the helper */ + if (flush_count > 0) { + perform_flush_io(temp_entries, flush_count); + } +} + +/* Prepare buffer for flush - must be called within critical section */ +static size_t prepare_buffer_flush_locked(log_entry_t *dest_buffer) +{ + async_log_buffer_t *buffer = atomic_load_explicit(&g_log_buffer, memory_order_acquire); + if (!buffer || buffer->count == 0) { + return 0; + } + + /* Copy buffer contents to destination */ + size_t flush_count = buffer->count; + memcpy(dest_buffer, buffer->entries, flush_count * sizeof(log_entry_t)); + + /* Clear buffer immediately */ + buffer->count = 0; + buffer->total_size = 0; + + return flush_count; +} + +/* Perform I/O for flushed entries - should be called OUTSIDE critical section */ +static void perform_flush_io(log_entry_t *entries, size_t count) +{ + if (!entries || count == 0) { + return; + } + + size_t successful_writes = 0; + for (size_t i = 0; i < count; i++) { + log_entry_t *entry = &entries[i]; + if (entry->size > 0) { + if (write_to_logs(entry->pipe, entry->data, entry->size)) { + successful_writes++; + } + } + } + + /* Update statistics in a single critical section */ + if (successful_writes > 0) { + sigset_t old_mask; + if (enter_critical_section(&old_mask) == 0) { + async_log_buffer_t *buffer = atomic_load_explicit(&g_log_buffer, memory_order_acquire); + if (buffer) { + buffer->entries_written += successful_writes; + } + exit_critical_section(&old_mask); + } + } +} + +/* Flush buffer contents when already in critical section + * This function temporarily exits the critical section to perform I/O, + * then re-enters to maintain the caller's expectation of holding the lock. + */ +static void flush_buffer_contents_locked(void) +{ + async_log_buffer_t *buffer = atomic_load_explicit(&g_log_buffer, memory_order_acquire); + if (!buffer || buffer->count == 0) { + return; + } + + /* Prepare flush while in critical section */ + log_entry_t *temp_entries = buffer->flush_buffer; + size_t flush_count = prepare_buffer_flush_locked(temp_entries); + + if (flush_count == 0) { + return; + } + + /* Temporarily exit critical section for I/O */ + sigset_t old_mask; + if (sigprocmask(SIG_UNBLOCK, &critical_signals, &old_mask) == 0) { + /* Perform I/O outside critical section - perform_flush_io handles its own + * critical section for statistics updates */ + size_t successful_writes = 0; + for (size_t i = 0; i < flush_count; i++) { + log_entry_t *entry = &temp_entries[i]; + if (entry->size > 0) { + if (write_to_logs(entry->pipe, entry->data, entry->size)) { + successful_writes++; + } + } + } + + /* Re-enter critical section before returning to maintain caller's expectation */ + sigprocmask(SIG_SETMASK, &old_mask, NULL); + + /* Update statistics now that we're back in critical section */ + buffer = atomic_load_explicit(&g_log_buffer, memory_order_acquire); + if (buffer) { + buffer->entries_written += successful_writes; + } + } +} + +static bool add_entry_to_buffer_locked(stdpipe_t pipe, char *buf, ssize_t size) +{ + async_log_buffer_t *buffer = atomic_load_explicit(&g_log_buffer, memory_order_acquire); + if (!buffer) { + return false; + } + + /* Buffer state is guaranteed stable in critical section */ + if (buffer->count >= MAX_LOG_ENTRIES) { + return false; + } + + /* Validate size constraints - empty writes should not reach here */ + if (size < 0) { + nwarnf("Invalid negative size %zd passed to add_entry_to_buffer_locked", size); + return false; + } + + if (size == 0) { + nwarn("Empty write reached add_entry_to_buffer_locked - this should not happen"); + return false; + } + + /* Check simple buffer size first for performance */ + if ((size_t)size > BUFFER_SIZE) { + nwarn("Log entry too large, cannot fit in buffer"); + return false; + } + + /* Check if adding this would exceed buffer capacity */ + if (buffer->total_size + (size_t)size > BUFFER_SIZE) { + return false; /* Would exceed buffer size */ + } + + /* Check for overflow (size is already positive here) */ + if (buffer->total_size > SIZE_MAX - (size_t)size) { + return false; /* Would overflow */ + } + + ssize_t actual_size = size; + if (size >= STDIO_BUF_SIZE) { + nwarn("Log entry too large, truncating to fit entry buffer"); + actual_size = ENTRY_TRUNCATION_SIZE; + } + + /* Add entry to buffer */ + log_entry_t *entry = &buffer->entries[buffer->count]; + entry->pipe = pipe; + entry->size = actual_size; + + /* Copy data safely with strict bounds checking */ + memcpy(entry->data, buf, (size_t)actual_size); + + /* Always null-terminate for safety - fix off-by-one */ + if (actual_size < STDIO_BUF_SIZE - 1) { + entry->data[actual_size] = '\0'; + } else { + entry->data[STDIO_BUF_SIZE - 1] = '\0'; + } + + /* Update counters */ + buffer->total_size += (size_t)actual_size; + buffer->count++; + + return true; +} + +static bool should_flush_buffer_locked(void) +{ + async_log_buffer_t *buffer = atomic_load_explicit(&g_log_buffer, memory_order_acquire); + if (!buffer) { + return false; + } + + /* Flush if buffer is getting full (50% threshold) */ + return (buffer->count >= FLUSH_THRESHOLD_ENTRIES) || (buffer->total_size >= FLUSH_THRESHOLD_BYTES); +} + +gboolean log_timer_cb(gpointer user_data) +{ + (void)user_data; /* unused */ + + /* Check if buffer is still valid and not shutting down */ + sigset_t old_mask; + if (enter_critical_section(&old_mask) != 0) { + return G_SOURCE_REMOVE; /* Can't acquire lock, stop timer */ + } + + async_log_buffer_t *buffer = atomic_load_explicit(&g_log_buffer, memory_order_acquire); + bool initialized = buffer ? atomic_load_explicit(&buffer->initialized, memory_order_acquire) : false; + bool shutdown = buffer ? atomic_load_explicit(&buffer->shutdown_requested, memory_order_acquire) : false; + + bool should_continue = (buffer && initialized && !shutdown); + + if (!should_continue) { + exit_critical_section(&old_mask); + return G_SOURCE_REMOVE; /* Stop the timer */ + } + + /* Flush the log buffer while in critical section */ + flush_buffer_contents_locked(); + + exit_critical_section(&old_mask); + return G_SOURCE_CONTINUE; +} + +/* Timer management functions */ +static void setup_flush_timer(void) +{ + /* Use GLib timeout instead of timerfd for simplicity */ + guint current_id = atomic_load_explicit(&timer_source_id, memory_order_acquire); + if (current_id == 0) { + guint new_id = g_timeout_add(FLUSH_INTERVAL_MS, log_timer_cb, NULL); + if (new_id == 0) { + nwarn("Failed to add log timer to main loop, async logging will not flush periodically"); + } else { + atomic_store_explicit(&timer_source_id, new_id, memory_order_release); + ninfo("Log timer successfully integrated with main loop"); + } + } +} + +static void cleanup_flush_timer(void) +{ + guint current_id = atomic_load_explicit(&timer_source_id, memory_order_acquire); + if (current_id != 0) { + g_source_remove(current_id); + atomic_store_explicit(&timer_source_id, 0, memory_order_release); + } +} + +void setup_log_timer_in_main_loop(void) +{ + /* Timer is now setup automatically during initialization */ + ninfo("Log timer is handled automatically during async logging initialization"); +} \ No newline at end of file diff --git a/src/ctr_logging_buffer.h b/src/ctr_logging_buffer.h new file mode 100644 index 00000000..66ebaef7 --- /dev/null +++ b/src/ctr_logging_buffer.h @@ -0,0 +1,54 @@ +#pragma once +#if !defined(CTR_LOGGING_BUFFER_H) +#define CTR_LOGGING_BUFFER_H + +#include "utils.h" +#include "config.h" +#include +#include +#include + +/* Signal-safe async I/O buffering for log writes to prevent fsync() blocking */ + +#define BUFFER_SIZE (STDIO_BUF_SIZE * 4) /* 32KB buffer */ +#define MAX_LOG_ENTRIES 512 /* Max entries per buffer */ +#define FLUSH_INTERVAL_MS 50 /* Flush every 50ms for better CRI-O compatibility */ +#define FLUSH_THRESHOLD_ENTRIES (MAX_LOG_ENTRIES / 2) /* Flush at 256 entries (50% full) */ +#define FLUSH_THRESHOLD_BYTES (BUFFER_SIZE / 2) /* Flush at 16KB (50% full) */ +#define ENTRY_TRUNCATION_SIZE (STDIO_BUF_SIZE - 1) /* Max size before truncation: 8191 bytes */ + +/* Structure for a single log entry */ +typedef struct { + stdpipe_t pipe; + char data[STDIO_BUF_SIZE]; + ssize_t size; +} log_entry_t; + +/* Simple async log buffer - protected by signal masking */ +typedef struct { + log_entry_t entries[MAX_LOG_ENTRIES]; + log_entry_t flush_buffer[MAX_LOG_ENTRIES]; /* Pre-allocated buffer for async-signal-safe flushing */ + size_t count; + size_t total_size; + atomic_bool initialized; + atomic_bool shutdown_requested; + + /* Statistics */ + size_t entries_written; + size_t entries_dropped; +} async_log_buffer_t; + +/* Global buffer instance (declared in .c file) */ + +/* Public API */ +bool init_async_logging(void); +void shutdown_async_logging(void); +bool write_to_logs_buffered(stdpipe_t pipe, char *buf, ssize_t num_read); +void flush_log_buffer(void); + +/* GLib event loop integration */ +#include +gboolean log_timer_cb(gpointer user_data); +void setup_log_timer_in_main_loop(void); + +#endif /* !defined(CTR_LOGGING_BUFFER_H) */ \ No newline at end of file diff --git a/src/ctr_stdio.c b/src/ctr_stdio.c index 5d7eb5e7..090f7083 100644 --- a/src/ctr_stdio.c +++ b/src/ctr_stdio.c @@ -4,6 +4,7 @@ #include "conn_sock.h" #include "utils.h" #include "ctr_logging.h" +#include "ctr_logging_buffer.h" #include "cli.h" #include @@ -107,7 +108,7 @@ static void drain_log_buffers(stdpipe_t pipe) /* We pass a single byte buffer because write_to_logs expects that there is one byte of capacity beyond the buflen that we specify */ char buf[1]; - write_to_logs(pipe, buf, 0); + write_to_logs_buffered(pipe, buf, 0); } static bool read_stdio(int fd, stdpipe_t pipe, gboolean *eof) @@ -146,7 +147,7 @@ static bool read_stdio(int fd, stdpipe_t pipe, gboolean *eof) // Always null terminate the buffer, just in case. buf[num_read] = '\0'; - bool written = write_to_logs(pipe, buf, num_read); + bool written = write_to_logs_buffered(pipe, buf, num_read); if (!written) return false; diff --git a/test/10-async-logging.bats b/test/10-async-logging.bats new file mode 100644 index 00000000..222089c5 --- /dev/null +++ b/test/10-async-logging.bats @@ -0,0 +1,75 @@ +#!/usr/bin/env bats + +load test_helper + +setup() { + check_conmon_binary + setup_test_env +} + +teardown() { + cleanup_test_env +} + +@test "async logging: signal safety during logging" { + # Test that async logging with signal masking continues after signal delivery + run_conmon --cid "$CTR_ID" --cuuid "$CTR_ID" --runtime "$VALID_PATH" --log-path "$LOG_PATH" & + local conmon_pid=$! + + # Give it a moment to start + sleep 0.1 + + # Send a harmless signal that could interrupt pthread mutexes + kill -USR1 "$conmon_pid" 2>/dev/null || true + + # Wait for completion + wait "$conmon_pid" 2>/dev/null || true + + # Log file should still be created despite signal interruption + [ -f "$LOG_PATH" ] +} + +@test "async logging: signal masking prevents corruption during concurrent signals" { + # Test that multiple signals don't corrupt the async logging buffer + run_conmon --cid "$CTR_ID" --cuuid "$CTR_ID" --runtime "$VALID_PATH" --log-path "$LOG_PATH" & + local conmon_pid=$! + + # Give it a moment to start + sleep 0.1 + + # Send multiple signals in quick succession + for sig in USR1 USR2 TERM; do + kill -"$sig" "$conmon_pid" 2>/dev/null || true + sleep 0.01 + done + + # Wait for completion + wait "$conmon_pid" 2>/dev/null || true + + # Log file should still be created and not corrupted + [ -f "$LOG_PATH" ] +} + +@test "async logging: performance under concurrent signal load" { + # Test that signal masking doesn't significantly impact performance + local start_time=$(date +%s%N) + + for i in {1..3}; do + local test_log="$TEST_TMPDIR/perf_$i.log" + run_conmon --cid "${CTR_ID}_$i" --cuuid "${CTR_ID}_$i" --runtime "$VALID_PATH" --log-path "$test_log" & + local pid=$! + + # Send a signal during execution + sleep 0.05 + kill -USR1 "$pid" 2>/dev/null || true + + wait "$pid" 2>/dev/null || true + assert_file_exists "$test_log" + done + + local end_time=$(date +%s%N) + local duration=$((end_time - start_time)) + + # Should complete reasonably quickly (under 5 seconds) + [ $((duration / 1000000000)) -lt 5 ] +} diff --git a/test/11-async-logging-edge-cases.bats b/test/11-async-logging-edge-cases.bats new file mode 100644 index 00000000..3920bd3b --- /dev/null +++ b/test/11-async-logging-edge-cases.bats @@ -0,0 +1,85 @@ +#!/usr/bin/env bats + +load test_helper + +setup() { + check_conmon_binary + setup_test_env +} + +teardown() { + cleanup_test_env +} + +@test "async logging edge case: disabled logging drivers work with signal masking" { + # Test that signal masking doesn't interfere with disabled logging + cd "$TEST_TMPDIR" + run_conmon --cid "$CTR_ID" --cuuid "$CTR_ID" --runtime "$VALID_PATH" --log-path "null:" & + local conmon_pid=$! + + # Send signal during execution + sleep 0.1 + kill -USR1 "$conmon_pid" 2>/dev/null || true + wait "$conmon_pid" 2>/dev/null || true + + assert_success + [ ! -f "null" ] +} + +@test "async logging edge case: signal masking during rapid container lifecycle" { + # Test that signal masking handles rapid container creation/destruction + for i in {1..5}; do + local rapid_cid="${CTR_ID}_rapid_$i" + local rapid_log="$TEST_TMPDIR/rapid_$i.log" + + run_conmon --cid "$rapid_cid" --cuuid "$rapid_cid" --runtime "$VALID_PATH" --log-path "$rapid_log" & + local pid=$! + + # Send signal during rapid lifecycle + sleep 0.05 + kill -USR2 "$pid" 2>/dev/null || true + wait "$pid" 2>/dev/null || true + + assert_success + [ -f "$rapid_log" ] + done +} + +@test "async logging edge case: signal masking with buffer flush timing" { + # Test that signal masking doesn't interfere with async buffer flushing + local large_log="$TEST_TMPDIR/large.log" + + run_conmon --cid "$CTR_ID" --cuuid "$CTR_ID" --runtime "$VALID_PATH" --log-path "$large_log" & + local conmon_pid=$! + + # Send signals at different intervals to test flush timing + for delay in 0.02 0.05 0.1; do + sleep "$delay" + kill -USR1 "$conmon_pid" 2>/dev/null || true + done + + wait "$conmon_pid" 2>/dev/null || true + assert_success + [ -f "$large_log" ] +} + +@test "async logging edge case: signal masking prevents race conditions in cleanup" { + # Test that signal masking prevents race conditions during cleanup + local cleanup_log="$TEST_TMPDIR/cleanup.log" + + run_conmon --cid "$CTR_ID" --cuuid "$CTR_ID" --runtime "$VALID_PATH" --log-path "$cleanup_log" & + local conmon_pid=$! + + # Give it time to initialize + sleep 0.1 + + # Send TERM signal to trigger cleanup while sending other signals + kill -USR1 "$conmon_pid" 2>/dev/null || true + kill -TERM "$conmon_pid" 2>/dev/null || true + kill -USR2 "$conmon_pid" 2>/dev/null || true + + wait "$conmon_pid" 2>/dev/null || true + + # Log file should be properly created and closed + [ -f "$cleanup_log" ] +}