diff --git a/internal/dcs/config.go b/internal/dcs/config.go index 03d9c28..2f3554b 100644 --- a/internal/dcs/config.go +++ b/internal/dcs/config.go @@ -25,6 +25,7 @@ type ZookeeperConfig struct { BackoffMultiplier float64 `config:"backoff_multiplier" yaml:"backoff_multiplier"` BackoffRandFactor float64 `config:"backoff_rand_factor" yaml:"backoff_rand_factor"` SessionTimeout time.Duration `config:"session_timeout" yaml:"session_timeout"` + LockHeldTTL time.Duration `config:"lock_held_ttl" yaml:"lock_held_ttl"` Auth bool `config:"auth" yaml:"auth"` UseSSL bool `config:"use_ssl" yaml:"use_ssl"` VerifyCerts bool `config:"verify_certs" yaml:"verify_certs"` @@ -57,6 +58,7 @@ func DefaultZookeeperConfig() (ZookeeperConfig, error) { config := ZookeeperConfig{ Hostname: hostname, SessionTimeout: 2 * time.Second, + LockHeldTTL: 30 * time.Second, BackoffInterval: backoff.DefaultInitialInterval, BackoffRandFactor: backoff.DefaultRandomizationFactor, BackoffMultiplier: backoff.DefaultMultiplier, diff --git a/internal/dcs/zk.go b/internal/dcs/zk.go index 7202c25..3739339 100644 --- a/internal/dcs/zk.go +++ b/internal/dcs/zk.go @@ -343,9 +343,11 @@ func (z *zkDCS) retryDelete(path string, version int32) (err error) { func (z *zkDCS) AcquireLock(path string) bool { fullPath := z.buildFullPath(path) - _, hasLock := z.lockHeld.Load(fullPath) - if hasLock { - return true + if cached, hasLock := z.lockHeld.Load(fullPath); hasLock { + if time.Since(cached.(time.Time)) < z.config.LockHeldTTL { + return true + } + z.lockHeld.Delete(fullPath) } self := z.getSelfLockOwner() data, _, err := z.retryGet(fullPath) @@ -365,7 +367,7 @@ func (z *zkDCS) AcquireLock(path string) bool { } return false } - z.lockHeld.Store(fullPath, struct{}{}) + z.lockHeld.Store(fullPath, time.Now()) return true } owner := LockOwner{} @@ -374,7 +376,7 @@ func (z *zkDCS) AcquireLock(path string) bool { return false } if owner == self { - z.lockHeld.Store(fullPath, struct{}{}) + z.lockHeld.Store(fullPath, time.Now()) return true } return false