From 702ec09383080815474c200add39472e41977ab6 Mon Sep 17 00:00:00 2001 From: Jon Poole Date: Wed, 6 Dec 2023 14:35:34 +0000 Subject: [PATCH 1/3] Heap pool --- src/parse/asp/benchmark_test.go | 26 ++++---- src/parse/asp/heap/heap.go | 103 ++++++++++++++++++++++++++++++++ 2 files changed, 119 insertions(+), 10 deletions(-) diff --git a/src/parse/asp/benchmark_test.go b/src/parse/asp/benchmark_test.go index f43166cdae..4811a9674e 100644 --- a/src/parse/asp/benchmark_test.go +++ b/src/parse/asp/benchmark_test.go @@ -6,10 +6,12 @@ import ( "fmt" "github.com/thought-machine/please/rules" "github.com/thought-machine/please/src/core" + "github.com/thought-machine/please/src/parse/asp/heap" "os" "path/filepath" "sync" "testing" + "time" ) var code = ` @@ -95,6 +97,7 @@ func BenchmarkParseWithArena(b *testing.B) { func parseInParallel(threads, repeats int, useArena bool) { wg := new(sync.WaitGroup) wg.Add(threads) + pool := heap.NewPool(threads, -1, time.Second) for j := 0; j < threads; j++ { go func() { for k := 0; k < repeats; k++ { @@ -103,9 +106,9 @@ func parseInParallel(threads, repeats int, useArena bool) { parseFileInput(r, nil) continue } - a := arena.NewArena() - parseFileInput(r, a) - a.Free() + heap := pool.Get() + parseFileInput(r, heap.Arena) + pool.Return(heap) } wg.Done() }() @@ -119,7 +122,7 @@ func BenchmarkParseAndInterpretWithArena(b *testing.B) { arena.NewArena().Free() b.ResetTimer() - parseAndInterpretInParallel(10, b.N, true, p) + parseAndInterpretInParallel(10, b.N*1000, true, p) } func BenchmarkParseAndInterpretWithoutArena(b *testing.B) { @@ -128,7 +131,7 @@ func BenchmarkParseAndInterpretWithoutArena(b *testing.B) { arena.NewArena().Free() b.ResetTimer() - parseAndInterpretInParallel(10, b.N, false, p) + parseAndInterpretInParallel(10, b.N*1000, false, p) } func newParserWithGo() *Parser { @@ -183,19 +186,22 @@ func newParserWithGo() *Parser { func parseAndInterpretInParallel(threads, repeats int, withArena bool, p *Parser) { wg := new(sync.WaitGroup) wg.Add(threads) + pool := heap.NewPool(threads, -1, time.Second) for thread := 0; thread < threads; thread++ { go func(thread int) { for repeat := 0; repeat < repeats; repeat++ { pkg := fmt.Sprintf("src/asp/parse_%v_%v", thread, repeat) - var heap *arena.Arena + var heap *heap.Heap + var arena *arena.Arena if withArena { - heap = arena.NewArena() + heap = pool.Get() + arena = heap.Arena } - s := p.interpreter.scope.newScope(core.NewPackage(pkg), heap, core.ParseModeNormal, filepath.Join(pkg, "BUILD"), 10) + s := p.interpreter.scope.newScope(core.NewPackage(pkg), arena, core.ParseModeNormal, filepath.Join(pkg, "BUILD"), 10) // This call to ParseData currently doesn't use the arena - stmts, err := s.interpreter.parser.ParseData(heap, []byte(code), "BUILD") + stmts, err := s.interpreter.parser.ParseData(arena, []byte(code), "BUILD") if err != nil { panic(err) } @@ -206,7 +212,7 @@ func parseAndInterpretInParallel(threads, repeats int, withArena bool, p *Parser t := s.state.Graph.TargetOrDie(core.NewBuildLabel(pkg, "asp")) _ = t if withArena { - heap.Free() + pool.Return(heap) } } wg.Done() diff --git a/src/parse/asp/heap/heap.go b/src/parse/asp/heap/heap.go index a627b7c37e..f26679e7af 100644 --- a/src/parse/asp/heap/heap.go +++ b/src/parse/asp/heap/heap.go @@ -1,9 +1,112 @@ +// Package heap provides a pool to acquire memory arenas to use to allocate objects when parsing and interpreting asp. +// This package pools the areas to improve efficiency, as short-lived arenas, that allocate a handful of small objects +// don't perform well. By re-using arenas between package parses, we can have them live longer and allocate more +// objects. package heap import ( "arena" + "sync" + "time" + + "github.com/thought-machine/please/src/cli/logging" ) +var log = logging.Log + +// Heap represents a memory arena we can use for asp heap allocated objects +type Heap struct { + usages int + lastUsage time.Time + Arena *arena.Arena + mux sync.Mutex +} + +func (h *Heap) free() { + if h.Arena == nil { + return + } + h.Arena.Free() + h.Arena = nil + h.usages = 0 + h.lastUsage = time.Time{} +} + +// Pool is a struct for managing a pool of heaps, freeing them after a set number of usages, or if they've not been used +// for a set duration. This allows the heaps to be freed once we finish parsing. +type Pool struct { + heaps []*Heap + available chan *Heap + heapsMux sync.Mutex + // Ideally this would be based on bytes allocated, but we don't have access to those stats. + UsagesBeforeFree int + IdleTimeUntilFree time.Duration +} + +// NewPool creates a new dynamically sized pool of heaps that can be used to allocated memory during parsing and +// interpreting asp code. +// +// usagesBeforeFree: The number of times a heap will be used before the pool frees the underlying arena. This can be +// +// negative, in which case the arena will never be freed by this heuristic. +// +// idleTimeUntilFree: The duration in which the underlying arena will be freed if this heap is not used. Idle time is +// +// calculated from when the heap was returned to the pool. +func NewPool(size, usagesBeforeFree int, idleTimeUntilFree time.Duration) *Pool { + pool := &Pool{ + heaps: make([]*Heap, size), + UsagesBeforeFree: usagesBeforeFree, + IdleTimeUntilFree: idleTimeUntilFree, + available: make(chan *Heap, size), + } + for i := 0; i < size; i++ { + pool.heaps[i] = new(Heap) + pool.available <- pool.heaps[i] + } + + go pool.freeIdleHeaps() + + return pool +} + +func (p *Pool) freeIdleHeaps() { + t := time.NewTicker(time.Second) + for { + select { + case <-t.C: + for _, h := range p.heaps { + if time.Since(h.lastUsage) < p.IdleTimeUntilFree { + continue + } + if !h.mux.TryLock() { + continue // Something must be using it so it's not idle + } + h.free() + h.mux.Unlock() + } + } + } +} + +func (p *Pool) Get() *Heap { + var heap = <-p.available + heap.mux.Lock() + if heap.Arena == nil { + heap.Arena = arena.NewArena() + } + return heap +} + +func (p *Pool) Return(heap *Heap) { + heap.usages++ + if heap.usages > p.UsagesBeforeFree { + heap.free() + } + heap.mux.Unlock() + p.available <- heap +} + func MakeSlice[T any](a *arena.Arena, len, cap int) []T { if a == nil { return make([]T, len, cap) From 0f94d3ac4a98d82ca39588e955439c865585124d Mon Sep 17 00:00:00 2001 From: Jon Poole Date: Wed, 6 Dec 2023 15:41:46 +0000 Subject: [PATCH 2/3] Heap pooling --- src/core/cycle_detector.go | 6 +++--- src/parse/asp/benchmark_test.go | 15 +++++++-------- src/parse/asp/heap/heap.go | 9 +++++++++ 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/core/cycle_detector.go b/src/core/cycle_detector.go index 0a0c926e97..848ff667a6 100644 --- a/src/core/cycle_detector.go +++ b/src/core/cycle_detector.go @@ -16,7 +16,7 @@ func (c *cycleDetector) Check() *errCycle { if c.stopped { return nil } - log.Debug("Running cycle detection...") + //log.Debug("Running cycle detection...") complete := map[*BuildTarget]struct{}{} partial := map[*BuildTarget]struct{}{} @@ -55,12 +55,12 @@ func (c *cycleDetector) Check() *errCycle { } if _, present := complete[target]; !present { if cycle, _ := visit(target); cycle != nil { - log.Debug("Cycle detection complete, cycle found: %s", cycle) + //log.Debug("Cycle detection complete, cycle found: %s", cycle) return &errCycle{Cycle: cycle} } } } - log.Debug("Cycle detection complete, no cycles found") + //log.Debug("Cycle detection complete, no cycles found") return nil } diff --git a/src/parse/asp/benchmark_test.go b/src/parse/asp/benchmark_test.go index 4811a9674e..856fed2c71 100644 --- a/src/parse/asp/benchmark_test.go +++ b/src/parse/asp/benchmark_test.go @@ -77,7 +77,6 @@ go_test( ` func BenchmarkParse(b *testing.B) { - arena.NewArena().Free() b.ReportAllocs() for i := 0; i < b.N; i++ { @@ -86,7 +85,6 @@ func BenchmarkParse(b *testing.B) { } func BenchmarkParseWithArena(b *testing.B) { - arena.NewArena().Free() b.ReportAllocs() for i := 0; i < b.N; i++ { @@ -119,19 +117,17 @@ func parseInParallel(threads, repeats int, useArena bool) { func BenchmarkParseAndInterpretWithArena(b *testing.B) { b.ReportAllocs() p := newParserWithGo() - arena.NewArena().Free() b.ResetTimer() - parseAndInterpretInParallel(10, b.N*1000, true, p) + parseAndInterpretInParallel(b, 10, b.N*10, true, p) } func BenchmarkParseAndInterpretWithoutArena(b *testing.B) { b.ReportAllocs() p := newParserWithGo() - arena.NewArena().Free() b.ResetTimer() - parseAndInterpretInParallel(10, b.N*1000, false, p) + parseAndInterpretInParallel(b,10, b.N*10, false, p) } func newParserWithGo() *Parser { @@ -183,10 +179,10 @@ func newParserWithGo() *Parser { return p } -func parseAndInterpretInParallel(threads, repeats int, withArena bool, p *Parser) { +func parseAndInterpretInParallel(b *testing.B, threads, repeats int, withArena bool, p *Parser) { wg := new(sync.WaitGroup) wg.Add(threads) - pool := heap.NewPool(threads, -1, time.Second) + pool := heap.NewPool(threads, 250, time.Second) for thread := 0; thread < threads; thread++ { go func(thread int) { for repeat := 0; repeat < repeats; repeat++ { @@ -220,4 +216,7 @@ func parseAndInterpretInParallel(threads, repeats int, withArena bool, p *Parser } wg.Wait() + + b.ReportMetric(float64(pool.Stats.Frees.Load()), "Arena-frees") + b.ReportMetric(float64(pool.Stats.NewArena.Load()), "Arena-creates") } diff --git a/src/parse/asp/heap/heap.go b/src/parse/asp/heap/heap.go index f26679e7af..4ac78689cf 100644 --- a/src/parse/asp/heap/heap.go +++ b/src/parse/asp/heap/heap.go @@ -7,6 +7,7 @@ package heap import ( "arena" "sync" + "sync/atomic" "time" "github.com/thought-machine/please/src/cli/logging" @@ -41,6 +42,12 @@ type Pool struct { // Ideally this would be based on bytes allocated, but we don't have access to those stats. UsagesBeforeFree int IdleTimeUntilFree time.Duration + Stats Stats +} + +type Stats struct { + Frees atomic.Uint64 + NewArena atomic.Uint64 } // NewPool creates a new dynamically sized pool of heaps that can be used to allocated memory during parsing and @@ -94,6 +101,7 @@ func (p *Pool) Get() *Heap { heap.mux.Lock() if heap.Arena == nil { heap.Arena = arena.NewArena() + p.Stats.NewArena.Add(1) } return heap } @@ -102,6 +110,7 @@ func (p *Pool) Return(heap *Heap) { heap.usages++ if heap.usages > p.UsagesBeforeFree { heap.free() + p.Stats.Frees.Add(1) } heap.mux.Unlock() p.available <- heap From 0071131435922bb13bf0c31622c51bb52c546150 Mon Sep 17 00:00:00 2001 From: Jon Poole Date: Wed, 6 Dec 2023 16:07:31 +0000 Subject: [PATCH 3/3] Add gc and mem stats --- src/parse/asp/benchmark_test.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/parse/asp/benchmark_test.go b/src/parse/asp/benchmark_test.go index 856fed2c71..167897e343 100644 --- a/src/parse/asp/benchmark_test.go +++ b/src/parse/asp/benchmark_test.go @@ -9,6 +9,8 @@ import ( "github.com/thought-machine/please/src/parse/asp/heap" "os" "path/filepath" + "runtime" + "runtime/debug" "sync" "testing" "time" @@ -182,7 +184,7 @@ func newParserWithGo() *Parser { func parseAndInterpretInParallel(b *testing.B, threads, repeats int, withArena bool, p *Parser) { wg := new(sync.WaitGroup) wg.Add(threads) - pool := heap.NewPool(threads, 250, time.Second) + pool := heap.NewPool(threads, 10, time.Second) for thread := 0; thread < threads; thread++ { go func(thread int) { for repeat := 0; repeat < repeats; repeat++ { @@ -217,6 +219,15 @@ func parseAndInterpretInParallel(b *testing.B, threads, repeats int, withArena b wg.Wait() + + gcStats := debug.GCStats{} + memStats := runtime.MemStats{} + debug.ReadGCStats(&gcStats) + runtime.ReadMemStats(&memStats) + b.ReportMetric(float64(pool.Stats.Frees.Load()), "Arena-frees") b.ReportMetric(float64(pool.Stats.NewArena.Load()), "Arena-creates") + b.ReportMetric(float64(memStats.Sys)/1024.0/1024.0/1024.0, "In-use") + b.ReportMetric(float64(gcStats.NumGC), "GCs") + }