Skip to content

Commit 1d5a987

Browse files
Rich-T-kidRichard Baah
authored andcommitted
Merge pull request #49 from Rich-T-kid/garbage-collection-thread
Feat: Implement garbage collection for s3
2 parents 2e92fb9 + 3290029 commit 1d5a987

File tree

6 files changed

+109
-8
lines changed

6 files changed

+109
-8
lines changed

README.md

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,4 @@ docker buildx build \
189189
--push \
190190
.
191191

192-
# ! TODO:
193-
194-
# fix aliasing (group_by,universal_Aggrs,projects)
195-
196-
# push up to ec2 instance to run parser
197-
198-
# fix the casting issues for taking in floats/ints (cast ints to floats if needed)
192+
# TODO: remove env stuff

src/Backend/opti-sql-go/config/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type serverConfig struct {
2828
Host string `yaml:"host"`
2929
Timeout int `yaml:"timeout"`
3030
MaxRequestSizeMB uint64 `yaml:"max_request_size_mb"` // max size of a file upload. passed in by grpc request
31+
RedisAddr string `yaml:"redis_addr"`
3132
}
3233
type batchConfig struct {
3334
Size int `yaml:"size"`
@@ -68,6 +69,7 @@ var configInstance *Config = &Config{
6869
Host: "0.0.0.0",
6970
Timeout: 30,
7071
MaxRequestSizeMB: 15,
72+
RedisAddr: "104.236.210.9",
7173
},
7274
Batch: batchConfig{
7375
Size: 1024 * 8, // rows per bathch
@@ -151,6 +153,9 @@ func mergeConfig(dst *Config, src map[string]interface{}) {
151153
if v, ok := server["max_request_size_mb"].(int); ok {
152154
dst.Server.MaxRequestSizeMB = uint64(v)
153155
}
156+
if v, ok := server["redis_addr"].(string); ok {
157+
dst.Server.RedisAddr = v
158+
}
154159
}
155160

156161
// =============================

src/Backend/opti-sql-go/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ require (
2727
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.13 // indirect
2828
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.13 // indirect
2929
github.com/aws/smithy-go v1.23.2 // indirect
30+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
31+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
3032
github.com/go-ini/ini v1.67.0 // indirect
3133
github.com/go-jose/go-jose/v4 v4.1.3 // indirect
3234
github.com/goccy/go-json v0.10.3 // indirect
@@ -41,6 +43,7 @@ require (
4143
github.com/minio/minio-go v6.0.14+incompatible // indirect
4244
github.com/mitchellh/go-homedir v1.1.0 // indirect
4345
github.com/pierrec/lz4/v4 v4.1.21 // indirect
46+
github.com/redis/go-redis/v9 v9.17.3 // indirect
4447
github.com/zeebo/xxh3 v1.0.2 // indirect
4548
go.uber.org/multierr v1.11.0 // indirect
4649
go.uber.org/zap v1.27.1 // indirect

src/Backend/opti-sql-go/go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,13 @@ github.com/aws/aws-sdk-go-v2/service/s3 v1.90.2 h1:DhdbtDl4FdNlj31+xiRXANxEE+eC7
3232
github.com/aws/aws-sdk-go-v2/service/s3 v1.90.2/go.mod h1:+wArOOrcHUevqdto9k1tKOF5++YTe9JEcPSc9Tx2ZSw=
3333
github.com/aws/smithy-go v1.23.2 h1:Crv0eatJUQhaManss33hS5r40CG3ZFH+21XSkqMrIUM=
3434
github.com/aws/smithy-go v1.23.2/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0=
35+
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
36+
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
3537
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3638
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
3739
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
40+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
41+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
3842
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
3943
github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
4044
github.com/go-jose/go-jose/v4 v4.1.3 h1:CVLmWDhDVRa6Mi/IgCgaopNosCaHz7zrMeF9MlZRkrs=
@@ -70,6 +74,8 @@ github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ
7074
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
7175
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
7276
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
77+
github.com/redis/go-redis/v9 v9.17.3 h1:fN29NdNrE17KttK5Ndf20buqfDZwGNgoUr9qjl1DQx4=
78+
github.com/redis/go-redis/v9 v9.17.3/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
7379
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
7480
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
7581
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package substrait
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"opti-sql-go/config"
7+
"time"
8+
9+
"github.com/minio/minio-go"
10+
"github.com/redis/go-redis/v9"
11+
"go.uber.org/zap"
12+
)
13+
14+
// garbage collection for removing files from s3 storage after expiration
15+
var dontTouchTestFiles = []string{"country_full.csv", "userdata.parquet", "example.txt", "random_test"}
16+
17+
const ignoreFolder = "result-file-cache"
18+
const loggerPrefix = "Garbage-Collection"
19+
const waitTime = time.Second * 5
20+
21+
func garbageCollection() {
22+
logger := config.GetLogger()
23+
logger.Info(fmt.Sprintf("[%v]starting garbage collection, won't touch these files %v", loggerPrefix, dontTouchTestFiles))
24+
config := config.GetConfig()
25+
redisInstance := redis.NewClient(&redis.Options{
26+
Addr: config.Server.RedisAddr + ":6379",
27+
Password: "", // no password
28+
DB: 0, // use default DB
29+
Protocol: 2,
30+
})
31+
secretes := config.Secretes
32+
accessKey := secretes.AccessKey
33+
secretKey := secretes.SecretKey
34+
endpoint := secretes.EndpointURL
35+
bucket := secretes.BucketName
36+
useSSL := true
37+
38+
client, err := minio.New(endpoint, accessKey, secretKey, useSSL)
39+
if err != nil {
40+
logger.Fatal("failed to construct s3 client to delete old files", zap.String("error message", fmt.Sprintf("%v", err)))
41+
}
42+
var failedAttempts = 0
43+
for {
44+
start:
45+
if failedAttempts > 5 {
46+
logger.Warn("removing files has failed over 5 times, check redis and s3 for issues !!!")
47+
}
48+
fmt.Printf("waiting %v minutes before check for files to clear from s3", waitTime.Minutes())
49+
time.Sleep(waitTime)
50+
start := time.Now()
51+
entries, err := redisInstance.LRange(context.TODO(), ignoreFolder, 0, -1).Result()
52+
if err != nil {
53+
logger.Error(fmt.Sprintf("failed to read in files from %v", ignoreFolder), zap.Int("fail counter", failedAttempts))
54+
failedAttempts++
55+
goto start // try again
56+
}
57+
// read all the files in s3
58+
doneChan := make(chan struct{})
59+
readCount := 0
60+
var nonValidFiles []string
61+
validMap := buildMap(dontTouchTestFiles, entries)
62+
for fileName := range client.ListObjects(bucket, "", true, doneChan) {
63+
if !validMap[fileName.Key] {
64+
nonValidFiles = append(nonValidFiles, fileName.Key)
65+
}
66+
readCount++
67+
}
68+
var removedFiles = 0
69+
for _, invalidFile := range nonValidFiles {
70+
err := client.RemoveObject(bucket, invalidFile)
71+
if err != nil {
72+
logger.Warn(fmt.Sprintf("error removing %v from s3: %v", invalidFile, err))
73+
// log and move on
74+
} else {
75+
removedFiles++
76+
}
77+
}
78+
failedAttempts = 0 // reset failed attempts back to zero
79+
logger.Info("Garbage Collection metrics", zap.Any("to-keep map", validMap), zap.Int("total-files count", readCount), zap.Int("removed-files count", removedFiles), zap.Any("time-taken", time.Since(start)))
80+
81+
}
82+
83+
}
84+
func buildMap(source1 []string, source2 []string) map[string]bool {
85+
result := make(map[string]bool)
86+
for _, k := range source1 {
87+
result[k] = true
88+
}
89+
for _, k := range source2 {
90+
result[k] = true
91+
}
92+
return result
93+
}

src/Backend/opti-sql-go/substrait/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,9 @@ func Start() chan struct{} {
181181
RegisterSSOperationServer(grpcServer, ss)
182182

183183
stopChan := make(chan struct{})
184-
185184
log.Printf("Substrait server listening on port %d", c.Server.Port)
186185
go unifiedShutdownHandler(ss, grpcServer, stopChan)
186+
go garbageCollection()
187187
go func() {
188188
if err := grpcServer.Serve(*ss.listener); err != nil {
189189
log.Fatalf("Failed to serve: %v", err)

0 commit comments

Comments
 (0)