From e13d6cc21d8b1cc3658f8a841564ec54a1e15a9c Mon Sep 17 00:00:00 2001 From: Philip Taffet Date: Tue, 3 Mar 2026 04:27:19 +0000 Subject: [PATCH] snapct: switch to tcp-based ping --- src/discof/restore/fd_snapct_tile.c | 23 +- .../restore/fd_snapct_tile.seccomppolicy | 34 ++- .../generated/fd_snapct_tile_seccomp.h | 172 +++++++------ src/discof/restore/utils/fd_ssping.c | 240 +++++++++++------- src/discof/restore/utils/fd_ssping.h | 15 +- 5 files changed, 284 insertions(+), 200 deletions(-) diff --git a/src/discof/restore/fd_snapct_tile.c b/src/discof/restore/fd_snapct_tile.c index d3990127cc8..9d5db3e110b 100644 --- a/src/discof/restore/fd_snapct_tile.c +++ b/src/discof/restore/fd_snapct_tile.c @@ -351,9 +351,10 @@ static ulong rlimit_file_cnt( fd_topo_t const * topo FD_PARAM_UNUSED, fd_topo_tile_t const * tile ) { ulong cnt = 1UL + /* stderr */ - 1UL; /* logfile */ + 1UL + /* logfile */ + 1UL; if( download_enabled( tile ) ) { - cnt += 1UL + /* ssping socket */ + cnt += FD_SSPING_FD_CNT + /* ssping socket */ 2UL + /* dirfd + full snapshot download temp fd */ tile->snapct.sources.servers_cnt; /* http resolver peer full sockets */ if( tile->snapct.incremental_snapshots ) { @@ -375,8 +376,18 @@ populate_allowed_seccomp( fd_topo_t const * topo, FD_SCRATCH_ALLOC_INIT( l, scratch ); fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) ); - int ping_fd = download_enabled( tile ) ? fd_ssping_get_sockfd( ctx->ssping ) : -1; - populate_sock_filter_policy_fd_snapct_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->local_out.dir_fd, (uint)ctx->local_out.full_snapshot_fd, (uint)ctx->local_out.incremental_snapshot_fd, (uint)ping_fd ); + int min_ping_fd = INT_MAX; + int max_ping_fd = -1; + int ping_fds[ 256 ]; + if( download_enabled( tile ) ) { + ulong fd_cnt = fd_ssping_get_sockfds( ctx->ssping, ping_fds, 256UL ); + for( ulong i=0UL; ilocal_out.dir_fd, (uint)ctx->local_out.full_snapshot_fd, (uint)ctx->local_out.incremental_snapshot_fd, (uint)min_ping_fd, (uint)max_ping_fd ); return sock_filter_policy_fd_snapct_tile_instr_cnt; } @@ -385,7 +396,7 @@ populate_allowed_fds( fd_topo_t const * topo, fd_topo_tile_t const * tile, ulong out_fds_cnt, int * out_fds ) { - if( FD_UNLIKELY( out_fds_cnt<6UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt )); + if( FD_UNLIKELY( out_fds_cnt<255UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt )); ulong out_cnt = 0; out_fds[ out_cnt++ ] = 2UL; /* stderr */ @@ -400,7 +411,7 @@ populate_allowed_fds( fd_topo_t const * topo, if( FD_LIKELY( -1!=ctx->local_out.dir_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.dir_fd; if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.full_snapshot_fd; if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.incremental_snapshot_fd; - if( FD_LIKELY( download_enabled( tile ) ) ) out_fds[ out_cnt++ ] = fd_ssping_get_sockfd( ctx->ssping ); + if( FD_LIKELY( download_enabled( tile ) ) ) out_cnt += fd_ssping_get_sockfds( ctx->ssping, out_fds+out_cnt, out_fds_cnt-out_cnt ); return out_cnt; } diff --git a/src/discof/restore/fd_snapct_tile.seccomppolicy b/src/discof/restore/fd_snapct_tile.seccomppolicy index da533007bf5..befedea9813 100644 --- a/src/discof/restore/fd_snapct_tile.seccomppolicy +++ b/src/discof/restore/fd_snapct_tile.seccomppolicy @@ -1,6 +1,6 @@ # logfile_fd: It can be disabled by configuration, but typically tiles # will open a log file on boot and write all messages there. -uint logfile_fd, uint dir_fd, uint out_full_fd, uint out_inc_fd, uint ping_fd +uint logfile_fd, uint dir_fd, uint out_full_fd, uint out_inc_fd, uint min_ping_fd, uint max_ping_fd # logging: all log messages are written to a file and/or pipe # @@ -31,13 +31,14 @@ lseek: (or (eq (arg 0) out_full_fd) # OpenSSL calls read to receive bytes from a socket. We will restrict # this being called on the stderr, the log file descriptor, the snapshot # directory, the full and incremental snapshot files, and the ping file -# descriptor. +# descriptors. read: (not (or (eq (arg 0) 2) (eq (arg 0) logfile_fd) (eq (arg 0) dir_fd) (eq (arg 0) out_full_fd) (eq (arg 0) out_inc_fd) - (eq (arg 0) ping_fd))) + (and (>= (arg 0) min_ping_fd) + (<= (arg 0) max_ping_fd)))) # logging: 'WARNING' and above fsync the logfile to disk immediately # @@ -51,7 +52,8 @@ socket: (and (eq (arg 0) "AF_INET") (eq (arg 2) 0)) # snapshot: need to be able to connect to a peer to download a snapshot -# from and for snapshot slot resolution. +# from and for snapshot slot resolution. We also call connect on all +# the ping file descriptors. # # arg 0 is the file descriptor of the socket to connect to. We will # restrict this being called on any of the snapshot file descriptors, @@ -60,8 +62,7 @@ connect: (not (or (eq (arg 0) 2) (eq (arg 0) logfile_fd) (eq (arg 0) dir_fd) (eq (arg 0) out_full_fd) - (eq (arg 0) out_inc_fd) - (eq (arg 0) ping_fd))) + (eq (arg 0) out_inc_fd))) # snapshot: need to close sockets that were opened when calls to connect # fail. @@ -74,28 +75,31 @@ close: (not (or (eq (arg 0) 2) (eq (arg 0) dir_fd) (eq (arg 0) out_full_fd) (eq (arg 0) out_inc_fd) - (eq (arg 0) ping_fd))) + (and (>= (arg 0) min_ping_fd) + (<= (arg 0) max_ping_fd)))) # snapshot: we need to be able to poll existing connections that we want # to download snapshots and resolve snapshots from ppoll +# snapshot: we need to be able to poll the ping connections to see if +# any peers have responded +poll + # snapshot: send HTTP requests to endpoints for snapshot downloading. # OpenSSL also uses sendto to write to the socket during the lifetime # of an HTTPS connection. # # arg 0 is the file descriptor that we would like to send a request to. # We will restrict this being called on any of the snapshot file -# descriptors, STDERR or the logfile. +# descriptors, ping file descriptors, STDERR or the logfile. sendto: (not (or (eq (arg 0) 2) (eq (arg 0) logfile_fd) (eq (arg 0) dir_fd) (eq (arg 0) out_full_fd) (eq (arg 0) out_inc_fd) - (eq (arg 0) ping_fd))) - -sendmmsg: (and (eq (arg 0) ping_fd) - (eq (arg 3) 0)) + (and (>= (arg 0) min_ping_fd) + (<= (arg 0) max_ping_fd)))) # snapshot: we need to be able to receive responses for any requests # that were sent out for snapshot download or slot resolution. @@ -115,13 +119,15 @@ recvfrom: (not (or (eq (arg 0) 2) # # arg 0 is the file descriptor that corresponds to the socket that we # want to connect to. We know that we don't want to connect to the -# file descriptors used for logging, stderr, or any of the snapshot fds. +# file descriptors used for logging, stderr, ping or any of the snapshot +# fds. The ping file descriptors set this option in privileged_init. setsockopt: (and (not (or (eq (arg 0) 2) (eq (arg 0) logfile_fd) (eq (arg 0) dir_fd) (eq (arg 0) out_full_fd) (eq (arg 0) out_inc_fd) - (eq (arg 0) ping_fd))) + (and (>= (arg 0) min_ping_fd) + (<= (arg 0) max_ping_fd)))) (eq (arg 1) IPPROTO_TCP) (eq (arg 2) TCP_NODELAY)) diff --git a/src/discof/restore/generated/fd_snapct_tile_seccomp.h b/src/discof/restore/generated/fd_snapct_tile_seccomp.h index d6ea08950cd..d9c3ce5e119 100644 --- a/src/discof/restore/generated/fd_snapct_tile_seccomp.h +++ b/src/discof/restore/generated/fd_snapct_tile_seccomp.h @@ -24,14 +24,14 @@ #else # error "Target architecture is unsupported by seccomp." #endif -static const unsigned int sock_filter_policy_fd_snapct_tile_instr_cnt = 131; +static const unsigned int sock_filter_policy_fd_snapct_tile_instr_cnt = 133; -static void populate_sock_filter_policy_fd_snapct_tile( ulong out_cnt, struct sock_filter * out, uint logfile_fd, uint dir_fd, uint out_full_fd, uint out_inc_fd, uint ping_fd ) { - FD_TEST( out_cnt >= 131 ); - struct sock_filter filter[131] = { +static void populate_sock_filter_policy_fd_snapct_tile( ulong out_cnt, struct sock_filter * out, uint logfile_fd, uint dir_fd, uint out_full_fd, uint out_inc_fd, uint min_ping_fd, uint max_ping_fd ) { + FD_TEST( out_cnt >= 133 ); + struct sock_filter filter[133] = { /* Check: Jump to RET_KILL_PROCESS if the script's arch != the runtime arch */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, ( offsetof( struct seccomp_data, arch ) ) ), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, ARCH_NR, 0, /* RET_KILL_PROCESS */ 127 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, ARCH_NR, 0, /* RET_KILL_PROCESS */ 129 ), /* loading syscall number in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, ( offsetof( struct seccomp_data, nr ) ) ), /* allow write based on expression */ @@ -43,89 +43,93 @@ static void populate_sock_filter_policy_fd_snapct_tile( ulong out_cnt, struct so /* allow read based on expression */ BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_read, /* check_read */ 30, 0 ), /* allow fsync based on expression */ - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_fsync, /* check_fsync */ 41, 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_fsync, /* check_fsync */ 43, 0 ), /* allow socket based on expression */ - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_socket, /* check_socket */ 42, 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_socket, /* check_socket */ 44, 0 ), /* allow connect based on expression */ - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_connect, /* check_connect */ 47, 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_connect, /* check_connect */ 49, 0 ), /* allow close based on expression */ BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_close, /* check_close */ 58, 0 ), /* simply allow ppoll */ - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_ppoll, /* RET_ALLOW */ 118, 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_ppoll, /* RET_ALLOW */ 120, 0 ), + /* simply allow poll */ + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_poll, /* RET_ALLOW */ 119, 0 ), /* allow sendto based on expression */ - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_sendto, /* check_sendto */ 68, 0 ), - /* allow sendmmsg based on expression */ - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_sendmmsg, /* check_sendmmsg */ 79, 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_sendto, /* check_sendto */ 69, 0 ), /* allow recvfrom based on expression */ BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_recvfrom, /* check_recvfrom */ 82, 0 ), /* allow setsockopt based on expression */ BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_setsockopt, /* check_setsockopt */ 91, 0 ), /* allow renameat based on expression */ - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_renameat, /* check_renameat */ 106, 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_renameat, /* check_renameat */ 108, 0 ), /* simply allow getpid */ - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_getpid, /* RET_ALLOW */ 112, 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_getpid, /* RET_ALLOW */ 114, 0 ), /* simply allow getrandom */ - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_getrandom, /* RET_ALLOW */ 111, 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_getrandom, /* RET_ALLOW */ 113, 0 ), /* allow exit based on expression */ - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_exit, /* check_exit */ 107, 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_exit, /* check_exit */ 109, 0 ), /* none of the syscalls matched */ - { BPF_JMP | BPF_JA, 0, 0, /* RET_KILL_PROCESS */ 108 }, + { BPF_JMP | BPF_JA, 0, 0, /* RET_KILL_PROCESS */ 110 }, // check_write: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 2, /* RET_ALLOW */ 107, /* lbl_1 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 2, /* RET_ALLOW */ 109, /* lbl_1 */ 0 ), // lbl_1: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_ALLOW */ 105, /* lbl_2 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_ALLOW */ 107, /* lbl_2 */ 0 ), // lbl_2: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_ALLOW */ 103, /* lbl_3 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_ALLOW */ 105, /* lbl_3 */ 0 ), // lbl_3: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_ALLOW */ 101, /* RET_KILL_PROCESS */ 100 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_ALLOW */ 103, /* RET_KILL_PROCESS */ 102 ), // check_ftruncate: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_ALLOW */ 99, /* lbl_4 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_ALLOW */ 101, /* lbl_4 */ 0 ), // lbl_4: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_ALLOW */ 97, /* RET_KILL_PROCESS */ 96 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_ALLOW */ 99, /* RET_KILL_PROCESS */ 98 ), // check_lseek: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_ALLOW */ 95, /* lbl_5 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_ALLOW */ 97, /* lbl_5 */ 0 ), // lbl_5: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_ALLOW */ 93, /* RET_KILL_PROCESS */ 92 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_ALLOW */ 95, /* RET_KILL_PROCESS */ 94 ), // check_read: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 2, /* RET_KILL_PROCESS */ 90, /* lbl_6 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 2, /* RET_KILL_PROCESS */ 92, /* lbl_6 */ 0 ), // lbl_6: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_KILL_PROCESS */ 88, /* lbl_7 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_KILL_PROCESS */ 90, /* lbl_7 */ 0 ), // lbl_7: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, dir_fd, /* RET_KILL_PROCESS */ 86, /* lbl_8 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, dir_fd, /* RET_KILL_PROCESS */ 88, /* lbl_8 */ 0 ), // lbl_8: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_KILL_PROCESS */ 84, /* lbl_9 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_KILL_PROCESS */ 86, /* lbl_9 */ 0 ), // lbl_9: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_KILL_PROCESS */ 82, /* lbl_10 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_KILL_PROCESS */ 84, /* lbl_10 */ 0 ), // lbl_10: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, ping_fd, /* RET_KILL_PROCESS */ 80, /* RET_ALLOW */ 81 ), + BPF_JUMP( BPF_JMP | BPF_JGE | BPF_K, min_ping_fd, /* lbl_11 */ 0, /* RET_ALLOW */ 83 ), +// lbl_11: + /* load syscall argument 0 in accumulator */ + BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), + BPF_JUMP( BPF_JMP | BPF_JGT | BPF_K, max_ping_fd, /* RET_ALLOW */ 81, /* RET_KILL_PROCESS */ 80 ), // check_fsync: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), @@ -133,152 +137,152 @@ static void populate_sock_filter_policy_fd_snapct_tile( ulong out_cnt, struct so // check_socket: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, AF_INET, /* lbl_11 */ 0, /* RET_KILL_PROCESS */ 76 ), -// lbl_11: + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, AF_INET, /* lbl_12 */ 0, /* RET_KILL_PROCESS */ 76 ), +// lbl_12: /* load syscall argument 1 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[1])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SOCK_STREAM|SOCK_NONBLOCK, /* lbl_12 */ 0, /* RET_KILL_PROCESS */ 74 ), -// lbl_12: + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SOCK_STREAM|SOCK_NONBLOCK, /* lbl_13 */ 0, /* RET_KILL_PROCESS */ 74 ), +// lbl_13: /* load syscall argument 2 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[2])), BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 0, /* RET_ALLOW */ 73, /* RET_KILL_PROCESS */ 72 ), // check_connect: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 2, /* RET_KILL_PROCESS */ 70, /* lbl_13 */ 0 ), -// lbl_13: - /* load syscall argument 0 in accumulator */ - BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_KILL_PROCESS */ 68, /* lbl_14 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 2, /* RET_KILL_PROCESS */ 70, /* lbl_14 */ 0 ), // lbl_14: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, dir_fd, /* RET_KILL_PROCESS */ 66, /* lbl_15 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_KILL_PROCESS */ 68, /* lbl_15 */ 0 ), // lbl_15: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_KILL_PROCESS */ 64, /* lbl_16 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, dir_fd, /* RET_KILL_PROCESS */ 66, /* lbl_16 */ 0 ), // lbl_16: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_KILL_PROCESS */ 62, /* lbl_17 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_KILL_PROCESS */ 64, /* lbl_17 */ 0 ), // lbl_17: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, ping_fd, /* RET_KILL_PROCESS */ 60, /* RET_ALLOW */ 61 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_KILL_PROCESS */ 62, /* RET_ALLOW */ 63 ), // check_close: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 2, /* RET_KILL_PROCESS */ 58, /* lbl_18 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 2, /* RET_KILL_PROCESS */ 60, /* lbl_18 */ 0 ), // lbl_18: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_KILL_PROCESS */ 56, /* lbl_19 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_KILL_PROCESS */ 58, /* lbl_19 */ 0 ), // lbl_19: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, dir_fd, /* RET_KILL_PROCESS */ 54, /* lbl_20 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, dir_fd, /* RET_KILL_PROCESS */ 56, /* lbl_20 */ 0 ), // lbl_20: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_KILL_PROCESS */ 52, /* lbl_21 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_KILL_PROCESS */ 54, /* lbl_21 */ 0 ), // lbl_21: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_KILL_PROCESS */ 50, /* lbl_22 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_KILL_PROCESS */ 52, /* lbl_22 */ 0 ), // lbl_22: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, ping_fd, /* RET_KILL_PROCESS */ 48, /* RET_ALLOW */ 49 ), -// check_sendto: + BPF_JUMP( BPF_JMP | BPF_JGE | BPF_K, min_ping_fd, /* lbl_23 */ 0, /* RET_ALLOW */ 51 ), +// lbl_23: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 2, /* RET_KILL_PROCESS */ 46, /* lbl_23 */ 0 ), -// lbl_23: + BPF_JUMP( BPF_JMP | BPF_JGT | BPF_K, max_ping_fd, /* RET_ALLOW */ 49, /* RET_KILL_PROCESS */ 48 ), +// check_sendto: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_KILL_PROCESS */ 44, /* lbl_24 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 2, /* RET_KILL_PROCESS */ 46, /* lbl_24 */ 0 ), // lbl_24: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, dir_fd, /* RET_KILL_PROCESS */ 42, /* lbl_25 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_KILL_PROCESS */ 44, /* lbl_25 */ 0 ), // lbl_25: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_KILL_PROCESS */ 40, /* lbl_26 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, dir_fd, /* RET_KILL_PROCESS */ 42, /* lbl_26 */ 0 ), // lbl_26: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_KILL_PROCESS */ 38, /* lbl_27 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_KILL_PROCESS */ 40, /* lbl_27 */ 0 ), // lbl_27: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, ping_fd, /* RET_KILL_PROCESS */ 36, /* RET_ALLOW */ 37 ), -// check_sendmmsg: - /* load syscall argument 0 in accumulator */ - BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, ping_fd, /* lbl_28 */ 0, /* RET_KILL_PROCESS */ 34 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_KILL_PROCESS */ 38, /* lbl_28 */ 0 ), // lbl_28: - /* load syscall argument 3 in accumulator */ - BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[3])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 0, /* RET_ALLOW */ 33, /* RET_KILL_PROCESS */ 32 ), -// check_recvfrom: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 2, /* RET_KILL_PROCESS */ 30, /* lbl_29 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JGE | BPF_K, min_ping_fd, /* lbl_29 */ 0, /* RET_ALLOW */ 37 ), // lbl_29: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_KILL_PROCESS */ 28, /* lbl_30 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JGT | BPF_K, max_ping_fd, /* RET_ALLOW */ 35, /* RET_KILL_PROCESS */ 34 ), +// check_recvfrom: + /* load syscall argument 0 in accumulator */ + BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 2, /* RET_KILL_PROCESS */ 32, /* lbl_30 */ 0 ), // lbl_30: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, dir_fd, /* RET_KILL_PROCESS */ 26, /* lbl_31 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_KILL_PROCESS */ 30, /* lbl_31 */ 0 ), // lbl_31: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_KILL_PROCESS */ 24, /* lbl_32 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, dir_fd, /* RET_KILL_PROCESS */ 28, /* lbl_32 */ 0 ), // lbl_32: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_KILL_PROCESS */ 22, /* RET_ALLOW */ 23 ), -// check_setsockopt: + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_KILL_PROCESS */ 26, /* lbl_33 */ 0 ), +// lbl_33: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 2, /* RET_KILL_PROCESS */ 20, /* lbl_34 */ 0 ), -// lbl_34: + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_KILL_PROCESS */ 24, /* RET_ALLOW */ 25 ), +// check_setsockopt: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_KILL_PROCESS */ 18, /* lbl_35 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 2, /* RET_KILL_PROCESS */ 22, /* lbl_35 */ 0 ), // lbl_35: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, dir_fd, /* RET_KILL_PROCESS */ 16, /* lbl_36 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_KILL_PROCESS */ 20, /* lbl_36 */ 0 ), // lbl_36: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_KILL_PROCESS */ 14, /* lbl_37 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, dir_fd, /* RET_KILL_PROCESS */ 18, /* lbl_37 */ 0 ), // lbl_37: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_KILL_PROCESS */ 12, /* lbl_38 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_full_fd, /* RET_KILL_PROCESS */ 16, /* lbl_38 */ 0 ), // lbl_38: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, ping_fd, /* RET_KILL_PROCESS */ 10, /* lbl_33 */ 0 ), -// lbl_33: + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, out_inc_fd, /* RET_KILL_PROCESS */ 14, /* lbl_39 */ 0 ), +// lbl_39: + /* load syscall argument 0 in accumulator */ + BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), + BPF_JUMP( BPF_JMP | BPF_JGE | BPF_K, min_ping_fd, /* lbl_40 */ 0, /* lbl_34 */ 2 ), +// lbl_40: + /* load syscall argument 0 in accumulator */ + BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), + BPF_JUMP( BPF_JMP | BPF_JGT | BPF_K, max_ping_fd, /* lbl_34 */ 0, /* RET_KILL_PROCESS */ 10 ), +// lbl_34: /* load syscall argument 1 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[1])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, IPPROTO_TCP, /* lbl_39 */ 0, /* RET_KILL_PROCESS */ 8 ), -// lbl_39: + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, IPPROTO_TCP, /* lbl_41 */ 0, /* RET_KILL_PROCESS */ 8 ), +// lbl_41: /* load syscall argument 2 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[2])), BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, TCP_NODELAY, /* RET_ALLOW */ 7, /* RET_KILL_PROCESS */ 6 ), // check_renameat: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, dir_fd, /* lbl_40 */ 0, /* RET_KILL_PROCESS */ 4 ), -// lbl_40: + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, dir_fd, /* lbl_42 */ 0, /* RET_KILL_PROCESS */ 4 ), +// lbl_42: /* load syscall argument 2 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[2])), BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, dir_fd, /* RET_ALLOW */ 3, /* RET_KILL_PROCESS */ 2 ), diff --git a/src/discof/restore/utils/fd_ssping.c b/src/discof/restore/utils/fd_ssping.c index 79a9a88980b..20477bd307b 100644 --- a/src/discof/restore/utils/fd_ssping.c +++ b/src/discof/restore/utils/fd_ssping.c @@ -10,7 +10,8 @@ #include #include #include -#include +#include +#include #define PEER_STATE_UNPINGED 0 #define PEER_STATE_PINGED 1 @@ -24,16 +25,6 @@ #define PING_BURST_MAX (16UL) /* Limit how many pings we can burst at once. */ -/* FIXME: This code uses fd_ip4_port_t as the key for peers, but it - should really just use uint (IPv4 address) as port has no meaning - for ICMP pings. Making this change however requires some significant - changes in snapct as we are also effectively storing peer invalidation - state in this data structure. The number of distinct peers with - the same IP address but different ports will be low, so this is fine - for now. */ - -/* FIXME: Properly set and track sequence numbers for repeated pings. */ - struct fd_ssping_peer { ulong refcnt; fd_ip4_port_t addr; @@ -55,6 +46,7 @@ struct fd_ssping_peer { int state; ulong latency_nanos; long deadline_nanos; + ulong used_fd_idx; }; typedef struct fd_ssping_peer fd_ssping_peer_t; @@ -92,25 +84,22 @@ struct fd_ssping_private { deadline_list_t * refreshing; deadline_list_t * invalid; - int sockfd; - fd_ssping_on_ping_fn_t on_ping_cb; void * cb_arg; ulong magic; /* ==FD_SSPING_MAGIC */ -}; -/* We attach the UDP port number associated with the peer to each ping - echo request, which must be reflected back to us in the echo reply. - This is used to look up the correct peer, which is keyed on both - IP address and UDP port. The ICMP echo protocol has no concept - of UDP port which is why we must do this manually. */ - -struct __attribute__((packed)) ssping_pkt { - struct icmphdr icmp; - ushort port; + /* Invariant: The pool elements with an associated file descriptor are + exactly those that are PINGED or REFRESHING. */ + ulong used_fd_cnt; + struct pollfd used_fds[ FD_SSPING_FD_CNT ]; /* indexed [0, used_fd_cnt) */ + int idle_fds[ FD_SSPING_FD_CNT ]; /* indexed [0, FD_SSPING_FD_CNT-used_fd_cnt) */ + /* ping_to_pool[ i ]==x means that used_fds[ i ].fd is in use for + pinging the peer in pool[ x ]. */ + ulong ping_to_pool[ FD_SSPING_FD_CNT ]; /* indexed [0, used_fd_cnt) */ }; + FD_FN_CONST ulong fd_ssping_align( void ) { return fd_ulong_max( alignof(fd_ssping_t), @@ -174,12 +163,21 @@ fd_ssping_new( void * shmem, ssping->refreshing = deadline_list_join( deadline_list_new( _refreshing ) ); ssping->invalid = deadline_list_join( deadline_list_new( _invalid ) ); - /* Note: This uses an obscure feature of Linux called ICMP datagram - sockets or unprivileged ping sockets. Normally one would have to - use SOCK_RAW sockets, but with this special feature any user can - send & receive ICMP echo packets. */ - ssping->sockfd = socket( AF_INET, SOCK_DGRAM|SOCK_NONBLOCK, IPPROTO_ICMP ); - if( FD_UNLIKELY( -1==ssping->sockfd ) ) FD_LOG_ERR(( "socket(SOCK_DGRAM,IPPROTO_ICMP) failed (%i-%s)", errno, fd_io_strerror( errno ) )); + for( ulong i=0UL; iidle_fds[ i ] = fd; + + ssping->used_fds[ i ].fd = -1; + ssping->used_fds[ i ].events = POLLOUT|POLLRDHUP|POLLPRI; + ssping->used_fds[ i ].revents = 0; + } + + ssping->used_fd_cnt = 0UL; ssping->on_ping_cb = on_ping_cb; ssping->cb_arg = cb_arg; @@ -213,11 +211,6 @@ fd_ssping_join( void * shping ) { return ssping; } -int -fd_ssping_get_sockfd( fd_ssping_t const * ssping ) { - return ssping->sockfd; -} - void fd_ssping_add( fd_ssping_t * ssping, fd_ip4_port_t addr ) { @@ -230,6 +223,7 @@ fd_ssping_add( fd_ssping_t * ssping, peer->state = PEER_STATE_UNPINGED; peer->addr = addr; peer->latency_nanos = ULONG_MAX; + peer->used_fd_idx = ULONG_MAX; peer_map_ele_insert( ssping->map, peer, ssping->pool ); deadline_list_ele_push_tail( ssping->unpinged, peer, ssping->pool ); } @@ -295,34 +289,86 @@ fd_ssping_invalidate( fd_ssping_t * ssping, deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool ); } +static void +remove_fdesc_idx( fd_ssping_t * ssping, + ulong fdesc_idx ) { + FD_TEST( fdesc_idxused_fd_cnt ); + ulong pool_idx = ssping->ping_to_pool[ fdesc_idx ]; + + int fdesc = ssping->used_fds[ fdesc_idx ].fd; + /* Abort the connection attempt or close the connection by connecting + to AF_UNSPEC. */ + struct sockaddr_in addr[1] = {{ + .sin_family = AF_UNSPEC, + .sin_addr = { .s_addr = 0U }, + .sin_port = 0 + }}; + if( FD_UNLIKELY( connect( fdesc, addr, sizeof(addr) ) ) ) FD_LOG_ERR(( "connect(AF_UNSPEC) failed (%d-%s)", errno, fd_io_strerror( errno ) )); + + /* Mark that the pool element no longer has an associated index. */ + ssping->pool[ pool_idx ].used_fd_idx = ULONG_MAX; + + /* Now swap the last used_fd into this position, updating all the + relevant bookkeeping info. */ + ulong last = ssping->used_fd_cnt-1UL; + if( FD_LIKELY( fdesc_idx!=last ) ) { + ssping->used_fds[ fdesc_idx ] = ssping->used_fds[ last ]; + ulong last_pool_idx = ssping->ping_to_pool[ fdesc_idx ] = ssping->ping_to_pool[ last ]; + ssping->pool[ last_pool_idx ].used_fd_idx = fdesc_idx; + } + + ssping->idle_fds[ FD_SSPING_FD_CNT - ssping->used_fd_cnt ] = fdesc; + ssping->used_fd_cnt--; +} + static inline void -recv_pings( fd_ssping_t * ssping ) { - for( ulong i=0UL; isockfd, &pkt, sizeof(pkt), 0, fd_type_pun( &addr ), &alen ); - if( FD_UNLIKELY( result!=sizeof(pkt) || alen!=sizeof(addr) || pkt.icmp.type!=ICMP_ECHOREPLY ) ) break; - - fd_ip4_port_t key = { - .addr = addr.sin_addr.s_addr, - .port = pkt.port - }; - fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &key, NULL, ssping->pool ); - if( FD_UNLIKELY( peer==NULL || ( peer->state!=PEER_STATE_PINGED && peer->state!=PEER_STATE_REFRESHING ) ) ) continue; - - long now = fd_log_wallclock(); - - deadline_list_ele_remove( peer->state==PEER_STATE_PINGED ? ssping->pinged : ssping->refreshing, peer, ssping->pool ); - peer->latency_nanos = (ulong)fd_long_max( now - (peer->deadline_nanos - PEER_DEADLINE_NANOS_PING), 1L ); - peer->state = PEER_STATE_VALID; - peer->deadline_nanos = now + PEER_DEADLINE_NANOS_VALID; - deadline_list_ele_push_tail( ssping->valid, peer, ssping->pool ); - - FD_LOG_INFO(( "pinged " FD_IP4_ADDR_FMT ":%hu in %lu nanos", - FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), fd_ushort_bswap( peer->addr.port ), peer->latency_nanos )); - ssping->on_ping_cb( ssping->cb_arg, peer->addr, peer->latency_nanos ); +recv_pings( fd_ssping_t * ssping, + fd_sspeer_selector_t * selector) { + int pollv = poll( ssping->used_fds, ssping->used_fd_cnt, 0 ); + if( FD_UNLIKELY( pollv<0 ) ) { + FD_LOG_WARNING(( "poll(used_fds,%lu,0) failed (%d-%s)", ssping->used_fd_cnt, errno, fd_io_strerror( errno ) )); + return; + } + long now = fd_log_wallclock(); + ulong processed = 0UL; + ulong processed_idx[ PING_BURST_MAX ]; + for( ulong i=0UL; iused_fd_cnt; i++ ) { + if( FD_UNLIKELY( processed >= fd_ulong_min( (ulong)pollv, PING_BURST_MAX ) ) ) break; + if( FD_UNLIKELY( ssping->used_fds[ i ].revents ) ) { + ulong pool_idx = ssping->ping_to_pool[ i ]; + fd_ssping_peer_t * peer = ssping->pool+pool_idx; + + FD_TEST( peer->state==PEER_STATE_PINGED || peer->state==PEER_STATE_REFRESHING ); + + + deadline_list_ele_remove( peer->state==PEER_STATE_PINGED ? ssping->pinged : ssping->refreshing, peer, ssping->pool ); + int is_err = ssping->used_fds[ i ].revents & (POLLRDHUP|POLLERR|POLLHUP); + if( FD_LIKELY( !is_err ) ) { + peer->latency_nanos = (ulong)fd_long_max( now - (peer->deadline_nanos - PEER_DEADLINE_NANOS_PING), 1L ); + peer->state = PEER_STATE_VALID; + peer->deadline_nanos = now + PEER_DEADLINE_NANOS_VALID; + deadline_list_ele_push_tail( ssping->valid, peer, ssping->pool ); + + FD_LOG_INFO(( "pinged " FD_IP4_ADDR_FMT ":%hu in %lu nanos", + FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), fd_ushort_bswap( peer->addr.port ), peer->latency_nanos )); + ssping->on_ping_cb( ssping->cb_arg, peer->addr, peer->latency_nanos ); + } else { + /* This is pretty unlikely, but the host could respond with an + RST packet I suppose. */ + peer->state = PEER_STATE_INVALID; + peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID; + deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool ); + fd_sspeer_selector_remove( selector, peer->addr ); + } + processed_idx[ processed ] = i; + processed++; + } } + /* Now we need to call remove_fdesc_idx on the processed ones in + reverse order (largest to smallest) so that we don't trip on + ourself as we shuffle the array. */ + while( processed ) remove_fdesc_idx( ssping, processed_idx[ --processed ] ); } static uint @@ -330,48 +376,38 @@ send_pings( fd_ssping_t * ssping, deadline_list_t * list, long until ) { uint msg_cnt = 0U; - struct ssping_pkt pkts [ PING_BURST_MAX ]; - struct iovec iovs [ PING_BURST_MAX ]; - struct sockaddr_in addrs [ PING_BURST_MAX ]; - struct mmsghdr msgs [ PING_BURST_MAX ]; for( deadline_list_iter_t iter = deadline_list_iter_fwd_init( list, ssping->pool ); - msg_cntpool ); + msg_cntused_fd_cntpool ); iter = deadline_list_iter_fwd_next( iter, list, ssping->pool ) ) { - fd_ssping_peer_t * peer = peer_pool_ele( ssping->pool, deadline_list_iter_idx( iter, list, ssping->pool ) ); + ulong peer_idx = deadline_list_iter_idx( iter, list, ssping->pool ); + fd_ssping_peer_t * peer = peer_pool_ele( ssping->pool, peer_idx ); if( peer->deadline_nanos>until ) break; - pkts[ msg_cnt ] = (struct ssping_pkt){ - .icmp = { .type = ICMP_ECHO }, - .port = peer->addr.port - }; - iovs[ msg_cnt ] = (struct iovec){ - .iov_base = pkts + msg_cnt, - .iov_len = sizeof(struct ssping_pkt) - }; - addrs[ msg_cnt ] = (struct sockaddr_in){ + int fdesc = ssping->idle_fds[ FD_SSPING_FD_CNT-ssping->used_fd_cnt-1UL ]; + + struct sockaddr_in addr[1] = {{ .sin_family = AF_INET, - .sin_addr = { .s_addr = peer->addr.addr } - }; - msgs[ msg_cnt ].msg_hdr = (struct msghdr){ - .msg_name = addrs + msg_cnt, - .msg_namelen = sizeof(struct sockaddr_in), - .msg_iov = iovs + msg_cnt, - .msg_iovlen = 1, - }; - msgs[ msg_cnt ].msg_len = 0; + .sin_addr = { .s_addr = peer->addr.addr }, + .sin_port = peer->addr.port + }}; + + if( FD_UNLIKELY( connect( fdesc, addr, sizeof(addr) ) && errno!=EINPROGRESS ) ) { + FD_LOG_WARNING(( "connect(" FD_IP4_ADDR_FMT ":%hu) failed (%d-%s)", FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), fd_ushort_bswap( peer->addr.port ), errno, fd_io_strerror( errno ) )); + /* Nothing to do. It will get "reaped" later. */ + } + + ssping->used_fds [ ssping->used_fd_cnt ].fd = fdesc; + ssping->ping_to_pool[ ssping->used_fd_cnt ] = peer_idx; + peer->used_fd_idx = ssping->used_fd_cnt; + ssping->used_fd_cnt++; msg_cnt++; } if( msg_cnt==0U ) return 0U; - int result = sendmmsg( ssping->sockfd, msgs, msg_cnt, 0 ); - if( FD_UNLIKELY( -1==result ) ) { - if( errno!=EAGAIN && errno!=EINTR ) FD_LOG_WARNING(( "sendmmsg(%u) failed (%i-%s)", msg_cnt, errno, fd_io_strerror( errno ) )); - return 0U; - } - FD_TEST( result>=0 && result<=(int)PING_BURST_MAX ); - return (uint)result; + return (uint)msg_cnt; } + void fd_ssping_advance( fd_ssping_t * ssping, long now, @@ -391,6 +427,8 @@ fd_ssping_advance( fd_ssping_t * ssping, deadline_list_ele_pop_head( ssping->pinged, ssping->pool ); + remove_fdesc_idx( ssping, peer->used_fd_idx ); + peer->state = PEER_STATE_INVALID; peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID; deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool ); @@ -412,6 +450,8 @@ fd_ssping_advance( fd_ssping_t * ssping, deadline_list_ele_pop_head( ssping->refreshing, ssping->pool ); + remove_fdesc_idx( ssping, peer->used_fd_idx ); + peer->state = PEER_STATE_INVALID; peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID; deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool ); @@ -429,5 +469,21 @@ fd_ssping_advance( fd_ssping_t * ssping, deadline_list_ele_push_tail( ssping->unpinged, peer, ssping->pool ); } - recv_pings( ssping ); + recv_pings( ssping, selector ); +} + +ulong +fd_ssping_get_sockfds( fd_ssping_t const * ssping, + int * fds, + ulong fd_cnt ) { + ulong ret_cnt = fd_ulong_min( fd_cnt, FD_SSPING_FD_CNT ); + for( ulong i=0UL; iused_fd_cnt ); i++ ) { + fds[ i ] = ssping->used_fds[ i ].fd; + } + fds += fd_ulong_min( ret_cnt, ssping->used_fd_cnt ); + ret_cnt -= fd_ulong_min( ret_cnt, ssping->used_fd_cnt ); + for( ulong j=0UL; jused_fd_cnt ); j++ ) { + fds[ j ] = ssping->idle_fds[ j ]; + } + return fd_ulong_min( fd_cnt, FD_SSPING_FD_CNT ); } diff --git a/src/discof/restore/utils/fd_ssping.h b/src/discof/restore/utils/fd_ssping.h index ff742854654..67e49668965 100644 --- a/src/discof/restore/utils/fd_ssping.h +++ b/src/discof/restore/utils/fd_ssping.h @@ -21,6 +21,10 @@ struct fd_sspeer_selector_private; typedef struct fd_sspeer_selector_private fd_sspeer_selector_t; +#define FD_SSPING_FD_CNT (249UL) /* Limit to how many pings can be + inflight. Chosen so that it doesn't + overflow the tile limit. */ + #define FD_SSPING_MAGIC (0xF17EDA2CE55A1A60) /* FIREDANCE SSPING V0 */ struct fd_ssping_private; @@ -91,10 +95,13 @@ fd_ssping_advance( fd_ssping_t * ssping, long now, fd_sspeer_selector_t * selector); -/* Return the ping socket file descriptor */ - -int -fd_ssping_get_sockfd( fd_ssping_t const * ssping ); +/* Write the socket file descriptors that this ping tracker might use to + fds[i] for i in [0, fd_cnt). Returns how many file descriptors were + actually written. */ +ulong +fd_ssping_get_sockfds( fd_ssping_t const * ssping, + int * fds, + ulong fd_cnt ); FD_PROTOTYPES_END