-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmulti_coster.go
More file actions
58 lines (51 loc) · 1.19 KB
/
multi_coster.go
File metadata and controls
58 lines (51 loc) · 1.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package infra_sdk
import (
"context"
"errors"
"sync"
)
// MultiCoster runs cost queries against multiple costers
// Results are combined into a single CostResult
type MultiCoster struct {
Costers []Coster
}
type costerResult struct {
costResult *CostResult
err error
}
func (c *MultiCoster) GetCosts(ctx context.Context, query CostQuery) (*CostResult, error) {
results := make(chan costerResult, len(c.Costers))
var wg sync.WaitGroup
// Run each coster concurrently
for _, cur := range c.Costers {
wg.Add(1)
go func(coster Coster) {
defer wg.Done()
costResult, err := coster.GetCosts(ctx, query)
results <- costerResult{
costResult: costResult,
err: err,
}
}(cur)
}
// Wait for all costers to finish
go func() {
wg.Wait()
close(results)
}()
// Combine results real-time
var errs []error
combinedResult := NewCostResult()
for res := range results {
if res.err != nil {
errs = append(errs, res.err)
continue
}
for _, series := range res.costResult.Series {
for _, point := range series.Points {
combinedResult.MergeDatapoint(series.MetricName, series.GroupKeys, point)
}
}
}
return combinedResult, errors.Join(errs...)
}