Skip to content
Open

wip #122

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,33 @@
msg->next = NULL;
}

// See https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-42.md#pinning
static inline natsStatus
_updateFetchPinID(jsFetch *fetch, natsStatus fetchStatus, natsMsg *msg)
{
const char *val = NULL;

// Clear the pinning ID if it mismatched, and continue fetching without it.
if (fetchStatus == NATS_PIN_ID_MISMATCH)
{
NATS_FREE(fetch->pinID);
fetch->pinID = NULL;
return NATS_OK;
}

// If the message contains a "Nats-Pin-Id" header, use its value as the new pinID.
natsMsgHeader_Get(msg, jsConsumerPinIDHdr, &val);
if (!nats_IsStringEmpty(val))
{
NATS_FREE(fetch->pinID);
fetch->pinID = NATS_STRDUP(val);
if (fetch->pinID == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

Check warning on line 134 in src/dispatch.c

View check run for this annotation

Codecov / codecov/patch

src/dispatch.c#L134

Added line #L134 was not covered by tests
}

return NATS_OK;
}

// Returns fetch status, sub/dispatch locks must be held.
static inline natsStatus
_preProcessUserMessage(
Expand All @@ -129,8 +156,14 @@

// Fetch-specific handling of synthetic and header-only messages
if ((jsi != NULL) && (fetch != NULL))
{
fetchStatus = js_checkFetchedMsg(sub, msg, jsi->fetchID, true, userMsg);

natsStatus s = _updateFetchPinID(fetch, fetchStatus, msg);
if (s != NATS_OK)
return s;
}

// Is it another kind of synthetic message?
*userMsg = *userMsg && (msg->subject[0] != '\0');

Expand Down
58 changes: 58 additions & 0 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,7 @@
if (fetch->expiresTimer != NULL)
natsTimer_Destroy(fetch->expiresTimer);

NATS_FREE(fetch->pinID);
NATS_FREE(fetch);
}

Expand Down Expand Up @@ -1816,6 +1817,17 @@
if (strncmp(val, HDR_STATUS_NO_RESP_503, HDR_STATUS_LEN) == 0)
return NATS_NO_RESPONDERS;

// Pull consumer pin ID mismatch
if (strncmp(val, HDR_STATUS_PIN_ID_MISMATCH, HDR_STATUS_LEN) == 0)
return NATS_PIN_ID_MISMATCH;

if (strncmp(val, HDR_STATUS_BAD_REQUEST, HDR_STATUS_LEN) == 0)
{
// This is a bad request, so we return the error.
natsMsgHeader_Get(msg, DESCRIPTION_HDR, &desc);
return nats_setError(NATS_INVALID_ARG, "%s", (desc == NULL ? "error checking pull subscribe message" : desc));
}

natsMsgHeader_Get(msg, DESCRIPTION_HDR, &desc);
return nats_setError(NATS_ERR, "%s", (desc == NULL ? "error checking pull subscribe message" : desc));
}
Expand All @@ -1842,6 +1854,22 @@
s = nats_marshalLong(buf, true, "idle_heartbeat", req->Heartbeat);
if ((s == NATS_OK) && req->NoWait)
s = natsBuf_Append(buf, ",\"no_wait\":true", -1);
if ((s == NATS_OK) && !nats_IsStringEmpty(req->Group))
{
s = natsBuf_Append(buf, ",\"group\":\"", -1);
IFOK(s, natsBuf_Append(buf, req->Group, -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
}
if ((s == NATS_OK) && (req->MinPending > 0))
s = nats_marshalLong(buf, true, "min_pending", req->MinPending);
if ((s == NATS_OK) && (req->MinAckPending > 0))
s = nats_marshalLong(buf, true, "min_ack_pending", req->MinAckPending);
if ((s == NATS_OK) && !nats_IsStringEmpty(req->ID))
{
s = natsBuf_Append(buf, ",\"id\":\"", -1);
IFOK(s, natsBuf_Append(buf, req->ID, -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
}
IFOK(s, natsBuf_AppendByte(buf, '}'));

// Sent the request to get more messages.
Expand Down Expand Up @@ -2929,6 +2957,10 @@
req.Expires = (fetch->opts.Timeout - (now - fetch->startTimeMillis)) * 1000 * 1000; // ns, go time.Duration
req.NoWait = fetch->opts.NoWait;
req.Heartbeat = fetch->opts.Heartbeat * 1000 * 1000; // ns, go time.Duration
req.Group = fetch->opts.Group;
req.MinPending = fetch->opts.MinPending;
req.MinAckPending = fetch->opts.MinAckPending;
req.ID = fetch->pinID;

size_t replySubjectSize = 1 + strlen(sub->subject) + 20;
char *replySubject = NATS_MALLOC(replySubjectSize);
Expand Down Expand Up @@ -3032,6 +3064,32 @@
return nats_setError(NATS_INVALID_ARG, "%s", "Can not use MaxBytes and KeepAhead together");
if (jsOpts->PullSubscribeAsync.NoWait)
return nats_setError(NATS_INVALID_ARG, "%s", "Can not use NoWait with KeepAhead together");

// TODO: this validation should really be done against the consumerinfo
// once it's obtained, but it's hidden deep in _subscribe. This would
// only execute if the user's intent is to create a new consumer as part
// of the call.
if ((opts != NULL) && (opts->Config.PriorityGroupsLen != 0))
{
if (nats_IsStringEmpty(jsOpts->PullSubscribeAsync.Group))
return nats_setError(NATS_INVALID_ARG, "%s", "Group is required for a priority group consumer");

Check warning on line 3075 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L3075

Added line #L3075 was not covered by tests

bool valid = false;

Check warning on line 3077 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L3077

Added line #L3077 was not covered by tests
for (int i = 0; i < opts->Config.PriorityGroupsLen; i++)
{
if (strcmp(opts->Config.PriorityGroups[i], jsOpts->PullSubscribeAsync.Group) != 0)
continue;

Check warning on line 3081 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L3081

Added line #L3081 was not covered by tests
valid = true;
break;
}
if (!valid)
return nats_setError(NATS_INVALID_ARG, "%s", "Group is not part of the priority group consumer");

Check warning on line 3086 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L3086

Added line #L3086 was not covered by tests
}
else
{
if (!nats_IsStringEmpty(jsOpts->PullSubscribeAsync.Group))
return nats_setError(NATS_INVALID_ARG, "%s", "Group is not supported for a non-priority group consumer");

Check warning on line 3091 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L3091

Added line #L3091 was not covered by tests
}
}

if (errCode != NULL)
Expand Down
7 changes: 7 additions & 0 deletions src/js.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ extern const int64_t jsDefaultRequestWait;
#define jsExpectedLastSubjSeqHdr "Nats-Expected-Last-Subject-Sequence"
#define jsExpectedLastMsgIdHdr "Nats-Expected-Last-Msg-Id"
#define jsConsumerStalledHdr "Nats-Consumer-Stalled"
#define jsConsumerPinIDHdr "Nats-Pin-Id"

#define jsErrStreamNameRequired "stream name is required"
#define jsErrConsumerNameRequired "consumer name is required"
Expand Down Expand Up @@ -99,6 +100,9 @@ extern const int64_t jsDefaultRequestWait;
#define jsReplayOriginalStr "original"
#define jsReplayInstantStr "instant"

#define jsPriorityPolicyPinnedClientStr "pinned_client"
#define jsPriorityPolicyOverflowStr "overflow"

#define jsAckPrefix "$JS.ACK."
#define jsAckPrefixLen (8)

Expand Down Expand Up @@ -147,6 +151,9 @@ extern const int64_t jsDefaultRequestWait;
// jsApiDeleteConsumerT is used to delete consumers.
#define jsApiConsumerDeleteT "%.*s.CONSUMER.DELETE.%s.%s"

// jsApiConsumerUnpinT is used to unpin a consumer.
#define jsApiConsumerUnpinT "%.*s.CONSUMER.UNPIN.%s.%s"

// jsApiStreams can lookup a stream by subject.
#define jsApiStreams "%.*s.STREAM.NAMES"

Expand Down
120 changes: 104 additions & 16 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -2940,24 +2940,20 @@
IFOK(s, natsBuf_AppendByte(buf, '"'));
}
if ((s == NATS_OK) && (cfg->FilterSubjectsLen > 0))
{
int i;

s = natsBuf_Append(buf, ",\"filter_subjects\":[", -1);
for (i = 0; (s == NATS_OK) && (i < cfg->FilterSubjectsLen); i++)
{
if (i > 0)
s = natsBuf_AppendByte(buf, ',');
IFOK(s, natsBuf_AppendByte(buf, '"'));
IFOK(s, natsBuf_Append(buf, cfg->FilterSubjects[i], -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
}

IFOK(s, natsBuf_AppendByte(buf, ']'));
}
nats_marshalStringArray(buf, true, "filter_subjects", cfg->FilterSubjects, cfg->FilterSubjectsLen);
IFOK(s, nats_marshalMetadata(buf, true, "metadata", cfg->Metadata));
if ((s == NATS_OK) && (cfg->PauseUntil > 0))
s = _marshalTimeUTC(buf, true, "pause_until", cfg->PauseUntil);
if ((s == NATS_OK) && !nats_IsStringEmpty(cfg->PriorityPolicy))
{
s = natsBuf_Append(buf, ",\"priority_policy\":\"", -1);
IFOK(s, natsBuf_Append(buf, cfg->PriorityPolicy, -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
}
if ((s == NATS_OK) && (cfg->PinnedTTL > 0))
s = nats_marshalLong(buf, true, "priority_timeout", cfg->PinnedTTL);
if ((s == NATS_OK) && (cfg->PriorityGroups != NULL) && (cfg->PriorityGroupsLen > 0))
nats_marshalStringArray(buf, true, "priority_groups", cfg->PriorityGroups, cfg->PriorityGroupsLen);
IFOK(s, _marshalReplayPolicy(buf, cfg->ReplayPolicy))
if ((s == NATS_OK) && (cfg->RateLimit > 0))
s = nats_marshalULong(buf, true, "rate_limit_bps", cfg->RateLimit);
Expand Down Expand Up @@ -3030,10 +3026,14 @@
NATS_FREE((char*) cc->FilterSubject);
for (i = 0; i < cc->FilterSubjectsLen; i++)
NATS_FREE((char *)cc->FilterSubjects[i]);
nats_freeMetadata(&(cc->Metadata));
NATS_FREE((char *)cc->FilterSubjects);
nats_freeMetadata(&(cc->Metadata));
NATS_FREE((char *)cc->SampleFrequency);
NATS_FREE(cc->BackOff);
NATS_FREE((char *)cc->PriorityPolicy);
for (i = 0; i < cc->PriorityGroupsLen; i++)
NATS_FREE((char *)cc->PriorityGroups[i]);
NATS_FREE((char *)cc->PriorityGroups);
NATS_FREE(cc);
}

Expand Down Expand Up @@ -3156,6 +3156,10 @@
IFOK(s, nats_JSONGetLong(cjson, "num_replicas", &(cc->Replicas)));
IFOK(s, nats_JSONGetBool(cjson, "mem_storage", &(cc->MemoryStorage)));
IFOK(s, nats_unmarshalMetadata(cjson, "metadata", &(cc->Metadata)));
IFOK(s, nats_JSONGetTime(cjson, "pause_until", &(cc->PauseUntil)));
IFOK(s, nats_JSONGetStr(cjson, "priority_policy", (char**) &(cc->PriorityPolicy)));
IFOK(s, nats_JSONGetLong(cjson, "priority_timeout", &(cc->PinnedTTL)));
IFOK(s, nats_JSONGetArrayStr(cjson, "priority_groups", (char ***)&(cc->PriorityGroups), &(cc->PriorityGroupsLen)));
}

if (s == NATS_OK)
Expand Down Expand Up @@ -3187,6 +3191,8 @@
{
natsStatus s = NATS_OK;
jsConsumerInfo *ci = NULL;
nats_JSON **priorityGroups = NULL;
int priorityGroupsLen = 0;

ci = (jsConsumerInfo*) NATS_CALLOC(1, sizeof(jsConsumerInfo));
if (ci == NULL)
Expand All @@ -3206,6 +3212,26 @@
IFOK(s, nats_JSONGetBool(json, "push_bound", &(ci->PushBound)));
IFOK(s, nats_JSONGetBool(json, "paused", &(ci->Paused)));
IFOK(s, nats_JSONGetLong(json, "pause_remaining", &(ci->PauseRemaining)));

IFOK(s, nats_JSONGetArrayObject(json, "priority_groups", &priorityGroups, &priorityGroupsLen));
if ((s == NATS_OK) && (priorityGroups != NULL))
{
ci->PriorityGroups = (jsPriorityGroupState*) NATS_CALLOC(priorityGroupsLen, sizeof(jsPriorityGroupState));
if (ci->PriorityGroups == NULL)
s = nats_setDefaultError(NATS_NO_MEMORY);

Check warning on line 3221 in src/jsm.c

View check run for this annotation

Codecov / codecov/patch

src/jsm.c#L3221

Added line #L3221 was not covered by tests
else
ci->PriorityGroupsLen = priorityGroupsLen;

for (int i=0; (s == NATS_OK) && (i<priorityGroupsLen); i++)
{
s = nats_JSONGetStr(priorityGroups[i], "group", (char**) &(ci->PriorityGroups[i].Group));
IFOK(s, nats_JSONGetStr(priorityGroups[i], "pinned_client_id", (char**) &(ci->PriorityGroups[i].PinnedClientID)));
IFOK(s, nats_JSONGetTime(priorityGroups[i], "pinned_ts", &(ci->PriorityGroups[i].PinnedTS)));
}
// Free the array of JSON objects that was allocated by nats_JSONGetArrayObject.
NATS_FREE(priorityGroups);
}

if (s == NATS_OK)
*new_ci = ci;
else
Expand Down Expand Up @@ -3602,6 +3628,60 @@
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_UnpinConsumer(jsCtx *js, const char *stream, const char *consumer, const char *group,
jsOptions *opts, jsErrCode *errCode)
{
natsStatus s = NATS_OK;
char *subj = NULL;
bool freePfx = false;
natsConnection *nc = NULL;
natsMsg *resp = NULL;
bool success = false;
jsOptions o;
char jsonBuf[64];

if (errCode != NULL)
*errCode = 0;

if (js == NULL)
return nats_setDefaultError(NATS_INVALID_ARG);

Check warning on line 3648 in src/jsm.c

View check run for this annotation

Codecov / codecov/patch

src/jsm.c#L3648

Added line #L3648 was not covered by tests

s = _checkStreamName(stream);
IFOK(s, js_checkConsName(consumer, false));
IFOK(s, nats_validateLimitedTerm("group", group));
if (s != NATS_OK)
return NATS_UPDATE_ERR_STACK(s);

Check warning on line 3654 in src/jsm.c

View check run for this annotation

Codecov / codecov/patch

src/jsm.c#L3654

Added line #L3654 was not covered by tests

s = js_setOpts(&nc, &freePfx, js, opts, &o);
if (s == NATS_OK)
{
if (nats_asprintf(&subj, jsApiConsumerUnpinT,
js_lenWithoutTrailingDot(o.Prefix), o.Prefix,
stream, consumer) < 0 )
{
s = nats_setDefaultError(NATS_NO_MEMORY);

Check warning on line 3663 in src/jsm.c

View check run for this annotation

Codecov / codecov/patch

src/jsm.c#L3663

Added line #L3663 was not covered by tests
}
if (freePfx)
NATS_FREE((char*) o.Prefix);

Check warning on line 3666 in src/jsm.c

View check run for this annotation

Codecov / codecov/patch

src/jsm.c#L3666

Added line #L3666 was not covered by tests
}

snprintf(jsonBuf, sizeof(jsonBuf), "{\"group\":\"%s\"}", group);

// Send the request
IFOK_JSR(s, natsConnection_RequestString(&resp, nc, subj, jsonBuf, o.Wait));

Check warning on line 3672 in src/jsm.c

View check run for this annotation

Codecov / codecov/patch

src/jsm.c#L3672

Added line #L3672 was not covered by tests

// If we got a response, check for error and success result.
IFOK(s, _unmarshalSuccessResp(&success, resp, errCode));

Check warning on line 3675 in src/jsm.c

View check run for this annotation

Codecov / codecov/patch

src/jsm.c#L3675

Added line #L3675 was not covered by tests
if ((s == NATS_OK) && !success)
s = nats_setError(s, "failed to unpin group '%s' at consumer '%s'", group, consumer);

NATS_FREE(subj);
natsMsg_Destroy(resp);

return NATS_UPDATE_ERR_STACK(s);

Check warning on line 3682 in src/jsm.c

View check run for this annotation

Codecov / codecov/patch

src/jsm.c#L3682

Added line #L3682 was not covered by tests
}

natsStatus
jsConsumerConfig_Init(jsConsumerConfig *cc)
{
Expand All @@ -3625,6 +3705,14 @@
NATS_FREE(ci->Name);
js_destroyConsumerConfig(ci->Config);
_destroyClusterInfo(ci->Cluster);

// Destroy any priority groups
for (int i = 0; i < ci->PriorityGroupsLen; i++)
{
NATS_FREE(ci->PriorityGroups[i].Group);
NATS_FREE(ci->PriorityGroups[i].PinnedClientID);
}
NATS_FREE(ci->PriorityGroups);
NATS_FREE(ci);
}

Expand Down
2 changes: 2 additions & 0 deletions src/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
#define STATUS_HDR "Status"
#define DESCRIPTION_HDR "Description"
#define HDR_STATUS_NO_RESP_503 "503"
#define HDR_STATUS_BAD_REQUEST "400"
#define HDR_STATUS_NOT_FOUND_404 "404"
#define HDR_STATUS_TIMEOUT_408 "408"
#define HDR_STATUS_MAX_BYTES_409 "409"
#define HDR_STATUS_PIN_ID_MISMATCH "423"
#define HDR_STATUS_CTRL_100 "100"
#define HDR_STATUS_LEN (3)

Expand Down
Loading
Loading