From b398e7ebf993c007ce88f46afbb3ec845bf84513 Mon Sep 17 00:00:00 2001 From: Peter Portante Date: Sun, 30 May 2021 16:39:20 -0400 Subject: [PATCH 1/3] Move `WRITEV_BUFFER_N_IOV` to src/ctr_logging.c The `WRITEV_BUFFER_N_IOV` constant is not part of the `ctr_logging` interface, and is only used internal to `src/ctr_logging.c`. --- src/ctr_logging.c | 2 ++ src/utils.h | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ctr_logging.c b/src/ctr_logging.c index c3fd5d25..aab8faaf 100644 --- a/src/ctr_logging.c +++ b/src/ctr_logging.c @@ -55,6 +55,8 @@ static char *container_name = NULL; static char *container_tag = NULL; static size_t container_tag_len; +#define WRITEV_BUFFER_N_IOV 128 + typedef struct { int iovcnt; struct iovec iov[WRITEV_BUFFER_N_IOV]; diff --git a/src/utils.h b/src/utils.h index 5d5caf34..4a660962 100644 --- a/src/utils.h +++ b/src/utils.h @@ -221,8 +221,6 @@ static inline void hashtable_free_cleanup(GHashTable **tbl) #define _cleanup_hashtable_ _cleanup_(hashtable_free_cleanup) -#define WRITEV_BUFFER_N_IOV 128 - ssize_t write_all(int fd, const void *buf, size_t count); #endif /* !defined(UTILS_H) */ From 648870d5847cb57d8a1439f4a22189a3d7299dd8 Mon Sep 17 00:00:00 2001 From: Peter Portante Date: Sun, 30 May 2021 22:35:58 -0400 Subject: [PATCH 2/3] Match `read(2)` buffer size to pipe size Address a number of conditions flagged in issue #262. The general thrust of this commit is to move closer to a one-to-one, `read(2)` system call to `writev(2)` system call ratio by creating a memory buffer for reading sized to match the stdio pipes. The commit also allocates a maximum number of I/O vectors sized to the read buffer size, calculating 2 I/O vectors per line, defaulting to a target line size of 25 bytes (defined as the constant, `AVG_LOG_LINE_TARGET`). The commit also now allocates the I/O vectors and read buffer memory using `mmap()` so that we don't cross page boundaries for I/O operations. As a result, the commit removes the `STDIO_BUF_SIZE` constance, using `fcntl(F_GETPIPE_SZ)` calls to size all buffers for reading from pipes. The `stdpipe_t` value is now passed to `write_to_remote_consoles()` so that we do not have to play games with pre-/post- byte allocations to buffers. --- cmd/conmon-config/conmon-config.go | 5 +- src/config.h | 2 - src/conmon.c | 39 +++++-- src/conn_sock.c | 14 ++- src/conn_sock.h | 3 +- src/ctr_logging.c | 163 ++++++++++++++++++----------- src/ctr_logging.h | 13 ++- src/ctr_stdio.c | 16 +-- src/globals.c | 3 + src/globals.h | 3 + 10 files changed, 168 insertions(+), 93 deletions(-) diff --git a/cmd/conmon-config/conmon-config.go b/cmd/conmon-config/conmon-config.go index fdb9ba89..446f1179 100644 --- a/cmd/conmon-config/conmon-config.go +++ b/cmd/conmon-config/conmon-config.go @@ -9,12 +9,10 @@ import ( ) func main() { - output := ` -#if !defined(CONFIG_H) + output := `#if !defined(CONFIG_H) #define CONFIG_H #define BUF_SIZE %d -#define STDIO_BUF_SIZE %d #define CONN_SOCK_BUF_SIZE %d #define DEFAULT_SOCKET_PATH "%s" #define WIN_RESIZE_EVENT %d @@ -26,7 +24,6 @@ func main() { if err := ioutil.WriteFile("config.h", []byte(fmt.Sprintf( output, config.BufSize, - config.BufSize, config.ConnSockBufSize, config.ContainerAttachSocketDir, config.WinResizeEvent, diff --git a/src/config.h b/src/config.h index 4277e5cc..e0e2363f 100644 --- a/src/config.h +++ b/src/config.h @@ -1,9 +1,7 @@ - #if !defined(CONFIG_H) #define CONFIG_H #define BUF_SIZE 8192 -#define STDIO_BUF_SIZE 8192 #define CONN_SOCK_BUF_SIZE 32768 #define DEFAULT_SOCKET_PATH "/var/run/crio" #define WIN_RESIZE_EVENT 1 diff --git a/src/conmon.c b/src/conmon.c index c349d6c2..ce29e9da 100644 --- a/src/conmon.c +++ b/src/conmon.c @@ -28,7 +28,6 @@ int main(int argc, char *argv[]) { setlocale(LC_ALL, ""); _cleanup_gerror_ GError *err = NULL; - char buf[BUF_SIZE]; int num_read; _cleanup_close_ int dev_null_r_cleanup = -1; _cleanup_close_ int dev_null_w_cleanup = -1; @@ -48,12 +47,18 @@ int main(int argc, char *argv[]) /* Catch SIGTERM and call exit(). This causes the atexit functions to be called. */ signal(SIGTERM, handle_signal); + char *start_pipe_fd_buf = NULL; + int start_pipe_fd_size = 0; int start_pipe_fd = get_pipe_fd_from_env("_OCI_STARTPIPE"); if (start_pipe_fd > 0) { + start_pipe_fd_size = fcntl(start_pipe_fd, F_GETPIPE_SZ); + if (start_pipe_fd_size < 0) + pexit("start-pipe size determination failed"); + start_pipe_fd_buf = alloca(start_pipe_fd_size); /* Block for an initial write to the start pipe before - spawning any childred or exiting, to ensure the + spawning any children or exiting, to ensure the parent can put us in the right cgroup. */ - num_read = read(start_pipe_fd, buf, BUF_SIZE); + num_read = read(start_pipe_fd, start_pipe_fd_buf, start_pipe_fd_size); if (num_read < 0) { pexit("start-pipe read failed"); } @@ -156,6 +161,9 @@ int main(int argc, char *argv[]) pexit("Failed to create !terminal stdin pipe"); mainfd_stdin = fds[1]; + mainfd_stdin_size = fcntl(mainfd_stdin, F_GETPIPE_SZ); + if (mainfd_stdin < 0) + pexit("main stdin pipe size determination failed"); workerfd_stdin = fds[0]; if (g_unix_set_fd_nonblocking(mainfd_stdin, TRUE, NULL) == FALSE) @@ -166,6 +174,9 @@ int main(int argc, char *argv[]) pexit("Failed to create !terminal stdout pipe"); mainfd_stdout = fds[0]; + mainfd_stdout_size = fcntl(mainfd_stdout, F_GETPIPE_SZ); + if (mainfd_stdout_size < 0) + pexit("main stdout pipe size determination failed"); workerfd_stdout = fds[1]; /* now that we've set mainfd_stdout, we can register the ctrl_winsz_cb @@ -181,6 +192,12 @@ int main(int argc, char *argv[]) pexit("Failed to create stderr pipe"); mainfd_stderr = fds[0]; + mainfd_stderr_size = fcntl(mainfd_stderr, F_GETPIPE_SZ); + if (mainfd_stderr_size < 0) + pexit("main stderr pipe size determination failed"); + if ((mainfd_stdout >= 0) && (mainfd_stderr_size != mainfd_stdout_size)) { + nwarn("main stderr and stdout pipe sizes don't match"); + } workerfd_stderr = fds[1]; GPtrArray *runtime_argv = configure_runtime_args(csname); @@ -266,11 +283,11 @@ int main(int argc, char *argv[]) if (opt_attach) { if (start_pipe_fd > 0) { ndebug("exec with attach is waiting for start message from parent"); - num_read = read(start_pipe_fd, buf, BUF_SIZE); - ndebug("exec with attach got start message from parent"); + num_read = read(start_pipe_fd, start_pipe_fd_buf, start_pipe_fd_size); if (num_read < 0) { _pexit("start-pipe read failed"); } + ndebug("exec with attach got start message from parent"); close(start_pipe_fd); } } @@ -347,16 +364,17 @@ int main(int argc, char *argv[]) * Read from container stderr for any error and send it to parent * We send -1 as pid to signal to parent that create container has failed. */ - num_read = read(mainfd_stderr, buf, BUF_SIZE - 1); + char *mainfd_stderr_buf = (char *)alloca(start_pipe_fd_size + 1); + num_read = read(mainfd_stderr, mainfd_stderr_buf, mainfd_stderr_size); if (num_read > 0) { - buf[num_read] = '\0'; - nwarnf("runtime stderr: %s", buf); + mainfd_stderr_buf[num_read] = '\0'; + nwarnf("runtime stderr: %s", mainfd_stderr_buf); if (sync_pipe_fd > 0) { int to_report = -1; if (opt_exec && container_status > 0) { to_report = -1 * container_status; } - write_sync_fd(sync_pipe_fd, to_report, buf); + write_sync_fd(sync_pipe_fd, to_report, mainfd_stderr_buf); } } nexitf("Failed to create container: exit status %d", get_exit_status(runtime_status)); @@ -387,6 +405,9 @@ int main(int argc, char *argv[]) setup_oom_handling(container_pid); + // Setup the buffers used by the stdio call-back. + writev_buffer_init(mainfd_stderr_size); + if (mainfd_stdout >= 0) { g_unix_fd_add(mainfd_stdout, G_IO_IN, stdio_cb, GINT_TO_POINTER(STDOUT_PIPE)); } diff --git a/src/conn_sock.c b/src/conn_sock.c index 75951a84..574b9f64 100644 --- a/src/conn_sock.c +++ b/src/conn_sock.c @@ -247,7 +247,7 @@ void schedule_main_stdin_write() schedule_local_sock_write(&local_mainfd_stdin); } -void write_back_to_remote_consoles(char *buf, int len) +void write_back_to_remote_consoles(stdpipe_t pipe, char *buf, int len) { if (local_mainfd_stdin.readers == NULL) return; @@ -255,9 +255,15 @@ void write_back_to_remote_consoles(char *buf, int len) for (int i = local_mainfd_stdin.readers->len; i > 0; i--) { struct remote_sock_s *remote_sock = g_ptr_array_index(local_mainfd_stdin.readers, i - 1); - if (remote_sock->writable && write_all(remote_sock->fd, buf, len) < 0) { - nwarn("Failed to write to remote console socket"); - remote_sock_shutdown(remote_sock, SHUT_WR); + if (remote_sock->writable) { + if (write_all(remote_sock->fd, &pipe, 1) < 0) { + nwarn("Failed to write to remote console socket"); + remote_sock_shutdown(remote_sock, SHUT_WR); + } + if (write_all(remote_sock->fd, buf, len) < 0) { + nwarn("Failed to write to remote console socket"); + remote_sock_shutdown(remote_sock, SHUT_WR); + } } } } diff --git a/src/conn_sock.h b/src/conn_sock.h index 796c4a71..f4e88c9f 100644 --- a/src/conn_sock.h +++ b/src/conn_sock.h @@ -3,6 +3,7 @@ #include /* gboolean */ #include "config.h" /* CONN_SOCK_BUF_SIZE */ +#include "utils.h" /* stdpipe_t */ #define SOCK_TYPE_CONSOLE 1 #define SOCK_TYPE_NOTIFY 2 @@ -51,7 +52,7 @@ char *setup_console_socket(void); char *setup_attach_socket(void); void setup_notify_socket(char *); void schedule_main_stdin_write(); -void write_back_to_remote_consoles(char *buf, int len); +void write_back_to_remote_consoles(stdpipe_t pipe, char *buf, int len); void close_all_readers(); #endif // CONN_SOCK_H diff --git a/src/ctr_logging.c b/src/ctr_logging.c index aab8faaf..5641dc49 100644 --- a/src/ctr_logging.c +++ b/src/ctr_logging.c @@ -2,6 +2,8 @@ #include "ctr_logging.h" #include "cli.h" #include +#include +#include // if the systemd development files were found, we can log to systemd #ifdef USE_JOURNALD @@ -18,8 +20,8 @@ static inline int sd_journal_sendv(G_GNUC_UNUSED const struct iovec *iov, G_GNUC #endif -/* strlen("1997-03-25T13:20:42.999999999+01:00 stdout ") + 1 */ -#define TSBUFLEN 44 +/* strlen("1997-03-25T13:20:42.999999999+01:00 stdout F ") + 1 */ +#define TSBUFLEN 46 /* Different types of container logging */ static gboolean use_journald_logging = FALSE; @@ -55,20 +57,13 @@ static char *container_name = NULL; static char *container_tag = NULL; static size_t container_tag_len; -#define WRITEV_BUFFER_N_IOV 128 - -typedef struct { - int iovcnt; - struct iovec iov[WRITEV_BUFFER_N_IOV]; -} writev_buffer_t; - static void parse_log_path(char *log_config); static const char *stdpipe_name(stdpipe_t pipe); -static int write_journald(int pipe, char *buf, ssize_t num_read); -static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen); +static int write_journald(stdpipe_t pipe, ssize_t num_read); +static int write_k8s_log(stdpipe_t pipe, ssize_t buflen); static bool get_line_len(ptrdiff_t *line_len, const char *buf, ssize_t buflen); -static ssize_t writev_buffer_append_segment(int fd, writev_buffer_t *buf, const void *data, ssize_t len); -static ssize_t writev_buffer_flush(int fd, writev_buffer_t *buf); +static ssize_t writev_buffer_append_segment(writev_buffer_t *buf, int fd, const void *data, ssize_t len); +static ssize_t writev_buffer_flush(writev_buffer_t *buf, int fd); static int set_k8s_timestamp(char *buf, ssize_t buflen, const char *pipename); static void reopen_k8s_file(void); @@ -188,13 +183,13 @@ static void parse_log_path(char *log_config) } /* write container output to all logs the user defined */ -bool write_to_logs(stdpipe_t pipe, char *buf, ssize_t num_read) +bool write_to_logs(stdpipe_t pipe, ssize_t num_read) { - if (use_k8s_logging && write_k8s_log(pipe, buf, num_read) < 0) { + if (use_k8s_logging && write_k8s_log(pipe, num_read) < 0) { nwarn("write_k8s_log failed"); return G_SOURCE_CONTINUE; } - if (use_journald_logging && write_journald(pipe, buf, num_read) < 0) { + if (use_journald_logging && write_journald(pipe, num_read) < 0) { nwarn("write_journald failed"); return G_SOURCE_CONTINUE; } @@ -205,8 +200,10 @@ bool write_to_logs(stdpipe_t pipe, char *buf, ssize_t num_read) /* write to systemd journal. If the pipe is stdout, write with notice priority, * otherwise, write with error priority */ -int write_journald(int pipe, char *buf, ssize_t buflen) +static int write_journald(stdpipe_t pipe, ssize_t buflen) { + char *buf = writev_buffer.buf; + /* When using writev_buffer_append_segment, we should never approach the number of * entries necessary to flush the buffer. Therefore, the fd passed in is for /dev/null */ @@ -225,8 +222,6 @@ int write_journald(int pipe, char *buf, ssize_t buflen) ptrdiff_t line_len = 0; while (buflen > 0) { - writev_buffer_t bufv = {0}; - bool partial = get_line_len(&line_len, buf, buflen); /* sd_journal_* doesn't have an option to specify the number of bytes to write in the message, and instead writes the * entire string. Copying every line doesn't make very much sense, so instead we do this tmp_line_end @@ -236,34 +231,33 @@ int write_journald(int pipe, char *buf, ssize_t buflen) buf[line_len] = '\0'; _cleanup_free_ char *message = g_strdup_printf("MESSAGE=%s", buf); - if (writev_buffer_append_segment(dev_null, &bufv, message, line_len + MESSAGE_EQ_LEN) < 0) + if (writev_buffer_append_segment(&writev_buffer, dev_null, message, line_len + MESSAGE_EQ_LEN) < 0) return -1; /* Restore state of the buffer */ buf[line_len] = tmp_line_end; - - if (writev_buffer_append_segment(dev_null, &bufv, container_id_full, cuuid_len + CID_FULL_EQ_LEN) < 0) + if (writev_buffer_append_segment(&writev_buffer, dev_null, container_id_full, cuuid_len + CID_FULL_EQ_LEN) < 0) return -1; - if (writev_buffer_append_segment(dev_null, &bufv, message_priority, PRIORITY_EQ_LEN) < 0) + if (writev_buffer_append_segment(&writev_buffer, dev_null, message_priority, PRIORITY_EQ_LEN) < 0) return -1; - if (writev_buffer_append_segment(dev_null, &bufv, container_id, TRUNC_ID_LEN + CID_EQ_LEN) < 0) + if (writev_buffer_append_segment(&writev_buffer, dev_null, container_id, TRUNC_ID_LEN + CID_EQ_LEN) < 0) return -1; - if (container_tag && writev_buffer_append_segment(dev_null, &bufv, container_tag, container_tag_len) < 0) + if (container_tag && writev_buffer_append_segment(&writev_buffer, dev_null, container_tag, container_tag_len) < 0) return -1; /* only print the name if we have a name to print */ - if (name && writev_buffer_append_segment(dev_null, &bufv, container_name, name_len + NAME_EQ_LEN) < 0) + if (name && writev_buffer_append_segment(&writev_buffer, dev_null, container_name, name_len + NAME_EQ_LEN) < 0) return -1; /* per docker journald logging format, CONTAINER_PARTIAL_MESSAGE is set to true if it's partial, but otherwise not set. */ - if (partial && writev_buffer_append_segment(dev_null, &bufv, "CONTAINER_PARTIAL_MESSAGE=true", PARTIAL_MESSAGE_EQ_LEN) < 0) + if (partial && writev_buffer_append_segment(&writev_buffer, dev_null, "CONTAINER_PARTIAL_MESSAGE=true", PARTIAL_MESSAGE_EQ_LEN) < 0) return -1; - int err = sd_journal_sendv(bufv.iov, bufv.iovcnt); + int err = sd_journal_sendv(writev_buffer.iov, writev_buffer.iovcnt); if (err < 0) { pwarn(strerror(err)); return err; @@ -281,16 +275,15 @@ int write_journald(int pipe, char *buf, ssize_t buflen) * line in buf, and will partially write the final line of the log if buf is * not terminated by a newline. */ -static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen) +static int write_k8s_log(stdpipe_t pipe, ssize_t buflen) { - writev_buffer_t bufv = {0}; + char *buf = writev_buffer.buf; static int64_t bytes_written = 0; int64_t bytes_to_be_written = 0; /* - * Use the same timestamp for every line of the log in this buffer. - * There is no practical difference in the output since write(2) is - * fast. + * Use the same timestamp for every line of the log in this buffer, as + * every log in this buffer was read from the pipe at the same time. */ char tsbuf[TSBUFLEN]; if (set_k8s_timestamp(tsbuf, sizeof tsbuf, stdpipe_name(pipe))) @@ -304,9 +297,10 @@ static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen) /* This is line_len bytes + TSBUFLEN - 1 + 2 (- 1 is for ignoring \0). */ bytes_to_be_written = line_len + TSBUFLEN + 1; - /* If partial, then we add a \n */ + /* If partial, then we add a \n, and change the default 'F' to a 'P'. */ if (partial) { bytes_to_be_written += 1; + tsbuf[TSBUFLEN - 3] = 'P'; } /* @@ -317,45 +311,32 @@ static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen) if ((log_size_max > 0) && (bytes_written + bytes_to_be_written) > log_size_max) { bytes_written = 0; - if (writev_buffer_flush(k8s_log_fd, &bufv) < 0) { + if (writev_buffer_flush(&writev_buffer, k8s_log_fd) < 0) { nwarn("failed to flush buffer to log"); /* * We are going to reopen the file anyway, in case of * errors discard all we have in the buffer. */ - bufv.iovcnt = 0; + writev_buffer.iovcnt = 0; } reopen_k8s_file(); } /* Output the timestamp */ - if (writev_buffer_append_segment(k8s_log_fd, &bufv, tsbuf, TSBUFLEN - 1) < 0) { + if (writev_buffer_append_segment(&writev_buffer, k8s_log_fd, tsbuf, TSBUFLEN - 1) < 0) { nwarn("failed to write (timestamp, stream) to log"); goto next; } - /* Output log tag for partial or newline */ - if (partial) { - if (writev_buffer_append_segment(k8s_log_fd, &bufv, "P ", 2) < 0) { - nwarn("failed to write partial log tag"); - goto next; - } - } else { - if (writev_buffer_append_segment(k8s_log_fd, &bufv, "F ", 2) < 0) { - nwarn("failed to write end log tag"); - goto next; - } - } - /* Output the actual contents. */ - if (writev_buffer_append_segment(k8s_log_fd, &bufv, buf, line_len) < 0) { + if (writev_buffer_append_segment(&writev_buffer, k8s_log_fd, buf, line_len) < 0) { nwarn("failed to write buffer to log"); goto next; } /* Output a newline for partial */ if (partial) { - if (writev_buffer_append_segment(k8s_log_fd, &bufv, "\n", 1) < 0) { + if (writev_buffer_append_segment(&writev_buffer, k8s_log_fd, "\n", 1) < 0) { nwarn("failed to write newline to log"); goto next; } @@ -368,7 +349,7 @@ static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen) buflen -= line_len; } - if (writev_buffer_flush(k8s_log_fd, &bufv) < 0) { + if (writev_buffer_flush(&writev_buffer, k8s_log_fd) < 0) { nwarn("failed to flush buffer to log"); } @@ -391,7 +372,69 @@ static bool get_line_len(ptrdiff_t *line_len, const char *buf, ssize_t buflen) } -static ssize_t writev_buffer_flush(int fd, writev_buffer_t *buf) +/* + * writev_buffer "class", of sorts. + * + * The writev_buffer_t global contains the fields of the object, and the + * writev_buffer_*() functions are the methods that act on that object. + * + * In this case, the object is a singleton, the global variable, writev_buffer. + */ + +/* Logging buffer that describes the mmap'd memory used for read and writing. */ +writev_buffer_t writev_buffer = {0}; + +/* + * We size the I/O vectors to handle an average log line length, including the + * new line character, of AVG_LOG_LINE_TARGET bytes in order to allocate enough + * I/O vectors to only require one writev() system call per read() system call + * from a pipe. If the average line length is less than AVG_LOG_LINE_TARGET, + * then we'll end up using potentially many more writev() system calls to write + * out the entire buffer read from a pipe. + */ +#define AVG_LOG_LINE_TARGET (float)25.0 + + +void writev_buffer_init(int pipe_size) +{ + // Allocate a buffer that matches the size of a pipe, along with the + // requisite I/O vectors, optimized for log lines >= AVG_LOG_LINE_TARGET + // bytes for the size of the buffer given. + + // WARNING - This means that any buffer processed with average log + // lines below AVG_LOG_LINE_TARGET bytes will result in multiple writev() + // system calls per buffer read. E.g., at a pipe size of 64 KB, for an + // average of 16 byte lines (4,096 per 64 KB buffer), it would require + // 8,192 I/O vectors (32 pages). + + // It takes 2 I/O vectors per new-line (timestamp + full_partial, and + // the actual log line). We divide the pipe_size by AVG_LOG_LINE_TARGET, + // taking the ceiling() so that we have an I/O vector for the remainder, + // and add one if the last buffer does not contain a new-line character. + // We calculate the total size of the I/O vectors in bytes, and then round + // up to the nearest page boundary. The pipe size (in bytes, but always + // rounded to the nearest page) is then added to that so we can allocate + // both structures in one set of anonymous mapped pages. + + int target_lines_cnt = (int)ceilf((float)(pipe_size / AVG_LOG_LINE_TARGET)) + 1; + if (target_lines_cnt < 0) + nexitf("Logic bomb! target # of lines per pipe is negative! pipe_size = %d, target_lines_cnt = %d", pipe_size, target_lines_cnt); + + writev_buffer.iovcnt_max = target_lines_cnt; + writev_buffer.buf_len = pipe_size; + + unsigned int iovectors_bytes = sizeof(struct iovec) * (unsigned int)target_lines_cnt; + int iovectors_bytes_page_aligned = (int)ceilf(iovectors_bytes / (float)getpagesize()) * getpagesize(); + char *memory = mmap(NULL, iovectors_bytes_page_aligned + pipe_size, PROT_READ|PROT_WRITE, MAP_ANONYMOUS, -1, 0); + if (memory == NULL) + nexitf("mmap() failed for I/O vectors and buffer"); + + writev_buffer.iov = (struct iovec *)memory; + writev_buffer.buf = &memory[iovectors_bytes_page_aligned]; +} + + +static ssize_t writev_buffer_flush(writev_buffer_t *buf, int fd) { size_t count = 0; int iovcnt = buf->iovcnt; @@ -427,12 +470,12 @@ static ssize_t writev_buffer_flush(int fd, writev_buffer_t *buf) } -ssize_t writev_buffer_append_segment(int fd, writev_buffer_t *buf, const void *data, ssize_t len) +ssize_t writev_buffer_append_segment(writev_buffer_t *buf, int fd, const void *data, ssize_t len) { if (data == NULL) return 1; - if (buf->iovcnt == WRITEV_BUFFER_N_IOV && writev_buffer_flush(fd, buf) < 0) + if (buf->iovcnt == buf->iovcnt_max && writev_buffer_flush(buf, fd) < 0) return -1; if (len > 0) { @@ -449,13 +492,13 @@ static const char *stdpipe_name(stdpipe_t pipe) { switch (pipe) { case STDIN_PIPE: - return "stdin"; + return "stdin "; case STDOUT_PIPE: return "stdout"; case STDERR_PIPE: return "stderr"; default: - return "NONE"; + return "NONE "; } } @@ -492,7 +535,7 @@ static int set_k8s_timestamp(char *buf, ssize_t buflen, const char *pipename) off = -off; } - int len = snprintf(buf, buflen, "%d-%02d-%02dT%02d:%02d:%02d.%09ld%c%02d:%02d %s ", current_tm.tm_year + 1900, + int len = snprintf(buf, buflen, "%d-%02d-%02dT%02d:%02d:%02d.%09ld%c%02d:%02d %s F ", current_tm.tm_year + 1900, current_tm.tm_mon + 1, current_tm.tm_mday, current_tm.tm_hour, current_tm.tm_min, current_tm.tm_sec, ts.tv_nsec, off_sign, off / 3600, (off % 3600) / 60, pipename); @@ -501,12 +544,14 @@ static int set_k8s_timestamp(char *buf, ssize_t buflen, const char *pipename) return err; } + /* reopen all log files */ void reopen_log_files(void) { reopen_k8s_file(); } + /* reopen the k8s log file fd. */ static void reopen_k8s_file(void) { diff --git a/src/ctr_logging.h b/src/ctr_logging.h index 1b63cd70..8925358b 100644 --- a/src/ctr_logging.h +++ b/src/ctr_logging.h @@ -5,8 +5,19 @@ #include "utils.h" /* stdpipe_t */ #include /* bool */ +typedef struct { + ssize_t iovcnt_max; + ssize_t iovcnt; + struct iovec *iov; + char *buf; + ssize_t buf_len; +} writev_buffer_t; + +extern writev_buffer_t writev_buffer; + +void writev_buffer_init(int pipe_size); void reopen_log_files(void); -bool write_to_logs(stdpipe_t pipe, char *buf, ssize_t num_read); +bool write_to_logs(stdpipe_t pipe, ssize_t num_read); void configure_log_drivers(gchar **log_drivers, int64_t log_size_max_, char *cuuid_, char *name_, char *tag); void sync_logs(void); diff --git a/src/ctr_stdio.c b/src/ctr_stdio.c index 7ec4fbac..80616289 100644 --- a/src/ctr_stdio.c +++ b/src/ctr_stdio.c @@ -100,18 +100,12 @@ void drain_stdio() static bool read_stdio(int fd, stdpipe_t pipe, gboolean *eof) { - /* We use two extra bytes. One at the start, which we don't read into, instead - we use that for marking the pipe when we write to the attached socket. - One at the end to guarantee a null-terminated buffer for journald logging*/ - - char real_buf[STDIO_BUF_SIZE + 2]; - char *buf = real_buf + 1; ssize_t num_read = 0; if (eof) *eof = false; - num_read = read(fd, buf, STDIO_BUF_SIZE); + num_read = read(fd, writev_buffer.buf, writev_buffer.buf_len); if (num_read == 0) { if (eof) *eof = true; @@ -120,15 +114,11 @@ static bool read_stdio(int fd, stdpipe_t pipe, gboolean *eof) nwarnf("stdio_input read failed %s", strerror(errno)); return false; } else { - // Always null terminate the buffer, just in case. - buf[num_read] = '\0'; - - bool written = write_to_logs(pipe, buf, num_read); + bool written = write_to_logs(pipe, num_read); if (!written) return written; - real_buf[0] = pipe; - write_back_to_remote_consoles(real_buf, num_read + 1); + write_back_to_remote_consoles(pipe, writev_buffer.buf, num_read); return true; } } diff --git a/src/globals.c b/src/globals.c index a5eeba30..11b8f9bc 100644 --- a/src/globals.c +++ b/src/globals.c @@ -4,8 +4,11 @@ int runtime_status = -1; int container_status = -1; int mainfd_stdin = -1; +int mainfd_stdin_size = 0; int mainfd_stdout = -1; +int mainfd_stdout_size = 0; int mainfd_stderr = -1; +int mainfd_stderr_size = 0; int attach_socket_fd = -1; int console_socket_fd = -1; diff --git a/src/globals.h b/src/globals.h index a21ceea6..b2fb0844 100644 --- a/src/globals.h +++ b/src/globals.h @@ -9,8 +9,11 @@ extern int runtime_status; extern int container_status; extern int mainfd_stdin; +extern int mainfd_stdin_size; extern int mainfd_stdout; +extern int mainfd_stdout_size; extern int mainfd_stderr; +extern int mainfd_stderr_size; extern int attach_socket_fd; extern int console_socket_fd; From 6f39877f0326d5a3ceaf7aac21aec150328d51e5 Mon Sep 17 00:00:00 2001 From: Peter Portante Date: Mon, 31 May 2021 00:31:36 -0400 Subject: [PATCH 3/3] See if we can improve journald handling We don't bother allocating I/O vectors for the journald case, and we make it much more efficient by using a pre-allocated stack variable using `memcpy` only, instead of `strdup()`. --- src/ctr_logging.c | 110 ++++++++++++++++++++++++++-------------------- 1 file changed, 62 insertions(+), 48 deletions(-) diff --git a/src/ctr_logging.c b/src/ctr_logging.c index 5641dc49..2ea18df1 100644 --- a/src/ctr_logging.c +++ b/src/ctr_logging.c @@ -54,8 +54,9 @@ static size_t name_len = 0; static char *container_id_full = NULL; static char *container_id = NULL; static char *container_name = NULL; +static size_t container_name_len = 0; static char *container_tag = NULL; -static size_t container_tag_len; +static size_t container_tag_len = 0; static void parse_log_path(char *log_config); static const char *stdpipe_name(stdpipe_t pipe); @@ -123,6 +124,7 @@ void configure_log_drivers(gchar **log_drivers, int64_t log_size_max_, char *cuu /* save the length so we don't have to compute every sd_journal_* call */ name_len = strlen(name); container_name = g_strdup_printf("CONTAINER_NAME=%s", name); + container_name_len = strlen(container_name); } } } @@ -204,65 +206,68 @@ static int write_journald(stdpipe_t pipe, ssize_t buflen) { char *buf = writev_buffer.buf; - /* When using writev_buffer_append_segment, we should never approach the number of - * entries necessary to flush the buffer. Therefore, the fd passed in is for /dev/null + /* Since we know the priority values for the journal (6 being log info and + * 3 being log err) we can set it statically here. This will also save on + * runtime, at the expense of needing to be changed if this convention is + * changed. */ - _cleanup_close_ int dev_null = open("/dev/null", O_WRONLY | O_CLOEXEC); - if (dev_null < 0) - pexit("Failed to open /dev/null"); - - /* Since we know the priority values for the journal (6 being log info and 3 being log err - * we can set it statically here. This will also save on runtime, at the expense of needing - * to be changed if this convention is changed. - */ - const char *message_priority = "PRIORITY=6"; + char *message_priority = "PRIORITY=6"; if (pipe == STDERR_PIPE) message_priority = "PRIORITY=3"; ptrdiff_t line_len = 0; - while (buflen > 0) { - bool partial = get_line_len(&line_len, buf, buflen); - /* sd_journal_* doesn't have an option to specify the number of bytes to write in the message, and instead writes the - * entire string. Copying every line doesn't make very much sense, so instead we do this tmp_line_end - * hack to emulate separate strings. - */ - char tmp_line_end = buf[line_len]; - buf[line_len] = '\0'; - - _cleanup_free_ char *message = g_strdup_printf("MESSAGE=%s", buf); - if (writev_buffer_append_segment(&writev_buffer, dev_null, message, line_len + MESSAGE_EQ_LEN) < 0) - return -1; - - /* Restore state of the buffer */ - buf[line_len] = tmp_line_end; - - if (writev_buffer_append_segment(&writev_buffer, dev_null, container_id_full, cuuid_len + CID_FULL_EQ_LEN) < 0) - return -1; - - if (writev_buffer_append_segment(&writev_buffer, dev_null, message_priority, PRIORITY_EQ_LEN) < 0) - return -1; + /* + * Writing to journald requires one `sd_journal_sendv()` call per line in + * the buffer received from the pipe. + */ + struct iovec sendv_vecs[7] = { + { (char *)0, 0 }, // filled in as we go + { container_id_full, cuuid_len + CID_FULL_EQ_LEN }, + { message_priority, PRIORITY_EQ_LEN }, + { container_id, TRUNC_ID_LEN + CID_EQ_LEN }, + }; + ssize_t sendv_vecs_len = 4; + if (container_tag) { + sendv_vecs[sendv_vecs_len].iov_base = (void *)container_tag; + sendv_vecs[sendv_vecs_len].iov_len = container_tag_len; + sendv_vecs_len += 1; + } + if (name) { + sendv_vecs[sendv_vecs_len].iov_base = (void *)container_name; + sendv_vecs[sendv_vecs_len].iov_len = container_name_len; + sendv_vecs_len += 1; + } - if (writev_buffer_append_segment(&writev_buffer, dev_null, container_id, TRUNC_ID_LEN + CID_EQ_LEN) < 0) - return -1; + char *msg_buf = (char *)alloca(buflen + MESSAGE_EQ_LEN); + memcpy(msg_buf, "MESSAGE=", MESSAGE_EQ_LEN); - if (container_tag && writev_buffer_append_segment(&writev_buffer, dev_null, container_tag, container_tag_len) < 0) - return -1; + while (buflen > 0) { + bool partial = get_line_len(&line_len, buf, buflen); - /* only print the name if we have a name to print */ - if (name && writev_buffer_append_segment(&writev_buffer, dev_null, container_name, name_len + NAME_EQ_LEN) < 0) - return -1; + // Fill in the message + memcpy(msg_buf + MESSAGE_EQ_LEN, buf, line_len); + sendv_vecs[0].iov_base = (void *)msg_buf; + sendv_vecs[0].iov_len = buflen + MESSAGE_EQ_LEN; /* per docker journald logging format, CONTAINER_PARTIAL_MESSAGE is set to true if it's partial, but otherwise not set. */ - if (partial && writev_buffer_append_segment(&writev_buffer, dev_null, "CONTAINER_PARTIAL_MESSAGE=true", PARTIAL_MESSAGE_EQ_LEN) < 0) - return -1; + if (partial) { + sendv_vecs[sendv_vecs_len].iov_base = "CONTAINER_PARTIAL_MESSAGE=true"; + sendv_vecs[sendv_vecs_len].iov_len = PARTIAL_MESSAGE_EQ_LEN; + sendv_vecs_len += 1; + } - int err = sd_journal_sendv(writev_buffer.iov, writev_buffer.iovcnt); + int err = sd_journal_sendv(sendv_vecs, sendv_vecs_len); if (err < 0) { pwarn(strerror(err)); return err; } + if (partial) { + // We don't have to do this, because we'll only get a partial line at the end. + sendv_vecs_len -= 1; + } + buf += line_len; buflen -= line_len; } @@ -420,17 +425,26 @@ void writev_buffer_init(int pipe_size) if (target_lines_cnt < 0) nexitf("Logic bomb! target # of lines per pipe is negative! pipe_size = %d, target_lines_cnt = %d", pipe_size, target_lines_cnt); - writev_buffer.iovcnt_max = target_lines_cnt; - writev_buffer.buf_len = pipe_size; + int iovectors_bytes_page_aligned; + if (use_k8s_logging) { + unsigned int iovectors_bytes = sizeof(struct iovec) * (unsigned int)target_lines_cnt; + iovectors_bytes_page_aligned = (int)ceilf(iovectors_bytes / (float)getpagesize()) * getpagesize(); + } + else { + iovectors_bytes_page_aligned = 0; + } - unsigned int iovectors_bytes = sizeof(struct iovec) * (unsigned int)target_lines_cnt; - int iovectors_bytes_page_aligned = (int)ceilf(iovectors_bytes / (float)getpagesize()) * getpagesize(); char *memory = mmap(NULL, iovectors_bytes_page_aligned + pipe_size, PROT_READ|PROT_WRITE, MAP_ANONYMOUS, -1, 0); if (memory == NULL) nexitf("mmap() failed for I/O vectors and buffer"); - writev_buffer.iov = (struct iovec *)memory; + if (use_k8s_logging) { + writev_buffer.iov = (struct iovec *)memory; + writev_buffer.iovcnt_max = target_lines_cnt; + } + writev_buffer.buf = &memory[iovectors_bytes_page_aligned]; + writev_buffer.buf_len = pipe_size; }