diff --git a/src/dispatch.c b/src/dispatch.c index 1571d5095..092aa65dd 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -110,6 +110,33 @@ _removeHeadMsg(natsDispatcher *d, natsMsg *msg) msg->next = NULL; } +// See https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-42.md#pinning +static inline natsStatus +_updateFetchPinID(jsFetch *fetch, natsStatus fetchStatus, natsMsg *msg) +{ + const char *val = NULL; + + // Clear the pinning ID if it mismatched, and continue fetching without it. + if (fetchStatus == NATS_PIN_ID_MISMATCH) + { + NATS_FREE(fetch->pinID); + fetch->pinID = NULL; + return NATS_OK; + } + + // If the message contains a "Nats-Pin-Id" header, use its value as the new pinID. + natsMsgHeader_Get(msg, jsConsumerPinIDHdr, &val); + if (!nats_IsStringEmpty(val)) + { + NATS_FREE(fetch->pinID); + fetch->pinID = NATS_STRDUP(val); + if (fetch->pinID == NULL) + return nats_setDefaultError(NATS_NO_MEMORY); + } + + return NATS_OK; +} + // Returns fetch status, sub/dispatch locks must be held. static inline natsStatus _preProcessUserMessage( @@ -129,8 +156,14 @@ _preProcessUserMessage( // Fetch-specific handling of synthetic and header-only messages if ((jsi != NULL) && (fetch != NULL)) + { fetchStatus = js_checkFetchedMsg(sub, msg, jsi->fetchID, true, userMsg); + natsStatus s = _updateFetchPinID(fetch, fetchStatus, msg); + if (s != NATS_OK) + return s; + } + // Is it another kind of synthetic message? *userMsg = *userMsg && (msg->subject[0] != '\0'); diff --git a/src/js.c b/src/js.c index 41256933b..7b3d39880 100644 --- a/src/js.c +++ b/src/js.c @@ -1235,6 +1235,7 @@ _destroyFetch(jsFetch *fetch) if (fetch->expiresTimer != NULL) natsTimer_Destroy(fetch->expiresTimer); + NATS_FREE(fetch->pinID); NATS_FREE(fetch); } @@ -1816,6 +1817,17 @@ js_checkFetchedMsg(natsSubscription *sub, natsMsg *msg, uint64_t fetchID, bool c if (strncmp(val, HDR_STATUS_NO_RESP_503, HDR_STATUS_LEN) == 0) return NATS_NO_RESPONDERS; + // Pull consumer pin ID mismatch + if (strncmp(val, HDR_STATUS_PIN_ID_MISMATCH, HDR_STATUS_LEN) == 0) + return NATS_PIN_ID_MISMATCH; + + if (strncmp(val, HDR_STATUS_BAD_REQUEST, HDR_STATUS_LEN) == 0) + { + // This is a bad request, so we return the error. + natsMsgHeader_Get(msg, DESCRIPTION_HDR, &desc); + return nats_setError(NATS_INVALID_ARG, "%s", (desc == NULL ? "error checking pull subscribe message" : desc)); + } + natsMsgHeader_Get(msg, DESCRIPTION_HDR, &desc); return nats_setError(NATS_ERR, "%s", (desc == NULL ? "error checking pull subscribe message" : desc)); } @@ -1842,6 +1854,22 @@ _publishPullRequest(natsConnection *nc, const char *subj, const char *rply, s = nats_marshalLong(buf, true, "idle_heartbeat", req->Heartbeat); if ((s == NATS_OK) && req->NoWait) s = natsBuf_Append(buf, ",\"no_wait\":true", -1); + if ((s == NATS_OK) && !nats_IsStringEmpty(req->Group)) + { + s = natsBuf_Append(buf, ",\"group\":\"", -1); + IFOK(s, natsBuf_Append(buf, req->Group, -1)); + IFOK(s, natsBuf_AppendByte(buf, '"')); + } + if ((s == NATS_OK) && (req->MinPending > 0)) + s = nats_marshalLong(buf, true, "min_pending", req->MinPending); + if ((s == NATS_OK) && (req->MinAckPending > 0)) + s = nats_marshalLong(buf, true, "min_ack_pending", req->MinAckPending); + if ((s == NATS_OK) && !nats_IsStringEmpty(req->ID)) + { + s = natsBuf_Append(buf, ",\"id\":\"", -1); + IFOK(s, natsBuf_Append(buf, req->ID, -1)); + IFOK(s, natsBuf_AppendByte(buf, '"')); + } IFOK(s, natsBuf_AppendByte(buf, '}')); // Sent the request to get more messages. @@ -2929,6 +2957,10 @@ js_maybeFetchMore(natsSubscription *sub, jsFetch *fetch) req.Expires = (fetch->opts.Timeout - (now - fetch->startTimeMillis)) * 1000 * 1000; // ns, go time.Duration req.NoWait = fetch->opts.NoWait; req.Heartbeat = fetch->opts.Heartbeat * 1000 * 1000; // ns, go time.Duration + req.Group = fetch->opts.Group; + req.MinPending = fetch->opts.MinPending; + req.MinAckPending = fetch->opts.MinAckPending; + req.ID = fetch->pinID; size_t replySubjectSize = 1 + strlen(sub->subject) + 20; char *replySubject = NATS_MALLOC(replySubjectSize); @@ -3032,6 +3064,32 @@ js_PullSubscribeAsync(natsSubscription **newsub, jsCtx *js, const char *subject, return nats_setError(NATS_INVALID_ARG, "%s", "Can not use MaxBytes and KeepAhead together"); if (jsOpts->PullSubscribeAsync.NoWait) return nats_setError(NATS_INVALID_ARG, "%s", "Can not use NoWait with KeepAhead together"); + + // TODO: this validation should really be done against the consumerinfo + // once it's obtained, but it's hidden deep in _subscribe. This would + // only execute if the user's intent is to create a new consumer as part + // of the call. + if ((opts != NULL) && (opts->Config.PriorityGroupsLen != 0)) + { + if (nats_IsStringEmpty(jsOpts->PullSubscribeAsync.Group)) + return nats_setError(NATS_INVALID_ARG, "%s", "Group is required for a priority group consumer"); + + bool valid = false; + for (int i = 0; i < opts->Config.PriorityGroupsLen; i++) + { + if (strcmp(opts->Config.PriorityGroups[i], jsOpts->PullSubscribeAsync.Group) != 0) + continue; + valid = true; + break; + } + if (!valid) + return nats_setError(NATS_INVALID_ARG, "%s", "Group is not part of the priority group consumer"); + } + else + { + if (!nats_IsStringEmpty(jsOpts->PullSubscribeAsync.Group)) + return nats_setError(NATS_INVALID_ARG, "%s", "Group is not supported for a non-priority group consumer"); + } } if (errCode != NULL) diff --git a/src/js.h b/src/js.h index d5f5d88fc..06f6ce920 100644 --- a/src/js.h +++ b/src/js.h @@ -40,6 +40,7 @@ extern const int64_t jsDefaultRequestWait; #define jsExpectedLastSubjSeqHdr "Nats-Expected-Last-Subject-Sequence" #define jsExpectedLastMsgIdHdr "Nats-Expected-Last-Msg-Id" #define jsConsumerStalledHdr "Nats-Consumer-Stalled" +#define jsConsumerPinIDHdr "Nats-Pin-Id" #define jsErrStreamNameRequired "stream name is required" #define jsErrConsumerNameRequired "consumer name is required" @@ -99,6 +100,9 @@ extern const int64_t jsDefaultRequestWait; #define jsReplayOriginalStr "original" #define jsReplayInstantStr "instant" +#define jsPriorityPolicyPinnedClientStr "pinned_client" +#define jsPriorityPolicyOverflowStr "overflow" + #define jsAckPrefix "$JS.ACK." #define jsAckPrefixLen (8) @@ -147,6 +151,9 @@ extern const int64_t jsDefaultRequestWait; // jsApiDeleteConsumerT is used to delete consumers. #define jsApiConsumerDeleteT "%.*s.CONSUMER.DELETE.%s.%s" +// jsApiConsumerUnpinT is used to unpin a consumer. +#define jsApiConsumerUnpinT "%.*s.CONSUMER.UNPIN.%s.%s" + // jsApiStreams can lookup a stream by subject. #define jsApiStreams "%.*s.STREAM.NAMES" diff --git a/src/jsm.c b/src/jsm.c index 112d990f7..4b27f2f19 100644 --- a/src/jsm.c +++ b/src/jsm.c @@ -2940,24 +2940,20 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo IFOK(s, natsBuf_AppendByte(buf, '"')); } if ((s == NATS_OK) && (cfg->FilterSubjectsLen > 0)) - { - int i; - - s = natsBuf_Append(buf, ",\"filter_subjects\":[", -1); - for (i = 0; (s == NATS_OK) && (i < cfg->FilterSubjectsLen); i++) - { - if (i > 0) - s = natsBuf_AppendByte(buf, ','); - IFOK(s, natsBuf_AppendByte(buf, '"')); - IFOK(s, natsBuf_Append(buf, cfg->FilterSubjects[i], -1)); - IFOK(s, natsBuf_AppendByte(buf, '"')); - } - - IFOK(s, natsBuf_AppendByte(buf, ']')); - } + nats_marshalStringArray(buf, true, "filter_subjects", cfg->FilterSubjects, cfg->FilterSubjectsLen); IFOK(s, nats_marshalMetadata(buf, true, "metadata", cfg->Metadata)); if ((s == NATS_OK) && (cfg->PauseUntil > 0)) s = _marshalTimeUTC(buf, true, "pause_until", cfg->PauseUntil); + if ((s == NATS_OK) && !nats_IsStringEmpty(cfg->PriorityPolicy)) + { + s = natsBuf_Append(buf, ",\"priority_policy\":\"", -1); + IFOK(s, natsBuf_Append(buf, cfg->PriorityPolicy, -1)); + IFOK(s, natsBuf_AppendByte(buf, '"')); + } + if ((s == NATS_OK) && (cfg->PinnedTTL > 0)) + s = nats_marshalLong(buf, true, "priority_timeout", cfg->PinnedTTL); + if ((s == NATS_OK) && (cfg->PriorityGroups != NULL) && (cfg->PriorityGroupsLen > 0)) + nats_marshalStringArray(buf, true, "priority_groups", cfg->PriorityGroups, cfg->PriorityGroupsLen); IFOK(s, _marshalReplayPolicy(buf, cfg->ReplayPolicy)) if ((s == NATS_OK) && (cfg->RateLimit > 0)) s = nats_marshalULong(buf, true, "rate_limit_bps", cfg->RateLimit); @@ -3030,10 +3026,14 @@ js_destroyConsumerConfig(jsConsumerConfig *cc) NATS_FREE((char*) cc->FilterSubject); for (i = 0; i < cc->FilterSubjectsLen; i++) NATS_FREE((char *)cc->FilterSubjects[i]); - nats_freeMetadata(&(cc->Metadata)); NATS_FREE((char *)cc->FilterSubjects); + nats_freeMetadata(&(cc->Metadata)); NATS_FREE((char *)cc->SampleFrequency); NATS_FREE(cc->BackOff); + NATS_FREE((char *)cc->PriorityPolicy); + for (i = 0; i < cc->PriorityGroupsLen; i++) + NATS_FREE((char *)cc->PriorityGroups[i]); + NATS_FREE((char *)cc->PriorityGroups); NATS_FREE(cc); } @@ -3156,6 +3156,10 @@ _unmarshalConsumerConfig(nats_JSON *json, const char *fieldName, jsConsumerConfi IFOK(s, nats_JSONGetLong(cjson, "num_replicas", &(cc->Replicas))); IFOK(s, nats_JSONGetBool(cjson, "mem_storage", &(cc->MemoryStorage))); IFOK(s, nats_unmarshalMetadata(cjson, "metadata", &(cc->Metadata))); + IFOK(s, nats_JSONGetTime(cjson, "pause_until", &(cc->PauseUntil))); + IFOK(s, nats_JSONGetStr(cjson, "priority_policy", (char**) &(cc->PriorityPolicy))); + IFOK(s, nats_JSONGetLong(cjson, "priority_timeout", &(cc->PinnedTTL))); + IFOK(s, nats_JSONGetArrayStr(cjson, "priority_groups", (char ***)&(cc->PriorityGroups), &(cc->PriorityGroupsLen))); } if (s == NATS_OK) @@ -3187,6 +3191,8 @@ js_unmarshalConsumerInfo(nats_JSON *json, jsConsumerInfo **new_ci) { natsStatus s = NATS_OK; jsConsumerInfo *ci = NULL; + nats_JSON **priorityGroups = NULL; + int priorityGroupsLen = 0; ci = (jsConsumerInfo*) NATS_CALLOC(1, sizeof(jsConsumerInfo)); if (ci == NULL) @@ -3206,6 +3212,26 @@ js_unmarshalConsumerInfo(nats_JSON *json, jsConsumerInfo **new_ci) IFOK(s, nats_JSONGetBool(json, "push_bound", &(ci->PushBound))); IFOK(s, nats_JSONGetBool(json, "paused", &(ci->Paused))); IFOK(s, nats_JSONGetLong(json, "pause_remaining", &(ci->PauseRemaining))); + + IFOK(s, nats_JSONGetArrayObject(json, "priority_groups", &priorityGroups, &priorityGroupsLen)); + if ((s == NATS_OK) && (priorityGroups != NULL)) + { + ci->PriorityGroups = (jsPriorityGroupState*) NATS_CALLOC(priorityGroupsLen, sizeof(jsPriorityGroupState)); + if (ci->PriorityGroups == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + ci->PriorityGroupsLen = priorityGroupsLen; + + for (int i=0; (s == NATS_OK) && (iPriorityGroups[i].Group)); + IFOK(s, nats_JSONGetStr(priorityGroups[i], "pinned_client_id", (char**) &(ci->PriorityGroups[i].PinnedClientID))); + IFOK(s, nats_JSONGetTime(priorityGroups[i], "pinned_ts", &(ci->PriorityGroups[i].PinnedTS))); + } + // Free the array of JSON objects that was allocated by nats_JSONGetArrayObject. + NATS_FREE(priorityGroups); + } + if (s == NATS_OK) *new_ci = ci; else @@ -3602,6 +3628,60 @@ js_PauseConsumer(jsConsumerPauseResponse **new_cpr, jsCtx *js, return NATS_UPDATE_ERR_STACK(s); } +natsStatus +js_UnpinConsumer(jsCtx *js, const char *stream, const char *consumer, const char *group, + jsOptions *opts, jsErrCode *errCode) +{ + natsStatus s = NATS_OK; + char *subj = NULL; + bool freePfx = false; + natsConnection *nc = NULL; + natsMsg *resp = NULL; + bool success = false; + jsOptions o; + char jsonBuf[64]; + + if (errCode != NULL) + *errCode = 0; + + if (js == NULL) + return nats_setDefaultError(NATS_INVALID_ARG); + + s = _checkStreamName(stream); + IFOK(s, js_checkConsName(consumer, false)); + IFOK(s, nats_validateLimitedTerm("group", group)); + if (s != NATS_OK) + return NATS_UPDATE_ERR_STACK(s); + + s = js_setOpts(&nc, &freePfx, js, opts, &o); + if (s == NATS_OK) + { + if (nats_asprintf(&subj, jsApiConsumerUnpinT, + js_lenWithoutTrailingDot(o.Prefix), o.Prefix, + stream, consumer) < 0 ) + { + s = nats_setDefaultError(NATS_NO_MEMORY); + } + if (freePfx) + NATS_FREE((char*) o.Prefix); + } + + snprintf(jsonBuf, sizeof(jsonBuf), "{\"group\":\"%s\"}", group); + + // Send the request + IFOK_JSR(s, natsConnection_RequestString(&resp, nc, subj, jsonBuf, o.Wait)); + + // If we got a response, check for error and success result. + IFOK(s, _unmarshalSuccessResp(&success, resp, errCode)); + if ((s == NATS_OK) && !success) + s = nats_setError(s, "failed to unpin group '%s' at consumer '%s'", group, consumer); + + NATS_FREE(subj); + natsMsg_Destroy(resp); + + return NATS_UPDATE_ERR_STACK(s); +} + natsStatus jsConsumerConfig_Init(jsConsumerConfig *cc) { @@ -3625,6 +3705,14 @@ jsConsumerInfo_Destroy(jsConsumerInfo *ci) NATS_FREE(ci->Name); js_destroyConsumerConfig(ci->Config); _destroyClusterInfo(ci->Cluster); + + // Destroy any priority groups + for (int i = 0; i < ci->PriorityGroupsLen; i++) + { + NATS_FREE(ci->PriorityGroups[i].Group); + NATS_FREE(ci->PriorityGroups[i].PinnedClientID); + } + NATS_FREE(ci->PriorityGroups); NATS_FREE(ci); } diff --git a/src/msg.h b/src/msg.h index 6ff2477fa..3e512788f 100644 --- a/src/msg.h +++ b/src/msg.h @@ -24,9 +24,11 @@ #define STATUS_HDR "Status" #define DESCRIPTION_HDR "Description" #define HDR_STATUS_NO_RESP_503 "503" +#define HDR_STATUS_BAD_REQUEST "400" #define HDR_STATUS_NOT_FOUND_404 "404" #define HDR_STATUS_TIMEOUT_408 "408" #define HDR_STATUS_MAX_BYTES_409 "409" +#define HDR_STATUS_PIN_ID_MISMATCH "423" #define HDR_STATUS_CTRL_100 "100" #define HDR_STATUS_LEN (3) diff --git a/src/nats.h b/src/nats.h index 75fbc8634..440f2eec3 100644 --- a/src/nats.h +++ b/src/nats.h @@ -899,7 +899,25 @@ typedef struct jsConsumerConfig // Configuration options introduced in 2.11 - int64_t PauseUntil; ///< Suspends the consumer until this deadline, represented as number of nanoseconds since epoch. + /// \brief Suspends the consumer until this deadline, represented as + /// number of nanoseconds since epoch. Requires nats-server v2.11.0 or + /// later. + int64_t PauseUntil; + + /// \brief Represents he priority policy the consumer is set to. Must be + /// "pinned_client" or "overflow". Requires nats-server v2.11.0 or + /// later. + const char *PriorityPolicy; + + /// \brief PinnedTTL represents the time after which the client will be + /// unpinned if no new pull requests are sent. Used with + /// PriorityPolicyPinned. Expressed in nanoseconds. Requires nats-server + /// v2.11.0 or later. + int64_t PinnedTTL; + + /// \brief The list of priority groups this consumer supports. + const char **PriorityGroups; + int PriorityGroupsLen; } jsConsumerConfig; /** @@ -1025,6 +1043,17 @@ typedef struct jsSequenceInfo } jsSequenceInfo; +/** + * Describes the configuration of priority groups in a pull consumer. + */ + + typedef struct jsPriorityGroupState + { + char *Group; + char *PinnedClientID; + int64_t PinnedTS; /// milliseconds since the epoch + } jsPriorityGroupState; + /** * Configuration and current state for this consumer. * @@ -1047,6 +1076,8 @@ typedef struct jsConsumerInfo bool PushBound; bool Paused; int64_t PauseRemaining; ///< Remaining time in nanoseconds. + jsPriorityGroupState *PriorityGroups; ///< Priority groups for the (pull) consumer. + int PriorityGroupsLen; ///< Number of priority groups. } jsConsumerInfo; /** @@ -1223,6 +1254,13 @@ typedef struct jsFetchRequest bool NoWait; ///< Will not wait if the request cannot be completed int64_t Heartbeat; ///< Have server sends heartbeats to help detect communication failures + // The use of these fields require nats-server v2.11.0 or later + int64_t MinPending; + int64_t MinAckPending; + const char *ID; ///< the "pinned" ID of this subscription, if any; to use with the + /// "pinned_client" priority policy + const char *Group; + } jsFetchRequest; /** \brief Callback used to indicate that the work of js_PullSubscribeAsync is @@ -1290,6 +1328,26 @@ typedef struct jsOptionsPullSubscribeAsync /// milliseconds) to help detect communication failures. int64_t Heartbeat; + /// @brief The name of consumer priority group. + /// + /// @note The use of this option require nats-server v2.11.0 or later, + /// and a consumer with priority groups enabled and configured. + const char *Group; + + /// @brief When specified, this subscription will only receive messages + /// when the consumer has at least this many pending messages. + /// + /// @note The use of this option require nats-server v2.11.0 or later, + /// and a consumer with priority groups enabled and configured. + int64_t MinPending; + + /// @brief When specified, this Pull request will only receive messages + /// when the consumer has at least this many ack pending messages. + /// + /// @note The use of this option require nats-server v2.11.0 or later, + /// and a consumer with priority groups enabled and configured. + int64_t MinAckPending; + /// @brief When using the automatic Fetch flow control (default /// NextHandler), this is the number of messages to ask for in a /// single request. @@ -6777,6 +6835,28 @@ js_PullSubscribeAsync(natsSubscription **newsub, jsCtx *js, const char *subject, NATS_EXTERN natsStatus natsSubscription_FetchRequest(natsMsgList *list, natsSubscription *sub, jsFetchRequest *request); +/** \brief Un-pins a consumer priority group from a specific subscription. + * + * Un-pins a consumer priority group from a specific subscription. This applies + * exclusively to js_PullSubscribeAsync subscriptions, and causes the consumer + * (on the server) to un-pin the currently pinned subscription in the priority + * group, and choose a new one. See + * https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-42.md#pinning + * for more details. + * + * @param js the pointer to the #jsCtx context. + * @param stream the name of the stream. + * @param consumer the name of the consumer. + * @param group the name of the group. + * @param opts the pointer to the #jsOptions object, possibly `NULL`. + * @param errCode the location where to store the JetStream specific error code, + * or `NULL` if not needed. + */ + +NATS_EXTERN natsStatus +js_UnpinConsumer(jsCtx *js, const char *stream, const char *consumer, const char *group, + jsOptions *opts, jsErrCode *errCode); + /** \brief Returns the jsConsumerInfo associated with this subscription. * * Returns the #jsConsumerInfo associated with this subscription. diff --git a/src/natsp.h b/src/natsp.h index b3b753c60..fe7fe8d57 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -393,6 +393,9 @@ typedef struct __jsFetch // Timer for the fetch expiration. We leverage the existing jsi->hbTimer for // checking missed heartbeats. natsTimer *expiresTimer; + + // Pin ID if pinned by the server + char *pinID; } jsFetch; typedef struct __jsSub diff --git a/src/status.h b/src/status.h index 032298896..1ffc65988 100644 --- a/src/status.h +++ b/src/status.h @@ -133,6 +133,9 @@ typedef enum NATS_LIMIT_REACHED, ///< Attempt to receive messages than allowed by the byte limit, for /// instance in js_PullSubscribeAsync(). + NATS_PIN_ID_MISMATCH, ///< Pin ID sent in the request does not match the currently pinned + /// consumer subscriber ID on the server. + } natsStatus; typedef enum { diff --git a/src/util.c b/src/util.c index 233f914b8..0884df224 100644 --- a/src/util.c +++ b/src/util.c @@ -2447,6 +2447,49 @@ bool nats_IsSubjectValid(const char *subject, bool wcAllowed) return true; } +// Bitmap of allowed ASCII characters (0–127) +// Allowed: A–Z, a–z, 0–9, '-', '_', '/', '=' +static const uint8_t _limited_term_allowed_bitmap[16] = { + 0x00, // 0x00–0x07 + 0x00, // 0x08–0x0F + 0x00, // 0x10–0x17 + 0x00, // 0x18–0x1F + 0x00, // 0x20–0x27 + 0x08, // 0x28–0x2F: '/' (0x2F) + 0xFF, // 0x30–0x37: '0'–'7' + 0x03, // 0x38–0x3F: '8', '9', '=' (0x3D) + 0xFE, // 0x40–0x47: 'A'–'G' + 0xFF, // 0x48–0x4F: 'H'–'O' + 0xFF, // 0x50–0x57: 'P'–'W' + 0x03, // 0x58–0x5F: 'X', 'Y', 'Z', '_' (0x5F), '-' (0x2D) + 0x00, // 0x60–0x67: '`' (not allowed), 'a'–'g' + 0xFF, // 0x68–0x6F: 'h'–'o' + 0xFF, // 0x70–0x77: 'p'–'w' + 0x03 // 0x78–0x7F: 'x', 'y', 'z' +}; + +// Validates limited_term, see +// https://github.com/nats-io/nats-architecture-and-design/blob/8693d2263e137e23748e3df583fd58247e4e48b7/adr/ADR-6.md?plain=1#L48 +natsStatus +nats_validateLimitedTerm(const char *name, const char *term) +{ + int i = 0; + int len = (int) strlen(term); + + if (len == 0) + return nats_setError(NATS_INVALID_ARG, "%s must not be be empty", name); + + for (; term[i] != '\0'; ++i) { + if (i >= 16) + return nats_setError(NATS_INVALID_ARG, "%s must not be longer than 16 characters", name); + + unsigned char c = (unsigned char)term[i]; + if ((c >= 128) || !(_limited_term_allowed_bitmap[c / 8] & (1 << (c % 8)))) + return nats_setError(NATS_INVALID_ARG, "%s must contain only [A-Za-z0-9-_/=]", name); + } + return NATS_OK; +} + natsStatus nats_marshalMetadata(natsBuffer *buf, bool comma, const char *fieldName, natsMetadata md) { @@ -2581,54 +2624,58 @@ nats_freeMetadata(natsMetadata *md) natsStatus nats_formatStringArray(char **out, const char **strings, int count) { natsStatus s = NATS_OK; - natsBuffer buf; - int len = 0; - int i; + natsBuffer buf = { 0 }; - len++; // For the '[' - for (i = 0; i < count; i++) + s = natsBuf_Init(&buf, 128); + IFOK(s, nats_marshalStringArray(&buf, false, NULL, strings, count)); + IFOK(s, natsBuf_AppendByte(&buf, '\0')); + if (s != NATS_OK) { - len += 2; // For the quotes - if (i > 0) - len++; // For the ',' - if (strings[i] == NULL) - len += (int)strlen("(null)"); - else - len += (int)strlen(strings[i]); + natsBuf_Cleanup(&buf); + return s; } - len++; // For the ']' - len++; // For the '\0' - s = natsBuf_Init(&buf, len); + *out = natsBuf_Data(&buf); + return NATS_OK; +} + +// note: does not JSON-escape the strings +natsStatus +nats_marshalStringArray(natsBuffer *buf, bool comma, const char *fieldName, const char **values, int len) +{ + natsStatus s = NATS_OK; + int i; + const char *sep = (comma ? "," : ""); + + if (!nats_IsStringEmpty(fieldName)) + { + s = natsBuf_Append(buf, sep, -1); + IFOK(s, natsBuf_AppendByte(buf, '"')); + IFOK(s, natsBuf_Append(buf, fieldName, -1)); + IFOK(s, natsBuf_AppendByte(buf, '"')); + IFOK(s, natsBuf_AppendByte(buf, ':')); + } - natsBuf_AppendByte(&buf, '['); - for (i = 0; (s == NATS_OK) && (i < count); i++) + natsBuf_AppendByte(buf, '['); + for (i = 0; (s == NATS_OK) && (i < len); i++) { if (i > 0) { - IFOK(s, natsBuf_AppendByte(&buf, ',')); + IFOK(s, natsBuf_AppendByte(buf, ',')); } - IFOK(s, natsBuf_AppendByte(&buf, '"')); - if (strings[i] == NULL) + IFOK(s, natsBuf_AppendByte(buf, '"')); + if (values[i] == NULL) { - IFOK(s, natsBuf_Append(&buf, "(null)", -1)); + IFOK(s, natsBuf_Append(buf, "(null)", -1)); } else { - IFOK(s, natsBuf_Append(&buf, strings[i], -1)); + IFOK(s, natsBuf_Append(buf, values[i], -1)); } - IFOK(s, natsBuf_AppendByte(&buf, '"')); - } - - IFOK(s, natsBuf_AppendByte(&buf, ']')); - IFOK(s, natsBuf_AppendByte(&buf, '\0')); - - if (s != NATS_OK) - { - natsBuf_Cleanup(&buf); - return s; + IFOK(s, natsBuf_AppendByte(buf, '"')); } - *out = natsBuf_Data(&buf); + IFOK(s, natsBuf_AppendByte(buf, ']')); return NATS_OK; } + diff --git a/src/util.h b/src/util.h index 5f26fdce2..d7bd0d467 100644 --- a/src/util.h +++ b/src/util.h @@ -272,4 +272,10 @@ nats_parseTime(char *str, int64_t *timeUTC); natsStatus nats_formatStringArray(char **out, const char **strings, int count); +natsStatus +nats_marshalStringArray(natsBuffer *buf, bool comma, const char *fieldName, const char **values, int len); + +natsStatus +nats_validateLimitedTerm(const char *name, const char *term); + #endif /* UTIL_H_ */ diff --git a/test/list_test.txt b/test/list_test.txt index 22a9f8eb8..2d2614d12 100644 --- a/test/list_test.txt +++ b/test/list_test.txt @@ -110,7 +110,10 @@ _test(JetStreamSubscribePull_Reconnect) _test(JetStreamSubscribePull) _test(JetStreamSubscribePullAsync_Disconnect) _test(JetStreamSubscribePullAsync_MissedHB) +_test(JetStreamSubscribePullAsync_Overflow) +_test(JetStreamSubscribePullAsync_Pinned) _test(JetStreamSubscribePullAsync_Reconnect) +_test(JetStreamSubscribePullAsync_Unpin) _test(JetStreamSubscribePullAsync_Unsubscribe) _test(JetStreamSubscribePullAsync) _test(JetStreamSubscribeSync) diff --git a/test/test.c b/test/test.c index eeea6780f..a2d6202ea 100644 --- a/test/test.c +++ b/test/test.c @@ -29113,7 +29113,7 @@ void test_JetStreamSubscribePull(void) fr.Expires = NATS_SECONDS_TO_NANOS(1); fr.Heartbeat = NATS_SECONDS_TO_NANOS(10); s = natsSubscription_FetchRequest(&list, sub, &fr); - testCond((s == NATS_ERR) && (strstr(nats_GetLastError(NULL), "too large") != NULL)); + testCond((s == NATS_INVALID_ARG) && (strstr(nats_GetLastError(NULL), "too large") != NULL)); nats_clearLastError(); test("Check idle hearbeat: "); @@ -29279,7 +29279,7 @@ _testBatchCompleted(struct threadArg *args, natsSubscription *sub, natsStatus ex if (!args->closed) printf("FAILED: onComplete has not been called\n"); if (args->status != expectedStatus) - printf("FAILED: status: %d, expected: %d\n", args->status, expectedStatus); + printf("FAILED: status: %d (%s), expected: %d\n", args->status, nats_GetLastError(NULL), expectedStatus); if (orFewer) { if (args->sum > expectedMsgs) @@ -29370,8 +29370,6 @@ void test_JetStream_GH823(void) _destroyDefaultThreadArgs(&args); } - - void test_JetStreamSubscribePullAsync(void) { natsStatus s; @@ -29747,7 +29745,7 @@ void test_JetStreamSubscribePullAsync_MissedHB(void) jsOpts.PullSubscribeAsync.Heartbeat = 200; s = js_PullSubscribeAsync(&sub, js, "foo", "dur", _recvPullAsync, &args, &jsOpts, NULL, &jerr); - testCond((s == NATS_OK) && _testBatchCompleted(&args, sub, NATS_ERR, 0, false)); + testCond((s == NATS_OK) && _testBatchCompleted(&args, sub, NATS_INVALID_ARG, 0, false)); test("Check the error to be 'heartbeat value too large': "); natsMutex_Lock(args.m); @@ -30008,6 +30006,362 @@ void test_JetStreamSubscribePullAsync_Disconnect(void) natsOptions_Destroy(opts); } +void test_JetStreamSubscribePullAsync_Pinned(void) +{ + natsStatus s; + natsSubscription *pinned = NULL, *unpinned = NULL, *subInError = NULL; + jsErrCode jerr = 0; + jsStreamConfig sc; + jsConsumerConfig cc; + jsOptions oPinned, oUnpinned, oError; + jsSubOptions so; + struct threadArg argsPinned, argsUnpinned, argsError; + const char *groups[] = {"A"}; + + const int firstBatch = 1000; + const int secondBatch = 100; + + JS_SETUP(2, 11, 0); + + s = _createDefaultThreadArgsForCbTests(&argsPinned); + IFOK(s, _createDefaultThreadArgsForCbTests(&argsUnpinned)); + IFOK(s, _createDefaultThreadArgsForCbTests(&argsError)); + if (s != NATS_OK) + FAIL("Unable to setup test"); + + // The default state of both args is OK. + // + // .control = 0; // don't ack, will be auto-ack + // .status = NATS_OK; // batch exit status will be here + // .msgReceived = false; + // .closed = false; + // .sum = 0; + + test("Create the test stream: "); + jsStreamConfig_Init(&sc); + sc.Name = "TEST"; + sc.Subjects = (const char *[1]){"foo"}; + sc.SubjectsLen = 1; + s = js_AddStream(NULL, js, &sc, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Create the test consumer configured with 'pinned_client': "); + jsConsumerConfig_Init(&cc); + cc.Durable = "pinned"; + cc.PriorityPolicy = jsPriorityPolicyPinnedClientStr; + cc.PinnedTTL = NATS_SECONDS_TO_NANOS(1); + cc.PriorityGroups = groups; + cc.PriorityGroupsLen = 1; + s = js_AddConsumer(NULL, js, "TEST", &cc, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + testf("Publish %d messages: ", firstBatch); + for (int i = 0; (s == NATS_OK) && (i < firstBatch); i++) + s = js_Publish(NULL, js, "foo", "hello", 5, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Initialize shared options: "); + jsSubOptions_Init(&so); + so.Stream = "TEST"; + so.Consumer = "pinned"; + testCond(true); + + // See the relevant comment in js_PullSubscribeAsync - we do not currently + // validate upfront, so the error comes as a callback/fetch status. + test("A sub without group returns an error: "); + jsOptions_Init(&oError); + oError.PullSubscribeAsync.CompleteHandler = _completePullAsync; + oError.PullSubscribeAsync.CompleteHandlerClosure = &argsError; + oError.PullSubscribeAsync.Group = NULL; + s = js_PullSubscribeAsync(&subInError, js, "foo", "pinned", _recvPullAsync, &argsError, &oError, &so, &jerr); + testCond((s == NATS_OK) && _testBatchCompleted(&argsError, subInError, NATS_INVALID_ARG, 0, false)); + + test("Create pull subscription that will get pinned: "); + jsOptions_Init(&oPinned); + oPinned.PullSubscribeAsync.CompleteHandler = _completePullAsync; + oPinned.PullSubscribeAsync.CompleteHandlerClosure = &argsPinned; + oPinned.PullSubscribeAsync.Group = "A"; + oPinned.PullSubscribeAsync.MaxMessages = firstBatch; + s = js_PullSubscribeAsync(&pinned, js, "foo", "pinned", _recvPullAsync, &argsPinned, &oPinned, &so, &jerr); + testCond((s == NATS_OK) && (pinned != NULL) && (jerr == 0)); + + test("Create a second pull subscription that will not get pinned: "); + jsOptions_Init(&oUnpinned); + oUnpinned.PullSubscribeAsync.CompleteHandler = _completePullAsync; + oUnpinned.PullSubscribeAsync.CompleteHandlerClosure = &argsUnpinned; + oUnpinned.PullSubscribeAsync.Group = "A"; + oUnpinned.PullSubscribeAsync.MaxMessages = secondBatch; // the second batch + s = js_PullSubscribeAsync(&unpinned, js, "foo", "pinned", _recvPullAsync, &argsUnpinned, &oUnpinned, &so, &jerr); + testCond((s == NATS_OK) && (unpinned != NULL) && (jerr == 0)); + + testf("Receive %d messages on the pinned sub (will auto-unsubscribe): ", firstBatch); + testCond(_testBatchCompleted(&argsPinned, pinned, NATS_MAX_DELIVERED_MSGS, firstBatch, false)); + + test("Make sure unpinned sub received nothing: "); + natsMutex_Lock(argsUnpinned.m); + int sum = argsUnpinned.sum; + natsMutex_Unlock(argsUnpinned.m); + testCond(sum == 0); + + testf("Publish %d more messages: ", secondBatch); + for (int i = 0; (s == NATS_OK) && (i < secondBatch); i++) + s = js_Publish(NULL, js, "foo", "goodbye", 7, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Receive 100 messages on the unpinned sub: "); + testCond(_testBatchCompleted(&argsUnpinned, unpinned, NATS_MAX_DELIVERED_MSGS, secondBatch, false)); + + natsSubscription_Destroy(pinned); + natsSubscription_Destroy(unpinned); + natsSubscription_Destroy(subInError); + JS_TEARDOWN; + _destroyDefaultThreadArgs(&argsPinned); + _destroyDefaultThreadArgs(&argsUnpinned); + _destroyDefaultThreadArgs(&argsError); +} + +void test_JetStreamSubscribePullAsync_Unpin(void) +{ + natsStatus s; + natsSubscription *pinned = NULL, *unpinned = NULL; + jsErrCode jerr = 0; + jsStreamConfig sc; + jsConsumerConfig cc; + jsOptions oPinned, oUnpinned; + jsSubOptions so; + struct threadArg argsPinned, argsUnpinned; + const char *groups[] = {"A"}; + + JS_SETUP(2, 11, 0); + + s = _createDefaultThreadArgsForCbTests(&argsPinned); + IFOK(s, _createDefaultThreadArgsForCbTests(&argsUnpinned)); + if (s != NATS_OK) + FAIL("Unable to setup test"); + + // The default state of both args is OK. + // + // .control = 0; // don't ack, will be auto-ack + // .status = NATS_OK; // batch exit status will be here + // .msgReceived = false; + // .closed = false; + // .sum = 0; + + test("Create the test stream: "); + jsStreamConfig_Init(&sc); + sc.Name = "TEST"; + sc.Subjects = (const char *[1]){"foo"}; + sc.SubjectsLen = 1; + s = js_AddStream(NULL, js, &sc, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Create the test consumer configured with 'pinned_client': "); + jsConsumerConfig_Init(&cc); + cc.Durable = "pinned"; + cc.PriorityPolicy = jsPriorityPolicyPinnedClientStr; + cc.PinnedTTL = NATS_SECONDS_TO_NANOS(1); + cc.PriorityGroups = groups; + cc.PriorityGroupsLen = 1; + s = js_AddConsumer(NULL, js, "TEST", &cc, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Publish 100 messages: "); + for (int i = 0; (s == NATS_OK) && (i < 100); i++) + s = js_Publish(NULL, js, "foo", "hello", 5, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Initialize shared options: "); + jsSubOptions_Init(&so); + so.Stream = "TEST"; + so.Consumer = "pinned"; + testCond(true); + + test("Create pull subscription that will get pinned: "); + jsOptions_Init(&oPinned); + oPinned.PullSubscribeAsync.CompleteHandler = _completePullAsync; + oPinned.PullSubscribeAsync.CompleteHandlerClosure = &argsPinned; + oPinned.PullSubscribeAsync.Group = "A"; + oPinned.PullSubscribeAsync.FetchSize = 5; // will slow it down, so we can easily unpin while still fetching + s = js_PullSubscribeAsync(&pinned, js, "foo", "pinned", _recvPullAsync, &argsPinned, &oPinned, &so, &jerr); + testCond((s == NATS_OK) && (pinned != NULL) && (jerr == 0)); + + test("Create a second pull subscription that will not get pinned: "); + jsOptions_Init(&oUnpinned); + oUnpinned.PullSubscribeAsync.CompleteHandler = _completePullAsync; + oUnpinned.PullSubscribeAsync.CompleteHandlerClosure = &argsUnpinned; + oUnpinned.PullSubscribeAsync.Group = "A"; + s = js_PullSubscribeAsync(&unpinned, js, "foo", "pinned", _recvPullAsync, &argsUnpinned, &oUnpinned, &so, &jerr); + testCond((s == NATS_OK) && (unpinned != NULL) && (jerr == 0)); + + nats_Sleep(100); // let the pinned sub get a few messages + + test("Ensure that the pinned sub received some, and the unpinned none: "); + natsMutex_Lock(argsPinned.m); + int pinnedSum = argsPinned.sum; + natsMutex_Unlock(argsPinned.m); + natsMutex_Lock(argsUnpinned.m); + int unpinnedSum = argsUnpinned.sum; + natsMutex_Unlock(argsUnpinned.m); + testCond((pinnedSum > 0) && (unpinnedSum == 0)); + + test("Unpin the pinned sub: "); + s = js_UnpinConsumer(js, "TEST", "pinned", "A", NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Publish more messages in case we already drained everything: "); + for (int i = 0; (s == NATS_OK) && (i < 100); i++) + s = js_Publish(NULL, js, "foo", "hello", 5, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Ensure that pinning changed, and all messages are delivered: "); + nats_Sleep(500); // should be enough even on windows!!! + natsMutex_Lock(argsPinned.m); + int previouslyPinnedSum = argsPinned.sum; + natsMutex_Unlock(argsPinned.m); + natsMutex_Lock(argsUnpinned.m); + int newPinnedSum = argsUnpinned.sum; + natsMutex_Unlock(argsUnpinned.m); + testCond((previouslyPinnedSum >= pinnedSum) // the previously pinned may still receive a few messages that are queued for processing + && (newPinnedSum > 0) // "new" messages should go to the other sub + && ((previouslyPinnedSum + newPinnedSum) == 200)); // between the 2, they get all messages + + natsSubscription_Destroy(pinned); + natsSubscription_Destroy(unpinned); + JS_TEARDOWN; + _destroyDefaultThreadArgs(&argsPinned); + _destroyDefaultThreadArgs(&argsUnpinned); +} + +void test_JetStreamSubscribePullAsync_Overflow(void) +{ + natsStatus s; + natsSubscription *sub = NULL; + jsErrCode jerr = 0; + jsStreamConfig sc; + jsConsumerConfig cc; + jsOptions o; + jsSubOptions so; + struct threadArg args; + const char *groups[] = {"A"}; + + JS_SETUP(2, 11, 0); + + s = _createDefaultThreadArgsForCbTests(&args); + if (s != NATS_OK) + FAIL("Unable to setup test"); + + // The default state of args is OK. + // + // .control = 0; // don't ack, will be auto-ack + // .status = NATS_OK; // batch exit status will be here + // .msgReceived = false; + // .closed = false; + // .sum = 0; + + test("Create the test stream: "); + jsStreamConfig_Init(&sc); + sc.Name = "TEST"; + sc.Subjects = (const char *[1]){"foo"}; + sc.SubjectsLen = 1; + s = js_AddStream(NULL, js, &sc, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Create the test consumer configured with 'overflow': "); + jsConsumerConfig_Init(&cc); + cc.Durable = "overflow"; + cc.PriorityPolicy = jsPriorityPolicyOverflowStr; + cc.PriorityGroups = groups; + cc.PriorityGroupsLen = 1; + s = js_AddConsumer(NULL, js, "TEST", &cc, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Publish 100 messages: "); + for (int i = 0; (s == NATS_OK) && (i < 100); i++) + s = js_Publish(NULL, js, "foo", "hello", 5, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Initialize shared options: "); + jsSubOptions_Init(&so); + so.Stream = "TEST"; + so.Consumer = "overflow"; + testCond(true); + + test("Create pull async subscription with MinPending=110, greater than 100 published: "); + jsOptions_Init(&o); + o.PullSubscribeAsync.CompleteHandler = _completePullAsync; + o.PullSubscribeAsync.CompleteHandlerClosure = &args; + o.PullSubscribeAsync.Group = "A"; + o.PullSubscribeAsync.MinPending = 110; + o.PullSubscribeAsync.MaxMessages = 91; // we will only get 91, after 200 messages are published + s = js_PullSubscribeAsync(&sub, js, "foo", "overflow", _recvPullAsync, &args, &o, &so, &jerr); + testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0)); + + test("Not getting messages yet, not enough pending: "); + nats_Sleep(100); + natsMutex_Lock(args.m); + int sum = args.sum; + natsMutex_Unlock(args.m); + testCond(sum == 0); + + test("Publish 100 more messages: "); + for (int i = 0; (s == NATS_OK) && (i < 100); i++) + s = js_Publish(NULL, js, "foo", "hello", 5, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Get 91 messages, until we drop below min pending of 110: "); + testCond(_testBatchCompleted(&args, sub, NATS_MAX_DELIVERED_MSGS, 91, false)); + natsSubscription_Destroy(sub); + sub = NULL; + + test("Subscribe again, this time with MinAckPending of 5: "); + natsMutex_Lock(args.m); + args.sum = 0; + args.closed = false; + args.msgReceived = false; + args.status = NATS_OK; // batch exit status will be here + natsMutex_Unlock(args.m); + jsOptions_Init(&o); + o.PullSubscribeAsync.CompleteHandler = _completePullAsync; + o.PullSubscribeAsync.CompleteHandlerClosure = &args; + o.PullSubscribeAsync.Group = "A"; + o.PullSubscribeAsync.MinAckPending = 5; + o.PullSubscribeAsync.MaxMessages = 99; // 200 - 91 - 10 + s = js_PullSubscribeAsync(&sub, js, "foo", "overflow", _recvPullAsync, &args, &o, &so, &jerr); + testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0)); + + test("Not getting messages yet since there no unacknowledged yet: "); + nats_Sleep(100); + natsMutex_Lock(args.m); + sum = args.sum; + natsMutex_Unlock(args.m); + testCond(sum == 0); + + test("Receive but not ACK 10 messages with a separate subscription: "); + so.ManualAck = true; + struct threadArg args2; + natsSubscription *sub2 = NULL; + s = _createDefaultThreadArgsForCbTests(&args2); + if (s == NATS_OK) + { + jsOptions_Init(&o); + o.PullSubscribeAsync.CompleteHandler = _completePullAsync; + o.PullSubscribeAsync.CompleteHandlerClosure = &args2; + o.PullSubscribeAsync.Group = "A"; + o.PullSubscribeAsync.MaxMessages = 10; + s = js_PullSubscribeAsync(&sub2, js, "foo", "overflow", _recvPullAsync, &args2, &o, &so, &jerr); + } + testCond((s == NATS_OK) && (sub2 != NULL) && (jerr == 0) && _testBatchCompleted(&args2, sub2, NATS_MAX_DELIVERED_MSGS, 10, false)); + natsSubscription_Destroy(sub2); + _destroyDefaultThreadArgs(&args2); + + test("Get the remaining 99 messages (200 - 91 - 10): "); + testCond(_testBatchCompleted(&args, sub, NATS_MAX_DELIVERED_MSGS, 99, false)); + + natsSubscription_Destroy(sub); + JS_TEARDOWN; + _destroyDefaultThreadArgs(&args); +} + void test_JetStreamSubscribeHeadersOnly(void) { natsStatus s;