From 7915ad9b5deea4803a4f4026eb65c299ace3342f Mon Sep 17 00:00:00 2001 From: atauov Date: Wed, 12 Nov 2025 20:43:45 +0500 Subject: [PATCH] fix locks in active-passive mode --- statefun/runtime.go | 67 ++++++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 34 deletions(-) diff --git a/statefun/runtime.go b/statefun/runtime.go index b544a56..9c6cb20 100644 --- a/statefun/runtime.go +++ b/statefun/runtime.go @@ -142,7 +142,7 @@ func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { } else { r.config.activeRevID = revID defer func() { - system.MsgOnErrorReturn(KeyMutexUnlock(ctx, r, system.GetHashStr(RuntimeName), revID)) + system.MsgOnErrorReturn(KeyMutexUnlock(ctx, r, system.GetHashStr(RuntimeName), r.config.activeRevID)) }() } } else { @@ -249,14 +249,16 @@ func (r *Runtime) handleSingleInstanceFunctions(ctx context.Context, revisions m // startFunctionSubscriptions starts the function subscriptions based on the configuration. func (r *Runtime) startFunctionSubscriptions(ctx context.Context, revisions map[string]uint64) error { for _, ft := range r.registeredFunctionTypes { - revision, exist := revisions[ft.name] - if !exist { - lg.Logf(lg.WarnLevel, "Function type %s is not registered; skipping", ft.name) - continue - } - if !ft.config.multipleInstancesAllowed && revision == 0 { - lg.Logf(lg.WarnLevel, "Function type %s is already running; skipping", ft.name) - continue + if !ft.config.multipleInstancesAllowed { + revision, exist := revisions[ft.name] + if !exist { + lg.Logf(lg.WarnLevel, "Function type %s is not registered; skipping", ft.name) + continue + } + if revision == 0 { + lg.Logf(lg.WarnLevel, "Function type %s is already running; skipping", ft.name) + continue + } } if ft.config.IsSignalProviderAllowed(sfPlugins.JetstreamGlobalSignal) { @@ -374,6 +376,8 @@ func (r *Runtime) singleInstanceFunctionLocksUpdater(ctx context.Context, revisi case <-r.shutdown: return case <-ticker.C: + subscribeRequired := false //if true, need to subscribe on all functions + if r.config.activePassiveMode { if r.config.isActiveInstance { newRevID, err := KeyMutexLockUpdate(ctx, r, system.GetHashStr(RuntimeName), r.config.activeRevID) @@ -384,38 +388,33 @@ func (r *Runtime) singleInstanceFunctionLocksUpdater(ctx context.Context, revisi } } else { newRevID, err := KeyMutexLock(ctx, r, system.GetHashStr(RuntimeName), true) - if err != nil { - if errors.Is(err, ErrMutexLocked) { - lg.Logf(lg.DebugLevel, "Cant lock. Another runtime is already active") - continue - } else { - lg.Logf(lg.ErrorLevel, "KeyMutexLock failed for %s: %v", RuntimeName, err) - return - } - } else { + if err == nil { r.config.isActiveInstance = true r.config.activeRevID = newRevID + subscribeRequired = true + } else if !errors.Is(err, ErrMutexLocked) { + lg.Logf(lg.ErrorLevel, "KeyMutexLock failed for %s: %v", RuntimeName, err) + return } } } - subscribeRequired := false //if true, need to subscribe on all functions - for ftName, revID := range revisions { - if revID != 0 { - newRevID, err := KeyMutexLockUpdate(ctx, r, system.GetHashStr(ftName), revID) - if err != nil { - lg.Logf(lg.ErrorLevel, "KeyMutexLockUpdate failed for %s: %v", ftName, err) - } else { - revisions[ftName] = newRevID - } - } else { - newRevID, err := KeyMutexLock(ctx, r, system.GetHashStr(ftName), true) - if err != nil { - lg.Logf(lg.TraceLevel, "KeyMutexLock failed for %s: %v", ftName, err) //try to take the lock + if r.config.isActiveInstance { + for ftName, revID := range revisions { + if revID != 0 { + newRevID, err := KeyMutexLockUpdate(ctx, r, system.GetHashStr(ftName), revID) + if err != nil { + lg.Logf(lg.ErrorLevel, "KeyMutexLockUpdate failed for %s: %v", ftName, err) + } else { + revisions[ftName] = newRevID + } } else { - subscribeRequired = true - revisions[ftName] = newRevID - lg.Logf(lg.DebugLevel, "KeyMutexLock succeeded for %s", ftName) + newRevID, err := KeyMutexLock(ctx, r, system.GetHashStr(ftName), true) + if err == nil { + subscribeRequired = true + revisions[ftName] = newRevID + lg.Logf(lg.DebugLevel, "KeyMutexLock succeeded for %s", ftName) + } } } }