From d74b69e345b92b29a91fa5ba00253d7c747a2827 Mon Sep 17 00:00:00 2001 From: Mike Dewar Date: Sun, 7 Sep 2014 09:17:48 -0400 Subject: [PATCH 1/5] new mechanism for the fromSQS block --- st/library/fromSQS.go | 114 +++++++++++++++++------------------------- 1 file changed, 47 insertions(+), 67 deletions(-) diff --git a/st/library/fromSQS.go b/st/library/fromSQS.go index 14692c4d..7a34f49e 100644 --- a/st/library/fromSQS.go +++ b/st/library/fromSQS.go @@ -14,86 +14,46 @@ import ( ) // lots of this code stolen brazenly from JP https://github.com/jprobinson -func (b *FromSQS) listener() { - log.Println("in listener") - b.lock.Lock() - // OK I KNOW that everything inside this lock is bad. This is a quick fix. Promise to do better in the future. - log.Println("in lock") - accessKey, ok := b.auth["AccessKey"].(string) - if !ok { - b.Error("could not assert AccessKey to a string") - b.listening = false - b.lock.Unlock() - return - } - accessSecret, ok := b.auth["AccessSecret"].(string) - if !ok { - b.Error("could not assert AccessSecret to a string") - b.listening = false - b.lock.Unlock() - return - } - queueName, ok := b.auth["QueueName"].(string) - if !ok { - b.Error("could not assert queue name to a string") - b.listening = false - b.lock.Unlock() - return - } - maxN, ok := b.auth["MaxNumberOfMessages"].(string) - if !ok { - b.Error("could not assert MaxNumberOfMessages to a string") - b.listening = false - b.lock.Unlock() - return - } - log.Println("authenticating with aws") - auth := aws.Auth{AccessKey: accessKey, SecretKey: accessSecret} +func listener(key, secret, queueName, maxN string, outChan chan []byte, stopChan chan bool) error { + auth := aws.Auth{AccessKey: key, SecretKey: secret} sqsClient := sqs.New(auth, aws.USEast) log.Println("getting SQS queue") queue, err := sqsClient.GetQueue(queueName) if err != nil { - b.Error(err) - b.listening = false - b.lock.Unlock() - return + return err } - log.Println("setting listening flag") - b.listening = true - b.lock.Unlock() - params := map[string]string{ - "WaitTimeSeconds":"1", - "MaxNumberOfMessages":maxN, + "WaitTimeSeconds": "1", + "MaxNumberOfMessages": maxN, } var resp *sqs.ReceiveMessageResponse log.Println("starting read loop") for { select { - case <-b.stop: + case <-stopChan: log.Println("Exiting SQS read loop") - return + return nil default: resp, err = queue.ReceiveMessageWithParameters(params) if err != nil { - b.Error(err) + return err } if len(resp.Messages) == 0 { break } for _, m := range resp.Messages { select { - case b.fromListener <- []byte(m.Body): + case outChan <- []byte(m.Body): default: log.Println("discarding messages") - log.Println(len(b.fromListener)) + log.Println(len(outChan)) continue } - - if _, err = queue.DeleteMessage(&m); err != nil { - b.Error(err) + _, err = queue.DeleteMessage(&m) + if err != nil { + return err } } } @@ -111,7 +71,7 @@ type FromSQS struct { lock sync.Mutex listening bool fromListener chan []byte - auth map[string]interface{} + auth map[string]string stop chan bool } @@ -130,7 +90,7 @@ func (b *FromSQS) Setup() { b.out = b.Broadcast() b.fromListener = make(chan []byte, 1000) b.stop = make(chan bool) - b.auth = map[string]interface{}{ + b.auth = map[string]string{ "QueueName": "", "AccessKey": "", "AccessSecret": "", @@ -138,20 +98,32 @@ func (b *FromSQS) Setup() { } } -func (b *FromSQS) stopListening() { - log.Println("attempting to stop SQS reader") - log.Println(b.listening) - if b.listening { - log.Println("sending stop") - b.stop <- true - log.Println("sent stop") - b.listening = false +func (b *FromSQS) runReader(sem chan bool, outChan chan []byte, stopChan chan bool, auth map[string]string) { + log.Println("starting new reader") + err := listener(auth["AccessKey"], auth["AccessSecret"], auth["QueueName"], auth["MaxNumberOfMessages"], outChan, stopChan) + if err != nil { + b.Error(err) + } + log.Println("freeing reader") + <-sem +} + +func stopAllReaders(stopChans []chan bool) { + for _, stopChan := range stopChans { + select { + case stopChan <- true: + default: + continue // someone has already tried to stop this reader + } } } // Run is the block's main loop. Here we listen on the different channels we set up. func (b *FromSQS) Run() { var err error + numReaders := 10 + stopChans := make([]chan bool, 0) + semChan := make(chan bool, numReaders) for { select { @@ -164,11 +136,19 @@ func (b *FromSQS) Run() { } } - b.stopListening() - log.Println("starting new listener") - go b.listener() + stopAllReaders(stopChans) + + go func() { + for { + semChan <- true // maintain pressure + stopChan := make(chan bool, 1) + stopChans = append(stopChans, stopChan) + go b.runReader(semChan, b.fromListener, stopChan, b.auth) + } + }() + case <-b.quit: - b.stopListening() + stopAllReaders(stopChans) return case msg := <-b.fromListener: var outMsg interface{} From 1ba11189460a9c8ebf01a3a892106124dda115f1 Mon Sep 17 00:00:00 2001 From: Mike Dewar Date: Sun, 7 Sep 2014 13:10:29 -0400 Subject: [PATCH 2/5] if a reader exits without an error, don't free up space for a new reader --- st/library/fromSQS.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/st/library/fromSQS.go b/st/library/fromSQS.go index 7a34f49e..a3c77729 100644 --- a/st/library/fromSQS.go +++ b/st/library/fromSQS.go @@ -103,9 +103,10 @@ func (b *FromSQS) runReader(sem chan bool, outChan chan []byte, stopChan chan bo err := listener(auth["AccessKey"], auth["AccessSecret"], auth["QueueName"], auth["MaxNumberOfMessages"], outChan, stopChan) if err != nil { b.Error(err) + log.Println("freeing reader") + <-sem } - log.Println("freeing reader") - <-sem + } func stopAllReaders(stopChans []chan bool) { From fb4d8f0f15c18f546cf921da9088771057162cfc Mon Sep 17 00:00:00 2001 From: Mike Dewar Date: Sun, 7 Sep 2014 17:26:59 -0400 Subject: [PATCH 3/5] closing stopChan now instead of sending a true and a little time check to make sure our readers aren't failing very quickly --- st/library/fromSQS.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/st/library/fromSQS.go b/st/library/fromSQS.go index a3c77729..c7c86232 100644 --- a/st/library/fromSQS.go +++ b/st/library/fromSQS.go @@ -2,7 +2,9 @@ package library import ( "encoding/json" + "errors" "log" + "time" "github.com/goamz/goamz/aws" "github.com/goamz/goamz/sqs" @@ -100,22 +102,25 @@ func (b *FromSQS) Setup() { func (b *FromSQS) runReader(sem chan bool, outChan chan []byte, stopChan chan bool, auth map[string]string) { log.Println("starting new reader") + t := time.Now() err := listener(auth["AccessKey"], auth["AccessSecret"], auth["QueueName"], auth["MaxNumberOfMessages"], outChan, stopChan) if err != nil { b.Error(err) - log.Println("freeing reader") - <-sem + if time.Since(t) < 1*time.Second { + log.Println("reader failed in less than one second") + b.Error(errors.New("reader died rapidly - check SQS reader parameters")) + // here we don't free up a seperate reader + } else { + log.Println("freeing reader") + <-sem + } } } func stopAllReaders(stopChans []chan bool) { for _, stopChan := range stopChans { - select { - case stopChan <- true: - default: - continue // someone has already tried to stop this reader - } + close(stopChan) } } @@ -138,6 +143,7 @@ func (b *FromSQS) Run() { } stopAllReaders(stopChans) + stopChans = make([]chan bool, 0) go func() { for { From 7e58d0358c7d144d47f957d2b4c723687e5e3c31 Mon Sep 17 00:00:00 2001 From: Mike Dewar Date: Tue, 7 Oct 2014 13:08:23 +0000 Subject: [PATCH 4/5] stuffs --- st/library/fromSQS.go | 93 ++++++++++++++++++++++--------------------- 1 file changed, 47 insertions(+), 46 deletions(-) diff --git a/st/library/fromSQS.go b/st/library/fromSQS.go index c7c86232..01c9382e 100644 --- a/st/library/fromSQS.go +++ b/st/library/fromSQS.go @@ -15,52 +15,6 @@ import ( "github.com/nytlabs/streamtools/st/util" ) -// lots of this code stolen brazenly from JP https://github.com/jprobinson -func listener(key, secret, queueName, maxN string, outChan chan []byte, stopChan chan bool) error { - auth := aws.Auth{AccessKey: key, SecretKey: secret} - sqsClient := sqs.New(auth, aws.USEast) - log.Println("getting SQS queue") - queue, err := sqsClient.GetQueue(queueName) - if err != nil { - return err - } - - params := map[string]string{ - "WaitTimeSeconds": "1", - "MaxNumberOfMessages": maxN, - } - - var resp *sqs.ReceiveMessageResponse - log.Println("starting read loop") - for { - select { - case <-stopChan: - log.Println("Exiting SQS read loop") - return nil - default: - resp, err = queue.ReceiveMessageWithParameters(params) - if err != nil { - return err - } - if len(resp.Messages) == 0 { - break - } - for _, m := range resp.Messages { - select { - case outChan <- []byte(m.Body): - default: - log.Println("discarding messages") - log.Println(len(outChan)) - continue - } - _, err = queue.DeleteMessage(&m) - if err != nil { - return err - } - } - } - } -} // specify those channels we're going to use to communicate with streamtools type FromSQS struct { @@ -124,6 +78,53 @@ func stopAllReaders(stopChans []chan bool) { } } +// lots of this code stolen brazenly from JP https://github.com/jprobinson +func listener(key, secret, queueName, maxN string, outChan chan []byte, stopChan chan bool) error { + auth := aws.Auth{AccessKey: key, SecretKey: secret} + sqsClient := sqs.New(auth, aws.USEast) + log.Println("getting SQS queue") + queue, err := sqsClient.GetQueue(queueName) + if err != nil { + return err + } + + params := map[string]string{ + "WaitTimeSeconds": "1", + "MaxNumberOfMessages": maxN, + } + + var resp *sqs.ReceiveMessageResponse + log.Println("starting read loop") + for { + select { + case <-stopChan: + log.Println("Exiting SQS read loop") + return nil + default: + resp, err = queue.ReceiveMessageWithParameters(params) + if err != nil { + return err + } + if len(resp.Messages) == 0 { + break + } + for _, m := range resp.Messages { + select { + case outChan <- []byte(m.Body): + default: + log.Println("discarding messages") + log.Println(len(outChan)) + continue + } + _, err = queue.DeleteMessage(&m) + if err != nil { + return err + } + } + } + } +} + // Run is the block's main loop. Here we listen on the different channels we set up. func (b *FromSQS) Run() { var err error From 7f6ad98abb17c72fd4931b813a2b15022973d593 Mon Sep 17 00:00:00 2001 From: Mike Dewar Date: Fri, 10 Oct 2014 07:10:43 -0400 Subject: [PATCH 5/5] not sure --- st/library/fromSQS.go | 1 - 1 file changed, 1 deletion(-) diff --git a/st/library/fromSQS.go b/st/library/fromSQS.go index c7c86232..81f3bcd8 100644 --- a/st/library/fromSQS.go +++ b/st/library/fromSQS.go @@ -115,7 +115,6 @@ func (b *FromSQS) runReader(sem chan bool, outChan chan []byte, stopChan chan bo <-sem } } - } func stopAllReaders(stopChans []chan bool) {