diff --git a/http/respChain.go b/http/respChain.go index 63cff9e0..ef73bf08 100644 --- a/http/respChain.go +++ b/http/respChain.go @@ -167,6 +167,7 @@ func getBuffer() *bytes.Buffer { func putBuffer(buf *bytes.Buffer) { cap := buf.Cap() if cap > DefaultMaxBodySize { + bufPool.Discard() return } @@ -179,6 +180,8 @@ func putBuffer(buf *bytes.Buffer) { default: // NOTE(dwisiswant0): Pool is full of large buffers, discard this // one. It will be GC'ed, preventing memory accumulation. + // Release the semaphore slot to prevent deadlock. + bufPool.Discard() } return } diff --git a/http/respChain_test.go b/http/respChain_test.go index 777830df..f7dd3251 100644 --- a/http/respChain_test.go +++ b/http/respChain_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/projectdiscovery/utils/sync/sizedpool" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -1227,3 +1228,70 @@ func TestResponseChain_StringSafety(t *testing.T) { assert.Equal(t, bodyContent, bodyStr, "BodyString() content changed after buffer reuse - unsafe memory sharing detected") assert.Contains(t, headersStr, headerValue, "HeadersString() content changed after buffer reuse - unsafe memory sharing detected") } + +// TestSemaphoreLeakDeadlock reproduces the semaphore leak deadlock. +// +// Source: https://github.com/projectdiscovery/utils/issues/714#issue-3747413857 +// by @Ezzer17. Polished some. +func TestSemaphoreLeakDeadlock(t *testing.T) { + // Save original state + origBufPool := bufPool + origSem := largeBufferSem + + var p = &sync.Pool{ + New: func() any { + return new(bytes.Buffer) + }, + } + var err error + + bufPool, err = sizedpool.New( + sizedpool.WithPool[*bytes.Buffer](p), + sizedpool.WithSize[*bytes.Buffer](20), + ) + if err != nil { + t.Fatal(err) + } + setLargeBufferSemSize(1) + + numResponses := 2 + maxIterations := 100 + + for iteration := 1; iteration <= maxIterations; iteration++ { + done := make(chan bool, 1) + currentIteration := iteration + + go func() { + responses := make([]*ResponseChain, numResponses) + for i := range responses { + t.Logf("iteration %d, response %d", currentIteration, i) + largeBody := strings.Repeat("X", largeBufferThreshold+1) + resp := &http.Response{ + StatusCode: 200, + Body: io.NopCloser(strings.NewReader(largeBody)), + Header: http.Header{}, + } + + rc := NewResponseChain(resp, -1) + _ = rc.Fill() + responses[i] = rc + } + + for _, rc := range responses { + rc.Close() + } + + done <- true + }() + + select { + case <-done: + // Iteration completed successfully, continue to next + case <-time.After(time.Second): + t.Fatalf("Deadlock detected at iteration %d", currentIteration) + } + } + + bufPool = origBufPool + largeBufferSem = origSem +} diff --git a/sync/sizedpool/sizedpool.go b/sync/sizedpool/sizedpool.go index 3f70a46e..06141642 100644 --- a/sync/sizedpool/sizedpool.go +++ b/sync/sizedpool/sizedpool.go @@ -8,8 +8,10 @@ import ( "github.com/projectdiscovery/utils/sync/semaphore" ) +// PoolOption represents an option for configuring the [SizedPool]. type PoolOption[T any] func(*SizedPool[T]) error +// WithSize sets the maximum size of the [SizedPool]. func WithSize[T any](size int64) PoolOption[T] { return func(sz *SizedPool[T]) error { if size <= 0 { @@ -24,6 +26,7 @@ func WithSize[T any](size int64) PoolOption[T] { } } +// WithPool sets the underlying [sync.Pool] for the [SizedPool]. func WithPool[T any](p *sync.Pool) PoolOption[T] { return func(sz *SizedPool[T]) error { sz.pool = p @@ -31,11 +34,14 @@ func WithPool[T any](p *sync.Pool) PoolOption[T] { } } +// SizedPool is a pool with a maximum size that blocks on Get when the pool is +// exhausted. type SizedPool[T any] struct { sem *semaphore.Semaphore pool *sync.Pool } +// New creates a new SizedPool with the given options. func New[T any](options ...PoolOption[T]) (*SizedPool[T], error) { sz := &SizedPool[T]{} for _, option := range options { @@ -46,6 +52,8 @@ func New[T any](options ...PoolOption[T]) (*SizedPool[T], error) { return sz, nil } +// Get retrieves an item from the pool, blocking if necessary until an item is +// available. func (sz *SizedPool[T]) Get(ctx context.Context) (T, error) { if sz.sem != nil { if err := sz.sem.Acquire(ctx, 1); err != nil { @@ -56,11 +64,21 @@ func (sz *SizedPool[T]) Get(ctx context.Context) (T, error) { return sz.pool.Get().(T), nil } +// Put returns an item to the pool and releases the semaphore slot. func (sz *SizedPool[T]) Put(x T) { + sz.Discard() + sz.pool.Put(x) +} + +// Discard releases the semaphore slot without returning the item to the pool. +// +// Use this when you need to discard an item obtained via [Get] without reusing +// it. This prevents semaphore leaks when items are intentionally not returned +// to the pool. +func (sz *SizedPool[T]) Discard() { if sz.sem != nil { sz.sem.Release(1) } - sz.pool.Put(x) } // Vary capacity by x - it's internally enqueued as a normal Acquire/Release operation as other Get/Put