diff --git a/internal/dcs/zk.go b/internal/dcs/zk.go index 68275183..c5503e14 100644 --- a/internal/dcs/zk.go +++ b/internal/dcs/zk.go @@ -22,6 +22,7 @@ type zkDCS struct { config *ZookeeperConfig conn *zk.Conn eventsChan <-chan zk.Event + lockHeld sync.Map disconnectCallback func() error isConnected bool connectedChans []chan struct{} @@ -219,6 +220,7 @@ func (z *zkDCS) handleSessionEvent(ev zk.Event) { z.connectedLock.Lock() if z.isConnected && z.closeTimer != nil { defer z.logger.Info("session lost") + z.lockHeld.Clear() z.isConnected = false err := z.disconnectCallback() if err != nil { @@ -338,6 +340,10 @@ func (z *zkDCS) retryDelete(path string, version int32) (err error) { func (z *zkDCS) AcquireLock(path string) bool { fullPath := z.buildFullPath(path) + _, hasLock := z.lockHeld.Load(fullPath) + if hasLock { + return true + } self := z.getSelfLockOwner() data, _, err := z.retryGet(fullPath) if err != nil && err != zk.ErrNoNode { @@ -356,6 +362,7 @@ func (z *zkDCS) AcquireLock(path string) bool { } return false } + z.lockHeld.Store(fullPath, struct{}{}) return true } owner := LockOwner{} @@ -363,11 +370,16 @@ func (z *zkDCS) AcquireLock(path string) bool { z.logger.Errorf("malformed lock data %s (%s): %v", fullPath, data, err) return false } - return owner == self + if owner == self { + z.lockHeld.Store(fullPath, struct{}{}) + return true + } + return false } func (z *zkDCS) ReleaseLock(path string) { fullPath := z.buildFullPath(path) + z.lockHeld.Delete(fullPath) data, stat, err := z.retryGet(fullPath) if err != nil && err != zk.ErrNoNode { z.logger.Errorf("failed to get lock info %s: %v", fullPath, err)