Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions doc/nn_tcp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ NN_TCP_NODELAY::
delaying of TCP acknowledgments. Using this option improves latency at
the expense of throughput. Type of this option is int. Default value is 0.

NN_TCP_QUICKACK::
This option, when set to 1, requests immediate sending of TCP acknowledgment.
Unlike NN_TCP_NODELAY, this is a one-shot option that is automatically
cleared by the kernel after each receive operation. This option should be
called after nn_recv() to force an immediate ACK for the received data.
This is a Linux-specific option. Type of this option is int.


EXAMPLE
-------
Expand Down
39 changes: 39 additions & 0 deletions src/core/sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "../protocol.h"
#include "../transport.h"
#include "../tcp.h"

#include "sock.h"
#include "global.h"
Expand Down Expand Up @@ -296,6 +297,44 @@ static int nn_sock_setopt_inner (struct nn_sock *self, int level,

/* Transport-specific options. */
if (level < NN_SOL_SOCKET) {
/* Special handling for TCP_QUICKACK - apply to all active endpoints.

TCP_QUICKACK must be set on active connections (after recv), so we
need to iterate through all endpoints and apply it to their active
connections.

Architecture:
- Socket has multiple endpoints (one per nn_bind/nn_connect call)
- Each bind endpoint (btcp) can have multiple accepted connections (atcp)
- Each connect endpoint (ctcp) has one outgoing connection

This iteration hits all endpoints, and each endpoint's setopt will
in turn apply the option to all its active connections. */
if (level == NN_TCP && option == NN_TCP_QUICKACK) {
struct nn_list_item *it;
struct nn_ep *ep;
int rc;
int applied = 0;

/* Iterate through all endpoints (btcp/ctcp instances) */
for (it = nn_list_begin (&self->eps);
it != nn_list_end (&self->eps);
it = nn_list_next (&self->eps, it)) {
ep = nn_cont (it, struct nn_ep, item);

/* Only apply to TCP endpoints that have a setopt operation.
This will call nn_btcp_setopt or nn_ctcp_setopt. */
if (ep->ops.setopt != NULL) {
rc = ep->ops.setopt (ep->tran, option, optval, optvallen);
if (rc == 0)
applied = 1;
}
}

/* Return success if at least one endpoint accepted it */
return applied ? 0 : -ENOPROTOOPT;
}

optset = nn_sock_optset (self, level);
if (!optset)
return -ENOPROTOOPT;
Expand Down
1 change: 1 addition & 0 deletions src/core/symbol.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ static const struct nn_symbol_properties sym_value_names [] = {
NN_SYM(NN_REQ_RESEND_IVL, TRANSPORT_OPTION, INT, MILLISECONDS),
NN_SYM(NN_SURVEYOR_DEADLINE, TRANSPORT_OPTION, INT, MILLISECONDS),
NN_SYM(NN_TCP_NODELAY, TRANSPORT_OPTION, INT, BOOLEAN),
NN_SYM(NN_TCP_QUICKACK, TRANSPORT_OPTION, INT, BOOLEAN),
NN_SYM(NN_WS_MSG_TYPE, TRANSPORT_OPTION, INT, NONE),

NN_SYM(NN_DONTWAIT, FLAG, NONE, NONE),
Expand Down
1 change: 1 addition & 0 deletions src/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ extern "C" {
#define NN_TCP -3

#define NN_TCP_NODELAY 1
#define NN_TCP_QUICKACK 2

#ifdef __cplusplus
}
Expand Down
3 changes: 3 additions & 0 deletions src/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ struct nn_ep_ops {

/* Deallocate the endpoint object. It will already have been stopped. */
void (*destroy) (void *);

/* Set a socket option on an active endpoint. Can be NULL if not supported. */
int (*setopt) (void *, int option, const void *optval, size_t optvallen);
};

/* Set up an ep for use by a transport. The final opaque argument is passed
Expand Down
38 changes: 38 additions & 0 deletions src/transports/tcp/atcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,44 @@ void nn_atcp_stop (struct nn_atcp *self)
nn_fsm_stop (&self->fsm);
}

int nn_atcp_setopt (struct nn_atcp *self, int option, const void *optval,
size_t optvallen)
{
int val;
int rc;

/* This is called for each accepted TCP connection (atcp) to apply
socket options on the active, established connection.

atcp represents one accepted client connection within a btcp listener. */

/* Only handle TCP_QUICKACK for now. */
if (option != NN_TCP_QUICKACK)
return -ENOPROTOOPT;

/* Only apply if we're in ACTIVE state with a valid stcp connection. */
if (self->state != NN_ATCP_STATE_ACTIVE)
return -ENOPROTOOPT;

/* TCP_QUICKACK is always an int. */
if (optvallen != sizeof (int))
return -EINVAL;
val = *(const int*) optval;

/* Apply TCP_QUICKACK directly to the underlying OS socket.

We can't use nn_usock_setsockopt() because it has an assertion that
only allows setting options on sockets in STARTING or ACCEPTED state.
Since this connection is already ACTIVE (established and exchanging data),
we access self->usock.s directly to call the OS setsockopt(). */
rc = setsockopt (self->usock.s, IPPROTO_TCP, TCP_QUICKACK,
&val, sizeof (val));
if (rc != 0)
return -errno;

return 0;
}

static void nn_atcp_shutdown (struct nn_fsm *self, int src, int type,
NN_UNUSED void *srcptr)
{
Expand Down
2 changes: 2 additions & 0 deletions src/transports/tcp/atcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ void nn_atcp_term (struct nn_atcp *self);
int nn_atcp_isidle (struct nn_atcp *self);
void nn_atcp_start (struct nn_atcp *self, struct nn_usock *listener);
void nn_atcp_stop (struct nn_atcp *self);
int nn_atcp_setopt (struct nn_atcp *self, int option, const void *optval,
size_t optvallen);

#endif

32 changes: 31 additions & 1 deletion src/transports/tcp/btcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ struct nn_btcp {
/* nn_ep virtual interface implementation. */
static void nn_btcp_stop (void *);
static void nn_btcp_destroy (void *);
static int nn_btcp_setopt (void *, int option, const void *optval, size_t optvallen);
const struct nn_ep_ops nn_btcp_ep_ops = {
nn_btcp_stop,
nn_btcp_destroy
nn_btcp_destroy,
nn_btcp_setopt
};

/* Private functions. */
Expand Down Expand Up @@ -188,6 +190,34 @@ static void nn_btcp_destroy (void *self)
nn_free (btcp);
}

static int nn_btcp_setopt (void *self, int option, const void *optval,
size_t optvallen)
{
struct nn_btcp *btcp = self;
struct nn_list_item *it;
struct nn_atcp *atcp;
int rc;
int applied = 0;

/* btcp is a bind endpoint (listener) that can have multiple accepted
connections. Each accepted connection is an atcp instance.

This is the second level of iteration:
- sock.c iterates through endpoints (including this btcp)
- This function iterates through all atcp connections within this btcp
- Each atcp applies the option to its underlying socket */
for (it = nn_list_begin (&btcp->atcps);
it != nn_list_end (&btcp->atcps);
it = nn_list_next (&btcp->atcps, it)) {
atcp = nn_cont (it, struct nn_atcp, item);
rc = nn_atcp_setopt (atcp, option, optval, optvallen);
if (rc == 0)
applied = 1;
}

return applied ? 0 : -ENOPROTOOPT;
}

static void nn_btcp_shutdown (struct nn_fsm *self, int src, int type,
void *srcptr)
{
Expand Down
42 changes: 41 additions & 1 deletion src/transports/tcp/ctcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,11 @@ struct nn_ctcp {
/* nn_ep virtual interface implementation. */
static void nn_ctcp_stop (void *);
static void nn_ctcp_destroy (void *);
static int nn_ctcp_setopt (void *, int option, const void *optval, size_t optvallen);
const struct nn_ep_ops nn_ctcp_ep_ops = {
nn_ctcp_stop,
nn_ctcp_destroy
nn_ctcp_destroy,
nn_ctcp_setopt
};

/* Private functions. */
Expand Down Expand Up @@ -222,6 +224,44 @@ static void nn_ctcp_destroy (void *self)
nn_free (ctcp);
}

static int nn_ctcp_setopt (void *self, int option, const void *optval,
size_t optvallen)
{
struct nn_ctcp *ctcp = self;
int val;
int rc;

/* ctcp is a connect endpoint (outgoing connection).
Unlike btcp which can have multiple atcp connections, ctcp represents
a single outgoing connection, so we apply the option directly. */

/* Only handle TCP_QUICKACK for now. */
if (option != NN_TCP_QUICKACK)
return -ENOPROTOOPT;

/* Only apply if we're in ACTIVE state with a valid stcp connection. */
if (ctcp->state != NN_CTCP_STATE_ACTIVE)
return -ENOPROTOOPT;

/* TCP_QUICKACK is always an int. */
if (optvallen != sizeof (int))
return -EINVAL;
val = *(const int*) optval;

/* Apply TCP_QUICKACK directly to the underlying OS socket.

We can't use nn_usock_setsockopt() because it has an assertion that
only allows setting options on sockets in STARTING or ACCEPTED state.
Since this connection is already ACTIVE (established and exchanging data),
we access ctcp->usock.s directly to call the OS setsockopt(). */
rc = setsockopt (ctcp->usock.s, IPPROTO_TCP, TCP_QUICKACK,
&val, sizeof (val));
if (rc != 0)
return -errno;

return 0;
}

static void nn_ctcp_shutdown (struct nn_fsm *self, int src, int type,
NN_UNUSED void *srcptr)
{
Expand Down
23 changes: 23 additions & 0 deletions tests/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,5 +220,28 @@ int main (int argc, const char *argv[])
nn_sleep (100);
test_close (sc);

/* Test TCP_QUICKACK socket option. */
sb = test_socket (AF_SP, NN_PAIR);
test_bind (sb, socket_address);
sc = test_socket (AF_SP, NN_PAIR);
test_connect (sc, socket_address);
nn_sleep (100);

/* Send and receive a message. */
test_send (sc, "HELLO");
test_recv (sb, "HELLO");

/* Set TCP_QUICKACK after recv - should succeed on active connection. */
opt = 1;
rc = nn_setsockopt (sb, NN_TCP, NN_TCP_QUICKACK, &opt, sizeof (opt));
errno_assert (rc == 0);

/* Try setting it on the sender side too. */
rc = nn_setsockopt (sc, NN_TCP, NN_TCP_QUICKACK, &opt, sizeof (opt));
errno_assert (rc == 0);

test_close (sb);
test_close (sc);

return 0;
}