Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions http/respChain.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func getBuffer() *bytes.Buffer {
func putBuffer(buf *bytes.Buffer) {
cap := buf.Cap()
if cap > DefaultMaxBodySize {
bufPool.Discard()
return
}

Expand All @@ -179,6 +180,8 @@ func putBuffer(buf *bytes.Buffer) {
default:
// NOTE(dwisiswant0): Pool is full of large buffers, discard this
// one. It will be GC'ed, preventing memory accumulation.
// Release the semaphore slot to prevent deadlock.
bufPool.Discard()
}
return
}
Expand Down
68 changes: 68 additions & 0 deletions http/respChain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

"github.com/projectdiscovery/utils/sync/sizedpool"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -1227,3 +1228,70 @@ func TestResponseChain_StringSafety(t *testing.T) {
assert.Equal(t, bodyContent, bodyStr, "BodyString() content changed after buffer reuse - unsafe memory sharing detected")
assert.Contains(t, headersStr, headerValue, "HeadersString() content changed after buffer reuse - unsafe memory sharing detected")
}

// TestSemaphoreLeakDeadlock reproduces the semaphore leak deadlock.
//
// Source: https://github.com/projectdiscovery/utils/issues/714#issue-3747413857
// by @Ezzer17. Polished some.
func TestSemaphoreLeakDeadlock(t *testing.T) {
// Save original state
origBufPool := bufPool
origSem := largeBufferSem

var p = &sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}
var err error

bufPool, err = sizedpool.New(
sizedpool.WithPool[*bytes.Buffer](p),
sizedpool.WithSize[*bytes.Buffer](20),
)
if err != nil {
t.Fatal(err)
}
setLargeBufferSemSize(1)

numResponses := 2
maxIterations := 100

for iteration := 1; iteration <= maxIterations; iteration++ {
done := make(chan bool, 1)
currentIteration := iteration

go func() {
responses := make([]*ResponseChain, numResponses)
for i := range responses {
t.Logf("iteration %d, response %d", currentIteration, i)
largeBody := strings.Repeat("X", largeBufferThreshold+1)
resp := &http.Response{
StatusCode: 200,
Body: io.NopCloser(strings.NewReader(largeBody)),
Header: http.Header{},
}

rc := NewResponseChain(resp, -1)
_ = rc.Fill()
responses[i] = rc
}

for _, rc := range responses {
rc.Close()
}

done <- true
}()

select {
case <-done:
// Iteration completed successfully, continue to next
case <-time.After(time.Second):
t.Fatalf("Deadlock detected at iteration %d", currentIteration)
}
}

bufPool = origBufPool
largeBufferSem = origSem
}
20 changes: 19 additions & 1 deletion sync/sizedpool/sizedpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"github.com/projectdiscovery/utils/sync/semaphore"
)

// PoolOption represents an option for configuring the [SizedPool].
type PoolOption[T any] func(*SizedPool[T]) error

// WithSize sets the maximum size of the [SizedPool].
func WithSize[T any](size int64) PoolOption[T] {
return func(sz *SizedPool[T]) error {
if size <= 0 {
Expand All @@ -24,18 +26,22 @@ func WithSize[T any](size int64) PoolOption[T] {
}
}

// WithPool sets the underlying [sync.Pool] for the [SizedPool].
func WithPool[T any](p *sync.Pool) PoolOption[T] {
return func(sz *SizedPool[T]) error {
sz.pool = p
return nil
}
}

// SizedPool is a pool with a maximum size that blocks on Get when the pool is
// exhausted.
type SizedPool[T any] struct {
sem *semaphore.Semaphore
pool *sync.Pool
}

// New creates a new SizedPool with the given options.
func New[T any](options ...PoolOption[T]) (*SizedPool[T], error) {
sz := &SizedPool[T]{}
for _, option := range options {
Expand All @@ -46,6 +52,8 @@ func New[T any](options ...PoolOption[T]) (*SizedPool[T], error) {
return sz, nil
}

// Get retrieves an item from the pool, blocking if necessary until an item is
// available.
func (sz *SizedPool[T]) Get(ctx context.Context) (T, error) {
if sz.sem != nil {
if err := sz.sem.Acquire(ctx, 1); err != nil {
Expand All @@ -56,11 +64,21 @@ func (sz *SizedPool[T]) Get(ctx context.Context) (T, error) {
return sz.pool.Get().(T), nil
}

// Put returns an item to the pool and releases the semaphore slot.
func (sz *SizedPool[T]) Put(x T) {
sz.Discard()
sz.pool.Put(x)
}

// Discard releases the semaphore slot without returning the item to the pool.
//
// Use this when you need to discard an item obtained via [Get] without reusing
// it. This prevents semaphore leaks when items are intentionally not returned
// to the pool.
func (sz *SizedPool[T]) Discard() {
if sz.sem != nil {
sz.sem.Release(1)
}
sz.pool.Put(x)
}

// Vary capacity by x - it's internally enqueued as a normal Acquire/Release operation as other Get/Put
Expand Down
Loading