diff --git a/src/aof.c b/src/aof.c index 3ace6701134..c8eaea4d57f 100644 --- a/src/aof.c +++ b/src/aof.c @@ -2423,7 +2423,7 @@ int rewriteObject(rio *r, robj *key, robj *o, int dbid, long long expiretime) { } /* If modules metadata is available */ - if ((getModuleMetaBits(o->metabits)) && (keyMetaOnAof(r, key, o, dbid) == 0)) + if ((getModuleMetaBits(o->flags.metabits)) && (keyMetaOnAof(r, key, o, dbid) == 0)) return C_ERR; return C_OK; diff --git a/src/cluster.c b/src/cluster.c index d07c31c5900..744ce11a919 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -94,7 +94,7 @@ void createDumpPayload(rio *payload, robj *o, robj *key, int dbid, int skip_chec rioInitWithBuffer(payload,sdsempty()); /* Save key metadata if present without (handles TTL separately via command args) */ - if (getModuleMetaBits(o->metabits)) + if (getModuleMetaBits(o->flags.metabits)) serverAssert(rdbSaveKeyMetadata(payload, key, o, dbid) != -1); serverAssert(rdbSaveObjectType(payload,o)); serverAssert(rdbSaveObject(payload,o,key,dbid)); diff --git a/src/cluster_asm.c b/src/cluster_asm.c index a0904537753..f114553c929 100644 --- a/src/cluster_asm.c +++ b/src/cluster_asm.c @@ -3439,7 +3439,7 @@ void asmActiveTrimDeleteKey(redisDb *db, robj *keyobj) { debugDelay(asmManager->debug_active_trim_delay); /* The key needs to be converted from static to heap before deletion. */ - int static_key = keyobj->refcount == OBJ_STATIC_REFCOUNT; + int static_key = keyobj->flags.refcount == OBJ_STATIC_REFCOUNT; if (static_key) keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr)); dbDelete(db, keyobj); diff --git a/src/db.c b/src/db.c index 04d94184f7e..ecaf83712ae 100644 --- a/src/db.c +++ b/src/db.c @@ -59,7 +59,7 @@ void updateLFU(robj *val) { /* Update LRM when an object is modified. */ void updateLRM(robj *o) { - if (o->refcount == OBJ_SHARED_REFCOUNT) + if (o->flags.refcount == OBJ_SHARED_REFCOUNT) return; if (server.maxmemory_policy & MAXMEMORY_FLAG_LRM) { o->lru = LRU_CLOCK(); @@ -591,10 +591,10 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link long long oldExpire = getExpire(db, key->ptr, old); /* All metadata will be kept if not `overwrite` for the new object */ - uint32_t newKeyMetaBits = old->metabits; + uint32_t newKeyMetaBits = old->flags.metabits; /* clear expire if not keepTTL or no old expire */ if ((!keepTTL) || (oldExpire == -1)) - newKeyMetaBits &= ~KEY_META_MASK_EXPIRE; + newKeyMetaBits &= ~KEY_META_MASK_EXPIRE; if (overwrite) { /* On overwrite, discard module metadata excluding expire if set */ @@ -604,7 +604,7 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link incrRefCount(old); /* Free related metadata. Ignore builtin metadata (currently only expire) */ - if (getModuleMetaBits(old->metabits)) { + if (getModuleMetaBits(old->flags.metabits)) { keyMetaOnUnlink(db, key, old); freeModuleMeta = 1; } @@ -622,8 +622,8 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link if (server.memory_tracking_enabled) oldsize = kvobjAllocSize(old); - if ((old->refcount == 1 && old->encoding != OBJ_ENCODING_EMBSTR) && - (val->refcount == 1 && val->encoding != OBJ_ENCODING_EMBSTR) && (!freeModuleMeta)) + if ((old->flags.refcount == 1 && old->encoding != OBJ_ENCODING_EMBSTR) && + (val->flags.refcount == 1 && val->encoding != OBJ_ENCODING_EMBSTR) && (!freeModuleMeta)) { /* Keep old object in the database. Just swap it's ptr, type and * encoding with the content of val. */ @@ -687,7 +687,7 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link } } - if (server.io_threads_num > 1 && old->encoding == OBJ_ENCODING_RAW && old->refcount == 1) { + if (server.io_threads_num > 1 && old->encoding == OBJ_ENCODING_RAW && old->flags.refcount == 1) { /* In multi-threaded mode, the OBJ_ENCODING_RAW string object usually is * allocated in the IO thread, so we defer the free to the IO thread. * Besides, we never free a string object in BIO threads, so, even with @@ -853,7 +853,7 @@ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) { * need to incr to retain kv */ incrRefCount(kv); /* refcnt=1->2 */ /* Metadata hook: notify unlink for key metadata cleanup. */ - if (getModuleMetaBits(kv->metabits)) keyMetaOnUnlink(db, key, kv); + if (getModuleMetaBits(kv->flags.metabits)) keyMetaOnUnlink(db, key, kv); /* Tells the module that the key has been unlinked from the database. */ moduleNotifyKeyUnlink(key, kv, db->id, flags); /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */ @@ -950,7 +950,7 @@ kvobj *dbUnshareStringValue(redisDb *db, robj *key, kvobj *kv) { * which can be used if we already have one, thus saving the dbFind call. */ kvobj *dbUnshareStringValueByLink(redisDb *db, robj *key, kvobj *o, dictEntryLink link) { serverAssert(o->type == OBJ_STRING); - if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) { + if (o->flags.refcount != 1 || o->encoding != OBJ_ENCODING_RAW) { robj *decoded = getDecodedObject(o); o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr)); decrRefCount(decoded); @@ -2166,7 +2166,7 @@ void renameGenericCommand(client *c, int nx) { /* Prepare metadata for the renamed key */ KeyMetaSpec keymeta; keyMetaSpecInit(&keymeta); - if (o->metabits) keyMetaOnRename(c->db, o, c->argv[1], c->argv[2], &keymeta); + if (o->flags.metabits) keyMetaOnRename(c->db, o, c->argv[1], c->argv[2], &keymeta); dbDelete(c->db,c->argv[1]); @@ -2381,7 +2381,7 @@ void copyCommand(client *c) { /* Prepare metadata for the new key */ KeyMetaSpec keymeta; keyMetaSpecInit(&keymeta); - if (o->metabits) keyMetaOnCopy(o, key, newkey, c->db->id, dst->id, &keymeta); + if (o->flags.metabits) keyMetaOnCopy(o, key, newkey, c->db->id, dst->id, &keymeta); kvobj *kvCopy = dbAddInternal(dst, newkey, &newobj, NULL, &keymeta); @@ -2696,7 +2696,7 @@ static void deleteKeyAndPropagate(redisDb *db, robj *keyobj, int notify_type, lo char *notify_name = notify_type == NOTIFY_EXPIRED ? "expired" : "evicted"; /* The key needs to be converted from static to heap before deleted */ - int static_key = keyobj->refcount == OBJ_STATIC_REFCOUNT; + int static_key = keyobj->flags.refcount == OBJ_STATIC_REFCOUNT; if (static_key) { keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr)); } diff --git a/src/debug.c b/src/debug.c index c239bbaf0f6..6fdfe634a3b 100644 --- a/src/debug.c +++ b/src/debug.c @@ -685,7 +685,7 @@ NULL "Value at:%p refcount:%d " "encoding:%s serializedlength:%zu " "lru:%d lru_seconds_idle:%llu%s", - (void*)kv, kv->refcount, + (void*)kv, kv->flags.refcount, strenc, rdbSavedObjectLen(kv, c->argv[2], c->db->id), kv->lru, estimateObjectIdleTime(kv)/1000, extra); } else if (!strcasecmp(c->argv[1]->ptr,"sdslen") && c->argc == 3) { @@ -1250,14 +1250,14 @@ void _serverAssertPrintClientInfo(const client *c) { arg = buf; } serverLog(LL_WARNING,"client->argv[%d] = \"%s\" (refcount: %d)", - j, arg, c->argv[j]->refcount); + j, arg, c->argv[j]->flags.refcount); } } void serverLogObjectDebugInfo(const robj *o) { serverLog(LL_WARNING,"Object type: %u", o->type); serverLog(LL_WARNING,"Object encoding: %u", o->encoding); - serverLog(LL_WARNING,"Object refcount: %d", o->refcount); + serverLog(LL_WARNING,"Object refcount: %d", o->flags.refcount); #if UNSAFE_CRASH_REPORT /* This code is now disabled. o->ptr may be unreliable to print. in some * cases a ziplist could have already been freed by realloc, but not yet diff --git a/src/defrag.c b/src/defrag.c index b058dfd01c5..72d29fca362 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -293,9 +293,9 @@ void *activeDefragHfieldAndUpdateRef(void *ptr, void *privdata) { * reference count is not 1, in these cases, the caller must explicitly pass * in the reference count, otherwise defragmentation will not be performed. * Note that the caller is responsible for updating any other references to the robj. */ -robj *activeDefragStringObEx(robj* ob, int expected_refcount) { +robj *activeDefragStringObEx(robj* ob, unsigned int expected_refcount) { robj *ret = NULL; - if (ob->refcount!=expected_refcount) + if (ob->flags.refcount!=expected_refcount) return NULL; /* try to defrag robj (only if not an EMBSTR type (handled below). */ @@ -1059,7 +1059,7 @@ robj *activeDefragKvobj(kvobj* kv, int without_free) { long offsetEmbstr = LONG_MIN; /* Don't defrag kvobj's with multiple references (refcount > 1) */ - if (kv->refcount != 1) + if (kv->flags.refcount != 1) return NULL; /* Calculate offset for EMBSTR strings */ @@ -1131,7 +1131,7 @@ void defragKey(defragKeysCtx *ctx, dictEntry *de, dictEntryLink link) { /* Only defrag strings with refcount==1 (String might be shared as dict * keys, e.g. pub/sub channels, and may be accessed by IO threads. Other * types are never used as dict keys) */ - if ((ob->refcount==1) && (ob->encoding == OBJ_ENCODING_RAW)) { + if ((ob->flags.refcount==1) && (ob->encoding == OBJ_ENCODING_RAW)) { /* For RAW strings, defrag the separate SDS allocation */ sds newsds = activeDefragSds((sds)ob->ptr); if (newsds) ob->ptr = newsds; @@ -1254,7 +1254,7 @@ void defragPubsubScanCallback(void *privdata, const dictEntry *de, dictEntryLink dict *newclients, *clients = dictGetVal(de); /* Try to defrag the channel name. */ - serverAssert(channel->refcount == (int)dictSize(clients) + 1); + serverAssert(channel->flags.refcount == dictSize(clients) + 1); newchannel = activeDefragStringObEx(channel, dictSize(clients) + 1); if (newchannel) { kvstoreDictSetKey(pubsub_channels, ctx->kvstate.slot, (dictEntry*)de, newchannel); diff --git a/src/iothread.c b/src/iothread.c index 54a9cae1eb2..e91425ebe23 100644 --- a/src/iothread.c +++ b/src/iothread.c @@ -183,8 +183,6 @@ void keepClientInMainThread(client *c) { c->io_flags |= CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED; c->tid = IOTHREAD_MAIN_THREAD_ID; freeClientDeferredObjects(c, 1); /* Free deferred objects. */ - freeClientIODeferredObjects(c, 1); /* Free IO deferred objects. */ - tryUnlinkClientFromPendingRefReply(c, 0); /* Main thread starts to manage it. */ server.io_threads_clients_num[c->tid]++; } @@ -586,10 +584,6 @@ int processClientsFromIOThread(IOThread *t) { /* Let main thread to run it, set running thread id first. */ c->running_tid = IOTHREAD_MAIN_THREAD_ID; - /* Free objects queued by IO thread for deferred freeing. */ - freeClientIODeferredObjects(c, 0); - tryUnlinkClientFromPendingRefReply(c, 0); - /* If a read error occurs, handle it in the main thread first, since we * want to print logs about client information before freeing. */ if (isClientReadErrorFatal(c)) handleClientReadError(c); diff --git a/src/keymeta.c b/src/keymeta.c index 530ea049300..4e37db91f0e 100644 --- a/src/keymeta.c +++ b/src/keymeta.c @@ -178,13 +178,14 @@ void keyMetaOnCopy(kvobj *kv, robj *srcKey, robj *dstKey, int srcDbId, int dstDb KeyMetaSpec *keymeta) { uint64_t *pMeta = ((uint64_t *)kv) - 1; - if (kv->metabits & KEY_META_MASK_EXPIRE) { + uint32_t kv_metabits = kv->flags.metabits; + if (kv_metabits & KEY_META_MASK_EXPIRE) { if (*pMeta != KM_EXPIRE_RESET_VALUE) keyMetaSpecAdd(keymeta, KEY_META_ID_EXPIRE, *pMeta); pMeta--; } - uint32_t mbits = kv->metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbits = kv_metabits >> KEY_META_ID_MODULE_FIRST; if (likely(mbits == 0)) return; int keyMetaId = KEY_META_ID_MODULE_FIRST; @@ -207,17 +208,18 @@ void keyMetaOnCopy(kvobj *kv, robj *srcKey, robj *dstKey, int srcDbId, int dstDb /* Prepare metadata spec for rename of `kv` */ void keyMetaOnRename(struct redisDb *db, kvobj *kv, robj *oldKey, robj *newKey, KeyMetaSpec *kms) { uint64_t *pMeta = ((uint64_t *)kv) - 1; + uint32_t kv_metabits = kv->flags.metabits; /* Handle builtin expire: add only if set and value != -1, but always advance * the pointer when the expire bit is set since the slot exists either way. */ - if (kv->metabits & KEY_META_MASK_EXPIRE) { + if (kv_metabits & KEY_META_MASK_EXPIRE) { if (*pMeta != KM_EXPIRE_RESET_VALUE) keyMetaSpecAdd(kms, KEY_META_ID_EXPIRE, *pMeta); pMeta--; /* skip expire slot */ } /* Process module metadata. Default on rename: keep if no callback. */ - uint32_t mbits = kv->metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbits = kv_metabits >> KEY_META_ID_MODULE_FIRST; if (likely(mbits == 0)) return; int keyMetaId = KEY_META_ID_MODULE_FIRST; @@ -244,17 +246,18 @@ void keyMetaOnRename(struct redisDb *db, kvobj *kv, robj *oldKey, robj *newKey, /* Prepare metadata spec for move of `kv` from srcDbId to dstDbId */ void keyMetaOnMove(kvobj *kv, robj *key, int srcDbId, int dstDbId, KeyMetaSpec *kms) { uint64_t *pMeta = ((uint64_t *)kv) - 1; + uint32_t kv_metabits = kv->flags.metabits; /* Handle builtin expire: add only if set and value != -1, but always advance * the pointer when the expire bit is set since the slot exists either way. */ - if (kv->metabits & KEY_META_MASK_EXPIRE) { + if (kv_metabits & KEY_META_MASK_EXPIRE) { if (*pMeta != KM_EXPIRE_RESET_VALUE) keyMetaSpecAdd(kms, KEY_META_ID_EXPIRE, *pMeta); pMeta--; /* skip expire slot */ } /* Process module metadata. Default on move: keep if no callback. */ - uint32_t mbits = kv->metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbits = kv_metabits >> KEY_META_ID_MODULE_FIRST; if (likely(mbits == 0)) return; int keyMetaId = KEY_META_ID_MODULE_FIRST; @@ -292,11 +295,12 @@ void keyMetaOnMove(kvobj *kv, robj *key, int srcDbId, int dstDbId, KeyMetaSpec * void keyMetaOnUnlink(redisDb *db, robj *key, kvobj *kv) { /* Skip builtin expire slot if present; no action for expire itself here. */ uint64_t *pMeta = ((uint64_t *)kv) - 1; - if (kv->metabits & KEY_META_MASK_EXPIRE) + uint32_t kv_metabits = kv->flags.metabits; + if (kv_metabits & KEY_META_MASK_EXPIRE) pMeta--; /* Iterate module metadata and invoke per-class unlink if provided. */ - uint32_t mbits = kv->metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbits = kv_metabits >> KEY_META_ID_MODULE_FIRST; if (likely(mbits == 0)) return; /* Build operation context for modules: from_key = key name, to_key = NULL. */ @@ -333,11 +337,12 @@ void keyMetaOnUnlink(redisDb *db, robj *key, kvobj *kv) { void keyMetaOnFree(kvobj *kv) { /* Skip builtin expire slot if present; no action needed for expire itself. */ uint64_t *pMeta = ((uint64_t *)kv) - 1; - if (kv->metabits & KEY_META_MASK_EXPIRE) + uint32_t kv_metabits = kv->flags.metabits; + if (kv_metabits & KEY_META_MASK_EXPIRE) pMeta--; /* Iterate module metadata and invoke per-class free if provided. */ - uint32_t mbits = kv->metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbits = kv_metabits >> KEY_META_ID_MODULE_FIRST; if (likely(mbits == 0)) return; int keyMetaId = KEY_META_ID_MODULE_FIRST; @@ -558,14 +563,15 @@ int rdbLoadKeyMetadata(rio *rdb, int dbid, int numClasses, KeyMetaSpec *kms) { * Returns -1 on error, 0 on success. */ int rdbSaveKeyMetadata(rio *rdb, robj *key, kvobj *kv, int dbid) { + uint32_t kv_metabits = kv->flags.metabits; /* Check if there are any module metadata bits set */ - uint32_t mbits = kv->metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbits = kv_metabits >> KEY_META_ID_MODULE_FIRST; if (likely(mbits == 0)) return 0; /* No module metadata */ /* Skip builtin expire slot if present */ uint64_t *pMeta = ((uint64_t *)kv) - 1; - if (kv->metabits & KEY_META_MASK_EXPIRE) + if (kv_metabits & KEY_META_MASK_EXPIRE) pMeta--; /* Create temporary buffer for payload (class data only, no headers) */ @@ -650,11 +656,12 @@ int rdbSaveKeyMetadata(rio *rdb, robj *key, kvobj *kv, int dbid) { int keyMetaOnAof(rio *r, robj *key, kvobj *kv, int dbid) { /* Skip builtin expire slot if present; no action needed for expire itself. */ uint64_t *pMeta = ((uint64_t *)kv) - 1; - if (kv->metabits & KEY_META_MASK_EXPIRE) + uint32_t kv_metabits = kv->flags.metabits; + if (kv_metabits & KEY_META_MASK_EXPIRE) pMeta--; /* Iterate module metadata and invoke per-class aof_rewrite if provided */ - uint32_t mbits = kv->metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbits = kv_metabits >> KEY_META_ID_MODULE_FIRST; if (likely(mbits == 0)) return 1; int keyMetaId = KEY_META_ID_MODULE_FIRST; @@ -686,17 +693,19 @@ int keyMetaOnAof(rio *r, robj *key, kvobj *kv, int dbid) { /* Move entire metadata from old to new kvobj as is */ void keyMetaTransition(kvobj *kvOld, kvobj *kvNew) { + uint32_t kvOld_metabits = kvOld->flags.metabits; + uint32_t kvNew_metabits = kvNew->flags.metabits; /* Precondition: */ - debugServerAssert(kvOld->metabits>>KEY_META_ID_MODULE_FIRST); - + debugServerAssert(kvOld_metabits>>KEY_META_ID_MODULE_FIRST); + /* Skip builtin expire slot if present; no action needed for expire itself. */ uint64_t *pMetaOld = ((uint64_t *)kvOld) - 1; - if (kvOld->metabits & KEY_META_MASK_EXPIRE) pMetaOld--; + if (kvOld_metabits & KEY_META_MASK_EXPIRE) pMetaOld--; uint64_t *pMetaNew = ((uint64_t *)kvNew) - 1; - if (kvNew->metabits & KEY_META_MASK_EXPIRE) pMetaNew--; - - uint32_t mbitsOld = kvOld->metabits >> KEY_META_ID_MODULE_FIRST; - uint32_t mbitsNew = kvNew->metabits >> KEY_META_ID_MODULE_FIRST; + if (kvNew_metabits & KEY_META_MASK_EXPIRE) pMetaNew--; + + uint32_t mbitsOld = kvOld_metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbitsNew = kvNew_metabits >> KEY_META_ID_MODULE_FIRST; if (likely(mbitsOld == 0)) return; int keyMetaId = KEY_META_ID_MODULE_FIRST; do { @@ -799,7 +808,7 @@ kvobj *keyMetaSetMetadata(redisDb *db, kvobj *kv, KeyMetaClassId id, uint64_t me return NULL; /* If metadata already attached, just update it in place. */ - if (kv->metabits & (1u << id)) { + if (kv->flags.metabits & (1u << id)) { *kvobjMetaRef(kv, id) = metadata; return kv; } @@ -838,7 +847,7 @@ kvobj *keyMetaSetMetadata(redisDb *db, kvobj *kv, KeyMetaClassId id, uint64_t me size_t oldsize = 0; if (server.memory_tracking_enabled) oldsize = kvobjAllocSize(kv); - kv = kvobjSet(key, kv, kv->metabits | (1u << id)); + kv = kvobjSet(key, kv, kv->flags.metabits | (1u << id)); kvstoreDictSetAtLink(db->keys, slot, kv, &keyLink, 0); if (server.memory_tracking_enabled) updateSlotAllocSize(db, slot, kv, oldsize, kvobjAllocSize(kv)); @@ -866,7 +875,7 @@ int keyMetaGetMetadata(KeyMetaClassId kmcId, kvobj *kv, uint64_t *metadata) { if (keyMetaClass[kmcId].state != CLASS_STATE_INUSE) return 0; - if (!(kv->metabits & (1u << kmcId))) + if (!(kv->flags.metabits & (1u << kmcId))) return 0; /* metadata not attached */ *metadata = *kvobjMetaRef(kv, kmcId); @@ -919,16 +928,17 @@ static void keyMetaSpecAddUnordered(KeyMetaSpec *keymeta, int metaid, uint64_t m /* Blindly reset modules metadata values to reset_value */ void keyMetaResetModuleValues(kvobj *kv) { + uint32_t kv_metabits = kv->flags.metabits; /* Precondition: only called for module metadata (bits 1-7) */ - debugServerAssert(kv->metabits & KEY_META_MASK_MODULES); + debugServerAssert(kv_metabits & KEY_META_MASK_MODULES); /* Skip expire slot (bit 0) if present, start directly at module metadata */ uint64_t *pMeta = ((uint64_t *)kv) - 1; - if (kv->metabits & KEY_META_MASK_EXPIRE) + if (kv_metabits & KEY_META_MASK_EXPIRE) pMeta--; /* Process only module metadata bits (1-7) */ - uint32_t mbits = kv->metabits >> KEY_META_ID_MODULE_FIRST; + uint32_t mbits = kv_metabits >> KEY_META_ID_MODULE_FIRST; int keyMetaId = KEY_META_ID_MODULE_FIRST; do { if (mbits & 1) diff --git a/src/keymeta.h b/src/keymeta.h index 43b4242313c..cd606ea856e 100644 --- a/src/keymeta.h +++ b/src/keymeta.h @@ -149,10 +149,11 @@ static inline uint32_t getModuleMetaBits(uint16_t metabits); /********** Inline functions **********/ static inline void keyMetaResetValues(kvobj *kv) { - if (unlikely(kv->metabits & KEY_META_MASK_MODULES)) + uint32_t metabits = kv->flags.metabits; + if (unlikely(metabits & KEY_META_MASK_MODULES)) keyMetaResetModuleValues(kv); /* Must be first meta (optimized) */ - if (kv->metabits & KEY_META_MASK_EXPIRE) + if (metabits & KEY_META_MASK_EXPIRE) ((uint64_t *)kv)[-1] = -1; } diff --git a/src/lazyfree.c b/src/lazyfree.c index 1960be1233f..d616aead88b 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -187,7 +187,7 @@ void freeObjAsync(robj *key, robj *obj, int dbid) { * possible. This rarely happens, however sometimes the implementation * of parts of the Redis core may call incrRefCount() to protect * objects, and then call dbDelete(). */ - if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) { + if (free_effort > LAZYFREE_THRESHOLD && obj->flags.refcount == 1) { atomicIncr(lazyfree_objects,1); bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj); } else { @@ -195,88 +195,6 @@ void freeObjAsync(robj *key, robj *obj, int dbid) { } } -/* Duplicate client reply objects that reference database objects to avoid race - * conditions with bio threads during async flushdb. - * - * Since incrRefCount/decrRefCount are not thread-safe, and bio thread may - * free database objects while main thread/IO threads send client replies, we need to - * create independent copies of the string objects to avoid concurrent access. */ -static void protectClientReplyObjects(void) { - /* If there are no clients with pending ref replies, exit ASAP. */ - if (!listLength(server.clients_with_pending_ref_reply)) - return; - - /* Pause all IO threads to safely duplicate string objects. */ - int allpaused = 0; - if (server.io_threads_num > 1) { - serverAssert(pthread_equal(server.main_thread_id, pthread_self())); - allpaused = 1; - pauseAllIOThreads(); - } - - listNode *ln; - listIter li; - listRewind(server.clients_with_pending_ref_reply, &li); - while ((ln = listNext(&li)) != NULL) { - client *c = listNodeValue(ln); - - /* Process c->buf if it's encoded */ - if (c->buf_encoded && c->bufpos > 0) { - char *ptr = c->buf; - while (ptr < c->buf + c->bufpos) { - payloadHeader *header = (payloadHeader *)ptr; - ptr += sizeof(payloadHeader); - - if (header->payload_type == BULK_STR_REF) { - bulkStrRef *str_ref = (bulkStrRef *)ptr; - if (str_ref->obj != NULL) { - /* Duplicate the string object */ - robj *new_obj = dupStringObject(str_ref->obj); - decrRefCount(str_ref->obj); - str_ref->obj = new_obj; - } - } - ptr += header->payload_len; - } - } - - /* Process reply list */ - if (c->reply && listLength(c->reply)) { - listIter reply_li; - listNode *reply_ln; - listRewind(c->reply, &reply_li); - while ((reply_ln = listNext(&reply_li))) { - clientReplyBlock *block = listNodeValue(reply_ln); - if (block && block->buf_encoded) { - char *ptr = block->buf; - while (ptr < block->buf + block->used) { - payloadHeader *header = (payloadHeader *)ptr; - ptr += sizeof(payloadHeader); - - if (header->payload_type == BULK_STR_REF) { - bulkStrRef *str_ref = (bulkStrRef *)ptr; - if (str_ref->obj != NULL) { - /* Duplicate the string object */ - robj *new_obj = dupStringObject(str_ref->obj); - decrRefCount(str_ref->obj); - str_ref->obj = new_obj; - } - } - ptr += header->payload_len; - } - } - } - } - - /* Process references in IO deferred objects and remove client from - * pending ref list since all refs have been duplicated above. */ - freeClientIODeferredObjects(c, 0); - tryUnlinkClientFromPendingRefReply(c, 1); - } - - if (allpaused) resumeAllIOThreads(); -} - /* Empty a Redis DB asynchronously. What the function does actually is to * create a new empty set of hash tables and scheduling the old ones for * lazy freeing. */ @@ -292,7 +210,6 @@ void emptyDbAsync(redisDb *db) { db->keys = kvstoreCreate(&kvstoreExType, &dbDictType, slot_count_bits, flags); db->expires = kvstoreCreate(&kvstoreBaseType, &dbExpiresDictType, slot_count_bits, flags); db->subexpires = estoreCreate(&subexpiresBucketsType, slot_count_bits); - protectClientReplyObjects(); /* Protect client reply objects before async free. */ emptyDbDataAsync(oldkeys, oldexpires, oldsubexpires); } diff --git a/src/module.c b/src/module.c index 6ab92b62532..06e1e0e97a4 100644 --- a/src/module.c +++ b/src/module.c @@ -946,7 +946,7 @@ void RedisModuleCommandDispatcher(client *c) { for (int i = 0; i < c->argc; i++) { /* Only do the work if the module took ownership of the object: * in that case the refcount is no longer 1. */ - if (c->argv[i]->refcount > 1) + if (c->argv[i]->flags.refcount > 1) trimStringObjectIfNeeded(c->argv[i], 0); } } @@ -2854,7 +2854,7 @@ void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) { * This API is not thread safe, access to these retained strings (if they originated * from a client command arguments) must be done with GIL locked. */ RedisModuleString* RM_HoldString(RedisModuleCtx *ctx, RedisModuleString *str) { - if (str->refcount == OBJ_STATIC_REFCOUNT) { + if (str->flags.refcount == OBJ_STATIC_REFCOUNT) { return RM_CreateStringFromString(ctx, str); } @@ -2968,7 +2968,7 @@ int RM_StringCompare(const RedisModuleString *a, const RedisModuleString *b) { /* Return the (possibly modified in encoding) input 'str' object if * the string is unshared, otherwise NULL is returned. */ RedisModuleString *moduleAssertUnsharedString(RedisModuleString *str) { - if (str->refcount != 1) { + if (str->flags.refcount != 1) { serverLog(LL_WARNING, "Module attempted to use an in-place string modify operation " "with a string referenced multiple times. Please check the code " @@ -6610,7 +6610,7 @@ robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int argv[argc++] = createStringObject(cstr,strlen(cstr)); } else if (*p == 's') { robj *obj = va_arg(ap,void*); - if (obj->refcount == OBJ_STATIC_REFCOUNT) + if (obj->flags.refcount == OBJ_STATIC_REFCOUNT) obj = createStringObject(obj->ptr,sdslen(obj->ptr)); else incrRefCount(obj); diff --git a/src/networking.c b/src/networking.c index 8dd6ac9861d..2832af687c1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -176,9 +176,6 @@ client *createClient(connection *conn) { c->original_argv = NULL; c->deferred_objects = NULL; c->deferred_objects_num = 0; - c->io_deferred_objects = NULL; - c->io_deferred_objects_num = 0; - c->io_deferred_objects_size = 0; c->cmd = c->lastcmd = c->realcmd = c->lookedcmd = NULL; c->cur_script = NULL; c->multibulklen = 0; @@ -519,15 +516,6 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) { _addReplyPayloadToList(c, c->reply, s + reply_len, len - reply_len, PLAIN_REPLY); } -/* Check if the client's pending_ref_reply_node is currently linked in the list. - * A node is considered linked if it has neighbors (prev/next), or if it's the - * only node in the list (head points to it). */ -static inline int clientIsInPendingRefReplyList(client *c) { - return listNextNode(&c->pending_ref_reply_node) != NULL || - listPrevNode(&c->pending_ref_reply_node) != NULL || - listFirst(server.clients_with_pending_ref_reply) == &c->pending_ref_reply_node; -} - /* Increment reference to object and add pointer to object and * pointer to string itself to current reply buffer */ static void _addBulkStrRefToBufferOrList(client *c, robj *obj, size_t len) { @@ -556,11 +544,6 @@ static void _addBulkStrRefToBufferOrList(client *c, robj *obj, size_t len) { if (!_addBulkStrRefToBuffer(c, (void *)&str_ref, sizeof(str_ref))) { _addReplyPayloadToList(c, c->reply, (void *)&str_ref, sizeof(str_ref), BULK_STR_REF); } - - /* Track clients with pending referenced reply objects for async flushdb protection. */ - if (!clientIsInPendingRefReplyList(c)) { - listLinkNodeTail(server.clients_with_pending_ref_reply, &c->pending_ref_reply_node); - } } /* ----------------------------------------------------------------------------- @@ -1240,7 +1223,7 @@ static int isCopyAvoidPreferred(client *c, robj *obj, size_t len) { * to server.pending_push_messages when CLIENT_PUSHING is set. */ if (c->flags & CLIENT_PUSHING) return 0; - if (obj->encoding != OBJ_ENCODING_RAW || obj->refcount >= OBJ_FIRST_SPECIAL_REFCOUNT) return 0; + if (obj->encoding != OBJ_ENCODING_RAW || obj->flags.refcount >= OBJ_FIRST_SPECIAL_REFCOUNT) return 0; /* Copy avoidance is preferred for any string size starting certain number of I/O threads */ if (server.io_threads_num >= COPY_AVOID_MIN_IO_THREADS) return 1; @@ -1711,48 +1694,6 @@ void freeClientDeferredObjects(client *c, int free_array) { } } -/* Queue an robj to be freed by the main thread when client returns from IO thread. - * This is used in IO thread write path to avoid refcount race conditions. */ -#define IO_DEFERRED_OBJECTS_INIT_SIZE 8 -void ioDeferFreeRobj(client *c, robj *obj) { - if (c->io_deferred_objects_num >= c->io_deferred_objects_size) { - int new_size = !c->io_deferred_objects_size ? - IO_DEFERRED_OBJECTS_INIT_SIZE : c->io_deferred_objects_size * 2; - c->io_deferred_objects = zrealloc(c->io_deferred_objects, new_size * sizeof(robj *)); - c->io_deferred_objects_size = new_size; - } - c->io_deferred_objects[c->io_deferred_objects_num++] = obj; -} - -/* Free all objects queued by IO thread for deferred freeing. - * Called by main thread when client returns from IO thread. - * If free_array is true then free the array itself as well. */ -void freeClientIODeferredObjects(client *c, int free_array) { - if (!c->conn) return; - - for (int i = 0; i < c->io_deferred_objects_num; i++) { - robj *obj = c->io_deferred_objects[i]; - decrRefCount(obj); - } - - if (!free_array) { - /* If the utilization rate is less than 1/4, reduce the size to 1/2 to avoid thrashing */ - if (c->io_deferred_objects_size > IO_DEFERRED_OBJECTS_INIT_SIZE && - c->io_deferred_objects_num * 4 < c->io_deferred_objects_size) - { - int new_size = c->io_deferred_objects_size / 2; - c->io_deferred_objects = zrealloc(c->io_deferred_objects, new_size * sizeof(robj *)); - c->io_deferred_objects_size = new_size; - } - c->io_deferred_objects_num = 0; - } else { - zfree(c->io_deferred_objects); - c->io_deferred_objects = NULL; - c->io_deferred_objects_num = 0; - c->io_deferred_objects_size = 0; - } -} - void freeClientOriginalArgv(client *c) { /* We didn't rewrite this client */ if (!c->original_argv) return; @@ -1903,19 +1844,6 @@ void unlinkClient(client *c) { if (c->flags & CLIENT_TRACKING) disableTracking(c); } -/* Remove client from the list of clients with pending referenced replies. - * This is called when the client has finished sending all pending replies, - * or when the client is being freed. - * - * If 'force' is true, the client is removed unconditionally. - * This should only be used when we are certain that the replies no longer - * contain any referenced robj. */ -void tryUnlinkClientFromPendingRefReply(client *c, int force) { - if (clientIsInPendingRefReplyList(c) && (force || !clientHasPendingReplies(c))) { - listUnlinkNode(server.clients_with_pending_ref_reply, &c->pending_ref_reply_node); - } -} - /* Clear the client state to resemble a newly connected client. */ void clearClientConnectionState(client *c) { listNode *ln; @@ -1999,8 +1927,7 @@ static void resetReusableQueryBuf(client *c) { /* Release references to string objects inside an encoded buffer. * If running in IO thread, defer the free to main thread via io_deferred_objects. */ -static void releaseBufReferences(client *c, char *buf, size_t bufpos) { - int in_io_thread = (c && c->running_tid != IOTHREAD_MAIN_THREAD_ID); +static void releaseBufReferences(char *buf, size_t bufpos) { char *ptr = buf; while (ptr < buf + bufpos) { payloadHeader *header = (payloadHeader *)ptr; @@ -2010,10 +1937,7 @@ static void releaseBufReferences(client *c, char *buf, size_t bufpos) { bulkStrRef *str_ref = (bulkStrRef *)ptr; /* Only release if not already released. */ if (str_ref->obj != NULL) { - if (in_io_thread) - ioDeferFreeRobj(c, str_ref->obj); - else - decrRefCount(str_ref->obj); + decrRefCount(str_ref->obj); str_ref->obj = NULL; } } else { @@ -2027,7 +1951,7 @@ static void releaseBufReferences(client *c, char *buf, size_t bufpos) { /* Release all references to string objects in all encoded buffers */ static void releaseAllBufReferences(client *c) { if (c->buf_encoded) { - releaseBufReferences(c, c->buf, c->bufpos); + releaseBufReferences(c->buf, c->bufpos); } listIter iter; @@ -2036,7 +1960,7 @@ static void releaseAllBufReferences(client *c) { while ((next = listNext(&iter))) { clientReplyBlock *o = (clientReplyBlock *)listNodeValue(next); if (o && o->buf_encoded) { - releaseBufReferences(c, o->buf, o->used); + releaseBufReferences(o->buf, o->used); } } } @@ -2143,8 +2067,6 @@ void freeClient(client *c) { freeReplicaReferencedReplBuffer(c); freeClientOriginalArgv(c); freeClientDeferredObjects(c, 1); - freeClientIODeferredObjects(c, 1); - tryUnlinkClientFromPendingRefReply(c, 1); if (c->deferred_reply_errors) listRelease(c->deferred_reply_errors); #ifdef LOG_REQ_RES @@ -2399,10 +2321,9 @@ static void processEncodedBufferForWrite(ReplyIOV *reply_iov, char *start_ptr, c /* Process sent data in the encoded buffer. * Returns pointer to the current payload header being processed, or NULL if all data is processed. * If running in IO thread, defer the free to main thread via io_deferred_objects. */ -static payloadHeader *processSentDataInEncodedBuffer(client *c, char *start_ptr, char *end_ptr, +static payloadHeader *processSentDataInEncodedBuffer(char *start_ptr, char *end_ptr, size_t *sentlen, ssize_t *remaining) { - int in_io_thread = (c && c->running_tid != IOTHREAD_MAIN_THREAD_ID); char *ptr = start_ptr; while (ptr < end_ptr && *remaining > 0) { payloadHeader *head = (payloadHeader *)ptr; @@ -2426,11 +2347,7 @@ static payloadHeader *processSentDataInEncodedBuffer(client *c, char *start_ptr, return head; } *remaining -= (writen_len - *sentlen); - if (in_io_thread) { - ioDeferFreeRobj(c, str_ref->obj); - } else { - decrRefCount(str_ref->obj); - } + decrRefCount(str_ref->obj); str_ref->obj = NULL; /* Mark as released to prevent double free */ *sentlen = 0; } @@ -2523,7 +2440,7 @@ static int _writevToClient(client *c, ssize_t *nwritten) { } else { /* For encoded buffers */ char *start_ptr = c->last_header ? (char *)c->last_header : c->buf; - c->last_header = processSentDataInEncodedBuffer(c, start_ptr, c->buf + c->bufpos, &c->sentlen, &remaining); + c->last_header = processSentDataInEncodedBuffer(start_ptr, c->buf + c->bufpos, &c->sentlen, &remaining); if (!c->last_header) { /* reach end */ c->bufpos = 0; c->buf_encoded = 0; @@ -2552,7 +2469,7 @@ static int _writevToClient(client *c, ssize_t *nwritten) { } else { /* Encoded reply block */ char *start_ptr = c->last_header ? (char *)c->last_header : o->buf; - c->last_header = processSentDataInEncodedBuffer(c, start_ptr, o->buf + o->used, &c->sentlen, &remaining); + c->last_header = processSentDataInEncodedBuffer(start_ptr, o->buf + o->used, &c->sentlen, &remaining); if (!c->last_header) { /* reach end */ /* Block fully consumed, remove it */ c->reply_bytes -= o->size; @@ -2744,10 +2661,6 @@ int writeToClient(client *c, int handler_installed) { return C_ERR; } - /* Remove client from pending referenced reply clients list. */ - if (c->running_tid == IOTHREAD_MAIN_THREAD_ID) - tryUnlinkClientFromPendingRefReply(c, 1); - /* If replica client has sent all the replication data it knows about * we send it to main thread so it can pick up new repl data ASAP. * Note, that we keep it in IO thread in case we have a pending ACK read. */ @@ -5604,7 +5517,7 @@ static void reclaimPendingCommand(client *c, pendingCommand *pcmd) { * decrease the reference count to release our reference to it. */ for (int j = 0; j < pcmd->argc; j++) { robj *o = pcmd->argv[j]; - if (o && o->refcount > 1) { + if (o && o->flags.refcount > 1) { decrRefCount(o); pcmd->argv[j] = NULL; } diff --git a/src/object.c b/src/object.c index d4915f2d675..ce822258c6e 100644 --- a/src/object.c +++ b/src/object.c @@ -25,7 +25,7 @@ /* Map a metadata ID (bit index) to its compacted slot number among set bits, * then return a pointer to that slot. Caller must ensure the ID bit is set. */ uint64_t *kvobjMetaRef(kvobj *kv, int metaId) { - uint32_t bits = kv->metabits; + uint32_t bits = kv->flags.metabits; /* Expiry is always the first metadata */ if (likely(metaId == 0)) return ((uint64_t *)kv) - 1; @@ -86,10 +86,10 @@ kvobj *kvobjCreate(int type, const sds key, void *ptr, uint32_t keyMetaBits) { kv->type = type; kv->encoding = OBJ_ENCODING_RAW; kv->ptr = ptr; - kv->refcount = 1; kv->lru = 0; - kv->iskvobj = 1; - kv->metabits = keyMetaBits; + kv->flags.iskvobj = 1; + kv->flags.metabits = keyMetaBits; + kv->flags.refcount = 1; /* The memory after the struct where we embedded data. */ char *data = (void *)(kv + 1); @@ -109,15 +109,15 @@ robj *createObject(int type, void *ptr) { o->type = type; o->encoding = OBJ_ENCODING_RAW; o->ptr = ptr; - o->refcount = 1; o->lru = 0; - o->iskvobj = 0; - o->metabits = 0; + o->flags.iskvobj = 0; + o->flags.metabits = 0; + o->flags.refcount = 1; return o; } void initObjectLRUOrLFU(robj *o) { - if (o->refcount == OBJ_SHARED_REFCOUNT) + if (o->flags.refcount == OBJ_SHARED_REFCOUNT) return; /* Set the LRU to the current lruclock (seconds resolution), or * alternatively the LFU counter. */ @@ -141,8 +141,8 @@ void initObjectLRUOrLFU(robj *o) { * */ robj *makeObjectShared(robj *o) { - serverAssert(o->refcount == 1); - o->refcount = OBJ_SHARED_REFCOUNT; + serverAssert(o->flags.refcount == 1); + o->flags.refcount = OBJ_SHARED_REFCOUNT; return o; } @@ -187,10 +187,10 @@ static kvobj *kvobjCreateEmbedString(const char *val_ptr, size_t val_len, o->type = OBJ_STRING; o->encoding = OBJ_ENCODING_EMBSTR; - o->refcount = 1; o->lru = 0; - o->metabits = keyMetaBits; - o->iskvobj = 1; + o->flags.refcount = 1; + o->flags.iskvobj = 1; + o->flags.metabits = keyMetaBits; /* The memory after the struct where we embedded data. */ char *data = (char *)(o + 1); @@ -227,10 +227,10 @@ robj *createEmbeddedStringObject(const char *val_ptr, size_t val_len) { robj *o = zmalloc_usable(sizeof(robj) + val_sds_size, &bufsize); o->type = OBJ_STRING; o->encoding = OBJ_ENCODING_EMBSTR; - o->refcount = 1; o->lru = 0; - o->metabits = 0; - o->iskvobj = 0; + o->flags.iskvobj = 0; + o->flags.metabits = 0; + o->flags.refcount = 1; /* The memory after the struct where we embedded data. */ char *data = (char *)(o + 1); @@ -244,14 +244,14 @@ robj *createEmbeddedStringObject(const char *val_ptr, size_t val_len) { sds kvobjGetKey(const kvobj *kv) { unsigned char *data = (void *)(kv + 1); - debugServerAssert(kv->iskvobj); + debugServerAssert(kv->flags.iskvobj); uint8_t hdr_size = *(uint8_t *)data; data += 1 + hdr_size; return (sds)data; } long long kvobjGetExpire(const kvobj *kv) { - if (kv->metabits & KEY_META_MASK_EXPIRE) { + if (kv->flags.metabits & KEY_META_MASK_EXPIRE) { return (long long) (*kvobjMetaRef((kvobj *)kv, KEY_META_ID_EXPIRE)); } else { return -1; @@ -262,13 +262,14 @@ long long kvobjGetExpire(const kvobj *kv) { * the old object's reference counter is decremented and possibly freed. Use the * returned object instead of 'val' after calling this function. */ kvobj *kvobjSetExpire(kvobj *kv, long long expire) { - /* If kv not expirable, then we need to realloc to add expire metadata */ - if (!(kv->metabits & KEY_META_MASK_EXPIRE)) { + /* If kv not expirable, then we need to realloc to add expire metadata */ + uint32_t metabits = kv->flags.metabits; + if (!(metabits & KEY_META_MASK_EXPIRE)) { /* Nothing to do if kv not expirable and expire is -1 */ if (expire == -1) return kv; - - kv = kvobjSet(kvobjGetKey(kv), kv, kv->metabits | KEY_META_MASK_EXPIRE); + + kv = kvobjSet(kvobjGetKey(kv), kv, metabits | KEY_META_MASK_EXPIRE); } /* kv is expirable. Update expire field. */ @@ -297,7 +298,7 @@ kvobj *kvobjSet(sds key, robj *val, uint32_t keyMetaBits) { } else { /* Create a new object with embedded key. Reuse ptr if possible. */ void *valptr; - if (val->refcount == 1) { + if (val->flags.refcount == 1) { /* Reuse the ptr. There are no other references to val. */ valptr = val->ptr; val->ptr = NULL; @@ -321,7 +322,7 @@ kvobj *kvobjSet(sds key, robj *val, uint32_t keyMetaBits) { kv->lru = val->lru; /* Transfer module metadata from `val` to new `kv` (if `val` of type kvobj with metadata). */ - if (val->metabits & KEY_META_MASK_MODULES) + if (val->flags.metabits & KEY_META_MASK_MODULES) keyMetaTransition((kvobj *) val, kv); decrRefCount(val); @@ -587,12 +588,27 @@ void freeStreamObject(robj *o) { } void incrRefCount(robj *o) { - if (o->refcount < OBJ_FIRST_SPECIAL_REFCOUNT - 1) { - o->refcount++; + uint32_t old_val = 0, new_val; + + atomicGet(o->flags_refcount, old_val); + unsigned int refcount = ((struct robjFlags*)&old_val)->refcount; + + if (refcount < OBJ_FIRST_SPECIAL_REFCOUNT - 1) { + // if (likely(refcount == 1)) { + // /* Fast path, only hold by itself. */ + // o->flags.refcount++; + // } else { + // do { + // new_val = old_val; + // ((struct robjFlags*)&new_val)->refcount++; + // } while (!atomicCompareExchange(uint32_t, o->flags_refcount, old_val, new_val)); + // } + + o->flags.refcount++; } else { - if (o->refcount == OBJ_SHARED_REFCOUNT) { + if (refcount == OBJ_SHARED_REFCOUNT) { /* Nothing to do: this refcount is immutable. */ - } else if (o->refcount == OBJ_STATIC_REFCOUNT) { + } else if (refcount == OBJ_STATIC_REFCOUNT) { serverPanic("You tried to retain an object allocated in the stack"); } else { serverPanic("You tried to retain an object with maximum refcount"); @@ -601,22 +617,41 @@ void incrRefCount(robj *o) { } void decrRefCount(robj *o) { - if (o->refcount == OBJ_SHARED_REFCOUNT) - return; /* Nothing to do: this refcount is immutable. */ + uint32_t old_val, new_val; + + atomicGet(o->flags_refcount, old_val); + unsigned int refcount = ((struct robjFlags*)&old_val)->refcount; - if (unlikely(o->refcount <= 0)) { + if (refcount == OBJ_SHARED_REFCOUNT) + return; /* Nothing to do: this refcount is immutable. */ + if (unlikely(refcount <= 0)) { serverPanic("illegal decrRefCount for object with: type %u, encoding %u, refcount %d", - o->type, o->encoding, o->refcount); + o->type, o->encoding, refcount); } - if (--(o->refcount) == 0) { + // if (likely(refcount == 1)) { + // /* Fast path, only hold by itself. */ + // o->flags.refcount = refcount = 0; + // } else { + // do { + // new_val = old_val; + // ((struct robjFlags*)&new_val)->refcount--; + // } while (!atomicCompareExchange(uint32_t, o->flags_refcount, old_val, new_val)); + // refcount = ((struct robjFlags*)&new_val)->refcount; + // } + + o->flags.refcount--; + refcount = o->flags.refcount; + + /* old_val now contains the value before decrement (CAS updates it on failure) */ + if (refcount == 0) { void *alloc = o; - - if (o->iskvobj) { + + if (o->flags.iskvobj) { /* eval real allocation pointer */ alloc = kvobjGetAllocPtr(o); /* if kvobj has metadata attached. */ - if (getModuleMetaBits(o->metabits)) + if (getModuleMetaBits(o->flags.metabits)) keyMetaOnFree((kvobj *)o); } @@ -800,7 +835,7 @@ void dismissObject(robj *o, size_t size_hint) { /* Currently we use zmadvise_dontneed only when we use jemalloc with Linux. * so we avoid these pointless loops when they're not going to do anything. */ #if defined(USE_JEMALLOC) && defined(__linux__) - if (o->refcount != 1) return; + if (o->flags.refcount != 1) return; switch(o->type) { case OBJ_STRING: dismissStringObject(o); break; case OBJ_LIST: dismissListObject(o, size_hint); break; @@ -876,7 +911,7 @@ robj *tryObjectEncodingEx(robj *o, int try_trim) { /* It's not safe to encode shared objects: shared objects can be shared * everywhere in the "object space" of Redis and may end in places where * they are not handled. We handle them only as values in the keyspace. */ - if (o->refcount > 1) return o; + if (o->flags.refcount > 1) return o; /* Check if we can represent this string as a long integer. * Note that we are sure that a string larger than 20 chars is not @@ -1602,7 +1637,7 @@ NULL } else if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) { if ((kv = kvobjCommandLookupOrReply(c, c->argv[2], shared.null[c->resp])) == NULL) return; - addReplyLongLong(c, kv->refcount); + addReplyLongLong(c, kv->flags.refcount); } else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) { if ((kv = kvobjCommandLookupOrReply(c, c->argv[2], shared.null[c->resp])) == NULL) return; diff --git a/src/object.h b/src/object.h index 1e761175d03..15e41fa49eb 100644 --- a/src/object.h +++ b/src/object.h @@ -96,17 +96,23 @@ struct RedisModuleType; #define OBJ_STATIC_REFCOUNT ((1 << OBJ_REFCOUNT_BITS) - 2) /* Object allocated in the stack. */ #define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT -struct redisObject { - unsigned type:4; - unsigned encoding:4; +struct robjFlags { unsigned refcount : OBJ_REFCOUNT_BITS; unsigned iskvobj : 1; /* 1 if this struct serves as a kvobj base */ - /* metabits and lru are Relevant only when iskvobj is set: */ unsigned metabits :8; /* Bitmap of metadata (+expiry) attached to this kvobj */ +}; + +struct redisObject { + unsigned type:4; + unsigned encoding:4; unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ + union { + redisAtomic uint32_t flags_refcount; + struct robjFlags flags; + }; void *ptr; }; @@ -187,7 +193,7 @@ void memoryCommand(struct client *c); static inline void *kvobjGetAllocPtr(const kvobj *kv) { /* Return the base allocation pointer (start of the metadata prefix). */ - uint32_t numMetaBytes = __builtin_popcount(kv->metabits) * sizeof(uint64_t); + uint32_t numMetaBytes = __builtin_popcount(kv->flags.metabits) * sizeof(uint64_t); return (char *)kv - numMetaBytes; } diff --git a/src/rdb.c b/src/rdb.c index 45d213b9690..1710c9e310d 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1408,7 +1408,7 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, in } /* if needed save key metadata */ - if (getModuleMetaBits(val->metabits)) { + if (getModuleMetaBits(val->flags.metabits)) { if (rdbSaveKeyMetadata(rdb, key, val, dbid) == -1) return -1; } diff --git a/src/script_lua.c b/src/script_lua.c index 2da14ae3dfa..1b13bdb9070 100644 --- a/src/script_lua.c +++ b/src/script_lua.c @@ -858,7 +858,7 @@ void freeLuaRedisArgv(robj **argv, int argc, int argv_len) { * The object must be small, SDS-encoded, and with refcount = 1 * (we must be the only owner) for us to cache it. */ if (j < LUA_CMD_OBJCACHE_SIZE && - o->refcount == 1 && + o->flags.refcount == 1 && (o->encoding == OBJ_ENCODING_RAW || o->encoding == OBJ_ENCODING_EMBSTR) && sdslen(o->ptr) <= LUA_CMD_OBJCACHE_MAX_LEN) diff --git a/src/server.c b/src/server.c index ddcc66c3932..46ace4a3914 100644 --- a/src/server.c +++ b/src/server.c @@ -475,11 +475,13 @@ int dictEncObjKeyCompare(dictCmpCache *cache, const void *key1, const void *key2 * good reasons, because it would incrRefCount() the object, which * is invalid. So we check to make sure dictFind() works with static * objects as well. */ - if (o1->refcount != OBJ_STATIC_REFCOUNT) o1 = getDecodedObject(o1); - if (o2->refcount != OBJ_STATIC_REFCOUNT) o2 = getDecodedObject(o2); + uint32_t refcount1 = o1->flags.refcount; + uint32_t refcount2 = o2->flags.refcount; + if (refcount1 != OBJ_STATIC_REFCOUNT) o1 = getDecodedObject(o1); + if (refcount2 != OBJ_STATIC_REFCOUNT) o2 = getDecodedObject(o2); cmp = dictSdsKeyCompare(cache,o1->ptr,o2->ptr); - if (o1->refcount != OBJ_STATIC_REFCOUNT) decrRefCount(o1); - if (o2->refcount != OBJ_STATIC_REFCOUNT) decrRefCount(o2); + if (refcount1 != OBJ_STATIC_REFCOUNT) decrRefCount(o1); + if (refcount2 != OBJ_STATIC_REFCOUNT) decrRefCount(o2); return cmp; } @@ -2893,7 +2895,6 @@ void initServer(void) { server.monitors = listCreate(); server.clients_pending_write = listCreate(); server.clients_pending_read = listCreate(); - server.clients_with_pending_ref_reply = listCreate(); server.clients_timeout_table = raxNew(); server.replication_allowed = 1; server.slaveseldb = -1; /* Force to emit the first SELECT command. */ diff --git a/src/server.h b/src/server.h index 836216fb06d..afc5655281a 100644 --- a/src/server.h +++ b/src/server.h @@ -1065,11 +1065,10 @@ char *getObjectTypeName(robj*); * we'll update it when the structure is changed, to avoid bugs like * bug #85 introduced exactly in this way. */ #define initStaticStringObject(_var,_ptr) do { \ - _var.refcount = OBJ_STATIC_REFCOUNT; \ _var.type = OBJ_STRING; \ _var.encoding = OBJ_ENCODING_RAW; \ - _var.metabits = 0; \ - _var.iskvobj = 0; \ + _var.lru = 0; \ + _var.flags_refcount = OBJ_STATIC_REFCOUNT; \ _var.ptr = _ptr; \ } while(0) @@ -1983,7 +1982,6 @@ struct redisServer { list *clients_to_close; /* Clients to close asynchronously */ list *clients_pending_write; /* There is to write or install handler. */ list *clients_pending_read; /* Client has pending read socket buffers. */ - list *clients_with_pending_ref_reply; /* Clients with referenced reply objects. */ list *slaves, *monitors; /* List of slaves and MONITORs */ client *current_client; /* The client that triggered the command execution (External or AOF). */ client *executing_client; /* The client executing the current command (possibly script or module). */ @@ -3102,7 +3100,6 @@ void freeClientArgv(client *c); void freeClientPendingCommands(client *c, int num_pcmds_to_free); void tryDeferFreeClientObject(client *c, int type, void *ptr); void freeClientDeferredObjects(client *c, int free_array); -void freeClientIODeferredObjects(client *c, int free_array); void sendReplyToClient(connection *conn); void *addReplyDeferredLen(client *c); void setDeferredArrayLen(client *c, void *node, long length); @@ -3198,7 +3195,6 @@ int clientHasPendingReplies(client *c); int updateClientMemUsageAndBucket(client *c); void removeClientFromMemUsageBucket(client *c, int allow_eviction); void unlinkClient(client *c); -void tryUnlinkClientFromPendingRefReply(client *c, int force); int writeToClient(client *c, int handler_installed); void linkClient(client *c); void protectClient(client *c); diff --git a/src/slowlog.c b/src/slowlog.c index eaf88cc33fc..5f469adc50f 100644 --- a/src/slowlog.c +++ b/src/slowlog.c @@ -52,7 +52,7 @@ slowlogEntry *slowlogCreateEntry(client *c, robj **argv, int argc, long long dur (unsigned long) sdslen(argv[j]->ptr) - SLOWLOG_ENTRY_MAX_STRING); se->argv[j] = createObject(OBJ_STRING,s); - } else if (argv[j]->refcount == OBJ_SHARED_REFCOUNT) { + } else if (argv[j]->flags.refcount == OBJ_SHARED_REFCOUNT) { se->argv[j] = argv[j]; } else { /* Here we need to duplicate the string objects composing the diff --git a/src/t_string.c b/src/t_string.c index 608db4c2fcb..fe50bb7fce3 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -815,7 +815,7 @@ void incrDecrCommand(client *c, long long incr) { } value += incr; - if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT && + if (o && o->flags.refcount == 1 && o->encoding == OBJ_ENCODING_INT && value >= LONG_MIN && value <= LONG_MAX) { new = o;