From f3c8bcf33e41cf5e966419fb9c2a1834b7c9878e Mon Sep 17 00:00:00 2001 From: DSurnin Date: Thu, 5 Mar 2020 04:00:39 -0700 Subject: [PATCH 01/20] Update readme with initial repo addr; Add head-object request; Add avg time; --- README.md | 6 ++++++ s3bench.go | 53 +++++++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 53 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index f0939b8..3e5af4a 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,9 @@ +# Initial +Cloned from +``` +https://github.com/igneous-systems/s3bench.git +``` + # S3 Bench This tool offers the ability to run very basic throughput benchmarking against an S3-compatible endpoint. It does a series of put operations followed by a diff --git a/s3bench.go b/s3bench.go index 76af88f..7c55cce 100644 --- a/s3bench.go +++ b/s3bench.go @@ -21,6 +21,7 @@ import ( const ( opRead = "Read" opWrite = "Write" + opHeadObj = "HeadObj" //max that can be deleted at a time via DeleteObjects() commitSize = 1000 ) @@ -39,6 +40,7 @@ func main() { numSamples := flag.Int("numSamples", 200, "total number of requests to send") skipCleanup := flag.Bool("skipCleanup", false, "skip deleting objects created by this tool at the end of the run") verbose := flag.Bool("verbose", false, "print verbose per thread status") + metaData := flag.Bool("metaData", false, "read obj metadata instead of obj itself") flag.Parse() @@ -64,6 +66,7 @@ func main() { bucketName: *bucketName, endpoints: strings.Split(*endpoint, ","), verbose: *verbose, + metaData: *metaData, } fmt.Println(params) fmt.Println() @@ -92,9 +95,16 @@ func main() { writeResult := params.Run(opWrite) fmt.Println() - fmt.Printf("Running %s test...\n", opRead) - readResult := params.Run(opRead) - fmt.Println() + var readResult = Result{} + if params.metaData { + fmt.Printf("Running %s test...\n", opHeadObj) + readResult = params.Run(opHeadObj) + fmt.Println() + } else { + fmt.Printf("Running %s test...\n", opRead) + readResult = params.Run(opRead) + fmt.Println() + } // Repeating the parameters of the test followed by the results fmt.Println(params) @@ -187,6 +197,11 @@ func (params *Params) submitLoad(op string) { Bucket: bucket, Key: key, } + } else if op == opHeadObj { + params.requests <- &s3.HeadObjectInput{ + Bucket: bucket, + Key: key, + } } else { panic("Developer error") } @@ -225,6 +240,16 @@ func (params *Params) startClient(cfg *aws.Config) { if numBytes != params.objectSize { err = fmt.Errorf("expected object length %d, actual %d", params.objectSize, numBytes) } + case *s3.HeadObjectInput: + req, resp := svc.HeadObjectRequest(r) + err = req.Send() + numBytes = 0 + if err == nil { + numBytes = *resp.ContentLength + } + if numBytes != params.objectSize { + err = fmt.Errorf("expected object length %d, actual %d, resp %v", params.objectSize, numBytes, resp) + } default: panic("Developer error") } @@ -245,6 +270,7 @@ type Params struct { bucketName string endpoints []string verbose bool + metaData bool } func (params Params) String() string { @@ -256,6 +282,7 @@ func (params Params) String() string { output += fmt.Sprintf("numClients: %d\n", params.numClients) output += fmt.Sprintf("numSamples: %d\n", params.numSamples) output += fmt.Sprintf("verbose: %d\n", params.verbose) + output += fmt.Sprintf("metaData: %d\n", params.metaData) return output } @@ -270,19 +297,24 @@ type Result struct { func (r Result) String() string { report := fmt.Sprintf("Results Summary for %s Operation(s)\n", r.operation) - report += fmt.Sprintf("Total Transferred: %0.3f MB\n", float64(r.bytesTransmitted)/(1024*1024)) - report += fmt.Sprintf("Total Throughput: %0.2f MB/s\n", (float64(r.bytesTransmitted)/(1024*1024))/r.totalDuration.Seconds()) + if r.operation == opHeadObj { + report += fmt.Sprintf("Total Reqs: %d\n", len(r.opDurations)) + } else { + report += fmt.Sprintf("Total Transferred: %0.3f MB\n", float64(r.bytesTransmitted)/(1024*1024)) + report += fmt.Sprintf("Total Throughput: %0.2f MB/s\n", (float64(r.bytesTransmitted)/(1024*1024))/r.totalDuration.Seconds()) + } report += fmt.Sprintf("Total Duration: %0.3f s\n", r.totalDuration.Seconds()) report += fmt.Sprintf("Number of Errors: %d\n", r.numErrors) if len(r.opDurations) > 0 { report += fmt.Sprintln("------------------------------------") + report += fmt.Sprintf("%s times Avg: %0.3f s\n", r.operation, r.avg()) report += fmt.Sprintf("%s times Max: %0.3f s\n", r.operation, r.percentile(100)) + report += fmt.Sprintf("%s times Min: %0.3f s\n", r.operation, r.percentile(0)) report += fmt.Sprintf("%s times 99th %%ile: %0.3f s\n", r.operation, r.percentile(99)) report += fmt.Sprintf("%s times 90th %%ile: %0.3f s\n", r.operation, r.percentile(90)) report += fmt.Sprintf("%s times 75th %%ile: %0.3f s\n", r.operation, r.percentile(75)) report += fmt.Sprintf("%s times 50th %%ile: %0.3f s\n", r.operation, r.percentile(50)) report += fmt.Sprintf("%s times 25th %%ile: %0.3f s\n", r.operation, r.percentile(25)) - report += fmt.Sprintf("%s times Min: %0.3f s\n", r.operation, r.percentile(0)) } return report } @@ -296,6 +328,15 @@ func (r Result) percentile(i int) float64 { return r.opDurations[i] } +func (r Result) avg() float64 { + ln := float64(len(r.opDurations)) + sm := float64(0) + for _, el := range r.opDurations { + sm += el + } + return sm / ln +} + type Req interface{} type Resp struct { From 601cd7dd02a5cdf9d3f2267b2023dccd5211bae3 Mon Sep 17 00:00:00 2001 From: DSurnin Date: Thu, 5 Mar 2020 04:07:52 -0700 Subject: [PATCH 02/20] Update readme with new command description --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index 3e5af4a..5f86097 100644 --- a/README.md +++ b/README.md @@ -131,3 +131,10 @@ Put times 50th %ile: 0.001 s Put times 25th %ile: 0.001 s Put times Min: 0.001 s ``` + +##### Head-object +It is possible to send head-object requests instead of get-object. +For this purpose one sould use *-metaData* flag +``` +./s3bench -accessKey=KEY -accessSecret=SECRET -bucket=loadgen -endpoint=http://endpoint1:80 -numClients=2 -numSamples=10 -objectNamePrefix=loadgen -objectSize=1024 -metaData +``` From 7a92a369db7bab3a451620a7e48dadc23c76bfc7 Mon Sep 17 00:00:00 2001 From: DSurnin Date: Tue, 10 Mar 2020 08:02:56 -0600 Subject: [PATCH 03/20] Create bucket if not exists; Object size support Gb, Mb, Kb, b specifiers; Number of reads for each file; Ttfb - time to first byte - metrics; --- s3bench.go | 145 ++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 120 insertions(+), 25 deletions(-) diff --git a/s3bench.go b/s3bench.go index 7c55cce..034454b 100644 --- a/s3bench.go +++ b/s3bench.go @@ -11,6 +11,8 @@ import ( "sort" "strings" "time" + "regexp" + "strconv" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -28,6 +30,47 @@ const ( var bufferBytes []byte +// true if created +// false if existed +func (params *Params) prepareBucket(cfg *aws.Config) bool { + cfg.Endpoint = aws.String(params.endpoints[0]) + svc := s3.New(session.New(), cfg) + req, _ := svc.CreateBucketRequest( + &s3.CreateBucketInput{Bucket: aws.String(params.bucketName)}) + + err := req.Send() + + if err == nil { + return true + } else if !strings.Contains(err.Error(), "BucketAlreadyOwnedByYou:") { + panic("Failed to create bucket: " + err.Error()) + } + + return false +} + +func parse_size(sz string) int64 { + sizes := map[string]int64 { + "b": 1, + "Kb": 1024, + "Mb": 1024 * 1024, + "Gb": 1024 * 1024 * 1024, + } + re := regexp.MustCompile(`^(\d+)([bKMG]{1,2})$`) + mm := re.FindStringSubmatch(sz) + if len(mm) != 3 { + fmt.Printf("Invalid objectSize value format\n") + os.Exit(1) + } + val, err := strconv.ParseInt(string(mm[1]), 10, 64) + mult, ex := sizes[string(mm[2])] + if !ex || err != nil { + fmt.Printf("Invalid objectSize value\n") + os.Exit(1) + } + return val * mult +} + func main() { endpoint := flag.String("endpoint", "", "S3 endpoint(s) comma separated - http://IP:PORT,http://IP:PORT") region := flag.String("region", "igneous-test", "AWS region to use, eg: us-west-1|us-east-1, etc") @@ -35,12 +78,13 @@ func main() { accessSecret := flag.String("accessSecret", "", "the S3 access secret") bucketName := flag.String("bucket", "bucketname", "the bucket for which to run the test") objectNamePrefix := flag.String("objectNamePrefix", "loadgen_test_", "prefix of the object name that will be used") - objectSize := flag.Int64("objectSize", 80*1024*1024, "size of individual requests in bytes (must be smaller than main memory)") + objectSize := flag.String("objectSize", "80Mb", "size of individual requests (must be smaller than main memory)") numClients := flag.Int("numClients", 40, "number of concurrent clients") numSamples := flag.Int("numSamples", 200, "total number of requests to send") skipCleanup := flag.Bool("skipCleanup", false, "skip deleting objects created by this tool at the end of the run") verbose := flag.Bool("verbose", false, "print verbose per thread status") metaData := flag.Bool("metaData", false, "read obj metadata instead of obj itself") + sampleReads := flag.Int("sampleReads", 1, "number of reads of each sample") flag.Parse() @@ -59,14 +103,15 @@ func main() { params := Params{ requests: make(chan Req), responses: make(chan Resp), - numSamples: *numSamples, + numSamples: uint(*numSamples), numClients: uint(*numClients), - objectSize: *objectSize, + objectSize: parse_size(*objectSize), objectNamePrefix: *objectNamePrefix, bucketName: *bucketName, endpoints: strings.Split(*endpoint, ","), verbose: *verbose, metaData: *metaData, + sampleReads: uint(*sampleReads), } fmt.Println(params) fmt.Println() @@ -74,7 +119,7 @@ func main() { // Generate the data from which we will do the writting fmt.Printf("Generating in-memory sample data... ") timeGenData := time.Now() - bufferBytes = make([]byte, *objectSize, *objectSize) + bufferBytes = make([]byte, params.objectSize, params.objectSize) _, err := rand.Read(bufferBytes) if err != nil { fmt.Printf("Could not allocate a buffer") @@ -89,6 +134,9 @@ func main() { Region: aws.String(*region), S3ForcePathStyle: aws.Bool(true), } + + bucket_created := params.prepareBucket(cfg) + params.StartClients(cfg) fmt.Printf("Running %s test...\n", opWrite) @@ -147,6 +195,18 @@ func main() { } } fmt.Printf("Successfully deleted %d/%d objects in %s\n", numSuccessfullyDeleted, *numSamples, time.Since(delStartTime)) + + if bucket_created { + fmt.Printf("Deleting bucket...\n") + params := &s3.DeleteBucketInput{ + Bucket: aws.String(*bucketName)} + _, err := svc.DeleteBucket(params) + if err == nil { + fmt.Printf("Succeeded\n") + } else { + fmt.Printf("Failed (%v)\n", err) + } + } } } @@ -156,9 +216,10 @@ func (params *Params) Run(op string) Result { // Start submitting load requests go params.submitLoad(op) + opSamples := params.spo(op) // Collect and aggregate stats for completed requests - result := Result{opDurations: make([]float64, 0, params.numSamples), operation: op} - for i := 0; i < params.numSamples; i++ { + result := Result{opDurations: make([]float64, 0, opSamples), operation: op} + for i := uint(0); i < opSamples; i++ { resp := <-params.responses errorString := "" if resp.err != nil { @@ -167,6 +228,7 @@ func (params *Params) Run(op string) Result { } else { result.bytesTransmitted = result.bytesTransmitted + params.objectSize result.opDurations = append(result.opDurations, resp.duration.Seconds()) + result.opTtfb = append(result.opTtfb, resp.ttfb.Seconds()) } if params.verbose { fmt.Printf("%v operation completed in %0.2fs (%d/%d) - %0.2fMB/s%s\n", @@ -178,14 +240,16 @@ func (params *Params) Run(op string) Result { result.totalDuration = time.Since(startTime) sort.Float64s(result.opDurations) + sort.Float64s(result.opTtfb) return result } // Create an individual load request and submit it to the client queue func (params *Params) submitLoad(op string) { bucket := aws.String(params.bucketName) - for i := 0; i < params.numSamples; i++ { - key := aws.String(fmt.Sprintf("%s%d", params.objectNamePrefix, i)) + opSamples := params.spo(op) + for i := uint(0); i < opSamples; i++ { + key := aws.String(fmt.Sprintf("%s%d", params.objectNamePrefix, i % params.numSamples)) if op == opWrite { params.requests <- &s3.PutObjectInput{ Bucket: bucket, @@ -221,6 +285,7 @@ func (params *Params) startClient(cfg *aws.Config) { svc := s3.New(session.New(), cfg) for request := range params.requests { putStartTime := time.Now() + var ttfb time.Duration var err error numBytes := params.objectSize @@ -230,9 +295,11 @@ func (params *Params) startClient(cfg *aws.Config) { // Disable payload checksum calculation (very expensive) req.HTTPRequest.Header.Add("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD") err = req.Send() + ttfb = time.Since(putStartTime) case *s3.GetObjectInput: req, resp := svc.GetObjectRequest(r) err = req.Send() + ttfb = time.Since(putStartTime) numBytes = 0 if err == nil { numBytes, err = io.Copy(ioutil.Discard, resp.Body) @@ -243,6 +310,7 @@ func (params *Params) startClient(cfg *aws.Config) { case *s3.HeadObjectInput: req, resp := svc.HeadObjectRequest(r) err = req.Send() + ttfb = time.Since(putStartTime) numBytes = 0 if err == nil { numBytes = *resp.ContentLength @@ -254,7 +322,7 @@ func (params *Params) startClient(cfg *aws.Config) { panic("Developer error") } - params.responses <- Resp{err, time.Since(putStartTime), numBytes} + params.responses <- Resp{err, time.Since(putStartTime), numBytes, ttfb} } } @@ -263,7 +331,7 @@ type Params struct { operation string requests chan Req responses chan Resp - numSamples int + numSamples uint numClients uint objectSize int64 objectNamePrefix string @@ -271,6 +339,7 @@ type Params struct { endpoints []string verbose bool metaData bool + sampleReads uint } func (params Params) String() string { @@ -281,6 +350,7 @@ func (params Params) String() string { output += fmt.Sprintf("objectSize: %0.4f MB\n", float64(params.objectSize)/(1024*1024)) output += fmt.Sprintf("numClients: %d\n", params.numClients) output += fmt.Sprintf("numSamples: %d\n", params.numSamples) + output += fmt.Sprintf("sampleReads: %d\n", params.sampleReads) output += fmt.Sprintf("verbose: %d\n", params.verbose) output += fmt.Sprintf("metaData: %d\n", params.metaData) return output @@ -293,6 +363,7 @@ type Result struct { numErrors int opDurations []float64 totalDuration time.Duration + opTtfb []float64 } func (r Result) String() string { @@ -305,33 +376,56 @@ func (r Result) String() string { } report += fmt.Sprintf("Total Duration: %0.3f s\n", r.totalDuration.Seconds()) report += fmt.Sprintf("Number of Errors: %d\n", r.numErrors) + if len(r.opDurations) > 0 { report += fmt.Sprintln("------------------------------------") - report += fmt.Sprintf("%s times Avg: %0.3f s\n", r.operation, r.avg()) - report += fmt.Sprintf("%s times Max: %0.3f s\n", r.operation, r.percentile(100)) - report += fmt.Sprintf("%s times Min: %0.3f s\n", r.operation, r.percentile(0)) - report += fmt.Sprintf("%s times 99th %%ile: %0.3f s\n", r.operation, r.percentile(99)) - report += fmt.Sprintf("%s times 90th %%ile: %0.3f s\n", r.operation, r.percentile(90)) - report += fmt.Sprintf("%s times 75th %%ile: %0.3f s\n", r.operation, r.percentile(75)) - report += fmt.Sprintf("%s times 50th %%ile: %0.3f s\n", r.operation, r.percentile(50)) - report += fmt.Sprintf("%s times 25th %%ile: %0.3f s\n", r.operation, r.percentile(25)) + report += fmt.Sprintf("%s times Avg: %0.3f s\n", r.operation, avg(r.opDurations)) + report += fmt.Sprintf("%s times Max: %0.3f s\n", r.operation, percentile(r.opDurations, 100)) + report += fmt.Sprintf("%s times Min: %0.3f s\n", r.operation, percentile(r.opDurations, 0)) + report += fmt.Sprintf("%s times 99th %%ile: %0.3f s\n", r.operation, percentile(r.opDurations, 99)) + report += fmt.Sprintf("%s times 90th %%ile: %0.3f s\n", r.operation, percentile(r.opDurations, 90)) + report += fmt.Sprintf("%s times 75th %%ile: %0.3f s\n", r.operation, percentile(r.opDurations, 75)) + report += fmt.Sprintf("%s times 50th %%ile: %0.3f s\n", r.operation, percentile(r.opDurations, 50)) + report += fmt.Sprintf("%s times 25th %%ile: %0.3f s\n", r.operation, percentile(r.opDurations, 25)) + } + + if len(r.opTtfb) > 0 { + report += fmt.Sprintln("------------------------------------") + report += fmt.Sprintf("%s ttfb Avg: %0.3f s\n", r.operation, avg(r.opTtfb)) + report += fmt.Sprintf("%s ttfb Max: %0.3f s\n", r.operation, percentile(r.opTtfb, 100)) + report += fmt.Sprintf("%s ttfb Min: %0.3f s\n", r.operation, percentile(r.opTtfb, 0)) + report += fmt.Sprintf("%s ttfb 99th %%ile: %0.3f s\n", r.operation, percentile(r.opTtfb, 99)) + report += fmt.Sprintf("%s ttfb 90th %%ile: %0.3f s\n", r.operation, percentile(r.opTtfb, 90)) + report += fmt.Sprintf("%s ttfb 75th %%ile: %0.3f s\n", r.operation, percentile(r.opTtfb, 75)) + report += fmt.Sprintf("%s ttfb 50th %%ile: %0.3f s\n", r.operation, percentile(r.opTtfb, 50)) + report += fmt.Sprintf("%s ttfb 25th %%ile: %0.3f s\n", r.operation, percentile(r.opTtfb, 25)) } return report } -func (r Result) percentile(i int) float64 { +// samples per operation +func (params Params) spo(op string) uint { + if op == opWrite { + return params.numSamples + } + + return params.numSamples * params.sampleReads +} + +func percentile(dt []float64, i int) float64 { + ln := len(dt) if i >= 100 { - i = len(r.opDurations) - 1 + i = ln - 1 } else if i > 0 && i < 100 { - i = int(float64(i) / 100 * float64(len(r.opDurations))) + i = int(float64(i) / 100 * float64(ln)) } - return r.opDurations[i] + return dt[i] } -func (r Result) avg() float64 { - ln := float64(len(r.opDurations)) +func avg(dt []float64) float64 { + ln := float64(len(dt)) sm := float64(0) - for _, el := range r.opDurations { + for _, el := range dt { sm += el } return sm / ln @@ -343,4 +437,5 @@ type Resp struct { err error duration time.Duration numBytes int64 + ttfb time.Duration } From aaa42ba4f3ddb7303cb882019aafb6812346e1fd Mon Sep 17 00:00:00 2001 From: DSurnin Date: Tue, 10 Mar 2020 08:19:46 -0600 Subject: [PATCH 04/20] Add one more error check to detect folder exists; --- s3bench.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/s3bench.go b/s3bench.go index 034454b..b21e233 100644 --- a/s3bench.go +++ b/s3bench.go @@ -42,7 +42,8 @@ func (params *Params) prepareBucket(cfg *aws.Config) bool { if err == nil { return true - } else if !strings.Contains(err.Error(), "BucketAlreadyOwnedByYou:") { + } else if !strings.Contains(err.Error(), "BucketAlreadyOwnedByYou:") && + !strings.Contains(err.Error(), "BucketAlreadyExists:") { panic("Failed to create bucket: " + err.Error()) } From e988426d0cf27731aa29edba2e89ed124d60d941 Mon Sep 17 00:00:00 2001 From: DSurnin Date: Wed, 11 Mar 2020 05:11:00 -0600 Subject: [PATCH 05/20] Add clientDelay param - time in ms before each client start; --- s3bench.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/s3bench.go b/s3bench.go index b21e233..b08b7f7 100644 --- a/s3bench.go +++ b/s3bench.go @@ -13,6 +13,7 @@ import ( "time" "regexp" "strconv" + mathrand "math/rand" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -86,6 +87,7 @@ func main() { verbose := flag.Bool("verbose", false, "print verbose per thread status") metaData := flag.Bool("metaData", false, "read obj metadata instead of obj itself") sampleReads := flag.Int("sampleReads", 1, "number of reads of each sample") + clientDelay := flag.Int("clientDelay", 1, "delay in ms before client starts. if negative value provided delay will be randomized in interval [0, abs{clientDelay})") flag.Parse() @@ -113,6 +115,7 @@ func main() { verbose: *verbose, metaData: *metaData, sampleReads: uint(*sampleReads), + clientDelay: *clientDelay, } fmt.Println(params) fmt.Println() @@ -277,7 +280,13 @@ func (params *Params) StartClients(cfg *aws.Config) { for i := 0; i < int(params.numClients); i++ { cfg.Endpoint = aws.String(params.endpoints[i%len(params.endpoints)]) go params.startClient(cfg) - time.Sleep(1 * time.Millisecond) + if params.clientDelay > 0 { + time.Sleep(time.Duration(params.clientDelay) * + time.Millisecond) + } else if params.clientDelay < 0 { + time.Sleep(time.Duration(mathrand.Intn(-params.clientDelay)) * + time.Millisecond) + } } } @@ -341,6 +350,7 @@ type Params struct { verbose bool metaData bool sampleReads uint + clientDelay int } func (params Params) String() string { @@ -354,6 +364,7 @@ func (params Params) String() string { output += fmt.Sprintf("sampleReads: %d\n", params.sampleReads) output += fmt.Sprintf("verbose: %d\n", params.verbose) output += fmt.Sprintf("metaData: %d\n", params.metaData) + output += fmt.Sprintf("clientDelay: %d\n", params.clientDelay) return output } From f5812ab9886fcd6fac55d3332e9c7a98e259b5e6 Mon Sep 17 00:00:00 2001 From: DSurnin Date: Sat, 21 Mar 2020 04:18:27 -0600 Subject: [PATCH 06/20] Add json format output do not print anything in non verbose mode --- s3bench.go | 284 ++++++++++++++++++++++++++++++++--------------------- 1 file changed, 170 insertions(+), 114 deletions(-) diff --git a/s3bench.go b/s3bench.go index b08b7f7..c613c79 100644 --- a/s3bench.go +++ b/s3bench.go @@ -14,6 +14,7 @@ import ( "regexp" "strconv" mathrand "math/rand" + "encoding/json" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -25,6 +26,7 @@ const ( opRead = "Read" opWrite = "Write" opHeadObj = "HeadObj" + opDelete = "DeleteObj" //max that can be deleted at a time via DeleteObjects() commitSize = 1000 ) @@ -88,6 +90,7 @@ func main() { metaData := flag.Bool("metaData", false, "read obj metadata instead of obj itself") sampleReads := flag.Int("sampleReads", 1, "number of reads of each sample") clientDelay := flag.Int("clientDelay", 1, "delay in ms before client starts. if negative value provided delay will be randomized in interval [0, abs{clientDelay})") + jsonOutput := flag.Bool("jsonOutput", false, "print results in forma of json") flag.Parse() @@ -116,12 +119,11 @@ func main() { metaData: *metaData, sampleReads: uint(*sampleReads), clientDelay: *clientDelay, + jsonOutput: *jsonOutput, } - fmt.Println(params) - fmt.Println() // Generate the data from which we will do the writting - fmt.Printf("Generating in-memory sample data... ") + params.printf("Generating in-memory sample data...\n") timeGenData := time.Now() bufferBytes = make([]byte, params.objectSize, params.objectSize) _, err := rand.Read(bufferBytes) @@ -129,8 +131,7 @@ func main() { fmt.Printf("Could not allocate a buffer") os.Exit(1) } - fmt.Printf("Done (%s)\n", time.Since(timeGenData)) - fmt.Println() + params.printf("Done (%s)\n", time.Since(timeGenData)) // Start the load clients and run a write test followed by a read test cfg := &aws.Config{ @@ -143,75 +144,39 @@ func main() { params.StartClients(cfg) - fmt.Printf("Running %s test...\n", opWrite) - writeResult := params.Run(opWrite) - fmt.Println() + testResults := make([]Result, 0, 3) + + params.printf("Running %s test...\n", opWrite) + testResults = append(testResults, params.Run(opWrite)) - var readResult = Result{} if params.metaData { - fmt.Printf("Running %s test...\n", opHeadObj) - readResult = params.Run(opHeadObj) - fmt.Println() + params.printf("Running %s test...\n", opHeadObj) + testResults = append(testResults, params.Run(opHeadObj)) } else { - fmt.Printf("Running %s test...\n", opRead) - readResult = params.Run(opRead) - fmt.Println() + params.printf("Running %s test...\n", opRead) + testResults = append(testResults, params.Run(opHeadObj)) } - // Repeating the parameters of the test followed by the results - fmt.Println(params) - fmt.Println() - fmt.Println(writeResult) - fmt.Println() - fmt.Println(readResult) + if !*skipCleanup { + params.printf("Running %s test...\n", opDelete) + testResults = append(testResults, params.Run(opDelete)) + } // Do cleanup if required - if !*skipCleanup { - fmt.Println() - fmt.Printf("Cleaning up %d objects...\n", *numSamples) - delStartTime := time.Now() + if !*skipCleanup && bucket_created { svc := s3.New(session.New(), cfg) - numSuccessfullyDeleted := 0 - - keyList := make([]*s3.ObjectIdentifier, 0, commitSize) - for i := 0; i < *numSamples; i++ { - bar := s3.ObjectIdentifier{ - Key: aws.String(fmt.Sprintf("%s%d", *objectNamePrefix, i)), - } - keyList = append(keyList, &bar) - if len(keyList) == commitSize || i == *numSamples-1 { - fmt.Printf("Deleting a batch of %d objects in range {%d, %d}... ", len(keyList), i-len(keyList)+1, i) - params := &s3.DeleteObjectsInput{ - Bucket: aws.String(*bucketName), - Delete: &s3.Delete{ - Objects: keyList}} - _, err := svc.DeleteObjects(params) - if err == nil { - numSuccessfullyDeleted += len(keyList) - fmt.Printf("Succeeded\n") - } else { - fmt.Printf("Failed (%v)\n", err) - } - //set cursor to 0 so we can move to the next batch. - keyList = keyList[:0] - - } - } - fmt.Printf("Successfully deleted %d/%d objects in %s\n", numSuccessfullyDeleted, *numSamples, time.Since(delStartTime)) - - if bucket_created { - fmt.Printf("Deleting bucket...\n") - params := &s3.DeleteBucketInput{ - Bucket: aws.String(*bucketName)} - _, err := svc.DeleteBucket(params) - if err == nil { - fmt.Printf("Succeeded\n") - } else { - fmt.Printf("Failed (%v)\n", err) - } + params.printf("Deleting bucket...\n") + dltinp := &s3.DeleteBucketInput{Bucket: aws.String(*bucketName)} + _, err := svc.DeleteBucket(dltinp) + if err == nil { + params.printf("Succeeded\n") + } else { + params.printf("Failed (%v)\n", err) } } + + params.reportPrint(params.reportPrepare(testResults)) } func (params *Params) Run(op string) Result { @@ -225,21 +190,16 @@ func (params *Params) Run(op string) Result { result := Result{opDurations: make([]float64, 0, opSamples), operation: op} for i := uint(0); i < opSamples; i++ { resp := <-params.responses - errorString := "" if resp.err != nil { - result.numErrors++ - errorString = fmt.Sprintf(", error: %s", resp.err) + errStr := fmt.Sprintf("%v(%d) completed in %0.2fs with error %s", + op, i+1, resp.duration.Seconds(), resp.err) + result.opErrors = append(result.opErrors, errStr) } else { result.bytesTransmitted = result.bytesTransmitted + params.objectSize result.opDurations = append(result.opDurations, resp.duration.Seconds()) result.opTtfb = append(result.opTtfb, resp.ttfb.Seconds()) } - if params.verbose { - fmt.Printf("%v operation completed in %0.2fs (%d/%d) - %0.2fMB/s%s\n", - op, resp.duration.Seconds(), i+1, params.numSamples, - (float64(result.bytesTransmitted)/(1024*1024))/time.Since(startTime).Seconds(), - errorString) - } + params.printf("operation %s(%d) completed in %.2fs|%s\n", op, i+1, resp.duration.Seconds(), resp.err) } result.totalDuration = time.Since(startTime) @@ -251,6 +211,27 @@ func (params *Params) Run(op string) Result { // Create an individual load request and submit it to the client queue func (params *Params) submitLoad(op string) { bucket := aws.String(params.bucketName) + if op == opDelete { + keyList := make([]*s3.ObjectIdentifier, 0, commitSize) + for i := uint(0); i < params.numSamples; i++ { + bar := s3.ObjectIdentifier{ + Key: aws.String(fmt.Sprintf("%s%d", params.objectNamePrefix, i)), + } + keyList = append(keyList, &bar) + if len(keyList) == commitSize || i == params.numSamples-1 { + params.printf("Deleting a batch of %d objects in range {%d, %d}... ", len(keyList), i-uint(len(keyList))+1, i) + params.requests <- &s3.DeleteObjectsInput{ + Bucket: aws.String(params.bucketName), + Delete: &s3.Delete{ + Objects: keyList}} + //set cursor to 0 so we can move to the next batch. + keyList = keyList[:0] + + } + } + return + } + opSamples := params.spo(op) for i := uint(0); i < opSamples; i++ { key := aws.String(fmt.Sprintf("%s%d", params.objectNamePrefix, i % params.numSamples)) @@ -328,6 +309,11 @@ func (params *Params) startClient(cfg *aws.Config) { if numBytes != params.objectSize { err = fmt.Errorf("expected object length %d, actual %d, resp %v", params.objectSize, numBytes, resp) } + case *s3.DeleteObjectsInput: + req, _ := svc.DeleteObjectsRequest(r) + err = req.Send() + ttfb = time.Since(putStartTime) + numBytes = 0 default: panic("Developer error") } @@ -338,7 +324,6 @@ func (params *Params) startClient(cfg *aws.Config) { // Specifies the parameters for a given test type Params struct { - operation string requests chan Req responses chan Resp numSamples uint @@ -351,74 +336,145 @@ type Params struct { metaData bool sampleReads uint clientDelay int + jsonOutput bool } -func (params Params) String() string { - output := fmt.Sprintln("Test parameters") - output += fmt.Sprintf("endpoint(s): %s\n", params.endpoints) - output += fmt.Sprintf("bucket: %s\n", params.bucketName) - output += fmt.Sprintf("objectNamePrefix: %s\n", params.objectNamePrefix) - output += fmt.Sprintf("objectSize: %0.4f MB\n", float64(params.objectSize)/(1024*1024)) - output += fmt.Sprintf("numClients: %d\n", params.numClients) - output += fmt.Sprintf("numSamples: %d\n", params.numSamples) - output += fmt.Sprintf("sampleReads: %d\n", params.sampleReads) - output += fmt.Sprintf("verbose: %d\n", params.verbose) - output += fmt.Sprintf("metaData: %d\n", params.metaData) - output += fmt.Sprintf("clientDelay: %d\n", params.clientDelay) - return output +func (params Params) printf(f string, args ...interface{}) { + if params.verbose { + fmt.Printf(f, args...) + } } // Contains the summary for a given test result type Result struct { operation string bytesTransmitted int64 - numErrors int opDurations []float64 totalDuration time.Duration opTtfb []float64 + opErrors []string } -func (r Result) String() string { - report := fmt.Sprintf("Results Summary for %s Operation(s)\n", r.operation) - if r.operation == opHeadObj { - report += fmt.Sprintf("Total Reqs: %d\n", len(r.opDurations)) - } else { - report += fmt.Sprintf("Total Transferred: %0.3f MB\n", float64(r.bytesTransmitted)/(1024*1024)) - report += fmt.Sprintf("Total Throughput: %0.2f MB/s\n", (float64(r.bytesTransmitted)/(1024*1024))/r.totalDuration.Seconds()) +func (r Result) report() map[string]interface{} { + ret := make(map[string]interface{}) + ret["Operation"] = r.operation + ret["Total Requests Count"] = len(r.opDurations) + if r.operation != opHeadObj && r.operation != opDelete { + ret["Total Transferred (MB)"] = float64(r.bytesTransmitted)/(1024*1024) + ret["Total Throughput (MB/s)"] = (float64(r.bytesTransmitted)/(1024*1024))/r.totalDuration.Seconds() } - report += fmt.Sprintf("Total Duration: %0.3f s\n", r.totalDuration.Seconds()) - report += fmt.Sprintf("Number of Errors: %d\n", r.numErrors) + ret["Total Duration (s)"] = r.totalDuration.Seconds() if len(r.opDurations) > 0 { - report += fmt.Sprintln("------------------------------------") - report += fmt.Sprintf("%s times Avg: %0.3f s\n", r.operation, avg(r.opDurations)) - report += fmt.Sprintf("%s times Max: %0.3f s\n", r.operation, percentile(r.opDurations, 100)) - report += fmt.Sprintf("%s times Min: %0.3f s\n", r.operation, percentile(r.opDurations, 0)) - report += fmt.Sprintf("%s times 99th %%ile: %0.3f s\n", r.operation, percentile(r.opDurations, 99)) - report += fmt.Sprintf("%s times 90th %%ile: %0.3f s\n", r.operation, percentile(r.opDurations, 90)) - report += fmt.Sprintf("%s times 75th %%ile: %0.3f s\n", r.operation, percentile(r.opDurations, 75)) - report += fmt.Sprintf("%s times 50th %%ile: %0.3f s\n", r.operation, percentile(r.opDurations, 50)) - report += fmt.Sprintf("%s times 25th %%ile: %0.3f s\n", r.operation, percentile(r.opDurations, 25)) + ret["Duration Max"] = percentile(r.opDurations, 100) + ret["Duration Avg"] = avg(r.opDurations) + ret["Duration Min"] = percentile(r.opDurations, 0) + ret["Duration 99th-ile"] = percentile(r.opDurations, 99) + ret["Duration 90th-ile"] = percentile(r.opDurations, 90) + ret["Duration 75th-ile"] = percentile(r.opDurations, 75) + ret["Duration 50th-ile"] = percentile(r.opDurations, 50) + ret["Duration 25th-ile"] = percentile(r.opDurations, 25) } if len(r.opTtfb) > 0 { - report += fmt.Sprintln("------------------------------------") - report += fmt.Sprintf("%s ttfb Avg: %0.3f s\n", r.operation, avg(r.opTtfb)) - report += fmt.Sprintf("%s ttfb Max: %0.3f s\n", r.operation, percentile(r.opTtfb, 100)) - report += fmt.Sprintf("%s ttfb Min: %0.3f s\n", r.operation, percentile(r.opTtfb, 0)) - report += fmt.Sprintf("%s ttfb 99th %%ile: %0.3f s\n", r.operation, percentile(r.opTtfb, 99)) - report += fmt.Sprintf("%s ttfb 90th %%ile: %0.3f s\n", r.operation, percentile(r.opTtfb, 90)) - report += fmt.Sprintf("%s ttfb 75th %%ile: %0.3f s\n", r.operation, percentile(r.opTtfb, 75)) - report += fmt.Sprintf("%s ttfb 50th %%ile: %0.3f s\n", r.operation, percentile(r.opTtfb, 50)) - report += fmt.Sprintf("%s ttfb 25th %%ile: %0.3f s\n", r.operation, percentile(r.opTtfb, 25)) + ret["Ttfb Max"] = percentile(r.opTtfb, 100) + ret["Ttfb Avg"] = avg(r.opTtfb) + ret["Ttfb Min"] = percentile(r.opTtfb, 0) + ret["Ttfb 99th-ile"] = percentile(r.opTtfb, 99) + ret["Ttfb 90th-ile"] = percentile(r.opTtfb, 90) + ret["Ttfb 75th-ile"] = percentile(r.opTtfb, 75) + ret["Ttfb 50th-ile"] = percentile(r.opTtfb, 50) + ret["Ttfb 25th-ile"] = percentile(r.opTtfb, 25) + } + + ret["Errors Count"] = len(r.opErrors) + ret["Errors"] = r.opErrors + return ret +} + +func (params Params) report() map[string]interface{} { + ret := make(map[string]interface{}) + ret["endpoints"] = params.endpoints + ret["bucket"] = params.bucketName + ret["objectNamePrefix"] = params.objectNamePrefix + ret["objectSize (MB)"] = float64(params.objectSize)/(1024*1024) + ret["numClients"] = params.numClients + ret["numSamples"] = params.numSamples + ret["sampleReads"] = params.sampleReads + ret["verbose"] = params.verbose + ret["metaData"] = params.metaData + ret["clientDelay"] = params.clientDelay + ret["jsonOutput"] = params.jsonOutput + return ret +} + +func (params Params) reportPrepare(tests []Result) map[string]interface{} { + report := make(map[string]interface{}) + report["Parameters"] = params.report() + testreps := make([]map[string]interface{}, 0, len(tests)) + for _, r := range tests { + testreps = append(testreps, r.report()) } + report["Tests"] = testreps return report } +func mapPrint(m map[string]interface{}, prefix string) { + for k,v := range m { + fmt.Printf("%s %-27s", prefix, k+":") + switch val := v.(type) { + case []string: + if len(val) == 0 { + fmt.Printf(" []\n") + } else { + fmt.Println() + for _, s := range val { + fmt.Printf("%s%s %s\n", prefix, prefix, s) + } + } + case map[string]interface{}: + fmt.Println() + mapPrint(val, prefix + " ") + case []map[string]interface{}: + if len(val) == 0 { + fmt.Printf(" []\n") + } else { + for _, m := range val { + fmt.Println() + mapPrint(m, prefix + " ") + } + } + case float64: + fmt.Printf(" %.3f\n", val) + default: + fmt.Printf(" %v\n", val) + } + } +} + +func (params Params) reportPrint(report map[string]interface{}) { + if params.jsonOutput { + b, err := json.Marshal(report) + if err != nil { + fmt.Println("Cannot generate JSON report %v", err) + } + fmt.Println(string(b)) + return + } + + mapPrint(report, "") +} + // samples per operation func (params Params) spo(op string) uint { if op == opWrite { return params.numSamples + } else if op == opDelete { + ret := params.numSamples / commitSize + if params.numSamples % commitSize > 0 { + ret++ + } + return ret } return params.numSamples * params.sampleReads From dc14962efabdb00ddeba281dfc1776c5d168d5b0 Mon Sep 17 00:00:00 2001 From: DSurnin Date: Mon, 23 Mar 2020 01:57:13 -0600 Subject: [PATCH 07/20] Revert parallel delete --- s3bench.go | 91 ++++++++++++++++++++++++++---------------------------- 1 file changed, 43 insertions(+), 48 deletions(-) diff --git a/s3bench.go b/s3bench.go index c613c79..9ad71d4 100644 --- a/s3bench.go +++ b/s3bench.go @@ -26,7 +26,6 @@ const ( opRead = "Read" opWrite = "Write" opHeadObj = "HeadObj" - opDelete = "DeleteObj" //max that can be deleted at a time via DeleteObjects() commitSize = 1000 ) @@ -154,25 +153,53 @@ func main() { testResults = append(testResults, params.Run(opHeadObj)) } else { params.printf("Running %s test...\n", opRead) - testResults = append(testResults, params.Run(opHeadObj)) - } - - if !*skipCleanup { - params.printf("Running %s test...\n", opDelete) - testResults = append(testResults, params.Run(opDelete)) + testResults = append(testResults, params.Run(opRead)) } // Do cleanup if required - if !*skipCleanup && bucket_created { + if !*skipCleanup { + params.printf("Cleaning up %d objects...\n", *numSamples) + delStartTime := time.Now() svc := s3.New(session.New(), cfg) - params.printf("Deleting bucket...\n") - dltinp := &s3.DeleteBucketInput{Bucket: aws.String(*bucketName)} - _, err := svc.DeleteBucket(dltinp) - if err == nil { - params.printf("Succeeded\n") - } else { - params.printf("Failed (%v)\n", err) + numSuccessfullyDeleted := 0 + + keyList := make([]*s3.ObjectIdentifier, 0, commitSize) + for i := 0; i < *numSamples; i++ { + bar := s3.ObjectIdentifier{ + Key: aws.String(fmt.Sprintf("%s%d", *objectNamePrefix, i)), + } + keyList = append(keyList, &bar) + if len(keyList) == commitSize || i == *numSamples-1 { + params.printf("Deleting a batch of %d objects in range {%d, %d}... ", len(keyList), i-len(keyList)+1, i) + dltpar := &s3.DeleteObjectsInput{ + Bucket: aws.String(*bucketName), + Delete: &s3.Delete{ + Objects: keyList}} + _, err := svc.DeleteObjects(dltpar) + if err == nil { + numSuccessfullyDeleted += len(keyList) + params.printf("Succeeded\n") + } else { + params.printf("Failed (%v)\n", err) + } + //set cursor to 0 so we can move to the next batch. + keyList = keyList[:0] + + } + } + params.printf("Successfully deleted %d/%d objects in %s\n", numSuccessfullyDeleted, *numSamples, time.Since(delStartTime)) + + if bucket_created { + params.printf("Deleting bucket...\n") + dltpar := &s3.DeleteBucketInput{ + Bucket: aws.String(*bucketName)} + _, err := svc.DeleteBucket(dltpar) + if err == nil { + params.printf("Succeeded\n") + } else { + params.printf("Failed (%v)\n", err) + } } } @@ -211,27 +238,6 @@ func (params *Params) Run(op string) Result { // Create an individual load request and submit it to the client queue func (params *Params) submitLoad(op string) { bucket := aws.String(params.bucketName) - if op == opDelete { - keyList := make([]*s3.ObjectIdentifier, 0, commitSize) - for i := uint(0); i < params.numSamples; i++ { - bar := s3.ObjectIdentifier{ - Key: aws.String(fmt.Sprintf("%s%d", params.objectNamePrefix, i)), - } - keyList = append(keyList, &bar) - if len(keyList) == commitSize || i == params.numSamples-1 { - params.printf("Deleting a batch of %d objects in range {%d, %d}... ", len(keyList), i-uint(len(keyList))+1, i) - params.requests <- &s3.DeleteObjectsInput{ - Bucket: aws.String(params.bucketName), - Delete: &s3.Delete{ - Objects: keyList}} - //set cursor to 0 so we can move to the next batch. - keyList = keyList[:0] - - } - } - return - } - opSamples := params.spo(op) for i := uint(0); i < opSamples; i++ { key := aws.String(fmt.Sprintf("%s%d", params.objectNamePrefix, i % params.numSamples)) @@ -309,11 +315,6 @@ func (params *Params) startClient(cfg *aws.Config) { if numBytes != params.objectSize { err = fmt.Errorf("expected object length %d, actual %d, resp %v", params.objectSize, numBytes, resp) } - case *s3.DeleteObjectsInput: - req, _ := svc.DeleteObjectsRequest(r) - err = req.Send() - ttfb = time.Since(putStartTime) - numBytes = 0 default: panic("Developer error") } @@ -359,7 +360,7 @@ func (r Result) report() map[string]interface{} { ret := make(map[string]interface{}) ret["Operation"] = r.operation ret["Total Requests Count"] = len(r.opDurations) - if r.operation != opHeadObj && r.operation != opDelete { + if r.operation != opHeadObj { ret["Total Transferred (MB)"] = float64(r.bytesTransmitted)/(1024*1024) ret["Total Throughput (MB/s)"] = (float64(r.bytesTransmitted)/(1024*1024))/r.totalDuration.Seconds() } @@ -469,12 +470,6 @@ func (params Params) reportPrint(report map[string]interface{}) { func (params Params) spo(op string) uint { if op == opWrite { return params.numSamples - } else if op == opDelete { - ret := params.numSamples / commitSize - if params.numSamples % commitSize > 0 { - ret++ - } - return ret } return params.numSamples * params.sampleReads From a811b188ee316d514ef06d90272a69c881eac7dc Mon Sep 17 00:00:00 2001 From: DSurnin Date: Tue, 24 Mar 2020 09:11:48 -0600 Subject: [PATCH 08/20] Add batch size for the delete command to the params --- s3bench.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/s3bench.go b/s3bench.go index 9ad71d4..1b9d648 100644 --- a/s3bench.go +++ b/s3bench.go @@ -26,8 +26,6 @@ const ( opRead = "Read" opWrite = "Write" opHeadObj = "HeadObj" - //max that can be deleted at a time via DeleteObjects() - commitSize = 1000 ) var bufferBytes []byte @@ -90,6 +88,7 @@ func main() { sampleReads := flag.Int("sampleReads", 1, "number of reads of each sample") clientDelay := flag.Int("clientDelay", 1, "delay in ms before client starts. if negative value provided delay will be randomized in interval [0, abs{clientDelay})") jsonOutput := flag.Bool("jsonOutput", false, "print results in forma of json") + deleteAtOnce := flag.Int("deleteAtOnce", 1000, "number of objs to delete at once") flag.Parse() @@ -104,6 +103,11 @@ func main() { os.Exit(1) } + if *deleteAtOnce < 1 { + fmt.Println("Caanot delete less than 1 obj at once") + os.Exit(1) + } + // Setup and print summary of the accepted parameters params := Params{ requests: make(chan Req), @@ -119,6 +123,7 @@ func main() { sampleReads: uint(*sampleReads), clientDelay: *clientDelay, jsonOutput: *jsonOutput, + deleteAtOnce: *deleteAtOnce, } // Generate the data from which we will do the writting @@ -164,13 +169,13 @@ func main() { numSuccessfullyDeleted := 0 - keyList := make([]*s3.ObjectIdentifier, 0, commitSize) + keyList := make([]*s3.ObjectIdentifier, 0, params.deleteAtOnce) for i := 0; i < *numSamples; i++ { bar := s3.ObjectIdentifier{ Key: aws.String(fmt.Sprintf("%s%d", *objectNamePrefix, i)), } keyList = append(keyList, &bar) - if len(keyList) == commitSize || i == *numSamples-1 { + if len(keyList) == params.deleteAtOnce || i == *numSamples-1 { params.printf("Deleting a batch of %d objects in range {%d, %d}... ", len(keyList), i-len(keyList)+1, i) dltpar := &s3.DeleteObjectsInput{ Bucket: aws.String(*bucketName), @@ -338,6 +343,7 @@ type Params struct { sampleReads uint clientDelay int jsonOutput bool + deleteAtOnce int } func (params Params) printf(f string, args ...interface{}) { @@ -406,6 +412,7 @@ func (params Params) report() map[string]interface{} { ret["metaData"] = params.metaData ret["clientDelay"] = params.clientDelay ret["jsonOutput"] = params.jsonOutput + ret["deleteAtOnce"] = params.deleteAtOnce return ret } From 02274ede71e35e51609e1d12ad370b85f1e28a56 Mon Sep 17 00:00:00 2001 From: Evgeniy Brazhnikov Date: Mon, 6 Apr 2020 04:50:48 -0600 Subject: [PATCH 09/20] Add object's tags support --- s3bench.go | 53 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 51 insertions(+), 2 deletions(-) diff --git a/s3bench.go b/s3bench.go index 1b9d648..1e9a310 100644 --- a/s3bench.go +++ b/s3bench.go @@ -26,6 +26,8 @@ const ( opRead = "Read" opWrite = "Write" opHeadObj = "HeadObj" + opGetObjTag = "GetObjTag" + opPutObjTag = "PutObjTag" ) var bufferBytes []byte @@ -89,6 +91,8 @@ func main() { clientDelay := flag.Int("clientDelay", 1, "delay in ms before client starts. if negative value provided delay will be randomized in interval [0, abs{clientDelay})") jsonOutput := flag.Bool("jsonOutput", false, "print results in forma of json") deleteAtOnce := flag.Int("deleteAtOnce", 1000, "number of objs to delete at once") + putObjTag := flag.Bool("putObjTag", false, "put object's tags") + getObjTag := flag.Bool("getObjTag", false, "get object's tags") flag.Parse() @@ -107,6 +111,10 @@ func main() { fmt.Println("Caanot delete less than 1 obj at once") os.Exit(1) } + if *metaData && *getObjTag { + fmt.Println("\"-metaData\" and \"-getObjTag\" cannt be specified simultaneously") + os.Exit(1) + } // Setup and print summary of the accepted parameters params := Params{ @@ -124,6 +132,8 @@ func main() { clientDelay: *clientDelay, jsonOutput: *jsonOutput, deleteAtOnce: *deleteAtOnce, + putObjTag: *putObjTag, + getObjTag: *getObjTag, } // Generate the data from which we will do the writting @@ -153,7 +163,14 @@ func main() { params.printf("Running %s test...\n", opWrite) testResults = append(testResults, params.Run(opWrite)) - if params.metaData { + if params.putObjTag { + params.printf("Running %s test...\n", opPutObjTag) + testResults = append(testResults, params.Run(opPutObjTag)) + } + if params.getObjTag { + params.printf("Running %s test...\n", opGetObjTag) + testResults = append(testResults, params.Run(opGetObjTag)) + } else if params.metaData { params.printf("Running %s test...\n", opHeadObj) testResults = append(testResults, params.Run(opHeadObj)) } else { @@ -262,7 +279,29 @@ func (params *Params) submitLoad(op string) { Bucket: bucket, Key: key, } - } else { + } else if op == opPutObjTag { + params.requests <- &s3.PutObjectTaggingInput{ + Bucket: bucket, + Key: key, + Tagging: &s3.Tagging{ + TagSet: []*s3.Tag{ + { + Key: aws.String("Key1"), + Value: aws.String("Value1"), + }, + { + Key: aws.String("Key2"), + Value: aws.String("Value2"), + }, + }, + }, + } + } else if op == opGetObjTag { + params.requests <- &s3.GetObjectTaggingInput{ + Bucket: bucket, + Key: key, + } + } else { panic("Developer error") } } @@ -320,6 +359,14 @@ func (params *Params) startClient(cfg *aws.Config) { if numBytes != params.objectSize { err = fmt.Errorf("expected object length %d, actual %d, resp %v", params.objectSize, numBytes, resp) } + case *s3.PutObjectTaggingInput: + req, _ := svc.PutObjectTaggingRequest(r) + err = req.Send() + ttfb = time.Since(putStartTime) + case *s3.GetObjectTaggingInput: + req, _ := svc.GetObjectTaggingRequest(r) + err = req.Send() + ttfb = time.Since(putStartTime) default: panic("Developer error") } @@ -344,6 +391,8 @@ type Params struct { clientDelay int jsonOutput bool deleteAtOnce int + putObjTag bool + getObjTag bool } func (params Params) printf(f string, args ...interface{}) { From 6da5405276651e28307ca702d53a0ff0991a265b Mon Sep 17 00:00:00 2001 From: Evgeniy Brazhnikov Date: Mon, 6 Apr 2020 07:56:07 -0600 Subject: [PATCH 10/20] Add object's tags support - 2 --- s3bench.go | 75 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/s3bench.go b/s3bench.go index 1e9a310..f6b9244 100644 --- a/s3bench.go +++ b/s3bench.go @@ -26,8 +26,8 @@ const ( opRead = "Read" opWrite = "Write" opHeadObj = "HeadObj" - opGetObjTag = "GetObjTag" - opPutObjTag = "PutObjTag" + opGetObjTag = "GetObjTag" + opPutObjTag = "PutObjTag" ) var bufferBytes []byte @@ -91,8 +91,9 @@ func main() { clientDelay := flag.Int("clientDelay", 1, "delay in ms before client starts. if negative value provided delay will be randomized in interval [0, abs{clientDelay})") jsonOutput := flag.Bool("jsonOutput", false, "print results in forma of json") deleteAtOnce := flag.Int("deleteAtOnce", 1000, "number of objs to delete at once") - putObjTag := flag.Bool("putObjTag", false, "put object's tags") - getObjTag := flag.Bool("getObjTag", false, "get object's tags") + putObjTag := flag.Bool("putObjTag", false, "put object's tags") + getObjTag := flag.Bool("getObjTag", false, "get object's tags") + numTags := flag.Int("numTags", 0, "number if tags") flag.Parse() @@ -108,13 +109,25 @@ func main() { } if *deleteAtOnce < 1 { - fmt.Println("Caanot delete less than 1 obj at once") + fmt.Println("Cann`t delete less than 1 obj at once") os.Exit(1) } - if *metaData && *getObjTag { + if *metaData && *getObjTag { fmt.Println("\"-metaData\" and \"-getObjTag\" cannt be specified simultaneously") os.Exit(1) - } + } + if *metaData && *putObjTag { + fmt.Println("\"-metaData\" and \"-putObjTag\" cannt be specified simultaneously") + os.Exit(1) + } + if *putObjTag && *numTags == 0 { + fmt.Println("You must specify \"-numTags\" in range [1..10]") + os.Exit(1) + } + if *numTags < 1 || *numTags > 10 { + fmt.Println("\"-numTags\" must be in range [1..10]") + os.Exit(1) + } // Setup and print summary of the accepted parameters params := Params{ @@ -132,8 +145,9 @@ func main() { clientDelay: *clientDelay, jsonOutput: *jsonOutput, deleteAtOnce: *deleteAtOnce, - putObjTag: *putObjTag, - getObjTag: *getObjTag, + putObjTag: *putObjTag, + getObjTag: *getObjTag, + numTags: uint(*numTags), } // Generate the data from which we will do the writting @@ -163,14 +177,14 @@ func main() { params.printf("Running %s test...\n", opWrite) testResults = append(testResults, params.Run(opWrite)) - if params.putObjTag { + if params.putObjTag { params.printf("Running %s test...\n", opPutObjTag) testResults = append(testResults, params.Run(opPutObjTag)) - } - if params.getObjTag { + } + if params.getObjTag { params.printf("Running %s test...\n", opGetObjTag) testResults = append(testResults, params.Run(opGetObjTag)) - } else if params.metaData { + } else if params.metaData { params.printf("Running %s test...\n", opHeadObj) testResults = append(testResults, params.Run(opHeadObj)) } else { @@ -280,28 +294,28 @@ func (params *Params) submitLoad(op string) { Key: key, } } else if op == opPutObjTag { + tagSet := make([]*s3.Tag, 0, params.numTags) + for iTag := uint(0); iTag < params.numTags; iTag++ { + key := fmt.Sprintf("%s%d", "key_", iTag); + value := fmt.Sprintf("%s%d", "value_", iTag) + tagSet = append(tagSet, &s3.Tag { + Key: &key, + Value: &value, + }) + } params.requests <- &s3.PutObjectTaggingInput{ Bucket: bucket, Key: key, - Tagging: &s3.Tagging{ - TagSet: []*s3.Tag{ - { - Key: aws.String("Key1"), - Value: aws.String("Value1"), - }, - { - Key: aws.String("Key2"), - Value: aws.String("Value2"), - }, - }, - }, + Tagging: &s3.Tagging{ + TagSet: tagSet, + }, } - } else if op == opGetObjTag { + } else if op == opGetObjTag { params.requests <- &s3.GetObjectTaggingInput{ Bucket: bucket, Key: key, } - } else { + } else { panic("Developer error") } } @@ -359,7 +373,7 @@ func (params *Params) startClient(cfg *aws.Config) { if numBytes != params.objectSize { err = fmt.Errorf("expected object length %d, actual %d, resp %v", params.objectSize, numBytes, resp) } - case *s3.PutObjectTaggingInput: + case *s3.PutObjectTaggingInput: req, _ := svc.PutObjectTaggingRequest(r) err = req.Send() ttfb = time.Since(putStartTime) @@ -391,8 +405,9 @@ type Params struct { clientDelay int jsonOutput bool deleteAtOnce int - putObjTag bool - getObjTag bool + putObjTag bool + getObjTag bool + numTags uint } func (params Params) printf(f string, args ...interface{}) { From 27f2d4c4937e235f87bd40500178b77c10f686f0 Mon Sep 17 00:00:00 2001 From: Evgeniy Brazhnikov Date: Tue, 7 Apr 2020 07:42:31 -0600 Subject: [PATCH 11/20] Add object's tags support - 3 --- s3bench.go | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/s3bench.go b/s3bench.go index f6b9244..17c42af 100644 --- a/s3bench.go +++ b/s3bench.go @@ -112,20 +112,20 @@ func main() { fmt.Println("Cann`t delete less than 1 obj at once") os.Exit(1) } - if *metaData && *getObjTag { - fmt.Println("\"-metaData\" and \"-getObjTag\" cannt be specified simultaneously") - os.Exit(1) - } - if *metaData && *putObjTag { - fmt.Println("\"-metaData\" and \"-putObjTag\" cannt be specified simultaneously") - os.Exit(1) + if *getObjTag { + *putObjTag = true + + if *metaData { + fmt.Println("\"-metaData\" and \"-getObjTag\" cannt be specified simultaneously") + os.Exit(1) + } } if *putObjTag && *numTags == 0 { fmt.Println("You must specify \"-numTags\" in range [1..10]") os.Exit(1) } - if *numTags < 1 || *numTags > 10 { - fmt.Println("\"-numTags\" must be in range [1..10]") + if *numTags < 1 { + fmt.Println("\"-numTags\" should be in range [1..10]") os.Exit(1) } @@ -202,8 +202,25 @@ func main() { keyList := make([]*s3.ObjectIdentifier, 0, params.deleteAtOnce) for i := 0; i < *numSamples; i++ { + objName := fmt.Sprintf("%s%d", *objectNamePrefix, i) + key := aws.String(objName) + + if params.putObjTag { + deleteObjectTaggingInput := &s3.DeleteObjectTaggingInput{ + Bucket: aws.String(*bucketName), + Key: key, + } + _, err := svc.DeleteObjectTagging(deleteObjectTaggingInput) + if params.verbose { + if err != nil { + fmt.Println(err.Error()) + } else { + fmt.Printf("Tags for %s deleted\n", objName) + } + } + } bar := s3.ObjectIdentifier{ - Key: aws.String(fmt.Sprintf("%s%d", *objectNamePrefix, i)), + Key: key, } keyList = append(keyList, &bar) if len(keyList) == params.deleteAtOnce || i == *numSamples-1 { From ffdedbd8f98e4ca5f922a8cb2abdfee33531fef6 Mon Sep 17 00:00:00 2001 From: DSurnin Date: Tue, 7 Apr 2020 08:42:15 -0600 Subject: [PATCH 12/20] rename metaData to headObj --- s3bench.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/s3bench.go b/s3bench.go index 17c42af..d0fa79d 100644 --- a/s3bench.go +++ b/s3bench.go @@ -86,14 +86,14 @@ func main() { numSamples := flag.Int("numSamples", 200, "total number of requests to send") skipCleanup := flag.Bool("skipCleanup", false, "skip deleting objects created by this tool at the end of the run") verbose := flag.Bool("verbose", false, "print verbose per thread status") - metaData := flag.Bool("metaData", false, "read obj metadata instead of obj itself") + headObj := flag.Bool("headObj", false, "head-object request instead of reading obj content") sampleReads := flag.Int("sampleReads", 1, "number of reads of each sample") clientDelay := flag.Int("clientDelay", 1, "delay in ms before client starts. if negative value provided delay will be randomized in interval [0, abs{clientDelay})") jsonOutput := flag.Bool("jsonOutput", false, "print results in forma of json") deleteAtOnce := flag.Int("deleteAtOnce", 1000, "number of objs to delete at once") putObjTag := flag.Bool("putObjTag", false, "put object's tags") getObjTag := flag.Bool("getObjTag", false, "get object's tags") - numTags := flag.Int("numTags", 0, "number if tags") + numTags := flag.Int("numTags", 10, "number of tags") flag.Parse() @@ -115,8 +115,8 @@ func main() { if *getObjTag { *putObjTag = true - if *metaData { - fmt.Println("\"-metaData\" and \"-getObjTag\" cannt be specified simultaneously") + if *headObj { + fmt.Println("\"-headObj\" and \"-getObjTag\" cannt be specified simultaneously") os.Exit(1) } } @@ -140,7 +140,7 @@ func main() { bucketName: *bucketName, endpoints: strings.Split(*endpoint, ","), verbose: *verbose, - metaData: *metaData, + headObj: *headObj, sampleReads: uint(*sampleReads), clientDelay: *clientDelay, jsonOutput: *jsonOutput, @@ -184,7 +184,7 @@ func main() { if params.getObjTag { params.printf("Running %s test...\n", opGetObjTag) testResults = append(testResults, params.Run(opGetObjTag)) - } else if params.metaData { + } else if params.headObj { params.printf("Running %s test...\n", opHeadObj) testResults = append(testResults, params.Run(opHeadObj)) } else { @@ -417,7 +417,7 @@ type Params struct { bucketName string endpoints []string verbose bool - metaData bool + headObj bool sampleReads uint clientDelay int jsonOutput bool @@ -490,7 +490,7 @@ func (params Params) report() map[string]interface{} { ret["numSamples"] = params.numSamples ret["sampleReads"] = params.sampleReads ret["verbose"] = params.verbose - ret["metaData"] = params.metaData + ret["headObj"] = params.headObj ret["clientDelay"] = params.clientDelay ret["jsonOutput"] = params.jsonOutput ret["deleteAtOnce"] = params.deleteAtOnce From 5d73e9c6253919b03ea0502c3dca6bc2a60cf025 Mon Sep 17 00:00:00 2001 From: DSurnin Date: Tue, 7 Apr 2020 09:12:01 -0600 Subject: [PATCH 13/20] add ability to combine headObj with tags tests; minor changes in params parsing rules --- s3bench.go | 45 +++++++++++++++++---------------------------- 1 file changed, 17 insertions(+), 28 deletions(-) diff --git a/s3bench.go b/s3bench.go index d0fa79d..8e24562 100644 --- a/s3bench.go +++ b/s3bench.go @@ -93,7 +93,7 @@ func main() { deleteAtOnce := flag.Int("deleteAtOnce", 1000, "number of objs to delete at once") putObjTag := flag.Bool("putObjTag", false, "put object's tags") getObjTag := flag.Bool("getObjTag", false, "get object's tags") - numTags := flag.Int("numTags", 10, "number of tags") + numTags := flag.Int("numTags", 10, "number of tags to create, for objects it should in range [1..10]") flag.Parse() @@ -109,23 +109,12 @@ func main() { } if *deleteAtOnce < 1 { - fmt.Println("Cann`t delete less than 1 obj at once") + fmt.Println("Cannot delete less than 1 obj at once") os.Exit(1) } - if *getObjTag { - *putObjTag = true - if *headObj { - fmt.Println("\"-headObj\" and \"-getObjTag\" cannt be specified simultaneously") - os.Exit(1) - } - } - if *putObjTag && *numTags == 0 { - fmt.Println("You must specify \"-numTags\" in range [1..10]") - os.Exit(1) - } if *numTags < 1 { - fmt.Println("\"-numTags\" should be in range [1..10]") + fmt.Println("-numTags cannot be less than 1") os.Exit(1) } @@ -145,9 +134,10 @@ func main() { clientDelay: *clientDelay, jsonOutput: *jsonOutput, deleteAtOnce: *deleteAtOnce, - putObjTag: *putObjTag, + putObjTag: *putObjTag || *getObjTag, getObjTag: *getObjTag, numTags: uint(*numTags), + readObj: !(*putObjTag || *getObjTag || *headObj), } // Generate the data from which we will do the writting @@ -172,7 +162,7 @@ func main() { params.StartClients(cfg) - testResults := make([]Result, 0, 3) + testResults := make([]Result, 2, 5) params.printf("Running %s test...\n", opWrite) testResults = append(testResults, params.Run(opWrite)) @@ -184,10 +174,12 @@ func main() { if params.getObjTag { params.printf("Running %s test...\n", opGetObjTag) testResults = append(testResults, params.Run(opGetObjTag)) - } else if params.headObj { + } + if params.headObj { params.printf("Running %s test...\n", opHeadObj) testResults = append(testResults, params.Run(opHeadObj)) - } else { + } + if params.readObj { params.printf("Running %s test...\n", opRead) testResults = append(testResults, params.Run(opRead)) } @@ -211,17 +203,9 @@ func main() { Key: key, } _, err := svc.DeleteObjectTagging(deleteObjectTaggingInput) - if params.verbose { - if err != nil { - fmt.Println(err.Error()) - } else { - fmt.Printf("Tags for %s deleted\n", objName) - } - } - } - bar := s3.ObjectIdentifier{ - Key: key, + params.printf("Delete tags %s |err %v\n", objName, err) } + bar := s3.ObjectIdentifier{ Key: key, } keyList = append(keyList, &bar) if len(keyList) == params.deleteAtOnce || i == *numSamples-1 { params.printf("Deleting a batch of %d objects in range {%d, %d}... ", len(keyList), i-len(keyList)+1, i) @@ -425,6 +409,7 @@ type Params struct { putObjTag bool getObjTag bool numTags uint + readObj bool } func (params Params) printf(f string, args ...interface{}) { @@ -494,6 +479,10 @@ func (params Params) report() map[string]interface{} { ret["clientDelay"] = params.clientDelay ret["jsonOutput"] = params.jsonOutput ret["deleteAtOnce"] = params.deleteAtOnce + ret["numTags"] = params.numTags + ret["putObjTag"] = params.putObjTag + ret["getObjTag"] = params.getObjTag + ret["readObj"] = params.readObj return ret } From c3cbe690df2350a88ef6002f9844c048c4be248a Mon Sep 17 00:00:00 2001 From: DSurnin Date: Tue, 7 Apr 2020 09:54:56 -0600 Subject: [PATCH 14/20] add tagNamePrefix and tagValPrefix params --- s3bench.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/s3bench.go b/s3bench.go index 8e24562..9608323 100644 --- a/s3bench.go +++ b/s3bench.go @@ -94,6 +94,8 @@ func main() { putObjTag := flag.Bool("putObjTag", false, "put object's tags") getObjTag := flag.Bool("getObjTag", false, "get object's tags") numTags := flag.Int("numTags", 10, "number of tags to create, for objects it should in range [1..10]") + tagNamePrefix := flag.String("tagNamePrefix", "tag_name_", "prefix of the tag name that will be used") + tagValPrefix := flag.String("tagValPrefix", "tag_val_", "prefix of the tag value that will be used") flag.Parse() @@ -138,6 +140,9 @@ func main() { getObjTag: *getObjTag, numTags: uint(*numTags), readObj: !(*putObjTag || *getObjTag || *headObj), + tagNamePrefix: *tagNamePrefix, + tagValPrefix: *tagValPrefix, + } // Generate the data from which we will do the writting @@ -162,7 +167,7 @@ func main() { params.StartClients(cfg) - testResults := make([]Result, 2, 5) + testResults := make([]Result, 1, 2) params.printf("Running %s test...\n", opWrite) testResults = append(testResults, params.Run(opWrite)) @@ -297,19 +302,17 @@ func (params *Params) submitLoad(op string) { } else if op == opPutObjTag { tagSet := make([]*s3.Tag, 0, params.numTags) for iTag := uint(0); iTag < params.numTags; iTag++ { - key := fmt.Sprintf("%s%d", "key_", iTag); - value := fmt.Sprintf("%s%d", "value_", iTag) + tag_name := fmt.Sprintf("%s%d", params.tagNamePrefix, iTag) + tag_value := fmt.Sprintf("%s%d", params.tagValPrefix, iTag) tagSet = append(tagSet, &s3.Tag { - Key: &key, - Value: &value, + Key: &tag_name, + Value: &tag_value, }) } params.requests <- &s3.PutObjectTaggingInput{ Bucket: bucket, Key: key, - Tagging: &s3.Tagging{ - TagSet: tagSet, - }, + Tagging: &s3.Tagging{ TagSet: tagSet, }, } } else if op == opGetObjTag { params.requests <- &s3.GetObjectTaggingInput{ @@ -378,10 +381,12 @@ func (params *Params) startClient(cfg *aws.Config) { req, _ := svc.PutObjectTaggingRequest(r) err = req.Send() ttfb = time.Since(putStartTime) + numBytes = 0 case *s3.GetObjectTaggingInput: req, _ := svc.GetObjectTaggingRequest(r) err = req.Send() ttfb = time.Since(putStartTime) + numBytes = 0 default: panic("Developer error") } @@ -410,6 +415,8 @@ type Params struct { getObjTag bool numTags uint readObj bool + tagNamePrefix string + tagValPrefix string } func (params Params) printf(f string, args ...interface{}) { @@ -432,7 +439,7 @@ func (r Result) report() map[string]interface{} { ret := make(map[string]interface{}) ret["Operation"] = r.operation ret["Total Requests Count"] = len(r.opDurations) - if r.operation != opHeadObj { + if r.operation == opWrite || r.operation == opRead { ret["Total Transferred (MB)"] = float64(r.bytesTransmitted)/(1024*1024) ret["Total Throughput (MB/s)"] = (float64(r.bytesTransmitted)/(1024*1024))/r.totalDuration.Seconds() } @@ -483,6 +490,8 @@ func (params Params) report() map[string]interface{} { ret["putObjTag"] = params.putObjTag ret["getObjTag"] = params.getObjTag ret["readObj"] = params.readObj + ret["tagNamePrefix"] = params.tagNamePrefix + ret["tagValPrefix"] = params.tagValPrefix return ret } From 6cb0a2317b2c038271f275422e0506485c3a4c72 Mon Sep 17 00:00:00 2001 From: DSurnin Date: Tue, 7 Apr 2020 10:14:59 -0600 Subject: [PATCH 15/20] add version info --- build.sh | 12 ++++++++++++ s3bench.go | 12 ++++++++++++ 2 files changed, 24 insertions(+) create mode 100755 build.sh diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..45a6179 --- /dev/null +++ b/build.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +set -e + +now=$(date +'%Y-%m-%d-%T') +githash=$(git rev-parse HEAD) + +echo "Building version $now-$githash..." + +go build -ldflags "-X main.gitHash=$githash -X main.buildDate=$now" + +echo "Complete" diff --git a/s3bench.go b/s3bench.go index 9608323..cb04893 100644 --- a/s3bench.go +++ b/s3bench.go @@ -22,6 +22,11 @@ import ( "github.com/aws/aws-sdk-go/service/s3" ) +var ( + gitHash string + buildDate string +) + const ( opRead = "Read" opWrite = "Write" @@ -96,9 +101,15 @@ func main() { numTags := flag.Int("numTags", 10, "number of tags to create, for objects it should in range [1..10]") tagNamePrefix := flag.String("tagNamePrefix", "tag_name_", "prefix of the tag name that will be used") tagValPrefix := flag.String("tagValPrefix", "tag_val_", "prefix of the tag value that will be used") + version := flag.Bool("version", false, "print version info") flag.Parse() + if *version { + fmt.Printf("%s-%s\n", buildDate, gitHash) + os.Exit(0) + } + if *numClients > *numSamples || *numSamples < 1 { fmt.Printf("numClients(%d) needs to be less than numSamples(%d) and greater than 0\n", *numClients, *numSamples) os.Exit(1) @@ -497,6 +508,7 @@ func (params Params) report() map[string]interface{} { func (params Params) reportPrepare(tests []Result) map[string]interface{} { report := make(map[string]interface{}) + report["Version"] = fmt.Sprintf("%s-%s", buildDate, gitHash) report["Parameters"] = params.report() testreps := make([]map[string]interface{}, 0, len(tests)) for _, r := range tests { From 4f7d65ecd3aaa644c4d2304c6818711c2d2cfdda Mon Sep 17 00:00:00 2001 From: DSurnin Date: Wed, 8 Apr 2020 07:38:24 -0600 Subject: [PATCH 16/20] sort output; add output format description parameter --- s3bench.go | 74 +++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 68 insertions(+), 6 deletions(-) diff --git a/s3bench.go b/s3bench.go index cb04893..a32d914 100644 --- a/s3bench.go +++ b/s3bench.go @@ -102,6 +102,7 @@ func main() { tagNamePrefix := flag.String("tagNamePrefix", "tag_name_", "prefix of the tag name that will be used") tagValPrefix := flag.String("tagValPrefix", "tag_val_", "prefix of the tag value that will be used") version := flag.Bool("version", false, "print version info") + reportFormat := flag.String("reportFormat", "Version;Parameters;Parameters:numClients;Parameters:numSamples;Parameters:objectSize (MB);Parameters:sampleReads;Parameters:clientDelay;Parameters:readObj;Parameters:headObj;Parameters:putObjTag;Parameters:getObjTag;Tests:Operation;Tests:Total Requests Count;Tests:Errors Count;Tests:Total Throughput (MB/s);Tests:Duration Max;Tests:Duration Avg;Tests:Duration Min;Tests:Ttfb Max;Tests:Ttfb Avg;Tests:Ttfb Min;-Tests:Duration 25th-ile;-Tests:Duration 50th-ile;-Tests:Duration 75th-ile;-Tests:Ttfb 25th-ile;-Tests:Ttfb 50th-ile;-Tests:Ttfb 75th-ile;", "rearrange output fields") flag.Parse() @@ -153,6 +154,7 @@ func main() { readObj: !(*putObjTag || *getObjTag || *headObj), tagNamePrefix: *tagNamePrefix, tagValPrefix: *tagValPrefix, + reportFormat: *reportFormat, } @@ -178,7 +180,7 @@ func main() { params.StartClients(cfg) - testResults := make([]Result, 1, 2) + testResults := []Result{} params.printf("Running %s test...\n", opWrite) testResults = append(testResults, params.Run(opWrite)) @@ -428,6 +430,7 @@ type Params struct { readObj bool tagNamePrefix string tagValPrefix string + reportFormat string } func (params Params) printf(f string, args ...interface{}) { @@ -503,6 +506,7 @@ func (params Params) report() map[string]interface{} { ret["readObj"] = params.readObj ret["tagNamePrefix"] = params.tagNamePrefix ret["tagValPrefix"] = params.tagValPrefix + ret["reportFormat"] = params.reportFormat return ret } @@ -518,8 +522,65 @@ func (params Params) reportPrepare(tests []Result) map[string]interface{} { return report } -func mapPrint(m map[string]interface{}, prefix string) { - for k,v := range m { +func indexOf(sls []string, s string) int { + ret := -1 + for i, v := range sls { + if v == s { + ret = i + break + } + } + return ret +} + +func keysSort(keys []string, format []string) []string { + sort.Strings(keys) + cur_formated := 0 + + for _, fv := range format { + fv = strings.TrimSpace(fv) + should_del := strings.HasPrefix(fv, "-") + if should_del { + fv = fv[1:] + } + ci := indexOf(keys, fv) + if ci < 0 { + continue + } + // delete old pos + keys = append(keys[:ci], keys[ci+1:]...) + + if !should_del { + // insert new pos + keys = append(keys[:cur_formated], append([]string{fv}, keys[cur_formated:]...)...) + cur_formated++ + } + } + + return keys +} + +func formatFilter(format []string, key string) []string { + ret := []string{} + for _, v := range format { + if strings.HasPrefix(v, key + ":") { + ret = append(ret, v[len(key + ":"):]) + } else if strings.HasPrefix(v, "-" + key + ":") { + ret = append(ret, "-" + v[len("-" + key + ":"):]) + } + } + + return ret +} + +func mapPrint(m map[string]interface{}, repFormat []string, prefix string) { + var mkeys []string + for k,_ := range m { + mkeys = append(mkeys, k) + } + mkeys = keysSort(mkeys, repFormat) + for _, k := range mkeys { + v := m[k] fmt.Printf("%s %-27s", prefix, k+":") switch val := v.(type) { case []string: @@ -533,14 +594,15 @@ func mapPrint(m map[string]interface{}, prefix string) { } case map[string]interface{}: fmt.Println() - mapPrint(val, prefix + " ") + mapPrint(val, formatFilter(repFormat, k), prefix + " ") case []map[string]interface{}: if len(val) == 0 { fmt.Printf(" []\n") } else { + val_format := formatFilter(repFormat, k) for _, m := range val { fmt.Println() - mapPrint(m, prefix + " ") + mapPrint(m, val_format, prefix + " ") } } case float64: @@ -561,7 +623,7 @@ func (params Params) reportPrint(report map[string]interface{}) { return } - mapPrint(report, "") + mapPrint(report, strings.Split(params.reportFormat, ";"), "") } // samples per operation From 7c567bab6b813c4e3b2d46d28a69282b6f55d751 Mon Sep 17 00:00:00 2001 From: DSurnin Date: Tue, 12 May 2020 07:27:26 -0600 Subject: [PATCH 17/20] fix throughput calc --- s3bench.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/s3bench.go b/s3bench.go index a32d914..497bed1 100644 --- a/s3bench.go +++ b/s3bench.go @@ -359,7 +359,7 @@ func (params *Params) startClient(cfg *aws.Config) { putStartTime := time.Now() var ttfb time.Duration var err error - numBytes := params.objectSize + var numBytes int64 = 0 switch r := request.(type) { case *s3.PutObjectInput: @@ -368,22 +368,25 @@ func (params *Params) startClient(cfg *aws.Config) { req.HTTPRequest.Header.Add("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD") err = req.Send() ttfb = time.Since(putStartTime) + if err == nil { + numBytes = params.objectSize + } case *s3.GetObjectInput: req, resp := svc.GetObjectRequest(r) err = req.Send() ttfb = time.Since(putStartTime) - numBytes = 0 if err == nil { numBytes, err = io.Copy(ioutil.Discard, resp.Body) } - if numBytes != params.objectSize { + if err != nil { + numBytes = 0 + } else if numBytes != params.objectSize { err = fmt.Errorf("expected object length %d, actual %d", params.objectSize, numBytes) } case *s3.HeadObjectInput: req, resp := svc.HeadObjectRequest(r) err = req.Send() ttfb = time.Since(putStartTime) - numBytes = 0 if err == nil { numBytes = *resp.ContentLength } @@ -394,12 +397,10 @@ func (params *Params) startClient(cfg *aws.Config) { req, _ := svc.PutObjectTaggingRequest(r) err = req.Send() ttfb = time.Since(putStartTime) - numBytes = 0 case *s3.GetObjectTaggingInput: req, _ := svc.GetObjectTaggingRequest(r) err = req.Send() ttfb = time.Since(putStartTime) - numBytes = 0 default: panic("Developer error") } @@ -628,7 +629,7 @@ func (params Params) reportPrint(report map[string]interface{}) { // samples per operation func (params Params) spo(op string) uint { - if op == opWrite { + if op == opWrite || op == opPutObjTag { return params.numSamples } From cac2c387b27d6108c3768d8bdf299ad1984a0b8e Mon Sep 17 00:00:00 2001 From: DSurnin Date: Thu, 18 Jun 2020 04:09:16 -0600 Subject: [PATCH 18/20] Add checksum validation and split into several files --- data_decl.go | 65 +++++++++ report.go | 175 +++++++++++++++++++++++ s3bench.go | 386 +++++++++------------------------------------------ utils.go | 81 +++++++++++ 4 files changed, 387 insertions(+), 320 deletions(-) create mode 100644 data_decl.go create mode 100644 report.go create mode 100644 utils.go diff --git a/data_decl.go b/data_decl.go new file mode 100644 index 0000000..8f20dd9 --- /dev/null +++ b/data_decl.go @@ -0,0 +1,65 @@ +package main + +import "time" + +var ( + gitHash string + buildDate string +) + +const ( + opRead = "Read" + opWrite = "Write" + opHeadObj = "HeadObj" + opGetObjTag = "GetObjTag" + opPutObjTag = "PutObjTag" + opValidate = "Validate" +) + +type Req struct { + top string + req interface{} +} + +type Resp struct { + err error + duration time.Duration + numBytes int64 + ttfb time.Duration +} + +// Specifies the parameters for a given test +type Params struct { + requests chan Req + responses chan Resp + numSamples uint + numClients uint + objectSize int64 + objectNamePrefix string + bucketName string + endpoints []string + verbose bool + headObj bool + sampleReads uint + clientDelay int + jsonOutput bool + deleteAtOnce int + putObjTag bool + getObjTag bool + numTags uint + readObj bool + tagNamePrefix string + tagValPrefix string + reportFormat string + validate bool +} + +// Contains the summary for a given test result +type Result struct { + operation string + bytesTransmitted int64 + opDurations []float64 + totalDuration time.Duration + opTtfb []float64 + opErrors []string +} diff --git a/report.go b/report.go new file mode 100644 index 0000000..142853e --- /dev/null +++ b/report.go @@ -0,0 +1,175 @@ +package main + +import ( + "fmt" + "sort" + "strings" + "encoding/json" +) + +func keysSort(keys []string, format []string) []string { + sort.Strings(keys) + cur_formated := 0 + + for _, fv := range format { + fv = strings.TrimSpace(fv) + should_del := strings.HasPrefix(fv, "-") + if should_del { + fv = fv[1:] + } + ci := indexOf(keys, fv) + if ci < 0 { + continue + } + // delete old pos + keys = append(keys[:ci], keys[ci+1:]...) + + if !should_del { + // insert new pos + keys = append(keys[:cur_formated], append([]string{fv}, keys[cur_formated:]...)...) + cur_formated++ + } + } + + return keys +} + +func formatFilter(format []string, key string) []string { + ret := []string{} + for _, v := range format { + if strings.HasPrefix(v, key + ":") { + ret = append(ret, v[len(key + ":"):]) + } else if strings.HasPrefix(v, "-" + key + ":") { + ret = append(ret, "-" + v[len("-" + key + ":"):]) + } + } + + return ret +} + +func mapPrint(m map[string]interface{}, repFormat []string, prefix string) { + var mkeys []string + for k,_ := range m { + mkeys = append(mkeys, k) + } + mkeys = keysSort(mkeys, repFormat) + for _, k := range mkeys { + v := m[k] + fmt.Printf("%s %-27s", prefix, k+":") + switch val := v.(type) { + case []string: + if len(val) == 0 { + fmt.Printf(" []\n") + } else { + fmt.Println() + for _, s := range val { + fmt.Printf("%s%s %s\n", prefix, prefix, s) + } + } + case map[string]interface{}: + fmt.Println() + mapPrint(val, formatFilter(repFormat, k), prefix + " ") + case []map[string]interface{}: + if len(val) == 0 { + fmt.Printf(" []\n") + } else { + val_format := formatFilter(repFormat, k) + for _, m := range val { + fmt.Println() + mapPrint(m, val_format, prefix + " ") + } + } + case float64: + fmt.Printf(" %.3f\n", val) + default: + fmt.Printf(" %v\n", val) + } + } +} + +func (params Params) reportPrepare(tests []Result) map[string]interface{} { + report := make(map[string]interface{}) + report["Version"] = fmt.Sprintf("%s-%s", buildDate, gitHash) + report["Parameters"] = params.report() + testreps := make([]map[string]interface{}, 0, len(tests)) + for _, r := range tests { + testreps = append(testreps, r.report()) + } + report["Tests"] = testreps + return report +} + +func (params Params) reportPrint(report map[string]interface{}) { + if params.jsonOutput { + b, err := json.Marshal(report) + if err != nil { + fmt.Println("Cannot generate JSON report %v", err) + } + fmt.Println(string(b)) + return + } + + mapPrint(report, strings.Split(params.reportFormat, ";"), "") +} + +func (r Result) report() map[string]interface{} { + ret := make(map[string]interface{}) + ret["Operation"] = r.operation + ret["Total Requests Count"] = len(r.opDurations) + if r.operation == opWrite || r.operation == opRead || r.operation == opValidate { + ret["Total Transferred (MB)"] = float64(r.bytesTransmitted)/(1024*1024) + ret["Total Throughput (MB/s)"] = (float64(r.bytesTransmitted)/(1024*1024))/r.totalDuration.Seconds() + } + ret["Total Duration (s)"] = r.totalDuration.Seconds() + + if len(r.opDurations) > 0 { + ret["Duration Max"] = percentile(r.opDurations, 100) + ret["Duration Avg"] = avg(r.opDurations) + ret["Duration Min"] = percentile(r.opDurations, 0) + ret["Duration 99th-ile"] = percentile(r.opDurations, 99) + ret["Duration 90th-ile"] = percentile(r.opDurations, 90) + ret["Duration 75th-ile"] = percentile(r.opDurations, 75) + ret["Duration 50th-ile"] = percentile(r.opDurations, 50) + ret["Duration 25th-ile"] = percentile(r.opDurations, 25) + } + + if len(r.opTtfb) > 0 { + ret["Ttfb Max"] = percentile(r.opTtfb, 100) + ret["Ttfb Avg"] = avg(r.opTtfb) + ret["Ttfb Min"] = percentile(r.opTtfb, 0) + ret["Ttfb 99th-ile"] = percentile(r.opTtfb, 99) + ret["Ttfb 90th-ile"] = percentile(r.opTtfb, 90) + ret["Ttfb 75th-ile"] = percentile(r.opTtfb, 75) + ret["Ttfb 50th-ile"] = percentile(r.opTtfb, 50) + ret["Ttfb 25th-ile"] = percentile(r.opTtfb, 25) + } + + ret["Errors Count"] = len(r.opErrors) + ret["Errors"] = r.opErrors + return ret +} + +func (params Params) report() map[string]interface{} { + ret := make(map[string]interface{}) + ret["endpoints"] = params.endpoints + ret["bucket"] = params.bucketName + ret["objectNamePrefix"] = params.objectNamePrefix + ret["objectSize (MB)"] = float64(params.objectSize)/(1024*1024) + ret["numClients"] = params.numClients + ret["numSamples"] = params.numSamples + ret["sampleReads"] = params.sampleReads + ret["verbose"] = params.verbose + ret["headObj"] = params.headObj + ret["clientDelay"] = params.clientDelay + ret["jsonOutput"] = params.jsonOutput + ret["deleteAtOnce"] = params.deleteAtOnce + ret["numTags"] = params.numTags + ret["putObjTag"] = params.putObjTag + ret["getObjTag"] = params.getObjTag + ret["readObj"] = params.readObj + ret["tagNamePrefix"] = params.tagNamePrefix + ret["tagValPrefix"] = params.tagValPrefix + ret["reportFormat"] = params.reportFormat + ret["validate"] = params.validate + return ret +} diff --git a/s3bench.go b/s3bench.go index 497bed1..fa13c4b 100644 --- a/s3bench.go +++ b/s3bench.go @@ -3,6 +3,8 @@ package main import ( "bytes" "crypto/rand" + "crypto/sha512" + "hash" "flag" "fmt" "io" @@ -11,10 +13,7 @@ import ( "sort" "strings" "time" - "regexp" - "strconv" mathrand "math/rand" - "encoding/json" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -22,20 +21,9 @@ import ( "github.com/aws/aws-sdk-go/service/s3" ) -var ( - gitHash string - buildDate string -) - -const ( - opRead = "Read" - opWrite = "Write" - opHeadObj = "HeadObj" - opGetObjTag = "GetObjTag" - opPutObjTag = "PutObjTag" -) - var bufferBytes []byte +var data_hash_base32 string +var data_hash [sha512.Size]byte // true if created // false if existed @@ -57,35 +45,13 @@ func (params *Params) prepareBucket(cfg *aws.Config) bool { return false } -func parse_size(sz string) int64 { - sizes := map[string]int64 { - "b": 1, - "Kb": 1024, - "Mb": 1024 * 1024, - "Gb": 1024 * 1024 * 1024, - } - re := regexp.MustCompile(`^(\d+)([bKMG]{1,2})$`) - mm := re.FindStringSubmatch(sz) - if len(mm) != 3 { - fmt.Printf("Invalid objectSize value format\n") - os.Exit(1) - } - val, err := strconv.ParseInt(string(mm[1]), 10, 64) - mult, ex := sizes[string(mm[2])] - if !ex || err != nil { - fmt.Printf("Invalid objectSize value\n") - os.Exit(1) - } - return val * mult -} - func main() { endpoint := flag.String("endpoint", "", "S3 endpoint(s) comma separated - http://IP:PORT,http://IP:PORT") region := flag.String("region", "igneous-test", "AWS region to use, eg: us-west-1|us-east-1, etc") accessKey := flag.String("accessKey", "", "the S3 access key") accessSecret := flag.String("accessSecret", "", "the S3 access secret") bucketName := flag.String("bucket", "bucketname", "the bucket for which to run the test") - objectNamePrefix := flag.String("objectNamePrefix", "loadgen_test_", "prefix of the object name that will be used") + objectNamePrefix := flag.String("objectNamePrefix", "loadgen_test", "prefix of the object name that will be used") objectSize := flag.String("objectSize", "80Mb", "size of individual requests (must be smaller than main memory)") numClients := flag.Int("numClients", 40, "number of concurrent clients") numSamples := flag.Int("numSamples", 200, "total number of requests to send") @@ -103,6 +69,7 @@ func main() { tagValPrefix := flag.String("tagValPrefix", "tag_val_", "prefix of the tag value that will be used") version := flag.Bool("version", false, "print version info") reportFormat := flag.String("reportFormat", "Version;Parameters;Parameters:numClients;Parameters:numSamples;Parameters:objectSize (MB);Parameters:sampleReads;Parameters:clientDelay;Parameters:readObj;Parameters:headObj;Parameters:putObjTag;Parameters:getObjTag;Tests:Operation;Tests:Total Requests Count;Tests:Errors Count;Tests:Total Throughput (MB/s);Tests:Duration Max;Tests:Duration Avg;Tests:Duration Min;Tests:Ttfb Max;Tests:Ttfb Avg;Tests:Ttfb Min;-Tests:Duration 25th-ile;-Tests:Duration 50th-ile;-Tests:Duration 75th-ile;-Tests:Ttfb 25th-ile;-Tests:Ttfb 50th-ile;-Tests:Ttfb 75th-ile;", "rearrange output fields") + validate := flag.Bool("validate", false, "validate stored data") flag.Parse() @@ -155,7 +122,7 @@ func main() { tagNamePrefix: *tagNamePrefix, tagValPrefix: *tagValPrefix, reportFormat: *reportFormat, - + validate: *validate, } // Generate the data from which we will do the writting @@ -164,9 +131,10 @@ func main() { bufferBytes = make([]byte, params.objectSize, params.objectSize) _, err := rand.Read(bufferBytes) if err != nil { - fmt.Printf("Could not allocate a buffer") - os.Exit(1) + panic("Could not allocate a buffer") } + data_hash = sha512.Sum512(bufferBytes) + data_hash_base32 = to_b32(data_hash[:]) params.printf("Done (%s)\n", time.Since(timeGenData)) // Start the load clients and run a write test followed by a read test @@ -201,6 +169,10 @@ func main() { params.printf("Running %s test...\n", opRead) testResults = append(testResults, params.Run(opRead)) } + if params.validate { + params.printf("Running %s test...\n", opValidate) + testResults = append(testResults, params.Run(opValidate)) + } // Do cleanup if required if !*skipCleanup { @@ -295,23 +267,32 @@ func (params *Params) submitLoad(op string) { bucket := aws.String(params.bucketName) opSamples := params.spo(op) for i := uint(0); i < opSamples; i++ { - key := aws.String(fmt.Sprintf("%s%d", params.objectNamePrefix, i % params.numSamples)) + key := aws.String(fmt.Sprintf("%s_%s_%d", params.objectNamePrefix, data_hash_base32, i % params.numSamples)) if op == opWrite { - params.requests <- &s3.PutObjectInput{ - Bucket: bucket, - Key: key, - Body: bytes.NewReader(bufferBytes), - } - } else if op == opRead { - params.requests <- &s3.GetObjectInput{ - Bucket: bucket, - Key: key, + params.requests <- Req{ + top: op, + req : &s3.PutObjectInput{ + Bucket: bucket, + Key: key, + Body: bytes.NewReader(bufferBytes), + }, } + } else if op == opRead || op == opValidate { + params.requests <- Req{ + top: op, + req: &s3.GetObjectInput{ + Bucket: bucket, + Key: key, + }, + } } else if op == opHeadObj { - params.requests <- &s3.HeadObjectInput{ - Bucket: bucket, - Key: key, - } + params.requests <- Req{ + top: op, + req: &s3.HeadObjectInput{ + Bucket: bucket, + Key: key, + }, + } } else if op == opPutObjTag { tagSet := make([]*s3.Tag, 0, params.numTags) for iTag := uint(0); iTag < params.numTags; iTag++ { @@ -322,15 +303,21 @@ func (params *Params) submitLoad(op string) { Value: &tag_value, }) } - params.requests <- &s3.PutObjectTaggingInput{ - Bucket: bucket, - Key: key, - Tagging: &s3.Tagging{ TagSet: tagSet, }, + params.requests <- Req{ + top: op, + req: &s3.PutObjectTaggingInput{ + Bucket: bucket, + Key: key, + Tagging: &s3.Tagging{ TagSet: tagSet, }, + }, } } else if op == opGetObjTag { - params.requests <- &s3.GetObjectTaggingInput{ - Bucket: bucket, - Key: key, + params.requests <- Req{ + top: op, + req: &s3.GetObjectTaggingInput{ + Bucket: bucket, + Key: key, + }, } } else { panic("Developer error") @@ -360,8 +347,10 @@ func (params *Params) startClient(cfg *aws.Config) { var ttfb time.Duration var err error var numBytes int64 = 0 + cur_op := request.top + var hasher hash.Hash = nil - switch r := request.(type) { + switch r := request.req.(type) { case *s3.PutObjectInput: req, _ := svc.PutObjectRequest(r) // Disable payload checksum calculation (very expensive) @@ -376,13 +365,25 @@ func (params *Params) startClient(cfg *aws.Config) { err = req.Send() ttfb = time.Since(putStartTime) if err == nil { - numBytes, err = io.Copy(ioutil.Discard, resp.Body) + if cur_op == opRead { + numBytes, err = io.Copy(ioutil.Discard, resp.Body) + } else if cur_op == opValidate { + hasher = sha512.New() + numBytes, err = io.Copy(hasher, resp.Body) + } } if err != nil { numBytes = 0 } else if numBytes != params.objectSize { err = fmt.Errorf("expected object length %d, actual %d", params.objectSize, numBytes) } + if cur_op == opValidate && err == nil { + cur_sum := hasher.Sum(nil) + if !bytes.Equal(cur_sum, data_hash[:]) { + cur_sum_enc := to_b32(cur_sum[:]) + err = fmt.Errorf("Read data checksum %s is not eq to write data checksum %s", cur_sum_enc, data_hash_base32) + } + } case *s3.HeadObjectInput: req, resp := svc.HeadObjectRequest(r) err = req.Send() @@ -408,258 +409,3 @@ func (params *Params) startClient(cfg *aws.Config) { params.responses <- Resp{err, time.Since(putStartTime), numBytes, ttfb} } } - -// Specifies the parameters for a given test -type Params struct { - requests chan Req - responses chan Resp - numSamples uint - numClients uint - objectSize int64 - objectNamePrefix string - bucketName string - endpoints []string - verbose bool - headObj bool - sampleReads uint - clientDelay int - jsonOutput bool - deleteAtOnce int - putObjTag bool - getObjTag bool - numTags uint - readObj bool - tagNamePrefix string - tagValPrefix string - reportFormat string -} - -func (params Params) printf(f string, args ...interface{}) { - if params.verbose { - fmt.Printf(f, args...) - } -} - -// Contains the summary for a given test result -type Result struct { - operation string - bytesTransmitted int64 - opDurations []float64 - totalDuration time.Duration - opTtfb []float64 - opErrors []string -} - -func (r Result) report() map[string]interface{} { - ret := make(map[string]interface{}) - ret["Operation"] = r.operation - ret["Total Requests Count"] = len(r.opDurations) - if r.operation == opWrite || r.operation == opRead { - ret["Total Transferred (MB)"] = float64(r.bytesTransmitted)/(1024*1024) - ret["Total Throughput (MB/s)"] = (float64(r.bytesTransmitted)/(1024*1024))/r.totalDuration.Seconds() - } - ret["Total Duration (s)"] = r.totalDuration.Seconds() - - if len(r.opDurations) > 0 { - ret["Duration Max"] = percentile(r.opDurations, 100) - ret["Duration Avg"] = avg(r.opDurations) - ret["Duration Min"] = percentile(r.opDurations, 0) - ret["Duration 99th-ile"] = percentile(r.opDurations, 99) - ret["Duration 90th-ile"] = percentile(r.opDurations, 90) - ret["Duration 75th-ile"] = percentile(r.opDurations, 75) - ret["Duration 50th-ile"] = percentile(r.opDurations, 50) - ret["Duration 25th-ile"] = percentile(r.opDurations, 25) - } - - if len(r.opTtfb) > 0 { - ret["Ttfb Max"] = percentile(r.opTtfb, 100) - ret["Ttfb Avg"] = avg(r.opTtfb) - ret["Ttfb Min"] = percentile(r.opTtfb, 0) - ret["Ttfb 99th-ile"] = percentile(r.opTtfb, 99) - ret["Ttfb 90th-ile"] = percentile(r.opTtfb, 90) - ret["Ttfb 75th-ile"] = percentile(r.opTtfb, 75) - ret["Ttfb 50th-ile"] = percentile(r.opTtfb, 50) - ret["Ttfb 25th-ile"] = percentile(r.opTtfb, 25) - } - - ret["Errors Count"] = len(r.opErrors) - ret["Errors"] = r.opErrors - return ret -} - -func (params Params) report() map[string]interface{} { - ret := make(map[string]interface{}) - ret["endpoints"] = params.endpoints - ret["bucket"] = params.bucketName - ret["objectNamePrefix"] = params.objectNamePrefix - ret["objectSize (MB)"] = float64(params.objectSize)/(1024*1024) - ret["numClients"] = params.numClients - ret["numSamples"] = params.numSamples - ret["sampleReads"] = params.sampleReads - ret["verbose"] = params.verbose - ret["headObj"] = params.headObj - ret["clientDelay"] = params.clientDelay - ret["jsonOutput"] = params.jsonOutput - ret["deleteAtOnce"] = params.deleteAtOnce - ret["numTags"] = params.numTags - ret["putObjTag"] = params.putObjTag - ret["getObjTag"] = params.getObjTag - ret["readObj"] = params.readObj - ret["tagNamePrefix"] = params.tagNamePrefix - ret["tagValPrefix"] = params.tagValPrefix - ret["reportFormat"] = params.reportFormat - return ret -} - -func (params Params) reportPrepare(tests []Result) map[string]interface{} { - report := make(map[string]interface{}) - report["Version"] = fmt.Sprintf("%s-%s", buildDate, gitHash) - report["Parameters"] = params.report() - testreps := make([]map[string]interface{}, 0, len(tests)) - for _, r := range tests { - testreps = append(testreps, r.report()) - } - report["Tests"] = testreps - return report -} - -func indexOf(sls []string, s string) int { - ret := -1 - for i, v := range sls { - if v == s { - ret = i - break - } - } - return ret -} - -func keysSort(keys []string, format []string) []string { - sort.Strings(keys) - cur_formated := 0 - - for _, fv := range format { - fv = strings.TrimSpace(fv) - should_del := strings.HasPrefix(fv, "-") - if should_del { - fv = fv[1:] - } - ci := indexOf(keys, fv) - if ci < 0 { - continue - } - // delete old pos - keys = append(keys[:ci], keys[ci+1:]...) - - if !should_del { - // insert new pos - keys = append(keys[:cur_formated], append([]string{fv}, keys[cur_formated:]...)...) - cur_formated++ - } - } - - return keys -} - -func formatFilter(format []string, key string) []string { - ret := []string{} - for _, v := range format { - if strings.HasPrefix(v, key + ":") { - ret = append(ret, v[len(key + ":"):]) - } else if strings.HasPrefix(v, "-" + key + ":") { - ret = append(ret, "-" + v[len("-" + key + ":"):]) - } - } - - return ret -} - -func mapPrint(m map[string]interface{}, repFormat []string, prefix string) { - var mkeys []string - for k,_ := range m { - mkeys = append(mkeys, k) - } - mkeys = keysSort(mkeys, repFormat) - for _, k := range mkeys { - v := m[k] - fmt.Printf("%s %-27s", prefix, k+":") - switch val := v.(type) { - case []string: - if len(val) == 0 { - fmt.Printf(" []\n") - } else { - fmt.Println() - for _, s := range val { - fmt.Printf("%s%s %s\n", prefix, prefix, s) - } - } - case map[string]interface{}: - fmt.Println() - mapPrint(val, formatFilter(repFormat, k), prefix + " ") - case []map[string]interface{}: - if len(val) == 0 { - fmt.Printf(" []\n") - } else { - val_format := formatFilter(repFormat, k) - for _, m := range val { - fmt.Println() - mapPrint(m, val_format, prefix + " ") - } - } - case float64: - fmt.Printf(" %.3f\n", val) - default: - fmt.Printf(" %v\n", val) - } - } -} - -func (params Params) reportPrint(report map[string]interface{}) { - if params.jsonOutput { - b, err := json.Marshal(report) - if err != nil { - fmt.Println("Cannot generate JSON report %v", err) - } - fmt.Println(string(b)) - return - } - - mapPrint(report, strings.Split(params.reportFormat, ";"), "") -} - -// samples per operation -func (params Params) spo(op string) uint { - if op == opWrite || op == opPutObjTag { - return params.numSamples - } - - return params.numSamples * params.sampleReads -} - -func percentile(dt []float64, i int) float64 { - ln := len(dt) - if i >= 100 { - i = ln - 1 - } else if i > 0 && i < 100 { - i = int(float64(i) / 100 * float64(ln)) - } - return dt[i] -} - -func avg(dt []float64) float64 { - ln := float64(len(dt)) - sm := float64(0) - for _, el := range dt { - sm += el - } - return sm / ln -} - -type Req interface{} - -type Resp struct { - err error - duration time.Duration - numBytes int64 - ttfb time.Duration -} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..cad4273 --- /dev/null +++ b/utils.go @@ -0,0 +1,81 @@ +package main + +import ( + "fmt" + "strconv" + "regexp" + "encoding/base32" +) + +func to_b32(dt []byte) string { + return base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(dt) +} + +func from_b32(s string) ([]byte, error) { + return base32.StdEncoding.WithPadding(base32.NoPadding).DecodeString(s) +} + +func parse_size(sz string) int64 { + sizes := map[string]int64 { + "b": 1, + "Kb": 1024, + "Mb": 1024 * 1024, + "Gb": 1024 * 1024 * 1024, + } + re := regexp.MustCompile(`^(\d+)([bKMG]{1,2})$`) + mm := re.FindStringSubmatch(sz) + if len(mm) != 3 { + panic("Invalid objectSize value format\n") + } + val, err := strconv.ParseInt(string(mm[1]), 10, 64) + mult, ex := sizes[string(mm[2])] + if !ex || err != nil { + panic("Invalid objectSize value\n") + } + return val * mult +} + +func (params Params) printf(f string, args ...interface{}) { + if params.verbose { + fmt.Printf(f, args...) + } +} + +// samples per operation +func (params Params) spo(op string) uint { + if op == opWrite || op == opPutObjTag || op == opValidate { + return params.numSamples + } + + return params.numSamples * params.sampleReads +} + +func percentile(dt []float64, i int) float64 { + ln := len(dt) + if i >= 100 { + i = ln - 1 + } else if i > 0 && i < 100 { + i = int(float64(i) / 100 * float64(ln)) + } + return dt[i] +} + +func avg(dt []float64) float64 { + ln := float64(len(dt)) + sm := float64(0) + for _, el := range dt { + sm += el + } + return sm / ln +} + +func indexOf(sls []string, s string) int { + ret := -1 + for i, v := range sls { + if v == s { + ret = i + break + } + } + return ret +} From 6442df57c88cc35fe4a879adac9fd44f2e424808 Mon Sep 17 00:00:00 2001 From: pujamudaliar <61046306+pujamudaliar@users.noreply.github.com> Date: Thu, 25 Jun 2020 21:48:24 +0530 Subject: [PATCH 19/20] Update README.md --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 5f86097..1475be4 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +[![Codacy Badge](https://app.codacy.com/project/badge/Grade/412fa22ba5f8452794584ed9819f149b)](https://www.codacy.com?utm_source=github.com&utm_medium=referral&utm_content=Seagate/s3bench&utm_campaign=Badge_Grade) + # Initial Cloned from ``` From dc5a27fc141b404ea16d6000b377c034b034966f Mon Sep 17 00:00:00 2001 From: Dmitrii Surnin Date: Tue, 7 Jul 2020 22:13:01 -0600 Subject: [PATCH 20/20] EOS-9790 : S3 - S3bench: enhance checksum support, cover failover scenario Added ability to run read/validate and write tests independently each other added cmd param -skipWrite added cmd param -skipRead fix tag delete on clean up Signed-off-by: Dmitrii Surnin --- data_decl.go | 2 ++ report.go | 2 ++ s3bench.go | 57 ++++++++++++++++++++++++++++++++++------------------ utils.go | 35 ++++++++++++++++++++++++++++++++ 4 files changed, 77 insertions(+), 19 deletions(-) diff --git a/data_decl.go b/data_decl.go index 8f20dd9..557f2d1 100644 --- a/data_decl.go +++ b/data_decl.go @@ -52,6 +52,8 @@ type Params struct { tagValPrefix string reportFormat string validate bool + skipWrite bool + skipRead bool } // Contains the summary for a given test result diff --git a/report.go b/report.go index 142853e..36509e1 100644 --- a/report.go +++ b/report.go @@ -171,5 +171,7 @@ func (params Params) report() map[string]interface{} { ret["tagValPrefix"] = params.tagValPrefix ret["reportFormat"] = params.reportFormat ret["validate"] = params.validate + ret["skipWrite"] = params.skipWrite + ret["skipRead"] = params.skipRead return ret } diff --git a/s3bench.go b/s3bench.go index fa13c4b..d77fbd9 100644 --- a/s3bench.go +++ b/s3bench.go @@ -70,6 +70,8 @@ func main() { version := flag.Bool("version", false, "print version info") reportFormat := flag.String("reportFormat", "Version;Parameters;Parameters:numClients;Parameters:numSamples;Parameters:objectSize (MB);Parameters:sampleReads;Parameters:clientDelay;Parameters:readObj;Parameters:headObj;Parameters:putObjTag;Parameters:getObjTag;Tests:Operation;Tests:Total Requests Count;Tests:Errors Count;Tests:Total Throughput (MB/s);Tests:Duration Max;Tests:Duration Avg;Tests:Duration Min;Tests:Ttfb Max;Tests:Ttfb Avg;Tests:Ttfb Min;-Tests:Duration 25th-ile;-Tests:Duration 50th-ile;-Tests:Duration 75th-ile;-Tests:Ttfb 25th-ile;-Tests:Ttfb 50th-ile;-Tests:Ttfb 75th-ile;", "rearrange output fields") validate := flag.Bool("validate", false, "validate stored data") + skipWrite := flag.Bool("skipWrite", false, "do not run Write test") + skipRead := flag.Bool("skipRead", false, "do not run Read test") flag.Parse() @@ -118,41 +120,59 @@ func main() { putObjTag: *putObjTag || *getObjTag, getObjTag: *getObjTag, numTags: uint(*numTags), - readObj: !(*putObjTag || *getObjTag || *headObj), + readObj: !(*putObjTag || *getObjTag || *headObj) && !*skipRead, tagNamePrefix: *tagNamePrefix, tagValPrefix: *tagValPrefix, reportFormat: *reportFormat, validate: *validate, + skipWrite: *skipWrite, + skipRead: *skipRead, } - // Generate the data from which we will do the writting - params.printf("Generating in-memory sample data...\n") - timeGenData := time.Now() - bufferBytes = make([]byte, params.objectSize, params.objectSize) - _, err := rand.Read(bufferBytes) - if err != nil { - panic("Could not allocate a buffer") + if !params.skipWrite { + // Generate the data from which we will do the writting + params.printf("Generating in-memory sample data...\n") + timeGenData := time.Now() + bufferBytes = make([]byte, params.objectSize, params.objectSize) + _, err := rand.Read(bufferBytes) + if err != nil { + panic("Could not allocate a buffer") + } + data_hash = sha512.Sum512(bufferBytes) + data_hash_base32 = to_b32(data_hash[:]) + params.printf("Done (%s)\n", time.Since(timeGenData)) } - data_hash = sha512.Sum512(bufferBytes) - data_hash_base32 = to_b32(data_hash[:]) - params.printf("Done (%s)\n", time.Since(timeGenData)) - // Start the load clients and run a write test followed by a read test cfg := &aws.Config{ Credentials: credentials.NewStaticCredentials(*accessKey, *accessSecret, ""), Region: aws.String(*region), S3ForcePathStyle: aws.Bool(true), } + if data_hash_base32 == "" { + var err error + data_hash_base32, err = params.getObjectHash(cfg) + if err != nil { + panic(fmt.Sprintf("Cannot read object hash:> %v", err)) + } + var hash_from_b32 []byte + hash_from_b32, err = from_b32(data_hash_base32) + if err != nil { + panic(fmt.Sprintf("Cannot convert object hash:> %v", err)) + } + copy(data_hash[:], hash_from_b32) + } + bucket_created := params.prepareBucket(cfg) params.StartClients(cfg) testResults := []Result{} - params.printf("Running %s test...\n", opWrite) - testResults = append(testResults, params.Run(opWrite)) - + if !params.skipWrite { + params.printf("Running %s test...\n", opWrite) + testResults = append(testResults, params.Run(opWrite)) + } if params.putObjTag { params.printf("Running %s test...\n", opPutObjTag) testResults = append(testResults, params.Run(opPutObjTag)) @@ -184,8 +204,7 @@ func main() { keyList := make([]*s3.ObjectIdentifier, 0, params.deleteAtOnce) for i := 0; i < *numSamples; i++ { - objName := fmt.Sprintf("%s%d", *objectNamePrefix, i) - key := aws.String(objName) + key := genObjName(params.objectNamePrefix, data_hash_base32, uint(i)) if params.putObjTag { deleteObjectTaggingInput := &s3.DeleteObjectTaggingInput{ @@ -193,7 +212,7 @@ func main() { Key: key, } _, err := svc.DeleteObjectTagging(deleteObjectTaggingInput) - params.printf("Delete tags %s |err %v\n", objName, err) + params.printf("Delete tags %s |err %v\n", *key, err) } bar := s3.ObjectIdentifier{ Key: key, } keyList = append(keyList, &bar) @@ -267,7 +286,7 @@ func (params *Params) submitLoad(op string) { bucket := aws.String(params.bucketName) opSamples := params.spo(op) for i := uint(0); i < opSamples; i++ { - key := aws.String(fmt.Sprintf("%s_%s_%d", params.objectNamePrefix, data_hash_base32, i % params.numSamples)) + key := genObjName(params.objectNamePrefix, data_hash_base32, i % params.numSamples) if op == opWrite { params.requests <- Req{ top: op, diff --git a/utils.go b/utils.go index cad4273..82c899d 100644 --- a/utils.go +++ b/utils.go @@ -5,6 +5,11 @@ import ( "strconv" "regexp" "encoding/base32" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + ) func to_b32(dt []byte) string { @@ -79,3 +84,33 @@ func indexOf(sls []string, s string) int { } return ret } + +func genObjName(pref string, hsh string, idx uint) *string { + return aws.String(fmt.Sprintf("%s_%s_%d", pref, hsh, idx)) +} + +func (params *Params) getObjectHash(cfg *aws.Config) (string, error){ + cfg.Endpoint = aws.String(params.endpoints[0]) + svc := s3.New(session.New(), cfg) + + result, err := svc.ListObjectsV2(&s3.ListObjectsV2Input{ + Bucket: aws.String(params.bucketName), + MaxKeys: aws.Int64(1), + Prefix: aws.String(params.objectNamePrefix), + }) + + if err != nil { + return "", err + } + if len(result.Contents) == 0 { + return "", fmt.Errorf("Empty bucket") + } + + re := regexp.MustCompile(`^.*_([A-Z2-7]+)_[0-9]+$`) + mm := re.FindStringSubmatch(*result.Contents[0].Key) + if len(mm) != 2 { + return "", fmt.Errorf("Invalid object name format") + } + + return mm[1], nil +}