From 600c870a3739b232ba96e178822942caa100910a Mon Sep 17 00:00:00 2001 From: atauov Date: Wed, 8 Oct 2025 18:36:22 +0500 Subject: [PATCH 1/3] GetValueJSON instead of GetValue in forceCreate (LLAPILinkCreate) --- embedded/graph/crud/ll_crud.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/embedded/graph/crud/ll_crud.go b/embedded/graph/crud/ll_crud.go index 532a632..8474e56 100644 --- a/embedded/graph/crud/ll_crud.go +++ b/embedded/graph/crud/ll_crud.go @@ -561,7 +561,7 @@ func LLAPILinkCreate(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContex if !forceCreate { // Check if link with this name already exists -------------- - _, err := ctx.Domain.Cache().GetValue(fmt.Sprintf(OutLinkBodyKeyPrefPattern+KeySuff1Pattern, selfID, linkName)) + _, err := ctx.Domain.Cache().GetValueJSON(fmt.Sprintf(OutLinkBodyKeyPrefPattern+KeySuff1Pattern, selfID, linkName)) if err == nil { operationKeysMutexUnlock(ctx) om.AggregateOpMsg(sfMediators.OpMsgFailed(fmt.Sprintf("link from=%s with name=%s already exists", selfID, linkName))).Reply() @@ -569,7 +569,7 @@ func LLAPILinkCreate(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContex } // ---------------------------------------------------------- // Check if link with this type "type" to "to" already exists - _, err = ctx.Domain.Cache().GetValue(fmt.Sprintf(OutLinkTypeKeyPrefPattern+KeySuff2Pattern, selfID, linkType, toId)) + _, err = ctx.Domain.Cache().GetValueJSON(fmt.Sprintf(OutLinkTypeKeyPrefPattern+KeySuff2Pattern, selfID, linkType, toId)) if err == nil { operationKeysMutexUnlock(ctx) om.AggregateOpMsg(sfMediators.OpMsgFailed(fmt.Sprintf("link from=%s with name=%s to=%s with type=%s already exists, two vertices can have a link with this type and direction only once", selfID, linkName, toId, linkType))).Reply() From 61772788d317809d4295b73d8e7e91baf73f6f1d Mon Sep 17 00:00:00 2001 From: atauov Date: Wed, 8 Oct 2025 18:38:19 +0500 Subject: [PATCH 2/3] Fix function subscriptions for multiple instance mode --- statefun/runtime.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/statefun/runtime.go b/statefun/runtime.go index bd7888d..e1268a5 100644 --- a/statefun/runtime.go +++ b/statefun/runtime.go @@ -244,14 +244,18 @@ func (r *Runtime) handleSingleInstanceFunctions(ctx context.Context, revisions m // startFunctionSubscriptions starts the function subscriptions based on the configuration. func (r *Runtime) startFunctionSubscriptions(ctx context.Context, revisions map[string]uint64) error { for _, ft := range r.registeredFunctionTypes { - revision, exist := revisions[ft.name] - if !exist { - lg.Logf(lg.WarnLevel, "Function type %s is not registered; skipping", ft.name) - continue - } - if !ft.config.multipleInstancesAllowed && revision == 0 { - lg.Logf(lg.WarnLevel, "Function type %s is already running; skipping", ft.name) - continue + if ft.config.multipleInstancesAllowed { + lg.Logf(lg.DebugLevel, "Function type %s allows multiple instances, subscribing", ft.name) + } else { + revision, exist := revisions[ft.name] + if !exist { + lg.Logf(lg.WarnLevel, "Function type %s is not registered; skipping", ft.name) + continue + } + if revision == 0 { + lg.Logf(lg.WarnLevel, "Function type %s is already running; skipping", ft.name) + continue + } } if ft.config.IsSignalProviderAllowed(sfPlugins.JetstreamGlobalSignal) { From b16be3cb76ce90b08e642adb75cffae99cd3d973 Mon Sep 17 00:00:00 2001 From: atauov Date: Wed, 8 Oct 2025 18:39:37 +0500 Subject: [PATCH 3/3] Ignore errors sequence {SEQ} not found --- statefun/cache/cache.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/statefun/cache/cache.go b/statefun/cache/cache.go index bee767f..e9626af 100644 --- a/statefun/cache/cache.go +++ b/statefun/cache/cache.go @@ -351,8 +351,11 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo //lg.Logf("---CACHE_KV TF DELETE: %s, %d, %d", key, kvRecordTime, appendFlag) //system.MsgOnErrorReturn(kv.Delete(entry.Key())) - system.MsgOnErrorReturn(customNatsKv.KVDelete(cs.js, cs.kv, entry.Key())) - + if err = customNatsKv.KVDelete(cs.js, cs.kv, entry.Key()); err != nil && + !(strings.HasPrefix(err.Error(), "nats: sequence ") && strings.Contains(err.Error(), " not found")) { + // Ignore "sequence {SEQ} not found" errors - another runtime already deleted the entry + lg.Logf(lg.ErrorLevel, "Failed to delete key=%s, err=%s", key, err) + } //cs.rootValue.purgeReady //if csv := cs.getLastKeyCacheStoreValue(key); csv != nil { // csv.Purge(true)