From 54e03225b03641960a24c017f1820ffbd7f245a8 Mon Sep 17 00:00:00 2001 From: Peter Portante Date: Mon, 29 Sep 2025 19:56:19 -0400 Subject: [PATCH] Remove need for extra bytes for remote consoles We use `writev()` for remote consoles to remove the need for extra bytes surrounding the data to be written. We really don't want to "leak" the protocol requirement for remote consoles to the rest of the code base. To hide that, and avoid two `write_all()` method calls in a row, we promote `writev_buffer_flush()` to a `utils` method called `writev_all()` to leverage I/O vectors in one call (memory copies will need to occur for remote sockets, so using `writev()` avoids that). Further, we use the `g_ptr_array_foreach()` method instead of doing it ourselves to avoid breaking the `GPtrArray` encapsulation. Signed-off-by: Peter Portante --- src/conn_sock.c | 39 +++++++++++++++------- src/conn_sock.h | 3 +- src/ctr_logging.c | 82 +++++++---------------------------------------- src/ctr_stdio.c | 13 ++------ src/utils.c | 56 ++++++++++++++++++++++++++++++-- src/utils.h | 8 ++++- 6 files changed, 104 insertions(+), 97 deletions(-) diff --git a/src/conn_sock.c b/src/conn_sock.c index c072ea71..bdcc8577 100644 --- a/src/conn_sock.c +++ b/src/conn_sock.c @@ -39,7 +39,7 @@ static char *setup_socket(int *fd, const char *path); setup_attach_socket() is responsible for setting the correct remote FD and pushing it onto the queue. */ static struct local_sock_s local_mainfd_stdin = {&mainfd_stdin, true, NULL, "container stdin", NULL}; -struct remote_sock_s remote_attach_sock = { +static struct remote_sock_s remote_attach_sock = { SOCK_TYPE_CONSOLE, /* sock_type */ -1, /* fd */ &local_mainfd_stdin, /* dest */ @@ -349,27 +349,42 @@ char *socket_parent_dir(gboolean use_full_attach_path, size_t desired_len) return base_path; } - void schedule_main_stdin_write() { schedule_local_sock_write(&local_mainfd_stdin); } -void write_back_to_remote_consoles(char *buf, int len) +struct write_console_sock_user_data { + stdpipe_t pipe; + char *buf; + size_t buflen; +}; + +static void write_console_sock(gpointer data, G_GNUC_UNUSED gpointer user_data) { - if (local_mainfd_stdin.readers == NULL) + struct remote_sock_s *remote_sock = (struct remote_sock_s *)data; + if (!remote_sock->writable) return; - - for (int i = local_mainfd_stdin.readers->len; i > 0; i--) { - struct remote_sock_s *remote_sock = g_ptr_array_index(local_mainfd_stdin.readers, i - 1); - - if (remote_sock->writable && write_all(remote_sock->fd, buf, len) < 0) { - nwarn("Failed to write to remote console socket"); - remote_sock_shutdown(remote_sock, SHUT_WR); - } + struct write_console_sock_user_data *wcsud = (struct write_console_sock_user_data *)user_data; + struct iovec iov[2] = {{&wcsud->pipe, 1}, {wcsud->buf, wcsud->buflen}}; + writev_iov_t wviov = {2, 2, iov}; + if (writev_all(remote_sock->fd, &wviov) < 0) { + nwarn("Failed to write to remote console socket"); + remote_sock_shutdown(remote_sock, SHUT_WR); } } +void write_back_to_remote_consoles(stdpipe_t pipe, char *buf, size_t buflen) +{ + if (local_mainfd_stdin.readers == NULL) + return; + GPtrArray *readers_copy = g_ptr_array_copy(local_mainfd_stdin.readers, NULL, NULL); + g_ptr_array_set_free_func(readers_copy, NULL); + struct write_console_sock_user_data wcsud = {pipe, buf, buflen}; + g_ptr_array_foreach(readers_copy, write_console_sock, &wcsud); + g_ptr_array_free(readers_copy, true); +} + /* Internal */ static gboolean attach_cb(int fd, G_GNUC_UNUSED GIOCondition condition, gpointer user_data) { diff --git a/src/conn_sock.h b/src/conn_sock.h index 3571cc43..b322f8eb 100644 --- a/src/conn_sock.h +++ b/src/conn_sock.h @@ -3,6 +3,7 @@ #include /* gboolean */ #include "config.h" /* CONN_SOCK_BUF_SIZE */ +#include "utils.h" /* stdpipe_t */ #define SOCK_TYPE_CONSOLE 1 #define SOCK_TYPE_NOTIFY 2 @@ -52,7 +53,7 @@ char *setup_seccomp_socket(const char *socket); char *setup_attach_socket(void); void setup_notify_socket(char *); void schedule_main_stdin_write(); -void write_back_to_remote_consoles(char *buf, int len); +void write_back_to_remote_consoles(stdpipe_t pipe, char *buf, size_t buflen); void close_all_readers(); #endif // CONN_SOCK_H diff --git a/src/ctr_logging.c b/src/ctr_logging.c index 797b469f..730e7691 100644 --- a/src/ctr_logging.c +++ b/src/ctr_logging.c @@ -79,19 +79,13 @@ static size_t syslog_identifier_len; #define WRITEV_BUFFER_N_IOV 128 -typedef struct { - int iovcnt; - struct iovec iov[WRITEV_BUFFER_N_IOV]; -} writev_buffer_t; - static void parse_log_path(char *log_config); static const char *stdpipe_name(stdpipe_t pipe); static int write_journald(int pipe, char *buf, ssize_t num_read); static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen); static bool get_line_len(ptrdiff_t *line_len, const char *buf, ssize_t buflen); -static ssize_t writev_buffer_append_segment(int fd, writev_buffer_t *buf, const void *data, ssize_t len); -static ssize_t writev_buffer_append_segment_no_flush(writev_buffer_t *buf, const void *data, ssize_t len); -static ssize_t writev_buffer_flush(int fd, writev_buffer_t *buf); +static ssize_t writev_buffer_append_segment(int fd, writev_iov_t *buf, const void *data, ssize_t len); +static ssize_t writev_buffer_append_segment_no_flush(writev_iov_t *buf, const void *data, ssize_t len); static void set_k8s_timestamp(char *buf, ssize_t buflen, const char *pipename); static void reopen_k8s_file(void); static int parse_priority_prefix(const char *buf, ssize_t buflen, int *priority, const char **message_start); @@ -386,7 +380,8 @@ static int write_journald(int pipe, char *buf, ssize_t buflen) ptrdiff_t line_len = 0; while (buflen > 0 || *partial_buf_len > 0) { - writev_buffer_t bufv = {0}; + struct iovec vecs[WRITEV_BUFFER_N_IOV]; + writev_iov_t bufv = {0, WRITEV_BUFFER_N_IOV, vecs}; bool partial = buflen == 0 || get_line_len(&line_len, buf, buflen); @@ -489,7 +484,8 @@ static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen) static bool stdout_has_partial = false; static bool stderr_has_partial = false; - writev_buffer_t bufv = {0}; + struct iovec vecs[WRITEV_BUFFER_N_IOV]; + writev_iov_t bufv = {0, WRITEV_BUFFER_N_IOV, vecs}; int64_t bytes_to_be_written = 0; bool *has_partial = (pipe == STDOUT_PIPE) ? &stdout_has_partial : &stderr_has_partial; @@ -550,7 +546,7 @@ static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen) * a timestamp. */ if ((log_size_max > 0) && (k8s_bytes_written + bytes_to_be_written) > log_size_max) { - if (writev_buffer_flush(k8s_log_fd, &bufv) < 0) { + if (writev_all(k8s_log_fd, &bufv) < 0) { nwarn("failed to flush buffer to log"); } reopen_k8s_file(); @@ -600,7 +596,7 @@ static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen) buflen -= line_len; } - if (writev_buffer_flush(k8s_log_fd, &bufv) < 0) { + if (writev_all(k8s_log_fd, &bufv) < 0) { nwarn("failed to flush buffer to log"); } @@ -622,66 +618,12 @@ static bool get_line_len(ptrdiff_t *line_len, const char *buf, ssize_t buflen) return partial; } - -static ssize_t writev_buffer_flush(int fd, writev_buffer_t *buf) -{ - ssize_t count = 0; - int iovcnt = buf->iovcnt; - struct iovec *iov = buf->iov; - - /* - * By definition, flushing the buffers will either be entirely successful, or will fail at some point - * along the way. There is no facility to attempt to retry a writev() system call outside of an EINTR - * errno. Therefore, no matter the outcome, always reset the writev_buffer_t data structure. - */ - buf->iovcnt = 0; - - while (iovcnt > 0) { - ssize_t res; - do { - res = writev(fd, iov, iovcnt); - } while (res == -1 && errno == EINTR); - - if (res <= 0) { - /* - * Any unflushed data is lost (this would be a good place to add a counter for how many times - * this occurs and another count for how much data is lost). - * - * Note that if writev() returns a 0, this logic considers it an error. - */ - return -1; - } - - count += res; - - while (res > 0) { - size_t iov_len = iov->iov_len; - size_t from_this = MIN((size_t)res, iov_len); - res -= from_this; - iov_len -= from_this; - - if (iov_len == 0) { - iov++; - iovcnt--; - /* continue, res still > 0 */ - } else { - iov->iov_len = iov_len; - iov->iov_base += from_this; - /* break, res is 0 */ - } - } - } - - return count; -} - - -ssize_t writev_buffer_append_segment(int fd, writev_buffer_t *buf, const void *data, ssize_t len) +ssize_t writev_buffer_append_segment(int fd, writev_iov_t *buf, const void *data, ssize_t len) { if (data == NULL) return 1; - if (buf->iovcnt == WRITEV_BUFFER_N_IOV && writev_buffer_flush(fd, buf) < 0) + if (buf->iovcnt == buf->max_iovcnt && writev_all(fd, buf) < 0) return -1; if (len > 0) { @@ -693,12 +635,12 @@ ssize_t writev_buffer_append_segment(int fd, writev_buffer_t *buf, const void *d return 1; } -ssize_t writev_buffer_append_segment_no_flush(writev_buffer_t *buf, const void *data, ssize_t len) +ssize_t writev_buffer_append_segment_no_flush(writev_iov_t *buf, const void *data, ssize_t len) { if (data == NULL) return 1; - if (buf->iovcnt == WRITEV_BUFFER_N_IOV) + if (buf->iovcnt == buf->max_iovcnt) return -1; if (len > 0) { diff --git a/src/ctr_stdio.c b/src/ctr_stdio.c index 5d7eb5e7..bb82737f 100644 --- a/src/ctr_stdio.c +++ b/src/ctr_stdio.c @@ -112,12 +112,7 @@ static void drain_log_buffers(stdpipe_t pipe) static bool read_stdio(int fd, stdpipe_t pipe, gboolean *eof) { - /* We use two extra bytes. One at the start, which we don't read into, instead - we use that for marking the pipe when we write to the attached socket. - One at the end to guarantee a null-terminated buffer for journald logging*/ - - char real_buf[STDIO_BUF_SIZE + 2]; - char *buf = real_buf + 1; + char buf[STDIO_BUF_SIZE]; ssize_t num_read = 0; if (eof) @@ -143,15 +138,11 @@ static bool read_stdio(int fd, stdpipe_t pipe, gboolean *eof) nwarnf("stdio_input read failed: %m"); return false; } else { - // Always null terminate the buffer, just in case. - buf[num_read] = '\0'; - bool written = write_to_logs(pipe, buf, num_read); if (!written) return false; - real_buf[0] = pipe; - write_back_to_remote_consoles(real_buf, num_read + 1); + write_back_to_remote_consoles(pipe, buf, num_read); return true; } } diff --git a/src/utils.c b/src/utils.c index 1fb5ab0b..b535e29d 100644 --- a/src/utils.c +++ b/src/utils.c @@ -62,9 +62,9 @@ static void get_signal_descriptor_mask(sigset_t *set) sigprocmask(SIG_BLOCK, set, NULL); } -ssize_t write_all(int fd, const void *buf, size_t count) +ssize_t write_all(int fd, const void *buf, size_t buflen) { - size_t remaining = count; + size_t remaining = buflen; const char *p = buf; ssize_t res; @@ -80,6 +80,58 @@ ssize_t write_all(int fd, const void *buf, size_t count) p += res; } + return buflen; +} + +ssize_t writev_all(int fd, writev_iov_t *buf) +{ + ssize_t count = 0; + size_t iovcnt = buf->iovcnt; + struct iovec *iov = buf->iov; + + /* + * By definition, flushing the buffers will either be entirely successful, or will fail at some point + * along the way. There is no facility to attempt to retry a writev() system call outside of an retryable + * errno. Therefore, no matter the outcome, always reset the given writev_iov_t data structure. + */ + buf->iovcnt = 0; + + while (iovcnt > 0) { + ssize_t res; + do { + res = writev(fd, iov, iovcnt); + } while (res == -1 && retryable_error(errno)); + + if (res <= 0) { + /* + * Any unflushed data is lost (this would be a good place to add a counter for how many times + * this occurs and another count for how much data is lost). + * + * Note that if writev() returns a 0, this logic considers it an error. + */ + return -1; + } + + count += res; + + while (res > 0) { + size_t iov_len = iov->iov_len; + size_t from_this = MIN((size_t)res, iov_len); + res -= from_this; + iov_len -= from_this; + + if (iov_len == 0) { + iov++; + iovcnt--; + /* continue, res still > 0 */ + } else { + iov->iov_len = iov_len; + iov->iov_base += from_this; + /* break, res is 0 */ + } + } + } + return count; } diff --git a/src/utils.h b/src/utils.h index 00acb9c4..fced4f85 100644 --- a/src/utils.h +++ b/src/utils.h @@ -255,7 +255,13 @@ static inline void hashtable_free_cleanup(GHashTable **tbl) #define _cleanup_gerror_ _cleanup_(gerror_free_cleanup) -ssize_t write_all(int fd, const void *buf, size_t count); +ssize_t write_all(int fd, const void *buf, size_t buflen); +typedef struct { + size_t iovcnt; + size_t max_iovcnt; + struct iovec *iov; +} writev_iov_t; +ssize_t writev_all(int fd, writev_iov_t *iov); int set_subreaper(gboolean enabled);