Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions src/conn_sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ static char *setup_socket(int *fd, const char *path);
setup_attach_socket() is responsible for setting the correct remote FD and pushing it onto the queue.
*/
static struct local_sock_s local_mainfd_stdin = {&mainfd_stdin, true, NULL, "container stdin", NULL};
struct remote_sock_s remote_attach_sock = {
static struct remote_sock_s remote_attach_sock = {
SOCK_TYPE_CONSOLE, /* sock_type */
-1, /* fd */
&local_mainfd_stdin, /* dest */
Expand Down Expand Up @@ -349,27 +349,42 @@ char *socket_parent_dir(gboolean use_full_attach_path, size_t desired_len)
return base_path;
}


void schedule_main_stdin_write()
{
schedule_local_sock_write(&local_mainfd_stdin);
}

void write_back_to_remote_consoles(char *buf, int len)
struct write_console_sock_user_data {
stdpipe_t pipe;
char *buf;
size_t buflen;
};

static void write_console_sock(gpointer data, G_GNUC_UNUSED gpointer user_data)
{
if (local_mainfd_stdin.readers == NULL)
struct remote_sock_s *remote_sock = (struct remote_sock_s *)data;
if (!remote_sock->writable)
return;

for (int i = local_mainfd_stdin.readers->len; i > 0; i--) {
struct remote_sock_s *remote_sock = g_ptr_array_index(local_mainfd_stdin.readers, i - 1);

if (remote_sock->writable && write_all(remote_sock->fd, buf, len) < 0) {
nwarn("Failed to write to remote console socket");
remote_sock_shutdown(remote_sock, SHUT_WR);
}
struct write_console_sock_user_data *wcsud = (struct write_console_sock_user_data *)user_data;
struct iovec iov[2] = {{&wcsud->pipe, 1}, {wcsud->buf, wcsud->buflen}};
writev_iov_t wviov = {2, 2, iov};
if (writev_all(remote_sock->fd, &wviov) < 0) {
nwarn("Failed to write to remote console socket");
remote_sock_shutdown(remote_sock, SHUT_WR);
}
}

void write_back_to_remote_consoles(stdpipe_t pipe, char *buf, size_t buflen)
{
if (local_mainfd_stdin.readers == NULL)
return;
GPtrArray *readers_copy = g_ptr_array_copy(local_mainfd_stdin.readers, NULL, NULL);
g_ptr_array_set_free_func(readers_copy, NULL);
struct write_console_sock_user_data wcsud = {pipe, buf, buflen};
g_ptr_array_foreach(readers_copy, write_console_sock, &wcsud);
g_ptr_array_free(readers_copy, true);
}

/* Internal */
static gboolean attach_cb(int fd, G_GNUC_UNUSED GIOCondition condition, gpointer user_data)
{
Expand Down
3 changes: 2 additions & 1 deletion src/conn_sock.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <glib.h> /* gboolean */
#include "config.h" /* CONN_SOCK_BUF_SIZE */
#include "utils.h" /* stdpipe_t */

#define SOCK_TYPE_CONSOLE 1
#define SOCK_TYPE_NOTIFY 2
Expand Down Expand Up @@ -52,7 +53,7 @@ char *setup_seccomp_socket(const char *socket);
char *setup_attach_socket(void);
void setup_notify_socket(char *);
void schedule_main_stdin_write();
void write_back_to_remote_consoles(char *buf, int len);
void write_back_to_remote_consoles(stdpipe_t pipe, char *buf, size_t buflen);
void close_all_readers();

#endif // CONN_SOCK_H
82 changes: 12 additions & 70 deletions src/ctr_logging.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,13 @@ static size_t syslog_identifier_len;

#define WRITEV_BUFFER_N_IOV 128

typedef struct {
int iovcnt;
struct iovec iov[WRITEV_BUFFER_N_IOV];
} writev_buffer_t;

static void parse_log_path(char *log_config);
static const char *stdpipe_name(stdpipe_t pipe);
static int write_journald(int pipe, char *buf, ssize_t num_read);
static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen);
static bool get_line_len(ptrdiff_t *line_len, const char *buf, ssize_t buflen);
static ssize_t writev_buffer_append_segment(int fd, writev_buffer_t *buf, const void *data, ssize_t len);
static ssize_t writev_buffer_append_segment_no_flush(writev_buffer_t *buf, const void *data, ssize_t len);
static ssize_t writev_buffer_flush(int fd, writev_buffer_t *buf);
static ssize_t writev_buffer_append_segment(int fd, writev_iov_t *buf, const void *data, ssize_t len);
static ssize_t writev_buffer_append_segment_no_flush(writev_iov_t *buf, const void *data, ssize_t len);
static void set_k8s_timestamp(char *buf, ssize_t buflen, const char *pipename);
static void reopen_k8s_file(void);
static int parse_priority_prefix(const char *buf, ssize_t buflen, int *priority, const char **message_start);
Expand Down Expand Up @@ -386,7 +380,8 @@ static int write_journald(int pipe, char *buf, ssize_t buflen)
ptrdiff_t line_len = 0;

while (buflen > 0 || *partial_buf_len > 0) {
writev_buffer_t bufv = {0};
struct iovec vecs[WRITEV_BUFFER_N_IOV];
writev_iov_t bufv = {0, WRITEV_BUFFER_N_IOV, vecs};

bool partial = buflen == 0 || get_line_len(&line_len, buf, buflen);

Expand Down Expand Up @@ -489,7 +484,8 @@ static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen)
static bool stdout_has_partial = false;
static bool stderr_has_partial = false;

writev_buffer_t bufv = {0};
struct iovec vecs[WRITEV_BUFFER_N_IOV];
writev_iov_t bufv = {0, WRITEV_BUFFER_N_IOV, vecs};
int64_t bytes_to_be_written = 0;

bool *has_partial = (pipe == STDOUT_PIPE) ? &stdout_has_partial : &stderr_has_partial;
Expand Down Expand Up @@ -550,7 +546,7 @@ static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen)
* a timestamp.
*/
if ((log_size_max > 0) && (k8s_bytes_written + bytes_to_be_written) > log_size_max) {
if (writev_buffer_flush(k8s_log_fd, &bufv) < 0) {
if (writev_all(k8s_log_fd, &bufv) < 0) {
nwarn("failed to flush buffer to log");
}
reopen_k8s_file();
Expand Down Expand Up @@ -600,7 +596,7 @@ static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen)
buflen -= line_len;
}

if (writev_buffer_flush(k8s_log_fd, &bufv) < 0) {
if (writev_all(k8s_log_fd, &bufv) < 0) {
nwarn("failed to flush buffer to log");
}

Expand All @@ -622,66 +618,12 @@ static bool get_line_len(ptrdiff_t *line_len, const char *buf, ssize_t buflen)
return partial;
}


static ssize_t writev_buffer_flush(int fd, writev_buffer_t *buf)
{
ssize_t count = 0;
int iovcnt = buf->iovcnt;
struct iovec *iov = buf->iov;

/*
* By definition, flushing the buffers will either be entirely successful, or will fail at some point
* along the way. There is no facility to attempt to retry a writev() system call outside of an EINTR
* errno. Therefore, no matter the outcome, always reset the writev_buffer_t data structure.
*/
buf->iovcnt = 0;

while (iovcnt > 0) {
ssize_t res;
do {
res = writev(fd, iov, iovcnt);
} while (res == -1 && errno == EINTR);

if (res <= 0) {
/*
* Any unflushed data is lost (this would be a good place to add a counter for how many times
* this occurs and another count for how much data is lost).
*
* Note that if writev() returns a 0, this logic considers it an error.
*/
return -1;
}

count += res;

while (res > 0) {
size_t iov_len = iov->iov_len;
size_t from_this = MIN((size_t)res, iov_len);
res -= from_this;
iov_len -= from_this;

if (iov_len == 0) {
iov++;
iovcnt--;
/* continue, res still > 0 */
} else {
iov->iov_len = iov_len;
iov->iov_base += from_this;
/* break, res is 0 */
}
}
}

return count;
}


ssize_t writev_buffer_append_segment(int fd, writev_buffer_t *buf, const void *data, ssize_t len)
ssize_t writev_buffer_append_segment(int fd, writev_iov_t *buf, const void *data, ssize_t len)
{
if (data == NULL)
return 1;

if (buf->iovcnt == WRITEV_BUFFER_N_IOV && writev_buffer_flush(fd, buf) < 0)
if (buf->iovcnt == buf->max_iovcnt && writev_all(fd, buf) < 0)
return -1;

if (len > 0) {
Expand All @@ -693,12 +635,12 @@ ssize_t writev_buffer_append_segment(int fd, writev_buffer_t *buf, const void *d
return 1;
}

ssize_t writev_buffer_append_segment_no_flush(writev_buffer_t *buf, const void *data, ssize_t len)
ssize_t writev_buffer_append_segment_no_flush(writev_iov_t *buf, const void *data, ssize_t len)
{
if (data == NULL)
return 1;

if (buf->iovcnt == WRITEV_BUFFER_N_IOV)
if (buf->iovcnt == buf->max_iovcnt)
return -1;

if (len > 0) {
Expand Down
13 changes: 2 additions & 11 deletions src/ctr_stdio.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,7 @@ static void drain_log_buffers(stdpipe_t pipe)

static bool read_stdio(int fd, stdpipe_t pipe, gboolean *eof)
{
/* We use two extra bytes. One at the start, which we don't read into, instead
we use that for marking the pipe when we write to the attached socket.
One at the end to guarantee a null-terminated buffer for journald logging*/

char real_buf[STDIO_BUF_SIZE + 2];
char *buf = real_buf + 1;
char buf[STDIO_BUF_SIZE];
ssize_t num_read = 0;

if (eof)
Expand All @@ -143,15 +138,11 @@ static bool read_stdio(int fd, stdpipe_t pipe, gboolean *eof)
nwarnf("stdio_input read failed: %m");
return false;
} else {
// Always null terminate the buffer, just in case.
buf[num_read] = '\0';

bool written = write_to_logs(pipe, buf, num_read);
if (!written)
return false;

real_buf[0] = pipe;
write_back_to_remote_consoles(real_buf, num_read + 1);
write_back_to_remote_consoles(pipe, buf, num_read);
return true;
}
}
Expand Down
56 changes: 54 additions & 2 deletions src/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ static void get_signal_descriptor_mask(sigset_t *set)
sigprocmask(SIG_BLOCK, set, NULL);
}

ssize_t write_all(int fd, const void *buf, size_t count)
ssize_t write_all(int fd, const void *buf, size_t buflen)
{
size_t remaining = count;
size_t remaining = buflen;
const char *p = buf;
ssize_t res;

Expand All @@ -80,6 +80,58 @@ ssize_t write_all(int fd, const void *buf, size_t count)
p += res;
}

return buflen;
}

ssize_t writev_all(int fd, writev_iov_t *buf)
{
ssize_t count = 0;
size_t iovcnt = buf->iovcnt;
struct iovec *iov = buf->iov;

/*
* By definition, flushing the buffers will either be entirely successful, or will fail at some point
* along the way. There is no facility to attempt to retry a writev() system call outside of an retryable
* errno. Therefore, no matter the outcome, always reset the given writev_iov_t data structure.
*/
buf->iovcnt = 0;

while (iovcnt > 0) {
ssize_t res;
do {
res = writev(fd, iov, iovcnt);
} while (res == -1 && retryable_error(errno));

if (res <= 0) {
/*
* Any unflushed data is lost (this would be a good place to add a counter for how many times
* this occurs and another count for how much data is lost).
*
* Note that if writev() returns a 0, this logic considers it an error.
*/
return -1;
}

count += res;

while (res > 0) {
size_t iov_len = iov->iov_len;
size_t from_this = MIN((size_t)res, iov_len);
res -= from_this;
iov_len -= from_this;

if (iov_len == 0) {
iov++;
iovcnt--;
/* continue, res still > 0 */
} else {
iov->iov_len = iov_len;
iov->iov_base += from_this;
/* break, res is 0 */
}
}
}

return count;
}

Expand Down
8 changes: 7 additions & 1 deletion src/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,13 @@ static inline void hashtable_free_cleanup(GHashTable **tbl)
#define _cleanup_gerror_ _cleanup_(gerror_free_cleanup)


ssize_t write_all(int fd, const void *buf, size_t count);
ssize_t write_all(int fd, const void *buf, size_t buflen);
typedef struct {
size_t iovcnt;
size_t max_iovcnt;
struct iovec *iov;
} writev_iov_t;
ssize_t writev_all(int fd, writev_iov_t *iov);

int set_subreaper(gboolean enabled);

Expand Down
Loading