From 104b418cf780f132bed9637e2dab153f0dfbe4cf Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 11 Jul 2025 08:19:06 -0500 Subject: [PATCH 1/8] Fix concurrency issues leading to panic in Coffer --- core/coffer.go | 25 ++++++++++---------- core/coffer_test.go | 57 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 12 deletions(-) diff --git a/core/coffer.go b/core/coffer.go index a87933d..8f9a895 100644 --- a/core/coffer.go +++ b/core/coffer.go @@ -48,12 +48,11 @@ func NewCoffer() *Coffer { // Init is used to reset the value stored inside a Coffer to a new random 32 byte value, overwriting the old. func (s *Coffer) Init() error { - if s.Destroyed() { - return ErrCofferExpired - } - s.Lock() defer s.Unlock() + if s.destroyed() { + return ErrCofferExpired + } if err := Scramble(s.left.Data()); err != nil { return err @@ -76,14 +75,13 @@ func (s *Coffer) Init() error { View returns a snapshot of the contents of a Coffer inside a Buffer. As usual the Buffer should be destroyed as soon as possible after use by calling the Destroy method. */ func (s *Coffer) View() (*Buffer, error) { - if s.Destroyed() { - return nil, ErrCofferExpired - } - b, _ := NewBuffer(32) s.Lock() defer s.Unlock() + if s.destroyed() { + return nil, ErrCofferExpired + } // data = hash(right) XOR left h := Hash(s.right.Data()) @@ -100,12 +98,11 @@ func (s *Coffer) View() (*Buffer, error) { Rekey is used to re-key a Coffer. Ideally this should be done at short, regular intervals. */ func (s *Coffer) Rekey() error { - if s.Destroyed() { - return ErrCofferExpired - } - s.Lock() defer s.Unlock() + if s.destroyed() { + return ErrCofferExpired + } if err := Scramble(s.rand.Data()); err != nil { return err @@ -174,6 +171,10 @@ func (s *Coffer) Destroyed() bool { s.Lock() defer s.Unlock() + return s.destroyed() +} + +func (s *Coffer) destroyed() bool { if s.left == nil || s.right == nil { return true } diff --git a/core/coffer_test.go b/core/coffer_test.go index ded0e07..9516491 100644 --- a/core/coffer_test.go +++ b/core/coffer_test.go @@ -2,7 +2,10 @@ package core import ( "bytes" + "context" + "sync" "testing" + "time" ) func TestNewCoffer(t *testing.T) { @@ -169,3 +172,57 @@ func TestCofferDestroy(t *testing.T) { t.Error("some partition not destroyed") } } + +func TestCofferConcurrent(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + funcs := []func(s *Coffer) error{ + func(s *Coffer) error { + return s.Init() + }, + func(s *Coffer) error { + return s.Rekey() + }, + func(s *Coffer) error { + _, err := s.View() + return err + }, + } + wg := &sync.WaitGroup{} + for _, fn := range funcs { + for i := 0; i != 100; i++ { + s := NewCoffer() + wg.Add(1) + go func(ctx context.Context, wg *sync.WaitGroup, s *Coffer) { + defer wg.Done() + for { + select { + case <-time.After(time.Millisecond): + err := fn(s) + if err != nil { + if err == ErrCofferExpired { + return + } + t.Fatalf("unexpected error: %v", err) + } + case <-ctx.Done(): + } + + } + }(ctx, wg, s) + + wg.Add(1) + go func(ctx context.Context, wg *sync.WaitGroup, s *Coffer, i int) { + defer wg.Done() + select { + case <-time.After(time.Duration(i) * time.Millisecond): + + case <-ctx.Done(): + } + s.Destroy() + }(ctx, wg, s, i) + } + } + wg.Wait() +} From 35ea5e401d208f271a5dd88e42e9652556c0936f Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 11 Jul 2025 14:06:11 -0500 Subject: [PATCH 2/8] re order to deal with buffer allocation --- core/coffer.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/coffer.go b/core/coffer.go index 8f9a895..854dfb4 100644 --- a/core/coffer.go +++ b/core/coffer.go @@ -74,14 +74,13 @@ func (s *Coffer) Init() error { /* View returns a snapshot of the contents of a Coffer inside a Buffer. As usual the Buffer should be destroyed as soon as possible after use by calling the Destroy method. */ -func (s *Coffer) View() (*Buffer, error) { - b, _ := NewBuffer(32) - +func (s *Coffer) View() (*Buffer, error) { s.Lock() defer s.Unlock() if s.destroyed() { return nil, ErrCofferExpired } + b, _ := NewBuffer(32) // data = hash(right) XOR left h := Hash(s.right.Data()) From bece222185f93d9aa22c1b00ed58a698a8c6e61d Mon Sep 17 00:00:00 2001 From: Awn Umar Date: Fri, 11 Jul 2025 20:25:24 +0100 Subject: [PATCH 3/8] use done channel --- core/coffer_test.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/core/coffer_test.go b/core/coffer_test.go index 9516491..3896fa4 100644 --- a/core/coffer_test.go +++ b/core/coffer_test.go @@ -193,8 +193,9 @@ func TestCofferConcurrent(t *testing.T) { for _, fn := range funcs { for i := 0; i != 100; i++ { s := NewCoffer() + done := make(chan struct{}) wg.Add(1) - go func(ctx context.Context, wg *sync.WaitGroup, s *Coffer) { + go func(ctx context.Context, wg *sync.WaitGroup, s *Coffer, done <-chan struct{}) { defer wg.Done() for { select { @@ -206,22 +207,24 @@ func TestCofferConcurrent(t *testing.T) { } t.Fatalf("unexpected error: %v", err) } + case <-done: + return case <-ctx.Done(): + return } - } - }(ctx, wg, s) + }(ctx, wg, s, done) wg.Add(1) - go func(ctx context.Context, wg *sync.WaitGroup, s *Coffer, i int) { + go func(ctx context.Context, wg *sync.WaitGroup, s *Coffer, i int, done chan<- struct{}) { defer wg.Done() select { case <-time.After(time.Duration(i) * time.Millisecond): - case <-ctx.Done(): } + close(done) s.Destroy() - }(ctx, wg, s, i) + }(ctx, wg, s, i, done) } } wg.Wait() From b407bc8ba7cbeee5fb2541480851b6aeb3bf2b9d Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 11 Jul 2025 15:27:08 -0500 Subject: [PATCH 4/8] Fix a race around the test loop variable --- core/coffer_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/coffer_test.go b/core/coffer_test.go index 9516491..37ab8ce 100644 --- a/core/coffer_test.go +++ b/core/coffer_test.go @@ -194,12 +194,12 @@ func TestCofferConcurrent(t *testing.T) { for i := 0; i != 100; i++ { s := NewCoffer() wg.Add(1) - go func(ctx context.Context, wg *sync.WaitGroup, s *Coffer) { + go func(ctx context.Context, wg *sync.WaitGroup, s *Coffer, target func(s *Coffer) error) { defer wg.Done() for { select { case <-time.After(time.Millisecond): - err := fn(s) + err := target(s) if err != nil { if err == ErrCofferExpired { return @@ -207,10 +207,11 @@ func TestCofferConcurrent(t *testing.T) { t.Fatalf("unexpected error: %v", err) } case <-ctx.Done(): + return } } - }(ctx, wg, s) + }(ctx, wg, s, fn) wg.Add(1) go func(ctx context.Context, wg *sync.WaitGroup, s *Coffer, i int) { From e9d1adcf8880670a06d05da8c316e6c8973ff1ce Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 11 Jul 2025 15:30:17 -0500 Subject: [PATCH 5/8] format test --- core/coffer_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/coffer_test.go b/core/coffer_test.go index 1ff7148..be9ec0e 100644 --- a/core/coffer_test.go +++ b/core/coffer_test.go @@ -206,21 +206,20 @@ func TestCofferConcurrent(t *testing.T) { return } t.Fatalf("unexpected error: %v", err) - } + } case <-ctx.Done(): return } } }(ctx, wg, s, fn) - wg.Add(1) go func(ctx context.Context, wg *sync.WaitGroup, s *Coffer, i int) { defer wg.Done() select { case <-time.After(time.Duration(i) * time.Millisecond): case <-ctx.Done(): - } + } s.Destroy() }(ctx, wg, s, i) } From 4d5d60a46d69a9a01fbbdf824f99faed3e5e3a51 Mon Sep 17 00:00:00 2001 From: Your Name Date: Sat, 12 Jul 2025 08:43:00 -0500 Subject: [PATCH 6/8] lower test concurrency by default --- core/coffer_test.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/core/coffer_test.go b/core/coffer_test.go index be9ec0e..f256956 100644 --- a/core/coffer_test.go +++ b/core/coffer_test.go @@ -3,6 +3,8 @@ package core import ( "bytes" "context" + "os" + "strconv" "sync" "testing" "time" @@ -174,6 +176,15 @@ func TestCofferDestroy(t *testing.T) { } func TestCofferConcurrent(t *testing.T) { + testConcurrency := 3 + envVar := os.Getenv("TEST_CONCURRENCY") + envVarValue, err := strconv.Atoi(envVar) + if envVarValue > 0 { + testConcurrency = envVarValue + t.Logf("test concurrency set to %v", testConcurrency) + } else { + t.Logf("cannot use test concurrency %v: %v", envVar, err) + } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -190,8 +201,9 @@ func TestCofferConcurrent(t *testing.T) { }, } wg := &sync.WaitGroup{} + for _, fn := range funcs { - for i := 0; i != 100; i++ { + for i := 0; i != testConcurrency; i++ { s := NewCoffer() wg.Add(1) From 43cafe56a565522ca7996ba485b2d93a3799c631 Mon Sep 17 00:00:00 2001 From: Your Name Date: Sat, 12 Jul 2025 10:18:09 -0500 Subject: [PATCH 7/8] add lock usage for checking if the buffer is destroyed --- core/buffer.go | 6 ++++++ core/coffer.go | 4 ++-- core/coffer_test.go | 14 ++++++++------ 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/core/buffer.go b/core/buffer.go index 4e15ce8..d9ee018 100644 --- a/core/buffer.go +++ b/core/buffer.go @@ -251,6 +251,12 @@ func (b *Buffer) Mutable() bool { return b.mutable } +func (b *Buffer) IsDestroyed() bool { + b.RLock() + defer b.RUnlock() + return b.data == nil +} + // BufferList stores a list of buffers in a thread-safe manner. type bufferList struct { sync.RWMutex diff --git a/core/coffer.go b/core/coffer.go index 854dfb4..72ab429 100644 --- a/core/coffer.go +++ b/core/coffer.go @@ -74,7 +74,7 @@ func (s *Coffer) Init() error { /* View returns a snapshot of the contents of a Coffer inside a Buffer. As usual the Buffer should be destroyed as soon as possible after use by calling the Destroy method. */ -func (s *Coffer) View() (*Buffer, error) { +func (s *Coffer) View() (*Buffer, error) { s.Lock() defer s.Unlock() if s.destroyed() { @@ -178,5 +178,5 @@ func (s *Coffer) destroyed() bool { return true } - return s.left.data == nil || s.right.data == nil + return s.left.IsDestroyed() || s.right.IsDestroyed() } diff --git a/core/coffer_test.go b/core/coffer_test.go index f256956..ecb6acd 100644 --- a/core/coffer_test.go +++ b/core/coffer_test.go @@ -178,12 +178,14 @@ func TestCofferDestroy(t *testing.T) { func TestCofferConcurrent(t *testing.T) { testConcurrency := 3 envVar := os.Getenv("TEST_CONCURRENCY") - envVarValue, err := strconv.Atoi(envVar) - if envVarValue > 0 { - testConcurrency = envVarValue - t.Logf("test concurrency set to %v", testConcurrency) - } else { - t.Logf("cannot use test concurrency %v: %v", envVar, err) + if len(envVar) > 0 { + envVarValue, err := strconv.Atoi(envVar) + if envVarValue > 0 { + testConcurrency = envVarValue + t.Logf("test concurrency set to %v", testConcurrency) + } else { + t.Logf("cannot use test concurrency %v: %v", envVar, err) + } } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() From fed96e5c9a1fd36938f2b92b2bf0a23f1a0cffe1 Mon Sep 17 00:00:00 2001 From: Your Name Date: Sun, 13 Jul 2025 17:27:27 -0500 Subject: [PATCH 8/8] do not export isDestroyed() function --- core/buffer.go | 3 ++- core/coffer.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/buffer.go b/core/buffer.go index d9ee018..afea842 100644 --- a/core/buffer.go +++ b/core/buffer.go @@ -251,7 +251,8 @@ func (b *Buffer) Mutable() bool { return b.mutable } -func (b *Buffer) IsDestroyed() bool { +// isDestroyed returns true if the buffer is destroyed +func (b *Buffer) isDestroyed() bool { b.RLock() defer b.RUnlock() return b.data == nil diff --git a/core/coffer.go b/core/coffer.go index 72ab429..1efdffa 100644 --- a/core/coffer.go +++ b/core/coffer.go @@ -178,5 +178,5 @@ func (s *Coffer) destroyed() bool { return true } - return s.left.IsDestroyed() || s.right.IsDestroyed() + return s.left.isDestroyed() || s.right.isDestroyed() }