diff --git a/README.md b/README.md index f0939b8..1475be4 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,11 @@ +[![Codacy Badge](https://app.codacy.com/project/badge/Grade/412fa22ba5f8452794584ed9819f149b)](https://www.codacy.com?utm_source=github.com&utm_medium=referral&utm_content=Seagate/s3bench&utm_campaign=Badge_Grade) + +# Initial +Cloned from +``` +https://github.com/igneous-systems/s3bench.git +``` + # S3 Bench This tool offers the ability to run very basic throughput benchmarking against an S3-compatible endpoint. It does a series of put operations followed by a @@ -125,3 +133,10 @@ Put times 50th %ile: 0.001 s Put times 25th %ile: 0.001 s Put times Min: 0.001 s ``` + +##### Head-object +It is possible to send head-object requests instead of get-object. +For this purpose one sould use *-metaData* flag +``` +./s3bench -accessKey=KEY -accessSecret=SECRET -bucket=loadgen -endpoint=http://endpoint1:80 -numClients=2 -numSamples=10 -objectNamePrefix=loadgen -objectSize=1024 -metaData +``` diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..45a6179 --- /dev/null +++ b/build.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +set -e + +now=$(date +'%Y-%m-%d-%T') +githash=$(git rev-parse HEAD) + +echo "Building version $now-$githash..." + +go build -ldflags "-X main.gitHash=$githash -X main.buildDate=$now" + +echo "Complete" diff --git a/data_decl.go b/data_decl.go new file mode 100644 index 0000000..557f2d1 --- /dev/null +++ b/data_decl.go @@ -0,0 +1,67 @@ +package main + +import "time" + +var ( + gitHash string + buildDate string +) + +const ( + opRead = "Read" + opWrite = "Write" + opHeadObj = "HeadObj" + opGetObjTag = "GetObjTag" + opPutObjTag = "PutObjTag" + opValidate = "Validate" +) + +type Req struct { + top string + req interface{} +} + +type Resp struct { + err error + duration time.Duration + numBytes int64 + ttfb time.Duration +} + +// Specifies the parameters for a given test +type Params struct { + requests chan Req + responses chan Resp + numSamples uint + numClients uint + objectSize int64 + objectNamePrefix string + bucketName string + endpoints []string + verbose bool + headObj bool + sampleReads uint + clientDelay int + jsonOutput bool + deleteAtOnce int + putObjTag bool + getObjTag bool + numTags uint + readObj bool + tagNamePrefix string + tagValPrefix string + reportFormat string + validate bool + skipWrite bool + skipRead bool +} + +// Contains the summary for a given test result +type Result struct { + operation string + bytesTransmitted int64 + opDurations []float64 + totalDuration time.Duration + opTtfb []float64 + opErrors []string +} diff --git a/report.go b/report.go new file mode 100644 index 0000000..36509e1 --- /dev/null +++ b/report.go @@ -0,0 +1,177 @@ +package main + +import ( + "fmt" + "sort" + "strings" + "encoding/json" +) + +func keysSort(keys []string, format []string) []string { + sort.Strings(keys) + cur_formated := 0 + + for _, fv := range format { + fv = strings.TrimSpace(fv) + should_del := strings.HasPrefix(fv, "-") + if should_del { + fv = fv[1:] + } + ci := indexOf(keys, fv) + if ci < 0 { + continue + } + // delete old pos + keys = append(keys[:ci], keys[ci+1:]...) + + if !should_del { + // insert new pos + keys = append(keys[:cur_formated], append([]string{fv}, keys[cur_formated:]...)...) + cur_formated++ + } + } + + return keys +} + +func formatFilter(format []string, key string) []string { + ret := []string{} + for _, v := range format { + if strings.HasPrefix(v, key + ":") { + ret = append(ret, v[len(key + ":"):]) + } else if strings.HasPrefix(v, "-" + key + ":") { + ret = append(ret, "-" + v[len("-" + key + ":"):]) + } + } + + return ret +} + +func mapPrint(m map[string]interface{}, repFormat []string, prefix string) { + var mkeys []string + for k,_ := range m { + mkeys = append(mkeys, k) + } + mkeys = keysSort(mkeys, repFormat) + for _, k := range mkeys { + v := m[k] + fmt.Printf("%s %-27s", prefix, k+":") + switch val := v.(type) { + case []string: + if len(val) == 0 { + fmt.Printf(" []\n") + } else { + fmt.Println() + for _, s := range val { + fmt.Printf("%s%s %s\n", prefix, prefix, s) + } + } + case map[string]interface{}: + fmt.Println() + mapPrint(val, formatFilter(repFormat, k), prefix + " ") + case []map[string]interface{}: + if len(val) == 0 { + fmt.Printf(" []\n") + } else { + val_format := formatFilter(repFormat, k) + for _, m := range val { + fmt.Println() + mapPrint(m, val_format, prefix + " ") + } + } + case float64: + fmt.Printf(" %.3f\n", val) + default: + fmt.Printf(" %v\n", val) + } + } +} + +func (params Params) reportPrepare(tests []Result) map[string]interface{} { + report := make(map[string]interface{}) + report["Version"] = fmt.Sprintf("%s-%s", buildDate, gitHash) + report["Parameters"] = params.report() + testreps := make([]map[string]interface{}, 0, len(tests)) + for _, r := range tests { + testreps = append(testreps, r.report()) + } + report["Tests"] = testreps + return report +} + +func (params Params) reportPrint(report map[string]interface{}) { + if params.jsonOutput { + b, err := json.Marshal(report) + if err != nil { + fmt.Println("Cannot generate JSON report %v", err) + } + fmt.Println(string(b)) + return + } + + mapPrint(report, strings.Split(params.reportFormat, ";"), "") +} + +func (r Result) report() map[string]interface{} { + ret := make(map[string]interface{}) + ret["Operation"] = r.operation + ret["Total Requests Count"] = len(r.opDurations) + if r.operation == opWrite || r.operation == opRead || r.operation == opValidate { + ret["Total Transferred (MB)"] = float64(r.bytesTransmitted)/(1024*1024) + ret["Total Throughput (MB/s)"] = (float64(r.bytesTransmitted)/(1024*1024))/r.totalDuration.Seconds() + } + ret["Total Duration (s)"] = r.totalDuration.Seconds() + + if len(r.opDurations) > 0 { + ret["Duration Max"] = percentile(r.opDurations, 100) + ret["Duration Avg"] = avg(r.opDurations) + ret["Duration Min"] = percentile(r.opDurations, 0) + ret["Duration 99th-ile"] = percentile(r.opDurations, 99) + ret["Duration 90th-ile"] = percentile(r.opDurations, 90) + ret["Duration 75th-ile"] = percentile(r.opDurations, 75) + ret["Duration 50th-ile"] = percentile(r.opDurations, 50) + ret["Duration 25th-ile"] = percentile(r.opDurations, 25) + } + + if len(r.opTtfb) > 0 { + ret["Ttfb Max"] = percentile(r.opTtfb, 100) + ret["Ttfb Avg"] = avg(r.opTtfb) + ret["Ttfb Min"] = percentile(r.opTtfb, 0) + ret["Ttfb 99th-ile"] = percentile(r.opTtfb, 99) + ret["Ttfb 90th-ile"] = percentile(r.opTtfb, 90) + ret["Ttfb 75th-ile"] = percentile(r.opTtfb, 75) + ret["Ttfb 50th-ile"] = percentile(r.opTtfb, 50) + ret["Ttfb 25th-ile"] = percentile(r.opTtfb, 25) + } + + ret["Errors Count"] = len(r.opErrors) + ret["Errors"] = r.opErrors + return ret +} + +func (params Params) report() map[string]interface{} { + ret := make(map[string]interface{}) + ret["endpoints"] = params.endpoints + ret["bucket"] = params.bucketName + ret["objectNamePrefix"] = params.objectNamePrefix + ret["objectSize (MB)"] = float64(params.objectSize)/(1024*1024) + ret["numClients"] = params.numClients + ret["numSamples"] = params.numSamples + ret["sampleReads"] = params.sampleReads + ret["verbose"] = params.verbose + ret["headObj"] = params.headObj + ret["clientDelay"] = params.clientDelay + ret["jsonOutput"] = params.jsonOutput + ret["deleteAtOnce"] = params.deleteAtOnce + ret["numTags"] = params.numTags + ret["putObjTag"] = params.putObjTag + ret["getObjTag"] = params.getObjTag + ret["readObj"] = params.readObj + ret["tagNamePrefix"] = params.tagNamePrefix + ret["tagValPrefix"] = params.tagValPrefix + ret["reportFormat"] = params.reportFormat + ret["validate"] = params.validate + ret["skipWrite"] = params.skipWrite + ret["skipRead"] = params.skipRead + return ret +} diff --git a/s3bench.go b/s3bench.go index 76af88f..d77fbd9 100644 --- a/s3bench.go +++ b/s3bench.go @@ -3,6 +3,8 @@ package main import ( "bytes" "crypto/rand" + "crypto/sha512" + "hash" "flag" "fmt" "io" @@ -11,6 +13,7 @@ import ( "sort" "strings" "time" + mathrand "math/rand" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -18,14 +21,29 @@ import ( "github.com/aws/aws-sdk-go/service/s3" ) -const ( - opRead = "Read" - opWrite = "Write" - //max that can be deleted at a time via DeleteObjects() - commitSize = 1000 -) - var bufferBytes []byte +var data_hash_base32 string +var data_hash [sha512.Size]byte + +// true if created +// false if existed +func (params *Params) prepareBucket(cfg *aws.Config) bool { + cfg.Endpoint = aws.String(params.endpoints[0]) + svc := s3.New(session.New(), cfg) + req, _ := svc.CreateBucketRequest( + &s3.CreateBucketInput{Bucket: aws.String(params.bucketName)}) + + err := req.Send() + + if err == nil { + return true + } else if !strings.Contains(err.Error(), "BucketAlreadyOwnedByYou:") && + !strings.Contains(err.Error(), "BucketAlreadyExists:") { + panic("Failed to create bucket: " + err.Error()) + } + + return false +} func main() { endpoint := flag.String("endpoint", "", "S3 endpoint(s) comma separated - http://IP:PORT,http://IP:PORT") @@ -33,15 +51,35 @@ func main() { accessKey := flag.String("accessKey", "", "the S3 access key") accessSecret := flag.String("accessSecret", "", "the S3 access secret") bucketName := flag.String("bucket", "bucketname", "the bucket for which to run the test") - objectNamePrefix := flag.String("objectNamePrefix", "loadgen_test_", "prefix of the object name that will be used") - objectSize := flag.Int64("objectSize", 80*1024*1024, "size of individual requests in bytes (must be smaller than main memory)") + objectNamePrefix := flag.String("objectNamePrefix", "loadgen_test", "prefix of the object name that will be used") + objectSize := flag.String("objectSize", "80Mb", "size of individual requests (must be smaller than main memory)") numClients := flag.Int("numClients", 40, "number of concurrent clients") numSamples := flag.Int("numSamples", 200, "total number of requests to send") skipCleanup := flag.Bool("skipCleanup", false, "skip deleting objects created by this tool at the end of the run") verbose := flag.Bool("verbose", false, "print verbose per thread status") + headObj := flag.Bool("headObj", false, "head-object request instead of reading obj content") + sampleReads := flag.Int("sampleReads", 1, "number of reads of each sample") + clientDelay := flag.Int("clientDelay", 1, "delay in ms before client starts. if negative value provided delay will be randomized in interval [0, abs{clientDelay})") + jsonOutput := flag.Bool("jsonOutput", false, "print results in forma of json") + deleteAtOnce := flag.Int("deleteAtOnce", 1000, "number of objs to delete at once") + putObjTag := flag.Bool("putObjTag", false, "put object's tags") + getObjTag := flag.Bool("getObjTag", false, "get object's tags") + numTags := flag.Int("numTags", 10, "number of tags to create, for objects it should in range [1..10]") + tagNamePrefix := flag.String("tagNamePrefix", "tag_name_", "prefix of the tag name that will be used") + tagValPrefix := flag.String("tagValPrefix", "tag_val_", "prefix of the tag value that will be used") + version := flag.Bool("version", false, "print version info") + reportFormat := flag.String("reportFormat", "Version;Parameters;Parameters:numClients;Parameters:numSamples;Parameters:objectSize (MB);Parameters:sampleReads;Parameters:clientDelay;Parameters:readObj;Parameters:headObj;Parameters:putObjTag;Parameters:getObjTag;Tests:Operation;Tests:Total Requests Count;Tests:Errors Count;Tests:Total Throughput (MB/s);Tests:Duration Max;Tests:Duration Avg;Tests:Duration Min;Tests:Ttfb Max;Tests:Ttfb Avg;Tests:Ttfb Min;-Tests:Duration 25th-ile;-Tests:Duration 50th-ile;-Tests:Duration 75th-ile;-Tests:Ttfb 25th-ile;-Tests:Ttfb 50th-ile;-Tests:Ttfb 75th-ile;", "rearrange output fields") + validate := flag.Bool("validate", false, "validate stored data") + skipWrite := flag.Bool("skipWrite", false, "do not run Write test") + skipRead := flag.Bool("skipRead", false, "do not run Read test") flag.Parse() + if *version { + fmt.Printf("%s-%s\n", buildDate, gitHash) + os.Exit(0) + } + if *numClients > *numSamples || *numSamples < 1 { fmt.Printf("numClients(%d) needs to be less than numSamples(%d) and greater than 0\n", *numClients, *numSamples) os.Exit(1) @@ -53,91 +91,165 @@ func main() { os.Exit(1) } + if *deleteAtOnce < 1 { + fmt.Println("Cannot delete less than 1 obj at once") + os.Exit(1) + } + + if *numTags < 1 { + fmt.Println("-numTags cannot be less than 1") + os.Exit(1) + } + // Setup and print summary of the accepted parameters params := Params{ requests: make(chan Req), responses: make(chan Resp), - numSamples: *numSamples, + numSamples: uint(*numSamples), numClients: uint(*numClients), - objectSize: *objectSize, + objectSize: parse_size(*objectSize), objectNamePrefix: *objectNamePrefix, bucketName: *bucketName, endpoints: strings.Split(*endpoint, ","), verbose: *verbose, + headObj: *headObj, + sampleReads: uint(*sampleReads), + clientDelay: *clientDelay, + jsonOutput: *jsonOutput, + deleteAtOnce: *deleteAtOnce, + putObjTag: *putObjTag || *getObjTag, + getObjTag: *getObjTag, + numTags: uint(*numTags), + readObj: !(*putObjTag || *getObjTag || *headObj) && !*skipRead, + tagNamePrefix: *tagNamePrefix, + tagValPrefix: *tagValPrefix, + reportFormat: *reportFormat, + validate: *validate, + skipWrite: *skipWrite, + skipRead: *skipRead, } - fmt.Println(params) - fmt.Println() - - // Generate the data from which we will do the writting - fmt.Printf("Generating in-memory sample data... ") - timeGenData := time.Now() - bufferBytes = make([]byte, *objectSize, *objectSize) - _, err := rand.Read(bufferBytes) - if err != nil { - fmt.Printf("Could not allocate a buffer") - os.Exit(1) + + if !params.skipWrite { + // Generate the data from which we will do the writting + params.printf("Generating in-memory sample data...\n") + timeGenData := time.Now() + bufferBytes = make([]byte, params.objectSize, params.objectSize) + _, err := rand.Read(bufferBytes) + if err != nil { + panic("Could not allocate a buffer") + } + data_hash = sha512.Sum512(bufferBytes) + data_hash_base32 = to_b32(data_hash[:]) + params.printf("Done (%s)\n", time.Since(timeGenData)) } - fmt.Printf("Done (%s)\n", time.Since(timeGenData)) - fmt.Println() - // Start the load clients and run a write test followed by a read test cfg := &aws.Config{ Credentials: credentials.NewStaticCredentials(*accessKey, *accessSecret, ""), Region: aws.String(*region), S3ForcePathStyle: aws.Bool(true), } - params.StartClients(cfg) - fmt.Printf("Running %s test...\n", opWrite) - writeResult := params.Run(opWrite) - fmt.Println() + if data_hash_base32 == "" { + var err error + data_hash_base32, err = params.getObjectHash(cfg) + if err != nil { + panic(fmt.Sprintf("Cannot read object hash:> %v", err)) + } + var hash_from_b32 []byte + hash_from_b32, err = from_b32(data_hash_base32) + if err != nil { + panic(fmt.Sprintf("Cannot convert object hash:> %v", err)) + } + copy(data_hash[:], hash_from_b32) + } + + bucket_created := params.prepareBucket(cfg) - fmt.Printf("Running %s test...\n", opRead) - readResult := params.Run(opRead) - fmt.Println() + params.StartClients(cfg) - // Repeating the parameters of the test followed by the results - fmt.Println(params) - fmt.Println() - fmt.Println(writeResult) - fmt.Println() - fmt.Println(readResult) + testResults := []Result{} + + if !params.skipWrite { + params.printf("Running %s test...\n", opWrite) + testResults = append(testResults, params.Run(opWrite)) + } + if params.putObjTag { + params.printf("Running %s test...\n", opPutObjTag) + testResults = append(testResults, params.Run(opPutObjTag)) + } + if params.getObjTag { + params.printf("Running %s test...\n", opGetObjTag) + testResults = append(testResults, params.Run(opGetObjTag)) + } + if params.headObj { + params.printf("Running %s test...\n", opHeadObj) + testResults = append(testResults, params.Run(opHeadObj)) + } + if params.readObj { + params.printf("Running %s test...\n", opRead) + testResults = append(testResults, params.Run(opRead)) + } + if params.validate { + params.printf("Running %s test...\n", opValidate) + testResults = append(testResults, params.Run(opValidate)) + } // Do cleanup if required if !*skipCleanup { - fmt.Println() - fmt.Printf("Cleaning up %d objects...\n", *numSamples) + params.printf("Cleaning up %d objects...\n", *numSamples) delStartTime := time.Now() svc := s3.New(session.New(), cfg) numSuccessfullyDeleted := 0 - keyList := make([]*s3.ObjectIdentifier, 0, commitSize) + keyList := make([]*s3.ObjectIdentifier, 0, params.deleteAtOnce) for i := 0; i < *numSamples; i++ { - bar := s3.ObjectIdentifier{ - Key: aws.String(fmt.Sprintf("%s%d", *objectNamePrefix, i)), + key := genObjName(params.objectNamePrefix, data_hash_base32, uint(i)) + + if params.putObjTag { + deleteObjectTaggingInput := &s3.DeleteObjectTaggingInput{ + Bucket: aws.String(*bucketName), + Key: key, + } + _, err := svc.DeleteObjectTagging(deleteObjectTaggingInput) + params.printf("Delete tags %s |err %v\n", *key, err) } + bar := s3.ObjectIdentifier{ Key: key, } keyList = append(keyList, &bar) - if len(keyList) == commitSize || i == *numSamples-1 { - fmt.Printf("Deleting a batch of %d objects in range {%d, %d}... ", len(keyList), i-len(keyList)+1, i) - params := &s3.DeleteObjectsInput{ + if len(keyList) == params.deleteAtOnce || i == *numSamples-1 { + params.printf("Deleting a batch of %d objects in range {%d, %d}... ", len(keyList), i-len(keyList)+1, i) + dltpar := &s3.DeleteObjectsInput{ Bucket: aws.String(*bucketName), Delete: &s3.Delete{ Objects: keyList}} - _, err := svc.DeleteObjects(params) + _, err := svc.DeleteObjects(dltpar) if err == nil { numSuccessfullyDeleted += len(keyList) - fmt.Printf("Succeeded\n") + params.printf("Succeeded\n") } else { - fmt.Printf("Failed (%v)\n", err) + params.printf("Failed (%v)\n", err) } //set cursor to 0 so we can move to the next batch. keyList = keyList[:0] } } - fmt.Printf("Successfully deleted %d/%d objects in %s\n", numSuccessfullyDeleted, *numSamples, time.Since(delStartTime)) + params.printf("Successfully deleted %d/%d objects in %s\n", numSuccessfullyDeleted, *numSamples, time.Since(delStartTime)) + + if bucket_created { + params.printf("Deleting bucket...\n") + dltpar := &s3.DeleteBucketInput{ + Bucket: aws.String(*bucketName)} + _, err := svc.DeleteBucket(dltpar) + if err == nil { + params.printf("Succeeded\n") + } else { + params.printf("Failed (%v)\n", err) + } + } } + + params.reportPrint(params.reportPrepare(testResults)) } func (params *Params) Run(op string) Result { @@ -146,46 +258,85 @@ func (params *Params) Run(op string) Result { // Start submitting load requests go params.submitLoad(op) + opSamples := params.spo(op) // Collect and aggregate stats for completed requests - result := Result{opDurations: make([]float64, 0, params.numSamples), operation: op} - for i := 0; i < params.numSamples; i++ { + result := Result{opDurations: make([]float64, 0, opSamples), operation: op} + for i := uint(0); i < opSamples; i++ { resp := <-params.responses - errorString := "" if resp.err != nil { - result.numErrors++ - errorString = fmt.Sprintf(", error: %s", resp.err) + errStr := fmt.Sprintf("%v(%d) completed in %0.2fs with error %s", + op, i+1, resp.duration.Seconds(), resp.err) + result.opErrors = append(result.opErrors, errStr) } else { result.bytesTransmitted = result.bytesTransmitted + params.objectSize result.opDurations = append(result.opDurations, resp.duration.Seconds()) + result.opTtfb = append(result.opTtfb, resp.ttfb.Seconds()) } - if params.verbose { - fmt.Printf("%v operation completed in %0.2fs (%d/%d) - %0.2fMB/s%s\n", - op, resp.duration.Seconds(), i+1, params.numSamples, - (float64(result.bytesTransmitted)/(1024*1024))/time.Since(startTime).Seconds(), - errorString) - } + params.printf("operation %s(%d) completed in %.2fs|%s\n", op, i+1, resp.duration.Seconds(), resp.err) } result.totalDuration = time.Since(startTime) sort.Float64s(result.opDurations) + sort.Float64s(result.opTtfb) return result } // Create an individual load request and submit it to the client queue func (params *Params) submitLoad(op string) { bucket := aws.String(params.bucketName) - for i := 0; i < params.numSamples; i++ { - key := aws.String(fmt.Sprintf("%s%d", params.objectNamePrefix, i)) + opSamples := params.spo(op) + for i := uint(0); i < opSamples; i++ { + key := genObjName(params.objectNamePrefix, data_hash_base32, i % params.numSamples) if op == opWrite { - params.requests <- &s3.PutObjectInput{ - Bucket: bucket, - Key: key, - Body: bytes.NewReader(bufferBytes), + params.requests <- Req{ + top: op, + req : &s3.PutObjectInput{ + Bucket: bucket, + Key: key, + Body: bytes.NewReader(bufferBytes), + }, + } + } else if op == opRead || op == opValidate { + params.requests <- Req{ + top: op, + req: &s3.GetObjectInput{ + Bucket: bucket, + Key: key, + }, + } + } else if op == opHeadObj { + params.requests <- Req{ + top: op, + req: &s3.HeadObjectInput{ + Bucket: bucket, + Key: key, + }, + } + } else if op == opPutObjTag { + tagSet := make([]*s3.Tag, 0, params.numTags) + for iTag := uint(0); iTag < params.numTags; iTag++ { + tag_name := fmt.Sprintf("%s%d", params.tagNamePrefix, iTag) + tag_value := fmt.Sprintf("%s%d", params.tagValPrefix, iTag) + tagSet = append(tagSet, &s3.Tag { + Key: &tag_name, + Value: &tag_value, + }) + } + params.requests <- Req{ + top: op, + req: &s3.PutObjectTaggingInput{ + Bucket: bucket, + Key: key, + Tagging: &s3.Tagging{ TagSet: tagSet, }, + }, } - } else if op == opRead { - params.requests <- &s3.GetObjectInput{ - Bucket: bucket, - Key: key, + } else if op == opGetObjTag { + params.requests <- Req{ + top: op, + req: &s3.GetObjectTaggingInput{ + Bucket: bucket, + Key: key, + }, } } else { panic("Developer error") @@ -197,7 +348,13 @@ func (params *Params) StartClients(cfg *aws.Config) { for i := 0; i < int(params.numClients); i++ { cfg.Endpoint = aws.String(params.endpoints[i%len(params.endpoints)]) go params.startClient(cfg) - time.Sleep(1 * time.Millisecond) + if params.clientDelay > 0 { + time.Sleep(time.Duration(params.clientDelay) * + time.Millisecond) + } else if params.clientDelay < 0 { + time.Sleep(time.Duration(mathrand.Intn(-params.clientDelay)) * + time.Millisecond) + } } } @@ -206,100 +363,68 @@ func (params *Params) startClient(cfg *aws.Config) { svc := s3.New(session.New(), cfg) for request := range params.requests { putStartTime := time.Now() + var ttfb time.Duration var err error - numBytes := params.objectSize + var numBytes int64 = 0 + cur_op := request.top + var hasher hash.Hash = nil - switch r := request.(type) { + switch r := request.req.(type) { case *s3.PutObjectInput: req, _ := svc.PutObjectRequest(r) // Disable payload checksum calculation (very expensive) req.HTTPRequest.Header.Add("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD") err = req.Send() + ttfb = time.Since(putStartTime) + if err == nil { + numBytes = params.objectSize + } case *s3.GetObjectInput: req, resp := svc.GetObjectRequest(r) err = req.Send() - numBytes = 0 + ttfb = time.Since(putStartTime) if err == nil { - numBytes, err = io.Copy(ioutil.Discard, resp.Body) + if cur_op == opRead { + numBytes, err = io.Copy(ioutil.Discard, resp.Body) + } else if cur_op == opValidate { + hasher = sha512.New() + numBytes, err = io.Copy(hasher, resp.Body) + } } - if numBytes != params.objectSize { + if err != nil { + numBytes = 0 + } else if numBytes != params.objectSize { err = fmt.Errorf("expected object length %d, actual %d", params.objectSize, numBytes) } + if cur_op == opValidate && err == nil { + cur_sum := hasher.Sum(nil) + if !bytes.Equal(cur_sum, data_hash[:]) { + cur_sum_enc := to_b32(cur_sum[:]) + err = fmt.Errorf("Read data checksum %s is not eq to write data checksum %s", cur_sum_enc, data_hash_base32) + } + } + case *s3.HeadObjectInput: + req, resp := svc.HeadObjectRequest(r) + err = req.Send() + ttfb = time.Since(putStartTime) + if err == nil { + numBytes = *resp.ContentLength + } + if numBytes != params.objectSize { + err = fmt.Errorf("expected object length %d, actual %d, resp %v", params.objectSize, numBytes, resp) + } + case *s3.PutObjectTaggingInput: + req, _ := svc.PutObjectTaggingRequest(r) + err = req.Send() + ttfb = time.Since(putStartTime) + case *s3.GetObjectTaggingInput: + req, _ := svc.GetObjectTaggingRequest(r) + err = req.Send() + ttfb = time.Since(putStartTime) default: panic("Developer error") } - params.responses <- Resp{err, time.Since(putStartTime), numBytes} - } -} - -// Specifies the parameters for a given test -type Params struct { - operation string - requests chan Req - responses chan Resp - numSamples int - numClients uint - objectSize int64 - objectNamePrefix string - bucketName string - endpoints []string - verbose bool -} - -func (params Params) String() string { - output := fmt.Sprintln("Test parameters") - output += fmt.Sprintf("endpoint(s): %s\n", params.endpoints) - output += fmt.Sprintf("bucket: %s\n", params.bucketName) - output += fmt.Sprintf("objectNamePrefix: %s\n", params.objectNamePrefix) - output += fmt.Sprintf("objectSize: %0.4f MB\n", float64(params.objectSize)/(1024*1024)) - output += fmt.Sprintf("numClients: %d\n", params.numClients) - output += fmt.Sprintf("numSamples: %d\n", params.numSamples) - output += fmt.Sprintf("verbose: %d\n", params.verbose) - return output -} - -// Contains the summary for a given test result -type Result struct { - operation string - bytesTransmitted int64 - numErrors int - opDurations []float64 - totalDuration time.Duration -} - -func (r Result) String() string { - report := fmt.Sprintf("Results Summary for %s Operation(s)\n", r.operation) - report += fmt.Sprintf("Total Transferred: %0.3f MB\n", float64(r.bytesTransmitted)/(1024*1024)) - report += fmt.Sprintf("Total Throughput: %0.2f MB/s\n", (float64(r.bytesTransmitted)/(1024*1024))/r.totalDuration.Seconds()) - report += fmt.Sprintf("Total Duration: %0.3f s\n", r.totalDuration.Seconds()) - report += fmt.Sprintf("Number of Errors: %d\n", r.numErrors) - if len(r.opDurations) > 0 { - report += fmt.Sprintln("------------------------------------") - report += fmt.Sprintf("%s times Max: %0.3f s\n", r.operation, r.percentile(100)) - report += fmt.Sprintf("%s times 99th %%ile: %0.3f s\n", r.operation, r.percentile(99)) - report += fmt.Sprintf("%s times 90th %%ile: %0.3f s\n", r.operation, r.percentile(90)) - report += fmt.Sprintf("%s times 75th %%ile: %0.3f s\n", r.operation, r.percentile(75)) - report += fmt.Sprintf("%s times 50th %%ile: %0.3f s\n", r.operation, r.percentile(50)) - report += fmt.Sprintf("%s times 25th %%ile: %0.3f s\n", r.operation, r.percentile(25)) - report += fmt.Sprintf("%s times Min: %0.3f s\n", r.operation, r.percentile(0)) + params.responses <- Resp{err, time.Since(putStartTime), numBytes, ttfb} } - return report -} - -func (r Result) percentile(i int) float64 { - if i >= 100 { - i = len(r.opDurations) - 1 - } else if i > 0 && i < 100 { - i = int(float64(i) / 100 * float64(len(r.opDurations))) - } - return r.opDurations[i] -} - -type Req interface{} - -type Resp struct { - err error - duration time.Duration - numBytes int64 } diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..82c899d --- /dev/null +++ b/utils.go @@ -0,0 +1,116 @@ +package main + +import ( + "fmt" + "strconv" + "regexp" + "encoding/base32" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + +) + +func to_b32(dt []byte) string { + return base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(dt) +} + +func from_b32(s string) ([]byte, error) { + return base32.StdEncoding.WithPadding(base32.NoPadding).DecodeString(s) +} + +func parse_size(sz string) int64 { + sizes := map[string]int64 { + "b": 1, + "Kb": 1024, + "Mb": 1024 * 1024, + "Gb": 1024 * 1024 * 1024, + } + re := regexp.MustCompile(`^(\d+)([bKMG]{1,2})$`) + mm := re.FindStringSubmatch(sz) + if len(mm) != 3 { + panic("Invalid objectSize value format\n") + } + val, err := strconv.ParseInt(string(mm[1]), 10, 64) + mult, ex := sizes[string(mm[2])] + if !ex || err != nil { + panic("Invalid objectSize value\n") + } + return val * mult +} + +func (params Params) printf(f string, args ...interface{}) { + if params.verbose { + fmt.Printf(f, args...) + } +} + +// samples per operation +func (params Params) spo(op string) uint { + if op == opWrite || op == opPutObjTag || op == opValidate { + return params.numSamples + } + + return params.numSamples * params.sampleReads +} + +func percentile(dt []float64, i int) float64 { + ln := len(dt) + if i >= 100 { + i = ln - 1 + } else if i > 0 && i < 100 { + i = int(float64(i) / 100 * float64(ln)) + } + return dt[i] +} + +func avg(dt []float64) float64 { + ln := float64(len(dt)) + sm := float64(0) + for _, el := range dt { + sm += el + } + return sm / ln +} + +func indexOf(sls []string, s string) int { + ret := -1 + for i, v := range sls { + if v == s { + ret = i + break + } + } + return ret +} + +func genObjName(pref string, hsh string, idx uint) *string { + return aws.String(fmt.Sprintf("%s_%s_%d", pref, hsh, idx)) +} + +func (params *Params) getObjectHash(cfg *aws.Config) (string, error){ + cfg.Endpoint = aws.String(params.endpoints[0]) + svc := s3.New(session.New(), cfg) + + result, err := svc.ListObjectsV2(&s3.ListObjectsV2Input{ + Bucket: aws.String(params.bucketName), + MaxKeys: aws.Int64(1), + Prefix: aws.String(params.objectNamePrefix), + }) + + if err != nil { + return "", err + } + if len(result.Contents) == 0 { + return "", fmt.Errorf("Empty bucket") + } + + re := regexp.MustCompile(`^.*_([A-Z2-7]+)_[0-9]+$`) + mm := re.FindStringSubmatch(*result.Contents[0].Key) + if len(mm) != 2 { + return "", fmt.Errorf("Invalid object name format") + } + + return mm[1], nil +}