From 0ad0f509ac116ab22cb9bd2fc99328bf5ba067b7 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Sat, 31 Jan 2026 19:30:45 +0800 Subject: [PATCH 1/8] make refcount atomic --- src/aof.c | 2 +- src/cluster.c | 2 +- src/cluster_asm.c | 2 +- src/db.c | 24 +++++------ src/debug.c | 6 +-- src/defrag.c | 10 ++--- src/iothread.c | 6 --- src/keymeta.c | 64 +++++++++++++++------------ src/keymeta.h | 5 ++- src/lazyfree.c | 85 +----------------------------------- src/module.c | 8 ++-- src/networking.c | 107 +++++----------------------------------------- src/object.c | 98 +++++++++++++++++++++++++----------------- src/object.h | 68 ++++++++++++++++++++++++++--- src/rdb.c | 2 +- src/script_lua.c | 2 +- src/server.c | 11 ++--- src/server.h | 8 +--- src/slowlog.c | 2 +- src/t_string.c | 2 +- 20 files changed, 211 insertions(+), 303 deletions(-) diff --git a/src/aof.c b/src/aof.c index 3ace6701134..a5c3acc65f8 100644 --- a/src/aof.c +++ b/src/aof.c @@ -2423,7 +2423,7 @@ int rewriteObject(rio *r, robj *key, robj *o, int dbid, long long expiretime) { } /* If modules metadata is available */ - if ((getModuleMetaBits(o->metabits)) && (keyMetaOnAof(r, key, o, dbid) == 0)) + if ((getModuleMetaBits(robj_get_metabits(o))) && (keyMetaOnAof(r, key, o, dbid) == 0)) return C_ERR; return C_OK; diff --git a/src/cluster.c b/src/cluster.c index d07c31c5900..0d9950c23ae 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -94,7 +94,7 @@ void createDumpPayload(rio *payload, robj *o, robj *key, int dbid, int skip_chec rioInitWithBuffer(payload,sdsempty()); /* Save key metadata if present without (handles TTL separately via command args) */ - if (getModuleMetaBits(o->metabits)) + if (getModuleMetaBits(robj_get_metabits(o))) serverAssert(rdbSaveKeyMetadata(payload, key, o, dbid) != -1); serverAssert(rdbSaveObjectType(payload,o)); serverAssert(rdbSaveObject(payload,o,key,dbid)); diff --git a/src/cluster_asm.c b/src/cluster_asm.c index a0904537753..c10b4458b4a 100644 --- a/src/cluster_asm.c +++ b/src/cluster_asm.c @@ -3439,7 +3439,7 @@ void asmActiveTrimDeleteKey(redisDb *db, robj *keyobj) { debugDelay(asmManager->debug_active_trim_delay); /* The key needs to be converted from static to heap before deletion. */ - int static_key = keyobj->refcount == OBJ_STATIC_REFCOUNT; + int static_key = robj_get_refcount(keyobj) == OBJ_STATIC_REFCOUNT; if (static_key) keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr)); dbDelete(db, keyobj); diff --git a/src/db.c b/src/db.c index 04d94184f7e..919f7aa68b3 100644 --- a/src/db.c +++ b/src/db.c @@ -59,7 +59,7 @@ void updateLFU(robj *val) { /* Update LRM when an object is modified. */ void updateLRM(robj *o) { - if (o->refcount == OBJ_SHARED_REFCOUNT) + if (robj_get_refcount(o) == OBJ_SHARED_REFCOUNT) return; if (server.maxmemory_policy & MAXMEMORY_FLAG_LRM) { o->lru = LRU_CLOCK(); @@ -591,10 +591,10 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link long long oldExpire = getExpire(db, key->ptr, old); /* All metadata will be kept if not `overwrite` for the new object */ - uint32_t newKeyMetaBits = old->metabits; + uint32_t newKeyMetaBits = robj_get_metabits(old); /* clear expire if not keepTTL or no old expire */ if ((!keepTTL) || (oldExpire == -1)) - newKeyMetaBits &= ~KEY_META_MASK_EXPIRE; + newKeyMetaBits &= ~KEY_META_MASK_EXPIRE; if (overwrite) { /* On overwrite, discard module metadata excluding expire if set */ @@ -604,7 +604,7 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link incrRefCount(old); /* Free related metadata. Ignore builtin metadata (currently only expire) */ - if (getModuleMetaBits(old->metabits)) { + if (getModuleMetaBits(robj_get_metabits(old))) { keyMetaOnUnlink(db, key, old); freeModuleMeta = 1; } @@ -622,8 +622,8 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link if (server.memory_tracking_enabled) oldsize = kvobjAllocSize(old); - if ((old->refcount == 1 && old->encoding != OBJ_ENCODING_EMBSTR) && - (val->refcount == 1 && val->encoding != OBJ_ENCODING_EMBSTR) && (!freeModuleMeta)) + if ((robj_get_refcount(old) == 1 && old->encoding != OBJ_ENCODING_EMBSTR) && + (robj_get_refcount(val) == 1 && val->encoding != OBJ_ENCODING_EMBSTR) && (!freeModuleMeta)) { /* Keep old object in the database. Just swap it's ptr, type and * encoding with the content of val. */ @@ -687,7 +687,7 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link } } - if (server.io_threads_num > 1 && old->encoding == OBJ_ENCODING_RAW && old->refcount == 1) { + if (server.io_threads_num > 1 && old->encoding == OBJ_ENCODING_RAW && robj_get_refcount(old) == 1) { /* In multi-threaded mode, the OBJ_ENCODING_RAW string object usually is * allocated in the IO thread, so we defer the free to the IO thread. * Besides, we never free a string object in BIO threads, so, even with @@ -853,7 +853,7 @@ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) { * need to incr to retain kv */ incrRefCount(kv); /* refcnt=1->2 */ /* Metadata hook: notify unlink for key metadata cleanup. */ - if (getModuleMetaBits(kv->metabits)) keyMetaOnUnlink(db, key, kv); + if (getModuleMetaBits(robj_get_metabits(kv))) keyMetaOnUnlink(db, key, kv); /* Tells the module that the key has been unlinked from the database. */ moduleNotifyKeyUnlink(key, kv, db->id, flags); /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */ @@ -950,7 +950,7 @@ kvobj *dbUnshareStringValue(redisDb *db, robj *key, kvobj *kv) { * which can be used if we already have one, thus saving the dbFind call. */ kvobj *dbUnshareStringValueByLink(redisDb *db, robj *key, kvobj *o, dictEntryLink link) { serverAssert(o->type == OBJ_STRING); - if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) { + if (robj_get_refcount(o) != 1 || o->encoding != OBJ_ENCODING_RAW) { robj *decoded = getDecodedObject(o); o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr)); decrRefCount(decoded); @@ -2166,7 +2166,7 @@ void renameGenericCommand(client *c, int nx) { /* Prepare metadata for the renamed key */ KeyMetaSpec keymeta; keyMetaSpecInit(&keymeta); - if (o->metabits) keyMetaOnRename(c->db, o, c->argv[1], c->argv[2], &keymeta); + if (robj_get_metabits(o)) keyMetaOnRename(c->db, o, c->argv[1], c->argv[2], &keymeta); dbDelete(c->db,c->argv[1]); @@ -2381,7 +2381,7 @@ void copyCommand(client *c) { /* Prepare metadata for the new key */ KeyMetaSpec keymeta; keyMetaSpecInit(&keymeta); - if (o->metabits) keyMetaOnCopy(o, key, newkey, c->db->id, dst->id, &keymeta); + if (robj_get_metabits(o)) keyMetaOnCopy(o, key, newkey, c->db->id, dst->id, &keymeta); kvobj *kvCopy = dbAddInternal(dst, newkey, &newobj, NULL, &keymeta); @@ -2696,7 +2696,7 @@ static void deleteKeyAndPropagate(redisDb *db, robj *keyobj, int notify_type, lo char *notify_name = notify_type == NOTIFY_EXPIRED ? "expired" : "evicted"; /* The key needs to be converted from static to heap before deleted */ - int static_key = keyobj->refcount == OBJ_STATIC_REFCOUNT; + int static_key = robj_get_refcount(keyobj) == OBJ_STATIC_REFCOUNT; if (static_key) { keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr)); } diff --git a/src/debug.c b/src/debug.c index c239bbaf0f6..fedd70bc26b 100644 --- a/src/debug.c +++ b/src/debug.c @@ -685,7 +685,7 @@ NULL "Value at:%p refcount:%d " "encoding:%s serializedlength:%zu " "lru:%d lru_seconds_idle:%llu%s", - (void*)kv, kv->refcount, + (void*)kv, robj_get_refcount(kv), strenc, rdbSavedObjectLen(kv, c->argv[2], c->db->id), kv->lru, estimateObjectIdleTime(kv)/1000, extra); } else if (!strcasecmp(c->argv[1]->ptr,"sdslen") && c->argc == 3) { @@ -1250,14 +1250,14 @@ void _serverAssertPrintClientInfo(const client *c) { arg = buf; } serverLog(LL_WARNING,"client->argv[%d] = \"%s\" (refcount: %d)", - j, arg, c->argv[j]->refcount); + j, arg, robj_get_refcount(c->argv[j])); } } void serverLogObjectDebugInfo(const robj *o) { serverLog(LL_WARNING,"Object type: %u", o->type); serverLog(LL_WARNING,"Object encoding: %u", o->encoding); - serverLog(LL_WARNING,"Object refcount: %d", o->refcount); + serverLog(LL_WARNING,"Object refcount: %d", robj_get_refcount(o)); #if UNSAFE_CRASH_REPORT /* This code is now disabled. o->ptr may be unreliable to print. in some * cases a ziplist could have already been freed by realloc, but not yet diff --git a/src/defrag.c b/src/defrag.c index b058dfd01c5..765023e13bb 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -293,9 +293,9 @@ void *activeDefragHfieldAndUpdateRef(void *ptr, void *privdata) { * reference count is not 1, in these cases, the caller must explicitly pass * in the reference count, otherwise defragmentation will not be performed. * Note that the caller is responsible for updating any other references to the robj. */ -robj *activeDefragStringObEx(robj* ob, int expected_refcount) { +robj *activeDefragStringObEx(robj* ob, unsigned int expected_refcount) { robj *ret = NULL; - if (ob->refcount!=expected_refcount) + if (robj_get_refcount(ob)!=expected_refcount) return NULL; /* try to defrag robj (only if not an EMBSTR type (handled below). */ @@ -1059,7 +1059,7 @@ robj *activeDefragKvobj(kvobj* kv, int without_free) { long offsetEmbstr = LONG_MIN; /* Don't defrag kvobj's with multiple references (refcount > 1) */ - if (kv->refcount != 1) + if (robj_get_refcount(kv) != 1) return NULL; /* Calculate offset for EMBSTR strings */ @@ -1131,7 +1131,7 @@ void defragKey(defragKeysCtx *ctx, dictEntry *de, dictEntryLink link) { /* Only defrag strings with refcount==1 (String might be shared as dict * keys, e.g. pub/sub channels, and may be accessed by IO threads. Other * types are never used as dict keys) */ - if ((ob->refcount==1) && (ob->encoding == OBJ_ENCODING_RAW)) { + if ((robj_get_refcount(ob)==1) && (ob->encoding == OBJ_ENCODING_RAW)) { /* For RAW strings, defrag the separate SDS allocation */ sds newsds = activeDefragSds((sds)ob->ptr); if (newsds) ob->ptr = newsds; @@ -1254,7 +1254,7 @@ void defragPubsubScanCallback(void *privdata, const dictEntry *de, dictEntryLink dict *newclients, *clients = dictGetVal(de); /* Try to defrag the channel name. */ - serverAssert(channel->refcount == (int)dictSize(clients) + 1); + serverAssert(robj_get_refcount(channel) == dictSize(clients) + 1); newchannel = activeDefragStringObEx(channel, dictSize(clients) + 1); if (newchannel) { kvstoreDictSetKey(pubsub_channels, ctx->kvstate.slot, (dictEntry*)de, newchannel); diff --git a/src/iothread.c b/src/iothread.c index 54a9cae1eb2..e91425ebe23 100644 --- a/src/iothread.c +++ b/src/iothread.c @@ -183,8 +183,6 @@ void keepClientInMainThread(client *c) { c->io_flags |= CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED; c->tid = IOTHREAD_MAIN_THREAD_ID; freeClientDeferredObjects(c, 1); /* Free deferred objects. */ - freeClientIODeferredObjects(c, 1); /* Free IO deferred objects. */ - tryUnlinkClientFromPendingRefReply(c, 0); /* Main thread starts to manage it. */ server.io_threads_clients_num[c->tid]++; } @@ -586,10 +584,6 @@ int processClientsFromIOThread(IOThread *t) { /* Let main thread to run it, set running thread id first. */ c->running_tid = IOTHREAD_MAIN_THREAD_ID; - /* Free objects queued by IO thread for deferred freeing. */ - freeClientIODeferredObjects(c, 0); - tryUnlinkClientFromPendingRefReply(c, 0); - /* If a read error occurs, handle it in the main thread first, since we * want to print logs about client information before freeing. */ if (isClientReadErrorFatal(c)) handleClientReadError(c); diff --git a/src/keymeta.c b/src/keymeta.c index 530ea049300..29c6ec5f3f2 100644 --- a/src/keymeta.c +++ b/src/keymeta.c @@ -178,13 +178,14 @@ void keyMetaOnCopy(kvobj *kv, robj *srcKey, robj *dstKey, int srcDbId, int dstDb KeyMetaSpec *keymeta) { uint64_t *pMeta = ((uint64_t *)kv) - 1; - if (kv->metabits & KEY_META_MASK_EXPIRE) { + uint32_t kv_metabits = robj_get_metabits(kv); + if (kv_metabits & KEY_META_MASK_EXPIRE) { if (*pMeta != KM_EXPIRE_RESET_VALUE) keyMetaSpecAdd(keymeta, KEY_META_ID_EXPIRE, *pMeta); pMeta--; } - uint32_t mbits = kv->metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbits = kv_metabits >> KEY_META_ID_MODULE_FIRST; if (likely(mbits == 0)) return; int keyMetaId = KEY_META_ID_MODULE_FIRST; @@ -207,17 +208,18 @@ void keyMetaOnCopy(kvobj *kv, robj *srcKey, robj *dstKey, int srcDbId, int dstDb /* Prepare metadata spec for rename of `kv` */ void keyMetaOnRename(struct redisDb *db, kvobj *kv, robj *oldKey, robj *newKey, KeyMetaSpec *kms) { uint64_t *pMeta = ((uint64_t *)kv) - 1; + uint32_t kv_metabits = robj_get_metabits(kv); /* Handle builtin expire: add only if set and value != -1, but always advance * the pointer when the expire bit is set since the slot exists either way. */ - if (kv->metabits & KEY_META_MASK_EXPIRE) { + if (kv_metabits & KEY_META_MASK_EXPIRE) { if (*pMeta != KM_EXPIRE_RESET_VALUE) keyMetaSpecAdd(kms, KEY_META_ID_EXPIRE, *pMeta); pMeta--; /* skip expire slot */ } /* Process module metadata. Default on rename: keep if no callback. */ - uint32_t mbits = kv->metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbits = kv_metabits >> KEY_META_ID_MODULE_FIRST; if (likely(mbits == 0)) return; int keyMetaId = KEY_META_ID_MODULE_FIRST; @@ -244,17 +246,18 @@ void keyMetaOnRename(struct redisDb *db, kvobj *kv, robj *oldKey, robj *newKey, /* Prepare metadata spec for move of `kv` from srcDbId to dstDbId */ void keyMetaOnMove(kvobj *kv, robj *key, int srcDbId, int dstDbId, KeyMetaSpec *kms) { uint64_t *pMeta = ((uint64_t *)kv) - 1; + uint32_t kv_metabits = robj_get_metabits(kv); /* Handle builtin expire: add only if set and value != -1, but always advance * the pointer when the expire bit is set since the slot exists either way. */ - if (kv->metabits & KEY_META_MASK_EXPIRE) { + if (kv_metabits & KEY_META_MASK_EXPIRE) { if (*pMeta != KM_EXPIRE_RESET_VALUE) keyMetaSpecAdd(kms, KEY_META_ID_EXPIRE, *pMeta); pMeta--; /* skip expire slot */ } /* Process module metadata. Default on move: keep if no callback. */ - uint32_t mbits = kv->metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbits = kv_metabits >> KEY_META_ID_MODULE_FIRST; if (likely(mbits == 0)) return; int keyMetaId = KEY_META_ID_MODULE_FIRST; @@ -292,11 +295,12 @@ void keyMetaOnMove(kvobj *kv, robj *key, int srcDbId, int dstDbId, KeyMetaSpec * void keyMetaOnUnlink(redisDb *db, robj *key, kvobj *kv) { /* Skip builtin expire slot if present; no action for expire itself here. */ uint64_t *pMeta = ((uint64_t *)kv) - 1; - if (kv->metabits & KEY_META_MASK_EXPIRE) + uint32_t kv_metabits = robj_get_metabits(kv); + if (kv_metabits & KEY_META_MASK_EXPIRE) pMeta--; /* Iterate module metadata and invoke per-class unlink if provided. */ - uint32_t mbits = kv->metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbits = kv_metabits >> KEY_META_ID_MODULE_FIRST; if (likely(mbits == 0)) return; /* Build operation context for modules: from_key = key name, to_key = NULL. */ @@ -333,11 +337,12 @@ void keyMetaOnUnlink(redisDb *db, robj *key, kvobj *kv) { void keyMetaOnFree(kvobj *kv) { /* Skip builtin expire slot if present; no action needed for expire itself. */ uint64_t *pMeta = ((uint64_t *)kv) - 1; - if (kv->metabits & KEY_META_MASK_EXPIRE) + uint32_t kv_metabits = robj_get_metabits(kv); + if (kv_metabits & KEY_META_MASK_EXPIRE) pMeta--; /* Iterate module metadata and invoke per-class free if provided. */ - uint32_t mbits = kv->metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbits = kv_metabits >> KEY_META_ID_MODULE_FIRST; if (likely(mbits == 0)) return; int keyMetaId = KEY_META_ID_MODULE_FIRST; @@ -558,14 +563,15 @@ int rdbLoadKeyMetadata(rio *rdb, int dbid, int numClasses, KeyMetaSpec *kms) { * Returns -1 on error, 0 on success. */ int rdbSaveKeyMetadata(rio *rdb, robj *key, kvobj *kv, int dbid) { + uint32_t kv_metabits = robj_get_metabits(kv); /* Check if there are any module metadata bits set */ - uint32_t mbits = kv->metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbits = kv_metabits >> KEY_META_ID_MODULE_FIRST; if (likely(mbits == 0)) return 0; /* No module metadata */ /* Skip builtin expire slot if present */ uint64_t *pMeta = ((uint64_t *)kv) - 1; - if (kv->metabits & KEY_META_MASK_EXPIRE) + if (kv_metabits & KEY_META_MASK_EXPIRE) pMeta--; /* Create temporary buffer for payload (class data only, no headers) */ @@ -650,11 +656,12 @@ int rdbSaveKeyMetadata(rio *rdb, robj *key, kvobj *kv, int dbid) { int keyMetaOnAof(rio *r, robj *key, kvobj *kv, int dbid) { /* Skip builtin expire slot if present; no action needed for expire itself. */ uint64_t *pMeta = ((uint64_t *)kv) - 1; - if (kv->metabits & KEY_META_MASK_EXPIRE) + uint32_t kv_metabits = robj_get_metabits(kv); + if (kv_metabits & KEY_META_MASK_EXPIRE) pMeta--; /* Iterate module metadata and invoke per-class aof_rewrite if provided */ - uint32_t mbits = kv->metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbits = kv_metabits >> KEY_META_ID_MODULE_FIRST; if (likely(mbits == 0)) return 1; int keyMetaId = KEY_META_ID_MODULE_FIRST; @@ -686,17 +693,19 @@ int keyMetaOnAof(rio *r, robj *key, kvobj *kv, int dbid) { /* Move entire metadata from old to new kvobj as is */ void keyMetaTransition(kvobj *kvOld, kvobj *kvNew) { + uint32_t kvOld_metabits = robj_get_metabits(kvOld); + uint32_t kvNew_metabits = robj_get_metabits(kvNew); /* Precondition: */ - debugServerAssert(kvOld->metabits>>KEY_META_ID_MODULE_FIRST); - + debugServerAssert(kvOld_metabits>>KEY_META_ID_MODULE_FIRST); + /* Skip builtin expire slot if present; no action needed for expire itself. */ uint64_t *pMetaOld = ((uint64_t *)kvOld) - 1; - if (kvOld->metabits & KEY_META_MASK_EXPIRE) pMetaOld--; + if (kvOld_metabits & KEY_META_MASK_EXPIRE) pMetaOld--; uint64_t *pMetaNew = ((uint64_t *)kvNew) - 1; - if (kvNew->metabits & KEY_META_MASK_EXPIRE) pMetaNew--; - - uint32_t mbitsOld = kvOld->metabits >> KEY_META_ID_MODULE_FIRST; - uint32_t mbitsNew = kvNew->metabits >> KEY_META_ID_MODULE_FIRST; + if (kvNew_metabits & KEY_META_MASK_EXPIRE) pMetaNew--; + + uint32_t mbitsOld = kvOld_metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbitsNew = kvNew_metabits >> KEY_META_ID_MODULE_FIRST; if (likely(mbitsOld == 0)) return; int keyMetaId = KEY_META_ID_MODULE_FIRST; do { @@ -799,7 +808,7 @@ kvobj *keyMetaSetMetadata(redisDb *db, kvobj *kv, KeyMetaClassId id, uint64_t me return NULL; /* If metadata already attached, just update it in place. */ - if (kv->metabits & (1u << id)) { + if (robj_get_metabits(kv) & (1u << id)) { *kvobjMetaRef(kv, id) = metadata; return kv; } @@ -838,7 +847,7 @@ kvobj *keyMetaSetMetadata(redisDb *db, kvobj *kv, KeyMetaClassId id, uint64_t me size_t oldsize = 0; if (server.memory_tracking_enabled) oldsize = kvobjAllocSize(kv); - kv = kvobjSet(key, kv, kv->metabits | (1u << id)); + kv = kvobjSet(key, kv, robj_get_metabits(kv) | (1u << id)); kvstoreDictSetAtLink(db->keys, slot, kv, &keyLink, 0); if (server.memory_tracking_enabled) updateSlotAllocSize(db, slot, kv, oldsize, kvobjAllocSize(kv)); @@ -866,7 +875,7 @@ int keyMetaGetMetadata(KeyMetaClassId kmcId, kvobj *kv, uint64_t *metadata) { if (keyMetaClass[kmcId].state != CLASS_STATE_INUSE) return 0; - if (!(kv->metabits & (1u << kmcId))) + if (!(robj_get_metabits(kv) & (1u << kmcId))) return 0; /* metadata not attached */ *metadata = *kvobjMetaRef(kv, kmcId); @@ -919,16 +928,17 @@ static void keyMetaSpecAddUnordered(KeyMetaSpec *keymeta, int metaid, uint64_t m /* Blindly reset modules metadata values to reset_value */ void keyMetaResetModuleValues(kvobj *kv) { + uint32_t kv_metabits = robj_get_metabits(kv); /* Precondition: only called for module metadata (bits 1-7) */ - debugServerAssert(kv->metabits & KEY_META_MASK_MODULES); + debugServerAssert(kv_metabits & KEY_META_MASK_MODULES); /* Skip expire slot (bit 0) if present, start directly at module metadata */ uint64_t *pMeta = ((uint64_t *)kv) - 1; - if (kv->metabits & KEY_META_MASK_EXPIRE) + if (kv_metabits & KEY_META_MASK_EXPIRE) pMeta--; /* Process only module metadata bits (1-7) */ - uint32_t mbits = kv->metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbits = kv_metabits >> KEY_META_ID_MODULE_FIRST; int keyMetaId = KEY_META_ID_MODULE_FIRST; do { if (mbits & 1) diff --git a/src/keymeta.h b/src/keymeta.h index 43b4242313c..0a56f27a401 100644 --- a/src/keymeta.h +++ b/src/keymeta.h @@ -149,10 +149,11 @@ static inline uint32_t getModuleMetaBits(uint16_t metabits); /********** Inline functions **********/ static inline void keyMetaResetValues(kvobj *kv) { - if (unlikely(kv->metabits & KEY_META_MASK_MODULES)) + uint32_t metabits = robj_get_metabits(kv); + if (unlikely(metabits & KEY_META_MASK_MODULES)) keyMetaResetModuleValues(kv); /* Must be first meta (optimized) */ - if (kv->metabits & KEY_META_MASK_EXPIRE) + if (metabits & KEY_META_MASK_EXPIRE) ((uint64_t *)kv)[-1] = -1; } diff --git a/src/lazyfree.c b/src/lazyfree.c index 1960be1233f..90cdb1f2eee 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -187,7 +187,7 @@ void freeObjAsync(robj *key, robj *obj, int dbid) { * possible. This rarely happens, however sometimes the implementation * of parts of the Redis core may call incrRefCount() to protect * objects, and then call dbDelete(). */ - if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) { + if (free_effort > LAZYFREE_THRESHOLD && robj_get_refcount(obj) == 1) { atomicIncr(lazyfree_objects,1); bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj); } else { @@ -195,88 +195,6 @@ void freeObjAsync(robj *key, robj *obj, int dbid) { } } -/* Duplicate client reply objects that reference database objects to avoid race - * conditions with bio threads during async flushdb. - * - * Since incrRefCount/decrRefCount are not thread-safe, and bio thread may - * free database objects while main thread/IO threads send client replies, we need to - * create independent copies of the string objects to avoid concurrent access. */ -static void protectClientReplyObjects(void) { - /* If there are no clients with pending ref replies, exit ASAP. */ - if (!listLength(server.clients_with_pending_ref_reply)) - return; - - /* Pause all IO threads to safely duplicate string objects. */ - int allpaused = 0; - if (server.io_threads_num > 1) { - serverAssert(pthread_equal(server.main_thread_id, pthread_self())); - allpaused = 1; - pauseAllIOThreads(); - } - - listNode *ln; - listIter li; - listRewind(server.clients_with_pending_ref_reply, &li); - while ((ln = listNext(&li)) != NULL) { - client *c = listNodeValue(ln); - - /* Process c->buf if it's encoded */ - if (c->buf_encoded && c->bufpos > 0) { - char *ptr = c->buf; - while (ptr < c->buf + c->bufpos) { - payloadHeader *header = (payloadHeader *)ptr; - ptr += sizeof(payloadHeader); - - if (header->payload_type == BULK_STR_REF) { - bulkStrRef *str_ref = (bulkStrRef *)ptr; - if (str_ref->obj != NULL) { - /* Duplicate the string object */ - robj *new_obj = dupStringObject(str_ref->obj); - decrRefCount(str_ref->obj); - str_ref->obj = new_obj; - } - } - ptr += header->payload_len; - } - } - - /* Process reply list */ - if (c->reply && listLength(c->reply)) { - listIter reply_li; - listNode *reply_ln; - listRewind(c->reply, &reply_li); - while ((reply_ln = listNext(&reply_li))) { - clientReplyBlock *block = listNodeValue(reply_ln); - if (block && block->buf_encoded) { - char *ptr = block->buf; - while (ptr < block->buf + block->used) { - payloadHeader *header = (payloadHeader *)ptr; - ptr += sizeof(payloadHeader); - - if (header->payload_type == BULK_STR_REF) { - bulkStrRef *str_ref = (bulkStrRef *)ptr; - if (str_ref->obj != NULL) { - /* Duplicate the string object */ - robj *new_obj = dupStringObject(str_ref->obj); - decrRefCount(str_ref->obj); - str_ref->obj = new_obj; - } - } - ptr += header->payload_len; - } - } - } - } - - /* Process references in IO deferred objects and remove client from - * pending ref list since all refs have been duplicated above. */ - freeClientIODeferredObjects(c, 0); - tryUnlinkClientFromPendingRefReply(c, 1); - } - - if (allpaused) resumeAllIOThreads(); -} - /* Empty a Redis DB asynchronously. What the function does actually is to * create a new empty set of hash tables and scheduling the old ones for * lazy freeing. */ @@ -292,7 +210,6 @@ void emptyDbAsync(redisDb *db) { db->keys = kvstoreCreate(&kvstoreExType, &dbDictType, slot_count_bits, flags); db->expires = kvstoreCreate(&kvstoreBaseType, &dbExpiresDictType, slot_count_bits, flags); db->subexpires = estoreCreate(&subexpiresBucketsType, slot_count_bits); - protectClientReplyObjects(); /* Protect client reply objects before async free. */ emptyDbDataAsync(oldkeys, oldexpires, oldsubexpires); } diff --git a/src/module.c b/src/module.c index 6ab92b62532..af1714656e3 100644 --- a/src/module.c +++ b/src/module.c @@ -946,7 +946,7 @@ void RedisModuleCommandDispatcher(client *c) { for (int i = 0; i < c->argc; i++) { /* Only do the work if the module took ownership of the object: * in that case the refcount is no longer 1. */ - if (c->argv[i]->refcount > 1) + if (robj_get_refcount(c->argv[i]) > 1) trimStringObjectIfNeeded(c->argv[i], 0); } } @@ -2854,7 +2854,7 @@ void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) { * This API is not thread safe, access to these retained strings (if they originated * from a client command arguments) must be done with GIL locked. */ RedisModuleString* RM_HoldString(RedisModuleCtx *ctx, RedisModuleString *str) { - if (str->refcount == OBJ_STATIC_REFCOUNT) { + if (robj_get_refcount(str) == OBJ_STATIC_REFCOUNT) { return RM_CreateStringFromString(ctx, str); } @@ -2968,7 +2968,7 @@ int RM_StringCompare(const RedisModuleString *a, const RedisModuleString *b) { /* Return the (possibly modified in encoding) input 'str' object if * the string is unshared, otherwise NULL is returned. */ RedisModuleString *moduleAssertUnsharedString(RedisModuleString *str) { - if (str->refcount != 1) { + if (robj_get_refcount(str) != 1) { serverLog(LL_WARNING, "Module attempted to use an in-place string modify operation " "with a string referenced multiple times. Please check the code " @@ -6610,7 +6610,7 @@ robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int argv[argc++] = createStringObject(cstr,strlen(cstr)); } else if (*p == 's') { robj *obj = va_arg(ap,void*); - if (obj->refcount == OBJ_STATIC_REFCOUNT) + if (robj_get_refcount(obj) == OBJ_STATIC_REFCOUNT) obj = createStringObject(obj->ptr,sdslen(obj->ptr)); else incrRefCount(obj); diff --git a/src/networking.c b/src/networking.c index 8dd6ac9861d..b806702679f 100644 --- a/src/networking.c +++ b/src/networking.c @@ -176,9 +176,6 @@ client *createClient(connection *conn) { c->original_argv = NULL; c->deferred_objects = NULL; c->deferred_objects_num = 0; - c->io_deferred_objects = NULL; - c->io_deferred_objects_num = 0; - c->io_deferred_objects_size = 0; c->cmd = c->lastcmd = c->realcmd = c->lookedcmd = NULL; c->cur_script = NULL; c->multibulklen = 0; @@ -519,15 +516,6 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) { _addReplyPayloadToList(c, c->reply, s + reply_len, len - reply_len, PLAIN_REPLY); } -/* Check if the client's pending_ref_reply_node is currently linked in the list. - * A node is considered linked if it has neighbors (prev/next), or if it's the - * only node in the list (head points to it). */ -static inline int clientIsInPendingRefReplyList(client *c) { - return listNextNode(&c->pending_ref_reply_node) != NULL || - listPrevNode(&c->pending_ref_reply_node) != NULL || - listFirst(server.clients_with_pending_ref_reply) == &c->pending_ref_reply_node; -} - /* Increment reference to object and add pointer to object and * pointer to string itself to current reply buffer */ static void _addBulkStrRefToBufferOrList(client *c, robj *obj, size_t len) { @@ -556,11 +544,6 @@ static void _addBulkStrRefToBufferOrList(client *c, robj *obj, size_t len) { if (!_addBulkStrRefToBuffer(c, (void *)&str_ref, sizeof(str_ref))) { _addReplyPayloadToList(c, c->reply, (void *)&str_ref, sizeof(str_ref), BULK_STR_REF); } - - /* Track clients with pending referenced reply objects for async flushdb protection. */ - if (!clientIsInPendingRefReplyList(c)) { - listLinkNodeTail(server.clients_with_pending_ref_reply, &c->pending_ref_reply_node); - } } /* ----------------------------------------------------------------------------- @@ -1240,7 +1223,7 @@ static int isCopyAvoidPreferred(client *c, robj *obj, size_t len) { * to server.pending_push_messages when CLIENT_PUSHING is set. */ if (c->flags & CLIENT_PUSHING) return 0; - if (obj->encoding != OBJ_ENCODING_RAW || obj->refcount >= OBJ_FIRST_SPECIAL_REFCOUNT) return 0; + if (obj->encoding != OBJ_ENCODING_RAW || robj_get_refcount(obj) >= OBJ_FIRST_SPECIAL_REFCOUNT) return 0; /* Copy avoidance is preferred for any string size starting certain number of I/O threads */ if (server.io_threads_num >= COPY_AVOID_MIN_IO_THREADS) return 1; @@ -1711,48 +1694,6 @@ void freeClientDeferredObjects(client *c, int free_array) { } } -/* Queue an robj to be freed by the main thread when client returns from IO thread. - * This is used in IO thread write path to avoid refcount race conditions. */ -#define IO_DEFERRED_OBJECTS_INIT_SIZE 8 -void ioDeferFreeRobj(client *c, robj *obj) { - if (c->io_deferred_objects_num >= c->io_deferred_objects_size) { - int new_size = !c->io_deferred_objects_size ? - IO_DEFERRED_OBJECTS_INIT_SIZE : c->io_deferred_objects_size * 2; - c->io_deferred_objects = zrealloc(c->io_deferred_objects, new_size * sizeof(robj *)); - c->io_deferred_objects_size = new_size; - } - c->io_deferred_objects[c->io_deferred_objects_num++] = obj; -} - -/* Free all objects queued by IO thread for deferred freeing. - * Called by main thread when client returns from IO thread. - * If free_array is true then free the array itself as well. */ -void freeClientIODeferredObjects(client *c, int free_array) { - if (!c->conn) return; - - for (int i = 0; i < c->io_deferred_objects_num; i++) { - robj *obj = c->io_deferred_objects[i]; - decrRefCount(obj); - } - - if (!free_array) { - /* If the utilization rate is less than 1/4, reduce the size to 1/2 to avoid thrashing */ - if (c->io_deferred_objects_size > IO_DEFERRED_OBJECTS_INIT_SIZE && - c->io_deferred_objects_num * 4 < c->io_deferred_objects_size) - { - int new_size = c->io_deferred_objects_size / 2; - c->io_deferred_objects = zrealloc(c->io_deferred_objects, new_size * sizeof(robj *)); - c->io_deferred_objects_size = new_size; - } - c->io_deferred_objects_num = 0; - } else { - zfree(c->io_deferred_objects); - c->io_deferred_objects = NULL; - c->io_deferred_objects_num = 0; - c->io_deferred_objects_size = 0; - } -} - void freeClientOriginalArgv(client *c) { /* We didn't rewrite this client */ if (!c->original_argv) return; @@ -1903,19 +1844,6 @@ void unlinkClient(client *c) { if (c->flags & CLIENT_TRACKING) disableTracking(c); } -/* Remove client from the list of clients with pending referenced replies. - * This is called when the client has finished sending all pending replies, - * or when the client is being freed. - * - * If 'force' is true, the client is removed unconditionally. - * This should only be used when we are certain that the replies no longer - * contain any referenced robj. */ -void tryUnlinkClientFromPendingRefReply(client *c, int force) { - if (clientIsInPendingRefReplyList(c) && (force || !clientHasPendingReplies(c))) { - listUnlinkNode(server.clients_with_pending_ref_reply, &c->pending_ref_reply_node); - } -} - /* Clear the client state to resemble a newly connected client. */ void clearClientConnectionState(client *c) { listNode *ln; @@ -1999,8 +1927,7 @@ static void resetReusableQueryBuf(client *c) { /* Release references to string objects inside an encoded buffer. * If running in IO thread, defer the free to main thread via io_deferred_objects. */ -static void releaseBufReferences(client *c, char *buf, size_t bufpos) { - int in_io_thread = (c && c->running_tid != IOTHREAD_MAIN_THREAD_ID); +static void releaseBufReferences(char *buf, size_t bufpos) { char *ptr = buf; while (ptr < buf + bufpos) { payloadHeader *header = (payloadHeader *)ptr; @@ -2010,10 +1937,7 @@ static void releaseBufReferences(client *c, char *buf, size_t bufpos) { bulkStrRef *str_ref = (bulkStrRef *)ptr; /* Only release if not already released. */ if (str_ref->obj != NULL) { - if (in_io_thread) - ioDeferFreeRobj(c, str_ref->obj); - else - decrRefCount(str_ref->obj); + decrRefCount(str_ref->obj); str_ref->obj = NULL; } } else { @@ -2027,7 +1951,7 @@ static void releaseBufReferences(client *c, char *buf, size_t bufpos) { /* Release all references to string objects in all encoded buffers */ static void releaseAllBufReferences(client *c) { if (c->buf_encoded) { - releaseBufReferences(c, c->buf, c->bufpos); + releaseBufReferences(c->buf, c->bufpos); } listIter iter; @@ -2036,7 +1960,7 @@ static void releaseAllBufReferences(client *c) { while ((next = listNext(&iter))) { clientReplyBlock *o = (clientReplyBlock *)listNodeValue(next); if (o && o->buf_encoded) { - releaseBufReferences(c, o->buf, o->used); + releaseBufReferences(o->buf, o->used); } } } @@ -2143,8 +2067,6 @@ void freeClient(client *c) { freeReplicaReferencedReplBuffer(c); freeClientOriginalArgv(c); freeClientDeferredObjects(c, 1); - freeClientIODeferredObjects(c, 1); - tryUnlinkClientFromPendingRefReply(c, 1); if (c->deferred_reply_errors) listRelease(c->deferred_reply_errors); #ifdef LOG_REQ_RES @@ -2399,10 +2321,9 @@ static void processEncodedBufferForWrite(ReplyIOV *reply_iov, char *start_ptr, c /* Process sent data in the encoded buffer. * Returns pointer to the current payload header being processed, or NULL if all data is processed. * If running in IO thread, defer the free to main thread via io_deferred_objects. */ -static payloadHeader *processSentDataInEncodedBuffer(client *c, char *start_ptr, char *end_ptr, +static payloadHeader *processSentDataInEncodedBuffer(char *start_ptr, char *end_ptr, size_t *sentlen, ssize_t *remaining) { - int in_io_thread = (c && c->running_tid != IOTHREAD_MAIN_THREAD_ID); char *ptr = start_ptr; while (ptr < end_ptr && *remaining > 0) { payloadHeader *head = (payloadHeader *)ptr; @@ -2426,11 +2347,7 @@ static payloadHeader *processSentDataInEncodedBuffer(client *c, char *start_ptr, return head; } *remaining -= (writen_len - *sentlen); - if (in_io_thread) { - ioDeferFreeRobj(c, str_ref->obj); - } else { - decrRefCount(str_ref->obj); - } + decrRefCount(str_ref->obj); str_ref->obj = NULL; /* Mark as released to prevent double free */ *sentlen = 0; } @@ -2523,7 +2440,7 @@ static int _writevToClient(client *c, ssize_t *nwritten) { } else { /* For encoded buffers */ char *start_ptr = c->last_header ? (char *)c->last_header : c->buf; - c->last_header = processSentDataInEncodedBuffer(c, start_ptr, c->buf + c->bufpos, &c->sentlen, &remaining); + c->last_header = processSentDataInEncodedBuffer(start_ptr, c->buf + c->bufpos, &c->sentlen, &remaining); if (!c->last_header) { /* reach end */ c->bufpos = 0; c->buf_encoded = 0; @@ -2552,7 +2469,7 @@ static int _writevToClient(client *c, ssize_t *nwritten) { } else { /* Encoded reply block */ char *start_ptr = c->last_header ? (char *)c->last_header : o->buf; - c->last_header = processSentDataInEncodedBuffer(c, start_ptr, o->buf + o->used, &c->sentlen, &remaining); + c->last_header = processSentDataInEncodedBuffer(start_ptr, o->buf + o->used, &c->sentlen, &remaining); if (!c->last_header) { /* reach end */ /* Block fully consumed, remove it */ c->reply_bytes -= o->size; @@ -2744,10 +2661,6 @@ int writeToClient(client *c, int handler_installed) { return C_ERR; } - /* Remove client from pending referenced reply clients list. */ - if (c->running_tid == IOTHREAD_MAIN_THREAD_ID) - tryUnlinkClientFromPendingRefReply(c, 1); - /* If replica client has sent all the replication data it knows about * we send it to main thread so it can pick up new repl data ASAP. * Note, that we keep it in IO thread in case we have a pending ACK read. */ @@ -5604,7 +5517,7 @@ static void reclaimPendingCommand(client *c, pendingCommand *pcmd) { * decrease the reference count to release our reference to it. */ for (int j = 0; j < pcmd->argc; j++) { robj *o = pcmd->argv[j]; - if (o && o->refcount > 1) { + if (o && robj_get_refcount(o) > 1) { decrRefCount(o); pcmd->argv[j] = NULL; } diff --git a/src/object.c b/src/object.c index d4915f2d675..d8f026b8033 100644 --- a/src/object.c +++ b/src/object.c @@ -25,7 +25,7 @@ /* Map a metadata ID (bit index) to its compacted slot number among set bits, * then return a pointer to that slot. Caller must ensure the ID bit is set. */ uint64_t *kvobjMetaRef(kvobj *kv, int metaId) { - uint32_t bits = kv->metabits; + uint32_t bits = robj_get_metabits(kv); /* Expiry is always the first metadata */ if (likely(metaId == 0)) return ((uint64_t *)kv) - 1; @@ -86,10 +86,8 @@ kvobj *kvobjCreate(int type, const sds key, void *ptr, uint32_t keyMetaBits) { kv->type = type; kv->encoding = OBJ_ENCODING_RAW; kv->ptr = ptr; - kv->refcount = 1; kv->lru = 0; - kv->iskvobj = 1; - kv->metabits = keyMetaBits; + kv->flags_refcount = OBJ_BUILD_FLAGS_REFCOUNT(1, 1, keyMetaBits); /* The memory after the struct where we embedded data. */ char *data = (void *)(kv + 1); @@ -109,15 +107,13 @@ robj *createObject(int type, void *ptr) { o->type = type; o->encoding = OBJ_ENCODING_RAW; o->ptr = ptr; - o->refcount = 1; o->lru = 0; - o->iskvobj = 0; - o->metabits = 0; + o->flags_refcount = OBJ_BUILD_FLAGS_REFCOUNT(1, 0, 0); return o; } void initObjectLRUOrLFU(robj *o) { - if (o->refcount == OBJ_SHARED_REFCOUNT) + if (robj_get_refcount(o) == OBJ_SHARED_REFCOUNT) return; /* Set the LRU to the current lruclock (seconds resolution), or * alternatively the LFU counter. */ @@ -141,8 +137,8 @@ void initObjectLRUOrLFU(robj *o) { * */ robj *makeObjectShared(robj *o) { - serverAssert(o->refcount == 1); - o->refcount = OBJ_SHARED_REFCOUNT; + serverAssert(robj_get_refcount(o) == 1); + robj_set_refcount(o, OBJ_SHARED_REFCOUNT); return o; } @@ -187,10 +183,8 @@ static kvobj *kvobjCreateEmbedString(const char *val_ptr, size_t val_len, o->type = OBJ_STRING; o->encoding = OBJ_ENCODING_EMBSTR; - o->refcount = 1; o->lru = 0; - o->metabits = keyMetaBits; - o->iskvobj = 1; + o->flags_refcount = OBJ_BUILD_FLAGS_REFCOUNT(1, 1, keyMetaBits); /* The memory after the struct where we embedded data. */ char *data = (char *)(o + 1); @@ -227,10 +221,8 @@ robj *createEmbeddedStringObject(const char *val_ptr, size_t val_len) { robj *o = zmalloc_usable(sizeof(robj) + val_sds_size, &bufsize); o->type = OBJ_STRING; o->encoding = OBJ_ENCODING_EMBSTR; - o->refcount = 1; o->lru = 0; - o->metabits = 0; - o->iskvobj = 0; + o->flags_refcount = OBJ_BUILD_FLAGS_REFCOUNT(1, 0, 0); /* The memory after the struct where we embedded data. */ char *data = (char *)(o + 1); @@ -244,14 +236,14 @@ robj *createEmbeddedStringObject(const char *val_ptr, size_t val_len) { sds kvobjGetKey(const kvobj *kv) { unsigned char *data = (void *)(kv + 1); - debugServerAssert(kv->iskvobj); + debugServerAssert(robj_get_iskvobj(kv)); uint8_t hdr_size = *(uint8_t *)data; data += 1 + hdr_size; return (sds)data; } long long kvobjGetExpire(const kvobj *kv) { - if (kv->metabits & KEY_META_MASK_EXPIRE) { + if (robj_get_metabits(kv) & KEY_META_MASK_EXPIRE) { return (long long) (*kvobjMetaRef((kvobj *)kv, KEY_META_ID_EXPIRE)); } else { return -1; @@ -262,13 +254,14 @@ long long kvobjGetExpire(const kvobj *kv) { * the old object's reference counter is decremented and possibly freed. Use the * returned object instead of 'val' after calling this function. */ kvobj *kvobjSetExpire(kvobj *kv, long long expire) { - /* If kv not expirable, then we need to realloc to add expire metadata */ - if (!(kv->metabits & KEY_META_MASK_EXPIRE)) { + /* If kv not expirable, then we need to realloc to add expire metadata */ + uint32_t metabits = robj_get_metabits(kv); + if (!(metabits & KEY_META_MASK_EXPIRE)) { /* Nothing to do if kv not expirable and expire is -1 */ if (expire == -1) return kv; - - kv = kvobjSet(kvobjGetKey(kv), kv, kv->metabits | KEY_META_MASK_EXPIRE); + + kv = kvobjSet(kvobjGetKey(kv), kv, metabits | KEY_META_MASK_EXPIRE); } /* kv is expirable. Update expire field. */ @@ -297,7 +290,7 @@ kvobj *kvobjSet(sds key, robj *val, uint32_t keyMetaBits) { } else { /* Create a new object with embedded key. Reuse ptr if possible. */ void *valptr; - if (val->refcount == 1) { + if (robj_get_refcount(val) == 1) { /* Reuse the ptr. There are no other references to val. */ valptr = val->ptr; val->ptr = NULL; @@ -321,7 +314,7 @@ kvobj *kvobjSet(sds key, robj *val, uint32_t keyMetaBits) { kv->lru = val->lru; /* Transfer module metadata from `val` to new `kv` (if `val` of type kvobj with metadata). */ - if (val->metabits & KEY_META_MASK_MODULES) + if (robj_get_metabits(val) & KEY_META_MASK_MODULES) keyMetaTransition((kvobj *) val, kv); decrRefCount(val); @@ -587,12 +580,23 @@ void freeStreamObject(robj *o) { } void incrRefCount(robj *o) { - if (o->refcount < OBJ_FIRST_SPECIAL_REFCOUNT - 1) { - o->refcount++; + uint32_t old_val, new_val; + atomicGet(o->flags_refcount, old_val); + unsigned int refcount = OBJ_GET_REFCOUNT(old_val); + + if (refcount < OBJ_FIRST_SPECIAL_REFCOUNT - 1) { + if (likely(refcount == 1)) { + /* Fast path, only hold by itself. */ + robj_incr_refcount(o); + } else { + do { + new_val = OBJ_INCR_REFCOUNT(old_val); + } while (!atomicCompareExchange(uint32_t, o->flags_refcount, old_val, new_val)); + } } else { - if (o->refcount == OBJ_SHARED_REFCOUNT) { + if (refcount == OBJ_SHARED_REFCOUNT) { /* Nothing to do: this refcount is immutable. */ - } else if (o->refcount == OBJ_STATIC_REFCOUNT) { + } else if (refcount == OBJ_STATIC_REFCOUNT) { serverPanic("You tried to retain an object allocated in the stack"); } else { serverPanic("You tried to retain an object with maximum refcount"); @@ -601,22 +605,38 @@ void incrRefCount(robj *o) { } void decrRefCount(robj *o) { - if (o->refcount == OBJ_SHARED_REFCOUNT) - return; /* Nothing to do: this refcount is immutable. */ + uint32_t old_val, new_val; + + atomicGet(o->flags_refcount, old_val); + unsigned int refcount = OBJ_GET_REFCOUNT(old_val); - if (unlikely(o->refcount <= 0)) { + if (refcount == OBJ_SHARED_REFCOUNT) + return; /* Nothing to do: this refcount is immutable. */ + if (unlikely(refcount <= 0)) { serverPanic("illegal decrRefCount for object with: type %u, encoding %u, refcount %d", - o->type, o->encoding, o->refcount); + o->type, o->encoding, refcount); + } + + if (likely(refcount == 1)) { + /* Fast path, only hold by itself. */ + robj_set_refcount(o, 0); + refcount = 0; + } else { + do { + new_val = OBJ_DECR_REFCOUNT(old_val); + } while (!atomicCompareExchange(uint32_t, o->flags_refcount, old_val, new_val)); + refcount = OBJ_GET_REFCOUNT(new_val); } - if (--(o->refcount) == 0) { + /* old_val now contains the value before decrement (CAS updates it on failure) */ + if (refcount == 0) { void *alloc = o; - - if (o->iskvobj) { + + if (robj_get_iskvobj(o)) { /* eval real allocation pointer */ alloc = kvobjGetAllocPtr(o); /* if kvobj has metadata attached. */ - if (getModuleMetaBits(o->metabits)) + if (getModuleMetaBits(robj_get_metabits(o))) keyMetaOnFree((kvobj *)o); } @@ -800,7 +820,7 @@ void dismissObject(robj *o, size_t size_hint) { /* Currently we use zmadvise_dontneed only when we use jemalloc with Linux. * so we avoid these pointless loops when they're not going to do anything. */ #if defined(USE_JEMALLOC) && defined(__linux__) - if (o->refcount != 1) return; + if (robj_get_refcount(o) != 1) return; switch(o->type) { case OBJ_STRING: dismissStringObject(o); break; case OBJ_LIST: dismissListObject(o, size_hint); break; @@ -876,7 +896,7 @@ robj *tryObjectEncodingEx(robj *o, int try_trim) { /* It's not safe to encode shared objects: shared objects can be shared * everywhere in the "object space" of Redis and may end in places where * they are not handled. We handle them only as values in the keyspace. */ - if (o->refcount > 1) return o; + if (robj_get_refcount(o) > 1) return o; /* Check if we can represent this string as a long integer. * Note that we are sure that a string larger than 20 chars is not @@ -1602,7 +1622,7 @@ NULL } else if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) { if ((kv = kvobjCommandLookupOrReply(c, c->argv[2], shared.null[c->resp])) == NULL) return; - addReplyLongLong(c, kv->refcount); + addReplyLongLong(c, robj_get_refcount(kv)); } else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) { if ((kv = kvobjCommandLookupOrReply(c, c->argv[2], shared.null[c->resp])) == NULL) return; diff --git a/src/object.h b/src/object.h index 1e761175d03..28f17a0e146 100644 --- a/src/object.h +++ b/src/object.h @@ -96,17 +96,55 @@ struct RedisModuleType; #define OBJ_STATIC_REFCOUNT ((1 << OBJ_REFCOUNT_BITS) - 2) /* Object allocated in the stack. */ #define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT +/* + * Bit layout of flags_refcount (32 bits): + * bits 0-22 : refcount (23 bits) + * bit 23 : iskvobj (1 bit) - 1 if this struct serves as a kvobj base + * bits 24-31 : metabits (8 bits) - Bitmap of metadata (+expiry) attached to this kvobj + */ +#define OBJ_REFCOUNT_SHIFT 0 +#define OBJ_ISKVOBJ_SHIFT OBJ_REFCOUNT_BITS /* bit 23 */ +#define OBJ_METABITS_SHIFT (OBJ_REFCOUNT_BITS + 1) /* bit 24 */ +#define OBJ_REFCOUNT_MASK ((1U << OBJ_REFCOUNT_BITS) - 1) /* 0x007FFFFF */ +#define OBJ_ISKVOBJ_MASK (1U << OBJ_ISKVOBJ_SHIFT) /* 0x00800000 */ +#define OBJ_METABITS_MASK (((1U << OBJ_NUM_KVMETA_BITS) - 1) << OBJ_METABITS_SHIFT) /* 0xFF000000 */ + +/* Refcount operations on flags_refcount value */ +#define OBJ_GET_REFCOUNT(atomic_val) ((atomic_val) & OBJ_REFCOUNT_MASK) +#define OBJ_SET_REFCOUNT(atomic_val, rc) \ + (((atomic_val) & ~OBJ_REFCOUNT_MASK) | ((rc) & OBJ_REFCOUNT_MASK)) +#define OBJ_INCR_REFCOUNT(atomic_val) \ + (((atomic_val) & ~OBJ_REFCOUNT_MASK) | ((OBJ_GET_REFCOUNT(atomic_val) + 1) & OBJ_REFCOUNT_MASK)) +#define OBJ_DECR_REFCOUNT(atomic_val) \ + (((atomic_val) & ~OBJ_REFCOUNT_MASK) | ((OBJ_GET_REFCOUNT(atomic_val) - 1) & OBJ_REFCOUNT_MASK)) + +#define OBJ_GET_ISKVOBJ(atomic_val) (((atomic_val) & OBJ_ISKVOBJ_MASK) >> OBJ_ISKVOBJ_SHIFT) +#define OBJ_GET_METABITS(atomic_val) (((atomic_val) & OBJ_METABITS_MASK) >> OBJ_METABITS_SHIFT) + +/* Build flags_refcount value from components */ +#define OBJ_BUILD_FLAGS_REFCOUNT(rc, iskvobj, mb) \ + (((rc) & OBJ_REFCOUNT_MASK) | \ + ((iskvobj) ? OBJ_ISKVOBJ_MASK : 0) | \ + (((mb) << OBJ_METABITS_SHIFT) & OBJ_METABITS_MASK)) + +/* Convenience macros to set fields on robj pointer (write) */ +#define robj_set_refcount(o, rc) do { \ + (o)->flags_refcount = OBJ_SET_REFCOUNT((o)->flags_refcount, (rc)); \ +} while(0) +#define robj_incr_refcount(o) do { \ + (o)->flags_refcount = OBJ_INCR_REFCOUNT((o)->flags_refcount); \ +} while(0) +#define robj_decr_refcount(o) do { \ + (o)->flags_refcount = OBJ_DECR_REFCOUNT((o)->flags_refcount); \ +} while(0) + struct redisObject { unsigned type:4; unsigned encoding:4; - unsigned refcount : OBJ_REFCOUNT_BITS; - unsigned iskvobj : 1; /* 1 if this struct serves as a kvobj base */ - - /* metabits and lru are Relevant only when iskvobj is set: */ - unsigned metabits :8; /* Bitmap of metadata (+expiry) attached to this kvobj */ unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ + redisAtomic uint32_t flags_refcount; void *ptr; }; @@ -116,6 +154,24 @@ typedef struct redisObject robj; /* kvobj: see header comment above for definition and memory layout. */ typedef struct redisObject kvobj; +/* Inline functions to access fields directly from robj pointer (read). + * These use atomic read for thread-safety. */ +static inline uint32_t robj_get_refcount(const robj *o) { + uint32_t tmp; + atomicGet((o)->flags_refcount, tmp); + return OBJ_GET_REFCOUNT(tmp); +} +static inline uint32_t robj_get_iskvobj(const robj *o) { + uint32_t tmp; + atomicGet((o)->flags_refcount, tmp); + return OBJ_GET_ISKVOBJ(tmp); +} +static inline uint32_t robj_get_metabits(const robj *o) { + uint32_t tmp; + atomicGet((o)->flags_refcount, tmp); + return OBJ_GET_METABITS(tmp); +} + kvobj *kvobjCreate(int type, const sds key, void *ptr, uint32_t keyMetaBits); kvobj *kvobjSet(sds key, robj *val, uint32_t keyMetaBits); kvobj *kvobjSetExpire(kvobj *kv, long long expire); @@ -187,7 +243,7 @@ void memoryCommand(struct client *c); static inline void *kvobjGetAllocPtr(const kvobj *kv) { /* Return the base allocation pointer (start of the metadata prefix). */ - uint32_t numMetaBytes = __builtin_popcount(kv->metabits) * sizeof(uint64_t); + uint32_t numMetaBytes = __builtin_popcount(robj_get_metabits(kv)) * sizeof(uint64_t); return (char *)kv - numMetaBytes; } diff --git a/src/rdb.c b/src/rdb.c index 45d213b9690..f0acc221d55 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1408,7 +1408,7 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, in } /* if needed save key metadata */ - if (getModuleMetaBits(val->metabits)) { + if (getModuleMetaBits(robj_get_metabits(val))) { if (rdbSaveKeyMetadata(rdb, key, val, dbid) == -1) return -1; } diff --git a/src/script_lua.c b/src/script_lua.c index 2da14ae3dfa..605997a37af 100644 --- a/src/script_lua.c +++ b/src/script_lua.c @@ -858,7 +858,7 @@ void freeLuaRedisArgv(robj **argv, int argc, int argv_len) { * The object must be small, SDS-encoded, and with refcount = 1 * (we must be the only owner) for us to cache it. */ if (j < LUA_CMD_OBJCACHE_SIZE && - o->refcount == 1 && + robj_get_refcount(o) == 1 && (o->encoding == OBJ_ENCODING_RAW || o->encoding == OBJ_ENCODING_EMBSTR) && sdslen(o->ptr) <= LUA_CMD_OBJCACHE_MAX_LEN) diff --git a/src/server.c b/src/server.c index ddcc66c3932..32bd84d3ed1 100644 --- a/src/server.c +++ b/src/server.c @@ -475,11 +475,13 @@ int dictEncObjKeyCompare(dictCmpCache *cache, const void *key1, const void *key2 * good reasons, because it would incrRefCount() the object, which * is invalid. So we check to make sure dictFind() works with static * objects as well. */ - if (o1->refcount != OBJ_STATIC_REFCOUNT) o1 = getDecodedObject(o1); - if (o2->refcount != OBJ_STATIC_REFCOUNT) o2 = getDecodedObject(o2); + uint32_t refcount1 = robj_get_refcount(o1); + uint32_t refcount2 = robj_get_refcount(o2); + if (refcount1 != OBJ_STATIC_REFCOUNT) o1 = getDecodedObject(o1); + if (refcount2 != OBJ_STATIC_REFCOUNT) o2 = getDecodedObject(o2); cmp = dictSdsKeyCompare(cache,o1->ptr,o2->ptr); - if (o1->refcount != OBJ_STATIC_REFCOUNT) decrRefCount(o1); - if (o2->refcount != OBJ_STATIC_REFCOUNT) decrRefCount(o2); + if (refcount1 != OBJ_STATIC_REFCOUNT) decrRefCount(o1); + if (refcount2 != OBJ_STATIC_REFCOUNT) decrRefCount(o2); return cmp; } @@ -2893,7 +2895,6 @@ void initServer(void) { server.monitors = listCreate(); server.clients_pending_write = listCreate(); server.clients_pending_read = listCreate(); - server.clients_with_pending_ref_reply = listCreate(); server.clients_timeout_table = raxNew(); server.replication_allowed = 1; server.slaveseldb = -1; /* Force to emit the first SELECT command. */ diff --git a/src/server.h b/src/server.h index 836216fb06d..2f522be7108 100644 --- a/src/server.h +++ b/src/server.h @@ -1065,11 +1065,10 @@ char *getObjectTypeName(robj*); * we'll update it when the structure is changed, to avoid bugs like * bug #85 introduced exactly in this way. */ #define initStaticStringObject(_var,_ptr) do { \ - _var.refcount = OBJ_STATIC_REFCOUNT; \ _var.type = OBJ_STRING; \ _var.encoding = OBJ_ENCODING_RAW; \ - _var.metabits = 0; \ - _var.iskvobj = 0; \ + _var.lru = 0; \ + _var.flags_refcount = OBJ_BUILD_FLAGS_REFCOUNT(OBJ_STATIC_REFCOUNT, 0, 0); \ _var.ptr = _ptr; \ } while(0) @@ -1983,7 +1982,6 @@ struct redisServer { list *clients_to_close; /* Clients to close asynchronously */ list *clients_pending_write; /* There is to write or install handler. */ list *clients_pending_read; /* Client has pending read socket buffers. */ - list *clients_with_pending_ref_reply; /* Clients with referenced reply objects. */ list *slaves, *monitors; /* List of slaves and MONITORs */ client *current_client; /* The client that triggered the command execution (External or AOF). */ client *executing_client; /* The client executing the current command (possibly script or module). */ @@ -3102,7 +3100,6 @@ void freeClientArgv(client *c); void freeClientPendingCommands(client *c, int num_pcmds_to_free); void tryDeferFreeClientObject(client *c, int type, void *ptr); void freeClientDeferredObjects(client *c, int free_array); -void freeClientIODeferredObjects(client *c, int free_array); void sendReplyToClient(connection *conn); void *addReplyDeferredLen(client *c); void setDeferredArrayLen(client *c, void *node, long length); @@ -3198,7 +3195,6 @@ int clientHasPendingReplies(client *c); int updateClientMemUsageAndBucket(client *c); void removeClientFromMemUsageBucket(client *c, int allow_eviction); void unlinkClient(client *c); -void tryUnlinkClientFromPendingRefReply(client *c, int force); int writeToClient(client *c, int handler_installed); void linkClient(client *c); void protectClient(client *c); diff --git a/src/slowlog.c b/src/slowlog.c index eaf88cc33fc..3146d02965b 100644 --- a/src/slowlog.c +++ b/src/slowlog.c @@ -52,7 +52,7 @@ slowlogEntry *slowlogCreateEntry(client *c, robj **argv, int argc, long long dur (unsigned long) sdslen(argv[j]->ptr) - SLOWLOG_ENTRY_MAX_STRING); se->argv[j] = createObject(OBJ_STRING,s); - } else if (argv[j]->refcount == OBJ_SHARED_REFCOUNT) { + } else if (robj_get_refcount(argv[j]) == OBJ_SHARED_REFCOUNT) { se->argv[j] = argv[j]; } else { /* Here we need to duplicate the string objects composing the diff --git a/src/t_string.c b/src/t_string.c index 608db4c2fcb..652c6aa85d0 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -815,7 +815,7 @@ void incrDecrCommand(client *c, long long incr) { } value += incr; - if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT && + if (o && robj_get_refcount(o) == 1 && o->encoding == OBJ_ENCODING_INT && value >= LONG_MIN && value <= LONG_MAX) { new = o; From 6e26080c66d486743bda8566c1deb3203645ae1e Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Sun, 1 Feb 2026 11:48:53 +0800 Subject: [PATCH 2/8] Refine --- src/aof.c | 2 +- src/cluster.c | 2 +- src/db.c | 10 ++++---- src/keymeta.c | 26 +++++++++---------- src/keymeta.h | 2 +- src/object.c | 56 +++++++++++++++++++++++++--------------- src/object.h | 71 +++++++++++---------------------------------------- src/rdb.c | 2 +- src/server.h | 2 +- 9 files changed, 73 insertions(+), 100 deletions(-) diff --git a/src/aof.c b/src/aof.c index a5c3acc65f8..c8eaea4d57f 100644 --- a/src/aof.c +++ b/src/aof.c @@ -2423,7 +2423,7 @@ int rewriteObject(rio *r, robj *key, robj *o, int dbid, long long expiretime) { } /* If modules metadata is available */ - if ((getModuleMetaBits(robj_get_metabits(o))) && (keyMetaOnAof(r, key, o, dbid) == 0)) + if ((getModuleMetaBits(o->flags.metabits)) && (keyMetaOnAof(r, key, o, dbid) == 0)) return C_ERR; return C_OK; diff --git a/src/cluster.c b/src/cluster.c index 0d9950c23ae..744ce11a919 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -94,7 +94,7 @@ void createDumpPayload(rio *payload, robj *o, robj *key, int dbid, int skip_chec rioInitWithBuffer(payload,sdsempty()); /* Save key metadata if present without (handles TTL separately via command args) */ - if (getModuleMetaBits(robj_get_metabits(o))) + if (getModuleMetaBits(o->flags.metabits)) serverAssert(rdbSaveKeyMetadata(payload, key, o, dbid) != -1); serverAssert(rdbSaveObjectType(payload,o)); serverAssert(rdbSaveObject(payload,o,key,dbid)); diff --git a/src/db.c b/src/db.c index 919f7aa68b3..4d0c1326d0d 100644 --- a/src/db.c +++ b/src/db.c @@ -591,7 +591,7 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link long long oldExpire = getExpire(db, key->ptr, old); /* All metadata will be kept if not `overwrite` for the new object */ - uint32_t newKeyMetaBits = robj_get_metabits(old); + uint32_t newKeyMetaBits = old->flags.metabits; /* clear expire if not keepTTL or no old expire */ if ((!keepTTL) || (oldExpire == -1)) newKeyMetaBits &= ~KEY_META_MASK_EXPIRE; @@ -604,7 +604,7 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link incrRefCount(old); /* Free related metadata. Ignore builtin metadata (currently only expire) */ - if (getModuleMetaBits(robj_get_metabits(old))) { + if (getModuleMetaBits(old->flags.metabits)) { keyMetaOnUnlink(db, key, old); freeModuleMeta = 1; } @@ -853,7 +853,7 @@ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) { * need to incr to retain kv */ incrRefCount(kv); /* refcnt=1->2 */ /* Metadata hook: notify unlink for key metadata cleanup. */ - if (getModuleMetaBits(robj_get_metabits(kv))) keyMetaOnUnlink(db, key, kv); + if (getModuleMetaBits(kv->flags.metabits)) keyMetaOnUnlink(db, key, kv); /* Tells the module that the key has been unlinked from the database. */ moduleNotifyKeyUnlink(key, kv, db->id, flags); /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */ @@ -2166,7 +2166,7 @@ void renameGenericCommand(client *c, int nx) { /* Prepare metadata for the renamed key */ KeyMetaSpec keymeta; keyMetaSpecInit(&keymeta); - if (robj_get_metabits(o)) keyMetaOnRename(c->db, o, c->argv[1], c->argv[2], &keymeta); + if (o->flags.metabits) keyMetaOnRename(c->db, o, c->argv[1], c->argv[2], &keymeta); dbDelete(c->db,c->argv[1]); @@ -2381,7 +2381,7 @@ void copyCommand(client *c) { /* Prepare metadata for the new key */ KeyMetaSpec keymeta; keyMetaSpecInit(&keymeta); - if (robj_get_metabits(o)) keyMetaOnCopy(o, key, newkey, c->db->id, dst->id, &keymeta); + if (o->flags.metabits) keyMetaOnCopy(o, key, newkey, c->db->id, dst->id, &keymeta); kvobj *kvCopy = dbAddInternal(dst, newkey, &newobj, NULL, &keymeta); diff --git a/src/keymeta.c b/src/keymeta.c index 29c6ec5f3f2..4e37db91f0e 100644 --- a/src/keymeta.c +++ b/src/keymeta.c @@ -178,7 +178,7 @@ void keyMetaOnCopy(kvobj *kv, robj *srcKey, robj *dstKey, int srcDbId, int dstDb KeyMetaSpec *keymeta) { uint64_t *pMeta = ((uint64_t *)kv) - 1; - uint32_t kv_metabits = robj_get_metabits(kv); + uint32_t kv_metabits = kv->flags.metabits; if (kv_metabits & KEY_META_MASK_EXPIRE) { if (*pMeta != KM_EXPIRE_RESET_VALUE) keyMetaSpecAdd(keymeta, KEY_META_ID_EXPIRE, *pMeta); @@ -208,7 +208,7 @@ void keyMetaOnCopy(kvobj *kv, robj *srcKey, robj *dstKey, int srcDbId, int dstDb /* Prepare metadata spec for rename of `kv` */ void keyMetaOnRename(struct redisDb *db, kvobj *kv, robj *oldKey, robj *newKey, KeyMetaSpec *kms) { uint64_t *pMeta = ((uint64_t *)kv) - 1; - uint32_t kv_metabits = robj_get_metabits(kv); + uint32_t kv_metabits = kv->flags.metabits; /* Handle builtin expire: add only if set and value != -1, but always advance * the pointer when the expire bit is set since the slot exists either way. */ @@ -246,7 +246,7 @@ void keyMetaOnRename(struct redisDb *db, kvobj *kv, robj *oldKey, robj *newKey, /* Prepare metadata spec for move of `kv` from srcDbId to dstDbId */ void keyMetaOnMove(kvobj *kv, robj *key, int srcDbId, int dstDbId, KeyMetaSpec *kms) { uint64_t *pMeta = ((uint64_t *)kv) - 1; - uint32_t kv_metabits = robj_get_metabits(kv); + uint32_t kv_metabits = kv->flags.metabits; /* Handle builtin expire: add only if set and value != -1, but always advance * the pointer when the expire bit is set since the slot exists either way. */ @@ -295,7 +295,7 @@ void keyMetaOnMove(kvobj *kv, robj *key, int srcDbId, int dstDbId, KeyMetaSpec * void keyMetaOnUnlink(redisDb *db, robj *key, kvobj *kv) { /* Skip builtin expire slot if present; no action for expire itself here. */ uint64_t *pMeta = ((uint64_t *)kv) - 1; - uint32_t kv_metabits = robj_get_metabits(kv); + uint32_t kv_metabits = kv->flags.metabits; if (kv_metabits & KEY_META_MASK_EXPIRE) pMeta--; @@ -337,7 +337,7 @@ void keyMetaOnUnlink(redisDb *db, robj *key, kvobj *kv) { void keyMetaOnFree(kvobj *kv) { /* Skip builtin expire slot if present; no action needed for expire itself. */ uint64_t *pMeta = ((uint64_t *)kv) - 1; - uint32_t kv_metabits = robj_get_metabits(kv); + uint32_t kv_metabits = kv->flags.metabits; if (kv_metabits & KEY_META_MASK_EXPIRE) pMeta--; @@ -563,7 +563,7 @@ int rdbLoadKeyMetadata(rio *rdb, int dbid, int numClasses, KeyMetaSpec *kms) { * Returns -1 on error, 0 on success. */ int rdbSaveKeyMetadata(rio *rdb, robj *key, kvobj *kv, int dbid) { - uint32_t kv_metabits = robj_get_metabits(kv); + uint32_t kv_metabits = kv->flags.metabits; /* Check if there are any module metadata bits set */ uint32_t mbits = kv_metabits >> KEY_META_ID_MODULE_FIRST; @@ -656,7 +656,7 @@ int rdbSaveKeyMetadata(rio *rdb, robj *key, kvobj *kv, int dbid) { int keyMetaOnAof(rio *r, robj *key, kvobj *kv, int dbid) { /* Skip builtin expire slot if present; no action needed for expire itself. */ uint64_t *pMeta = ((uint64_t *)kv) - 1; - uint32_t kv_metabits = robj_get_metabits(kv); + uint32_t kv_metabits = kv->flags.metabits; if (kv_metabits & KEY_META_MASK_EXPIRE) pMeta--; @@ -693,8 +693,8 @@ int keyMetaOnAof(rio *r, robj *key, kvobj *kv, int dbid) { /* Move entire metadata from old to new kvobj as is */ void keyMetaTransition(kvobj *kvOld, kvobj *kvNew) { - uint32_t kvOld_metabits = robj_get_metabits(kvOld); - uint32_t kvNew_metabits = robj_get_metabits(kvNew); + uint32_t kvOld_metabits = kvOld->flags.metabits; + uint32_t kvNew_metabits = kvNew->flags.metabits; /* Precondition: */ debugServerAssert(kvOld_metabits>>KEY_META_ID_MODULE_FIRST); @@ -808,7 +808,7 @@ kvobj *keyMetaSetMetadata(redisDb *db, kvobj *kv, KeyMetaClassId id, uint64_t me return NULL; /* If metadata already attached, just update it in place. */ - if (robj_get_metabits(kv) & (1u << id)) { + if (kv->flags.metabits & (1u << id)) { *kvobjMetaRef(kv, id) = metadata; return kv; } @@ -847,7 +847,7 @@ kvobj *keyMetaSetMetadata(redisDb *db, kvobj *kv, KeyMetaClassId id, uint64_t me size_t oldsize = 0; if (server.memory_tracking_enabled) oldsize = kvobjAllocSize(kv); - kv = kvobjSet(key, kv, robj_get_metabits(kv) | (1u << id)); + kv = kvobjSet(key, kv, kv->flags.metabits | (1u << id)); kvstoreDictSetAtLink(db->keys, slot, kv, &keyLink, 0); if (server.memory_tracking_enabled) updateSlotAllocSize(db, slot, kv, oldsize, kvobjAllocSize(kv)); @@ -875,7 +875,7 @@ int keyMetaGetMetadata(KeyMetaClassId kmcId, kvobj *kv, uint64_t *metadata) { if (keyMetaClass[kmcId].state != CLASS_STATE_INUSE) return 0; - if (!(robj_get_metabits(kv) & (1u << kmcId))) + if (!(kv->flags.metabits & (1u << kmcId))) return 0; /* metadata not attached */ *metadata = *kvobjMetaRef(kv, kmcId); @@ -928,7 +928,7 @@ static void keyMetaSpecAddUnordered(KeyMetaSpec *keymeta, int metaid, uint64_t m /* Blindly reset modules metadata values to reset_value */ void keyMetaResetModuleValues(kvobj *kv) { - uint32_t kv_metabits = robj_get_metabits(kv); + uint32_t kv_metabits = kv->flags.metabits; /* Precondition: only called for module metadata (bits 1-7) */ debugServerAssert(kv_metabits & KEY_META_MASK_MODULES); diff --git a/src/keymeta.h b/src/keymeta.h index 0a56f27a401..cd606ea856e 100644 --- a/src/keymeta.h +++ b/src/keymeta.h @@ -149,7 +149,7 @@ static inline uint32_t getModuleMetaBits(uint16_t metabits); /********** Inline functions **********/ static inline void keyMetaResetValues(kvobj *kv) { - uint32_t metabits = robj_get_metabits(kv); + uint32_t metabits = kv->flags.metabits; if (unlikely(metabits & KEY_META_MASK_MODULES)) keyMetaResetModuleValues(kv); /* Must be first meta (optimized) */ diff --git a/src/object.c b/src/object.c index d8f026b8033..0c3d9382e05 100644 --- a/src/object.c +++ b/src/object.c @@ -25,7 +25,7 @@ /* Map a metadata ID (bit index) to its compacted slot number among set bits, * then return a pointer to that slot. Caller must ensure the ID bit is set. */ uint64_t *kvobjMetaRef(kvobj *kv, int metaId) { - uint32_t bits = robj_get_metabits(kv); + uint32_t bits = kv->flags.metabits; /* Expiry is always the first metadata */ if (likely(metaId == 0)) return ((uint64_t *)kv) - 1; @@ -87,7 +87,9 @@ kvobj *kvobjCreate(int type, const sds key, void *ptr, uint32_t keyMetaBits) { kv->encoding = OBJ_ENCODING_RAW; kv->ptr = ptr; kv->lru = 0; - kv->flags_refcount = OBJ_BUILD_FLAGS_REFCOUNT(1, 1, keyMetaBits); + kv->flags.iskvobj = 1; + kv->flags.metabits = keyMetaBits; + kv->flags.refcount = 1; /* The memory after the struct where we embedded data. */ char *data = (void *)(kv + 1); @@ -108,7 +110,9 @@ robj *createObject(int type, void *ptr) { o->encoding = OBJ_ENCODING_RAW; o->ptr = ptr; o->lru = 0; - o->flags_refcount = OBJ_BUILD_FLAGS_REFCOUNT(1, 0, 0); + o->flags.iskvobj = 0; + o->flags.metabits = 0; + o->flags.refcount = 1; return o; } @@ -138,7 +142,7 @@ void initObjectLRUOrLFU(robj *o) { */ robj *makeObjectShared(robj *o) { serverAssert(robj_get_refcount(o) == 1); - robj_set_refcount(o, OBJ_SHARED_REFCOUNT); + o->flags.refcount = OBJ_SHARED_REFCOUNT; return o; } @@ -184,7 +188,9 @@ static kvobj *kvobjCreateEmbedString(const char *val_ptr, size_t val_len, o->type = OBJ_STRING; o->encoding = OBJ_ENCODING_EMBSTR; o->lru = 0; - o->flags_refcount = OBJ_BUILD_FLAGS_REFCOUNT(1, 1, keyMetaBits); + o->flags.refcount = 1; + o->flags.iskvobj = 1; + o->flags.metabits = keyMetaBits; /* The memory after the struct where we embedded data. */ char *data = (char *)(o + 1); @@ -222,7 +228,9 @@ robj *createEmbeddedStringObject(const char *val_ptr, size_t val_len) { o->type = OBJ_STRING; o->encoding = OBJ_ENCODING_EMBSTR; o->lru = 0; - o->flags_refcount = OBJ_BUILD_FLAGS_REFCOUNT(1, 0, 0); + o->flags.iskvobj = 0; + o->flags.metabits = 0; + o->flags.refcount = 1; /* The memory after the struct where we embedded data. */ char *data = (char *)(o + 1); @@ -236,14 +244,14 @@ robj *createEmbeddedStringObject(const char *val_ptr, size_t val_len) { sds kvobjGetKey(const kvobj *kv) { unsigned char *data = (void *)(kv + 1); - debugServerAssert(robj_get_iskvobj(kv)); + debugServerAssert(kv->flags.iskvobj); uint8_t hdr_size = *(uint8_t *)data; data += 1 + hdr_size; return (sds)data; } long long kvobjGetExpire(const kvobj *kv) { - if (robj_get_metabits(kv) & KEY_META_MASK_EXPIRE) { + if (kv->flags.metabits & KEY_META_MASK_EXPIRE) { return (long long) (*kvobjMetaRef((kvobj *)kv, KEY_META_ID_EXPIRE)); } else { return -1; @@ -255,7 +263,7 @@ long long kvobjGetExpire(const kvobj *kv) { * returned object instead of 'val' after calling this function. */ kvobj *kvobjSetExpire(kvobj *kv, long long expire) { /* If kv not expirable, then we need to realloc to add expire metadata */ - uint32_t metabits = robj_get_metabits(kv); + uint32_t metabits = kv->flags.metabits; if (!(metabits & KEY_META_MASK_EXPIRE)) { /* Nothing to do if kv not expirable and expire is -1 */ if (expire == -1) @@ -314,7 +322,7 @@ kvobj *kvobjSet(sds key, robj *val, uint32_t keyMetaBits) { kv->lru = val->lru; /* Transfer module metadata from `val` to new `kv` (if `val` of type kvobj with metadata). */ - if (robj_get_metabits(val) & KEY_META_MASK_MODULES) + if (val->flags.metabits & KEY_META_MASK_MODULES) keyMetaTransition((kvobj *) val, kv); decrRefCount(val); @@ -580,17 +588,21 @@ void freeStreamObject(robj *o) { } void incrRefCount(robj *o) { - uint32_t old_val, new_val; + uint32_t old_val = 0, new_val; + atomicGet(o->flags_refcount, old_val); - unsigned int refcount = OBJ_GET_REFCOUNT(old_val); + struct robjFlags *flags = (struct robjFlags*)&old_val; + unsigned int refcount = flags->refcount; if (refcount < OBJ_FIRST_SPECIAL_REFCOUNT - 1) { if (likely(refcount == 1)) { /* Fast path, only hold by itself. */ - robj_incr_refcount(o); + o->flags.refcount++; } else { do { - new_val = OBJ_INCR_REFCOUNT(old_val); + new_val = old_val; + flags = (struct robjFlags*)&new_val; + flags->refcount++; } while (!atomicCompareExchange(uint32_t, o->flags_refcount, old_val, new_val)); } } else { @@ -608,7 +620,8 @@ void decrRefCount(robj *o) { uint32_t old_val, new_val; atomicGet(o->flags_refcount, old_val); - unsigned int refcount = OBJ_GET_REFCOUNT(old_val); + struct robjFlags *flags = (struct robjFlags*)&old_val; + unsigned int refcount = flags->refcount; if (refcount == OBJ_SHARED_REFCOUNT) return; /* Nothing to do: this refcount is immutable. */ @@ -619,24 +632,25 @@ void decrRefCount(robj *o) { if (likely(refcount == 1)) { /* Fast path, only hold by itself. */ - robj_set_refcount(o, 0); - refcount = 0; + o->flags.refcount = refcount = 0; } else { do { - new_val = OBJ_DECR_REFCOUNT(old_val); + new_val = old_val; + flags = (struct robjFlags*)&new_val; + flags->refcount--; } while (!atomicCompareExchange(uint32_t, o->flags_refcount, old_val, new_val)); - refcount = OBJ_GET_REFCOUNT(new_val); + refcount = flags->refcount; } /* old_val now contains the value before decrement (CAS updates it on failure) */ if (refcount == 0) { void *alloc = o; - if (robj_get_iskvobj(o)) { + if (o->flags.iskvobj) { /* eval real allocation pointer */ alloc = kvobjGetAllocPtr(o); /* if kvobj has metadata attached. */ - if (getModuleMetaBits(robj_get_metabits(o))) + if (getModuleMetaBits(o->flags.metabits)) keyMetaOnFree((kvobj *)o); } diff --git a/src/object.h b/src/object.h index 28f17a0e146..744f45a34c9 100644 --- a/src/object.h +++ b/src/object.h @@ -96,47 +96,12 @@ struct RedisModuleType; #define OBJ_STATIC_REFCOUNT ((1 << OBJ_REFCOUNT_BITS) - 2) /* Object allocated in the stack. */ #define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT -/* - * Bit layout of flags_refcount (32 bits): - * bits 0-22 : refcount (23 bits) - * bit 23 : iskvobj (1 bit) - 1 if this struct serves as a kvobj base - * bits 24-31 : metabits (8 bits) - Bitmap of metadata (+expiry) attached to this kvobj - */ -#define OBJ_REFCOUNT_SHIFT 0 -#define OBJ_ISKVOBJ_SHIFT OBJ_REFCOUNT_BITS /* bit 23 */ -#define OBJ_METABITS_SHIFT (OBJ_REFCOUNT_BITS + 1) /* bit 24 */ -#define OBJ_REFCOUNT_MASK ((1U << OBJ_REFCOUNT_BITS) - 1) /* 0x007FFFFF */ -#define OBJ_ISKVOBJ_MASK (1U << OBJ_ISKVOBJ_SHIFT) /* 0x00800000 */ -#define OBJ_METABITS_MASK (((1U << OBJ_NUM_KVMETA_BITS) - 1) << OBJ_METABITS_SHIFT) /* 0xFF000000 */ - -/* Refcount operations on flags_refcount value */ -#define OBJ_GET_REFCOUNT(atomic_val) ((atomic_val) & OBJ_REFCOUNT_MASK) -#define OBJ_SET_REFCOUNT(atomic_val, rc) \ - (((atomic_val) & ~OBJ_REFCOUNT_MASK) | ((rc) & OBJ_REFCOUNT_MASK)) -#define OBJ_INCR_REFCOUNT(atomic_val) \ - (((atomic_val) & ~OBJ_REFCOUNT_MASK) | ((OBJ_GET_REFCOUNT(atomic_val) + 1) & OBJ_REFCOUNT_MASK)) -#define OBJ_DECR_REFCOUNT(atomic_val) \ - (((atomic_val) & ~OBJ_REFCOUNT_MASK) | ((OBJ_GET_REFCOUNT(atomic_val) - 1) & OBJ_REFCOUNT_MASK)) - -#define OBJ_GET_ISKVOBJ(atomic_val) (((atomic_val) & OBJ_ISKVOBJ_MASK) >> OBJ_ISKVOBJ_SHIFT) -#define OBJ_GET_METABITS(atomic_val) (((atomic_val) & OBJ_METABITS_MASK) >> OBJ_METABITS_SHIFT) - -/* Build flags_refcount value from components */ -#define OBJ_BUILD_FLAGS_REFCOUNT(rc, iskvobj, mb) \ - (((rc) & OBJ_REFCOUNT_MASK) | \ - ((iskvobj) ? OBJ_ISKVOBJ_MASK : 0) | \ - (((mb) << OBJ_METABITS_SHIFT) & OBJ_METABITS_MASK)) - -/* Convenience macros to set fields on robj pointer (write) */ -#define robj_set_refcount(o, rc) do { \ - (o)->flags_refcount = OBJ_SET_REFCOUNT((o)->flags_refcount, (rc)); \ -} while(0) -#define robj_incr_refcount(o) do { \ - (o)->flags_refcount = OBJ_INCR_REFCOUNT((o)->flags_refcount); \ -} while(0) -#define robj_decr_refcount(o) do { \ - (o)->flags_refcount = OBJ_DECR_REFCOUNT((o)->flags_refcount); \ -} while(0) +struct robjFlags { + unsigned refcount : OBJ_REFCOUNT_BITS; + unsigned iskvobj : 1; /* 1 if this struct serves as a kvobj base */ + /* metabits and lru are Relevant only when iskvobj is set: */ + unsigned metabits :8; /* Bitmap of metadata (+expiry) attached to this kvobj */ +}; struct redisObject { unsigned type:4; @@ -144,7 +109,10 @@ struct redisObject { unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ - redisAtomic uint32_t flags_refcount; + union { + redisAtomic uint32_t flags_refcount; + struct robjFlags flags; + }; void *ptr; }; @@ -157,19 +125,10 @@ typedef struct redisObject kvobj; /* Inline functions to access fields directly from robj pointer (read). * These use atomic read for thread-safety. */ static inline uint32_t robj_get_refcount(const robj *o) { - uint32_t tmp; - atomicGet((o)->flags_refcount, tmp); - return OBJ_GET_REFCOUNT(tmp); -} -static inline uint32_t robj_get_iskvobj(const robj *o) { - uint32_t tmp; - atomicGet((o)->flags_refcount, tmp); - return OBJ_GET_ISKVOBJ(tmp); -} -static inline uint32_t robj_get_metabits(const robj *o) { - uint32_t tmp; - atomicGet((o)->flags_refcount, tmp); - return OBJ_GET_METABITS(tmp); + uint32_t tmp = 0; + atomicGet(o->flags_refcount, tmp); + struct robjFlags *flags = (struct robjFlags*)&tmp; + return flags->refcount; } kvobj *kvobjCreate(int type, const sds key, void *ptr, uint32_t keyMetaBits); @@ -243,7 +202,7 @@ void memoryCommand(struct client *c); static inline void *kvobjGetAllocPtr(const kvobj *kv) { /* Return the base allocation pointer (start of the metadata prefix). */ - uint32_t numMetaBytes = __builtin_popcount(robj_get_metabits(kv)) * sizeof(uint64_t); + uint32_t numMetaBytes = __builtin_popcount(kv->flags.metabits) * sizeof(uint64_t); return (char *)kv - numMetaBytes; } diff --git a/src/rdb.c b/src/rdb.c index f0acc221d55..1710c9e310d 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1408,7 +1408,7 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, in } /* if needed save key metadata */ - if (getModuleMetaBits(robj_get_metabits(val))) { + if (getModuleMetaBits(val->flags.metabits)) { if (rdbSaveKeyMetadata(rdb, key, val, dbid) == -1) return -1; } diff --git a/src/server.h b/src/server.h index 2f522be7108..afc5655281a 100644 --- a/src/server.h +++ b/src/server.h @@ -1068,7 +1068,7 @@ char *getObjectTypeName(robj*); _var.type = OBJ_STRING; \ _var.encoding = OBJ_ENCODING_RAW; \ _var.lru = 0; \ - _var.flags_refcount = OBJ_BUILD_FLAGS_REFCOUNT(OBJ_STATIC_REFCOUNT, 0, 0); \ + _var.flags_refcount = OBJ_STATIC_REFCOUNT; \ _var.ptr = _ptr; \ } while(0) From d9e5fdb67de7dc0794222c403030557c5787a95b Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Sun, 1 Feb 2026 12:22:41 +0800 Subject: [PATCH 3/8] Refine --- src/object.c | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/object.c b/src/object.c index 0c3d9382e05..6db4d20d56a 100644 --- a/src/object.c +++ b/src/object.c @@ -591,8 +591,7 @@ void incrRefCount(robj *o) { uint32_t old_val = 0, new_val; atomicGet(o->flags_refcount, old_val); - struct robjFlags *flags = (struct robjFlags*)&old_val; - unsigned int refcount = flags->refcount; + unsigned int refcount = ((struct robjFlags*)&old_val)->refcount; if (refcount < OBJ_FIRST_SPECIAL_REFCOUNT - 1) { if (likely(refcount == 1)) { @@ -601,8 +600,7 @@ void incrRefCount(robj *o) { } else { do { new_val = old_val; - flags = (struct robjFlags*)&new_val; - flags->refcount++; + ((struct robjFlags*)&new_val)->refcount++; } while (!atomicCompareExchange(uint32_t, o->flags_refcount, old_val, new_val)); } } else { @@ -620,8 +618,7 @@ void decrRefCount(robj *o) { uint32_t old_val, new_val; atomicGet(o->flags_refcount, old_val); - struct robjFlags *flags = (struct robjFlags*)&old_val; - unsigned int refcount = flags->refcount; + unsigned int refcount = ((struct robjFlags*)&old_val)->refcount; if (refcount == OBJ_SHARED_REFCOUNT) return; /* Nothing to do: this refcount is immutable. */ @@ -636,10 +633,9 @@ void decrRefCount(robj *o) { } else { do { new_val = old_val; - flags = (struct robjFlags*)&new_val; - flags->refcount--; + ((struct robjFlags*)&new_val)->refcount--; } while (!atomicCompareExchange(uint32_t, o->flags_refcount, old_val, new_val)); - refcount = flags->refcount; + refcount = ((struct robjFlags*)&new_val)->refcount; } /* old_val now contains the value before decrement (CAS updates it on failure) */ From bf873f91af78adf5e69b20ea73e5ac1e40b28ca9 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Sun, 1 Feb 2026 12:32:25 +0800 Subject: [PATCH 4/8] temp --- src/object.h | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/object.h b/src/object.h index 744f45a34c9..989165eb1f0 100644 --- a/src/object.h +++ b/src/object.h @@ -125,10 +125,11 @@ typedef struct redisObject kvobj; /* Inline functions to access fields directly from robj pointer (read). * These use atomic read for thread-safety. */ static inline uint32_t robj_get_refcount(const robj *o) { - uint32_t tmp = 0; - atomicGet(o->flags_refcount, tmp); - struct robjFlags *flags = (struct robjFlags*)&tmp; - return flags->refcount; + return o->flags.refcount; + // uint32_t tmp = 0; + // atomicGet(o->flags_refcount, tmp); + // struct robjFlags *flags = (struct robjFlags*)&tmp; + // return flags->refcount; } kvobj *kvobjCreate(int type, const sds key, void *ptr, uint32_t keyMetaBits); From 122e293cff6c2dfb32227daae5954bfd77c85e79 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Sun, 1 Feb 2026 12:54:59 +0800 Subject: [PATCH 5/8] temp --- src/object.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/object.h b/src/object.h index 989165eb1f0..90eb5d36b5b 100644 --- a/src/object.h +++ b/src/object.h @@ -125,11 +125,11 @@ typedef struct redisObject kvobj; /* Inline functions to access fields directly from robj pointer (read). * These use atomic read for thread-safety. */ static inline uint32_t robj_get_refcount(const robj *o) { - return o->flags.refcount; - // uint32_t tmp = 0; - // atomicGet(o->flags_refcount, tmp); - // struct robjFlags *flags = (struct robjFlags*)&tmp; - // return flags->refcount; + // return o->flags.refcount; + uint32_t tmp = 0; + atomicGet(o->flags_refcount, tmp); + struct robjFlags *flags = (struct robjFlags*)&tmp; + return flags->refcount; } kvobj *kvobjCreate(int type, const sds key, void *ptr, uint32_t keyMetaBits); From 6d25511839989374367a2309e51a643c6c0eb259 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Sun, 1 Feb 2026 13:52:45 +0800 Subject: [PATCH 6/8] temp --- src/object.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/object.h b/src/object.h index 90eb5d36b5b..989165eb1f0 100644 --- a/src/object.h +++ b/src/object.h @@ -125,11 +125,11 @@ typedef struct redisObject kvobj; /* Inline functions to access fields directly from robj pointer (read). * These use atomic read for thread-safety. */ static inline uint32_t robj_get_refcount(const robj *o) { - // return o->flags.refcount; - uint32_t tmp = 0; - atomicGet(o->flags_refcount, tmp); - struct robjFlags *flags = (struct robjFlags*)&tmp; - return flags->refcount; + return o->flags.refcount; + // uint32_t tmp = 0; + // atomicGet(o->flags_refcount, tmp); + // struct robjFlags *flags = (struct robjFlags*)&tmp; + // return flags->refcount; } kvobj *kvobjCreate(int type, const sds key, void *ptr, uint32_t keyMetaBits); From ba34224a64e5755045bfbbea64abef371b216b71 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Sun, 1 Feb 2026 13:58:38 +0800 Subject: [PATCH 7/8] temp --- src/object.c | 43 ++++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/src/object.c b/src/object.c index 6db4d20d56a..41a6f5c79fc 100644 --- a/src/object.c +++ b/src/object.c @@ -594,15 +594,17 @@ void incrRefCount(robj *o) { unsigned int refcount = ((struct robjFlags*)&old_val)->refcount; if (refcount < OBJ_FIRST_SPECIAL_REFCOUNT - 1) { - if (likely(refcount == 1)) { - /* Fast path, only hold by itself. */ - o->flags.refcount++; - } else { - do { - new_val = old_val; - ((struct robjFlags*)&new_val)->refcount++; - } while (!atomicCompareExchange(uint32_t, o->flags_refcount, old_val, new_val)); - } + // if (likely(refcount == 1)) { + // /* Fast path, only hold by itself. */ + // o->flags.refcount++; + // } else { + // do { + // new_val = old_val; + // ((struct robjFlags*)&new_val)->refcount++; + // } while (!atomicCompareExchange(uint32_t, o->flags_refcount, old_val, new_val)); + // } + + o->flags.refcount++; } else { if (refcount == OBJ_SHARED_REFCOUNT) { /* Nothing to do: this refcount is immutable. */ @@ -627,16 +629,19 @@ void decrRefCount(robj *o) { o->type, o->encoding, refcount); } - if (likely(refcount == 1)) { - /* Fast path, only hold by itself. */ - o->flags.refcount = refcount = 0; - } else { - do { - new_val = old_val; - ((struct robjFlags*)&new_val)->refcount--; - } while (!atomicCompareExchange(uint32_t, o->flags_refcount, old_val, new_val)); - refcount = ((struct robjFlags*)&new_val)->refcount; - } + // if (likely(refcount == 1)) { + // /* Fast path, only hold by itself. */ + // o->flags.refcount = refcount = 0; + // } else { + // do { + // new_val = old_val; + // ((struct robjFlags*)&new_val)->refcount--; + // } while (!atomicCompareExchange(uint32_t, o->flags_refcount, old_val, new_val)); + // refcount = ((struct robjFlags*)&new_val)->refcount; + // } + + o->flags.refcount--; + refcount = o->flags.refcount; /* old_val now contains the value before decrement (CAS updates it on failure) */ if (refcount == 0) { From 8cce9679a98352eab68c77f40df4b92ba7e42666 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Sun, 1 Feb 2026 14:36:56 +0800 Subject: [PATCH 8/8] temp --- src/cluster_asm.c | 2 +- src/db.c | 12 ++++++------ src/debug.c | 6 +++--- src/defrag.c | 8 ++++---- src/lazyfree.c | 2 +- src/module.c | 8 ++++---- src/networking.c | 4 ++-- src/object.c | 12 ++++++------ src/object.h | 10 ---------- src/script_lua.c | 2 +- src/server.c | 4 ++-- src/slowlog.c | 2 +- src/t_string.c | 2 +- 13 files changed, 32 insertions(+), 42 deletions(-) diff --git a/src/cluster_asm.c b/src/cluster_asm.c index c10b4458b4a..f114553c929 100644 --- a/src/cluster_asm.c +++ b/src/cluster_asm.c @@ -3439,7 +3439,7 @@ void asmActiveTrimDeleteKey(redisDb *db, robj *keyobj) { debugDelay(asmManager->debug_active_trim_delay); /* The key needs to be converted from static to heap before deletion. */ - int static_key = robj_get_refcount(keyobj) == OBJ_STATIC_REFCOUNT; + int static_key = keyobj->flags.refcount == OBJ_STATIC_REFCOUNT; if (static_key) keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr)); dbDelete(db, keyobj); diff --git a/src/db.c b/src/db.c index 4d0c1326d0d..ecaf83712ae 100644 --- a/src/db.c +++ b/src/db.c @@ -59,7 +59,7 @@ void updateLFU(robj *val) { /* Update LRM when an object is modified. */ void updateLRM(robj *o) { - if (robj_get_refcount(o) == OBJ_SHARED_REFCOUNT) + if (o->flags.refcount == OBJ_SHARED_REFCOUNT) return; if (server.maxmemory_policy & MAXMEMORY_FLAG_LRM) { o->lru = LRU_CLOCK(); @@ -622,8 +622,8 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link if (server.memory_tracking_enabled) oldsize = kvobjAllocSize(old); - if ((robj_get_refcount(old) == 1 && old->encoding != OBJ_ENCODING_EMBSTR) && - (robj_get_refcount(val) == 1 && val->encoding != OBJ_ENCODING_EMBSTR) && (!freeModuleMeta)) + if ((old->flags.refcount == 1 && old->encoding != OBJ_ENCODING_EMBSTR) && + (val->flags.refcount == 1 && val->encoding != OBJ_ENCODING_EMBSTR) && (!freeModuleMeta)) { /* Keep old object in the database. Just swap it's ptr, type and * encoding with the content of val. */ @@ -687,7 +687,7 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link } } - if (server.io_threads_num > 1 && old->encoding == OBJ_ENCODING_RAW && robj_get_refcount(old) == 1) { + if (server.io_threads_num > 1 && old->encoding == OBJ_ENCODING_RAW && old->flags.refcount == 1) { /* In multi-threaded mode, the OBJ_ENCODING_RAW string object usually is * allocated in the IO thread, so we defer the free to the IO thread. * Besides, we never free a string object in BIO threads, so, even with @@ -950,7 +950,7 @@ kvobj *dbUnshareStringValue(redisDb *db, robj *key, kvobj *kv) { * which can be used if we already have one, thus saving the dbFind call. */ kvobj *dbUnshareStringValueByLink(redisDb *db, robj *key, kvobj *o, dictEntryLink link) { serverAssert(o->type == OBJ_STRING); - if (robj_get_refcount(o) != 1 || o->encoding != OBJ_ENCODING_RAW) { + if (o->flags.refcount != 1 || o->encoding != OBJ_ENCODING_RAW) { robj *decoded = getDecodedObject(o); o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr)); decrRefCount(decoded); @@ -2696,7 +2696,7 @@ static void deleteKeyAndPropagate(redisDb *db, robj *keyobj, int notify_type, lo char *notify_name = notify_type == NOTIFY_EXPIRED ? "expired" : "evicted"; /* The key needs to be converted from static to heap before deleted */ - int static_key = robj_get_refcount(keyobj) == OBJ_STATIC_REFCOUNT; + int static_key = keyobj->flags.refcount == OBJ_STATIC_REFCOUNT; if (static_key) { keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr)); } diff --git a/src/debug.c b/src/debug.c index fedd70bc26b..6fdfe634a3b 100644 --- a/src/debug.c +++ b/src/debug.c @@ -685,7 +685,7 @@ NULL "Value at:%p refcount:%d " "encoding:%s serializedlength:%zu " "lru:%d lru_seconds_idle:%llu%s", - (void*)kv, robj_get_refcount(kv), + (void*)kv, kv->flags.refcount, strenc, rdbSavedObjectLen(kv, c->argv[2], c->db->id), kv->lru, estimateObjectIdleTime(kv)/1000, extra); } else if (!strcasecmp(c->argv[1]->ptr,"sdslen") && c->argc == 3) { @@ -1250,14 +1250,14 @@ void _serverAssertPrintClientInfo(const client *c) { arg = buf; } serverLog(LL_WARNING,"client->argv[%d] = \"%s\" (refcount: %d)", - j, arg, robj_get_refcount(c->argv[j])); + j, arg, c->argv[j]->flags.refcount); } } void serverLogObjectDebugInfo(const robj *o) { serverLog(LL_WARNING,"Object type: %u", o->type); serverLog(LL_WARNING,"Object encoding: %u", o->encoding); - serverLog(LL_WARNING,"Object refcount: %d", robj_get_refcount(o)); + serverLog(LL_WARNING,"Object refcount: %d", o->flags.refcount); #if UNSAFE_CRASH_REPORT /* This code is now disabled. o->ptr may be unreliable to print. in some * cases a ziplist could have already been freed by realloc, but not yet diff --git a/src/defrag.c b/src/defrag.c index 765023e13bb..72d29fca362 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -295,7 +295,7 @@ void *activeDefragHfieldAndUpdateRef(void *ptr, void *privdata) { * Note that the caller is responsible for updating any other references to the robj. */ robj *activeDefragStringObEx(robj* ob, unsigned int expected_refcount) { robj *ret = NULL; - if (robj_get_refcount(ob)!=expected_refcount) + if (ob->flags.refcount!=expected_refcount) return NULL; /* try to defrag robj (only if not an EMBSTR type (handled below). */ @@ -1059,7 +1059,7 @@ robj *activeDefragKvobj(kvobj* kv, int without_free) { long offsetEmbstr = LONG_MIN; /* Don't defrag kvobj's with multiple references (refcount > 1) */ - if (robj_get_refcount(kv) != 1) + if (kv->flags.refcount != 1) return NULL; /* Calculate offset for EMBSTR strings */ @@ -1131,7 +1131,7 @@ void defragKey(defragKeysCtx *ctx, dictEntry *de, dictEntryLink link) { /* Only defrag strings with refcount==1 (String might be shared as dict * keys, e.g. pub/sub channels, and may be accessed by IO threads. Other * types are never used as dict keys) */ - if ((robj_get_refcount(ob)==1) && (ob->encoding == OBJ_ENCODING_RAW)) { + if ((ob->flags.refcount==1) && (ob->encoding == OBJ_ENCODING_RAW)) { /* For RAW strings, defrag the separate SDS allocation */ sds newsds = activeDefragSds((sds)ob->ptr); if (newsds) ob->ptr = newsds; @@ -1254,7 +1254,7 @@ void defragPubsubScanCallback(void *privdata, const dictEntry *de, dictEntryLink dict *newclients, *clients = dictGetVal(de); /* Try to defrag the channel name. */ - serverAssert(robj_get_refcount(channel) == dictSize(clients) + 1); + serverAssert(channel->flags.refcount == dictSize(clients) + 1); newchannel = activeDefragStringObEx(channel, dictSize(clients) + 1); if (newchannel) { kvstoreDictSetKey(pubsub_channels, ctx->kvstate.slot, (dictEntry*)de, newchannel); diff --git a/src/lazyfree.c b/src/lazyfree.c index 90cdb1f2eee..d616aead88b 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -187,7 +187,7 @@ void freeObjAsync(robj *key, robj *obj, int dbid) { * possible. This rarely happens, however sometimes the implementation * of parts of the Redis core may call incrRefCount() to protect * objects, and then call dbDelete(). */ - if (free_effort > LAZYFREE_THRESHOLD && robj_get_refcount(obj) == 1) { + if (free_effort > LAZYFREE_THRESHOLD && obj->flags.refcount == 1) { atomicIncr(lazyfree_objects,1); bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj); } else { diff --git a/src/module.c b/src/module.c index af1714656e3..06e1e0e97a4 100644 --- a/src/module.c +++ b/src/module.c @@ -946,7 +946,7 @@ void RedisModuleCommandDispatcher(client *c) { for (int i = 0; i < c->argc; i++) { /* Only do the work if the module took ownership of the object: * in that case the refcount is no longer 1. */ - if (robj_get_refcount(c->argv[i]) > 1) + if (c->argv[i]->flags.refcount > 1) trimStringObjectIfNeeded(c->argv[i], 0); } } @@ -2854,7 +2854,7 @@ void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) { * This API is not thread safe, access to these retained strings (if they originated * from a client command arguments) must be done with GIL locked. */ RedisModuleString* RM_HoldString(RedisModuleCtx *ctx, RedisModuleString *str) { - if (robj_get_refcount(str) == OBJ_STATIC_REFCOUNT) { + if (str->flags.refcount == OBJ_STATIC_REFCOUNT) { return RM_CreateStringFromString(ctx, str); } @@ -2968,7 +2968,7 @@ int RM_StringCompare(const RedisModuleString *a, const RedisModuleString *b) { /* Return the (possibly modified in encoding) input 'str' object if * the string is unshared, otherwise NULL is returned. */ RedisModuleString *moduleAssertUnsharedString(RedisModuleString *str) { - if (robj_get_refcount(str) != 1) { + if (str->flags.refcount != 1) { serverLog(LL_WARNING, "Module attempted to use an in-place string modify operation " "with a string referenced multiple times. Please check the code " @@ -6610,7 +6610,7 @@ robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int argv[argc++] = createStringObject(cstr,strlen(cstr)); } else if (*p == 's') { robj *obj = va_arg(ap,void*); - if (robj_get_refcount(obj) == OBJ_STATIC_REFCOUNT) + if (obj->flags.refcount == OBJ_STATIC_REFCOUNT) obj = createStringObject(obj->ptr,sdslen(obj->ptr)); else incrRefCount(obj); diff --git a/src/networking.c b/src/networking.c index b806702679f..2832af687c1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1223,7 +1223,7 @@ static int isCopyAvoidPreferred(client *c, robj *obj, size_t len) { * to server.pending_push_messages when CLIENT_PUSHING is set. */ if (c->flags & CLIENT_PUSHING) return 0; - if (obj->encoding != OBJ_ENCODING_RAW || robj_get_refcount(obj) >= OBJ_FIRST_SPECIAL_REFCOUNT) return 0; + if (obj->encoding != OBJ_ENCODING_RAW || obj->flags.refcount >= OBJ_FIRST_SPECIAL_REFCOUNT) return 0; /* Copy avoidance is preferred for any string size starting certain number of I/O threads */ if (server.io_threads_num >= COPY_AVOID_MIN_IO_THREADS) return 1; @@ -5517,7 +5517,7 @@ static void reclaimPendingCommand(client *c, pendingCommand *pcmd) { * decrease the reference count to release our reference to it. */ for (int j = 0; j < pcmd->argc; j++) { robj *o = pcmd->argv[j]; - if (o && robj_get_refcount(o) > 1) { + if (o && o->flags.refcount > 1) { decrRefCount(o); pcmd->argv[j] = NULL; } diff --git a/src/object.c b/src/object.c index 41a6f5c79fc..ce822258c6e 100644 --- a/src/object.c +++ b/src/object.c @@ -117,7 +117,7 @@ robj *createObject(int type, void *ptr) { } void initObjectLRUOrLFU(robj *o) { - if (robj_get_refcount(o) == OBJ_SHARED_REFCOUNT) + if (o->flags.refcount == OBJ_SHARED_REFCOUNT) return; /* Set the LRU to the current lruclock (seconds resolution), or * alternatively the LFU counter. */ @@ -141,7 +141,7 @@ void initObjectLRUOrLFU(robj *o) { * */ robj *makeObjectShared(robj *o) { - serverAssert(robj_get_refcount(o) == 1); + serverAssert(o->flags.refcount == 1); o->flags.refcount = OBJ_SHARED_REFCOUNT; return o; } @@ -298,7 +298,7 @@ kvobj *kvobjSet(sds key, robj *val, uint32_t keyMetaBits) { } else { /* Create a new object with embedded key. Reuse ptr if possible. */ void *valptr; - if (robj_get_refcount(val) == 1) { + if (val->flags.refcount == 1) { /* Reuse the ptr. There are no other references to val. */ valptr = val->ptr; val->ptr = NULL; @@ -835,7 +835,7 @@ void dismissObject(robj *o, size_t size_hint) { /* Currently we use zmadvise_dontneed only when we use jemalloc with Linux. * so we avoid these pointless loops when they're not going to do anything. */ #if defined(USE_JEMALLOC) && defined(__linux__) - if (robj_get_refcount(o) != 1) return; + if (o->flags.refcount != 1) return; switch(o->type) { case OBJ_STRING: dismissStringObject(o); break; case OBJ_LIST: dismissListObject(o, size_hint); break; @@ -911,7 +911,7 @@ robj *tryObjectEncodingEx(robj *o, int try_trim) { /* It's not safe to encode shared objects: shared objects can be shared * everywhere in the "object space" of Redis and may end in places where * they are not handled. We handle them only as values in the keyspace. */ - if (robj_get_refcount(o) > 1) return o; + if (o->flags.refcount > 1) return o; /* Check if we can represent this string as a long integer. * Note that we are sure that a string larger than 20 chars is not @@ -1637,7 +1637,7 @@ NULL } else if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) { if ((kv = kvobjCommandLookupOrReply(c, c->argv[2], shared.null[c->resp])) == NULL) return; - addReplyLongLong(c, robj_get_refcount(kv)); + addReplyLongLong(c, kv->flags.refcount); } else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) { if ((kv = kvobjCommandLookupOrReply(c, c->argv[2], shared.null[c->resp])) == NULL) return; diff --git a/src/object.h b/src/object.h index 989165eb1f0..15e41fa49eb 100644 --- a/src/object.h +++ b/src/object.h @@ -122,16 +122,6 @@ typedef struct redisObject robj; /* kvobj: see header comment above for definition and memory layout. */ typedef struct redisObject kvobj; -/* Inline functions to access fields directly from robj pointer (read). - * These use atomic read for thread-safety. */ -static inline uint32_t robj_get_refcount(const robj *o) { - return o->flags.refcount; - // uint32_t tmp = 0; - // atomicGet(o->flags_refcount, tmp); - // struct robjFlags *flags = (struct robjFlags*)&tmp; - // return flags->refcount; -} - kvobj *kvobjCreate(int type, const sds key, void *ptr, uint32_t keyMetaBits); kvobj *kvobjSet(sds key, robj *val, uint32_t keyMetaBits); kvobj *kvobjSetExpire(kvobj *kv, long long expire); diff --git a/src/script_lua.c b/src/script_lua.c index 605997a37af..1b13bdb9070 100644 --- a/src/script_lua.c +++ b/src/script_lua.c @@ -858,7 +858,7 @@ void freeLuaRedisArgv(robj **argv, int argc, int argv_len) { * The object must be small, SDS-encoded, and with refcount = 1 * (we must be the only owner) for us to cache it. */ if (j < LUA_CMD_OBJCACHE_SIZE && - robj_get_refcount(o) == 1 && + o->flags.refcount == 1 && (o->encoding == OBJ_ENCODING_RAW || o->encoding == OBJ_ENCODING_EMBSTR) && sdslen(o->ptr) <= LUA_CMD_OBJCACHE_MAX_LEN) diff --git a/src/server.c b/src/server.c index 32bd84d3ed1..46ace4a3914 100644 --- a/src/server.c +++ b/src/server.c @@ -475,8 +475,8 @@ int dictEncObjKeyCompare(dictCmpCache *cache, const void *key1, const void *key2 * good reasons, because it would incrRefCount() the object, which * is invalid. So we check to make sure dictFind() works with static * objects as well. */ - uint32_t refcount1 = robj_get_refcount(o1); - uint32_t refcount2 = robj_get_refcount(o2); + uint32_t refcount1 = o1->flags.refcount; + uint32_t refcount2 = o2->flags.refcount; if (refcount1 != OBJ_STATIC_REFCOUNT) o1 = getDecodedObject(o1); if (refcount2 != OBJ_STATIC_REFCOUNT) o2 = getDecodedObject(o2); cmp = dictSdsKeyCompare(cache,o1->ptr,o2->ptr); diff --git a/src/slowlog.c b/src/slowlog.c index 3146d02965b..5f469adc50f 100644 --- a/src/slowlog.c +++ b/src/slowlog.c @@ -52,7 +52,7 @@ slowlogEntry *slowlogCreateEntry(client *c, robj **argv, int argc, long long dur (unsigned long) sdslen(argv[j]->ptr) - SLOWLOG_ENTRY_MAX_STRING); se->argv[j] = createObject(OBJ_STRING,s); - } else if (robj_get_refcount(argv[j]) == OBJ_SHARED_REFCOUNT) { + } else if (argv[j]->flags.refcount == OBJ_SHARED_REFCOUNT) { se->argv[j] = argv[j]; } else { /* Here we need to duplicate the string objects composing the diff --git a/src/t_string.c b/src/t_string.c index 652c6aa85d0..fe50bb7fce3 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -815,7 +815,7 @@ void incrDecrCommand(client *c, long long incr) { } value += incr; - if (o && robj_get_refcount(o) == 1 && o->encoding == OBJ_ENCODING_INT && + if (o && o->flags.refcount == 1 && o->encoding == OBJ_ENCODING_INT && value >= LONG_MIN && value <= LONG_MAX) { new = o;