diff --git a/st/library/fromSQS.go b/st/library/fromSQS.go index 14692c4d..6dd9a999 100644 --- a/st/library/fromSQS.go +++ b/st/library/fromSQS.go @@ -2,7 +2,9 @@ package library import ( "encoding/json" + "errors" "log" + "time" "github.com/goamz/goamz/aws" "github.com/goamz/goamz/sqs" @@ -13,92 +15,6 @@ import ( "github.com/nytlabs/streamtools/st/util" ) -// lots of this code stolen brazenly from JP https://github.com/jprobinson -func (b *FromSQS) listener() { - log.Println("in listener") - b.lock.Lock() - // OK I KNOW that everything inside this lock is bad. This is a quick fix. Promise to do better in the future. - log.Println("in lock") - accessKey, ok := b.auth["AccessKey"].(string) - if !ok { - b.Error("could not assert AccessKey to a string") - b.listening = false - b.lock.Unlock() - return - } - accessSecret, ok := b.auth["AccessSecret"].(string) - if !ok { - b.Error("could not assert AccessSecret to a string") - b.listening = false - b.lock.Unlock() - return - } - queueName, ok := b.auth["QueueName"].(string) - if !ok { - b.Error("could not assert queue name to a string") - b.listening = false - b.lock.Unlock() - return - } - maxN, ok := b.auth["MaxNumberOfMessages"].(string) - if !ok { - b.Error("could not assert MaxNumberOfMessages to a string") - b.listening = false - b.lock.Unlock() - return - } - log.Println("authenticating with aws") - auth := aws.Auth{AccessKey: accessKey, SecretKey: accessSecret} - sqsClient := sqs.New(auth, aws.USEast) - log.Println("getting SQS queue") - queue, err := sqsClient.GetQueue(queueName) - if err != nil { - b.Error(err) - b.listening = false - b.lock.Unlock() - return - } - - log.Println("setting listening flag") - b.listening = true - b.lock.Unlock() - - params := map[string]string{ - "WaitTimeSeconds":"1", - "MaxNumberOfMessages":maxN, - } - - var resp *sqs.ReceiveMessageResponse - log.Println("starting read loop") - for { - select { - case <-b.stop: - log.Println("Exiting SQS read loop") - return - default: - resp, err = queue.ReceiveMessageWithParameters(params) - if err != nil { - b.Error(err) - } - if len(resp.Messages) == 0 { - break - } - for _, m := range resp.Messages { - select { - case b.fromListener <- []byte(m.Body): - default: - log.Println("discarding messages") - log.Println(len(b.fromListener)) - continue - } - - if _, err = queue.DeleteMessage(&m); err != nil { - b.Error(err) - } - } - } - } -} // specify those channels we're going to use to communicate with streamtools type FromSQS struct { @@ -111,7 +27,7 @@ type FromSQS struct { lock sync.Mutex listening bool fromListener chan []byte - auth map[string]interface{} + auth map[string]string stop chan bool } @@ -130,7 +46,7 @@ func (b *FromSQS) Setup() { b.out = b.Broadcast() b.fromListener = make(chan []byte, 1000) b.stop = make(chan bool) - b.auth = map[string]interface{}{ + b.auth = map[string]string{ "QueueName": "", "AccessKey": "", "AccessSecret": "", @@ -138,20 +54,82 @@ func (b *FromSQS) Setup() { } } -func (b *FromSQS) stopListening() { - log.Println("attempting to stop SQS reader") - log.Println(b.listening) - if b.listening { - log.Println("sending stop") - b.stop <- true - log.Println("sent stop") - b.listening = false +func (b *FromSQS) runReader(sem chan bool, outChan chan []byte, stopChan chan bool, auth map[string]string) { + log.Println("starting new reader") + t := time.Now() + err := listener(auth["AccessKey"], auth["AccessSecret"], auth["QueueName"], auth["MaxNumberOfMessages"], outChan, stopChan) + if err != nil { + b.Error(err) + if time.Since(t) < 1*time.Second { + log.Println("reader failed in less than one second") + b.Error(errors.New("reader died rapidly - check SQS reader parameters")) + // here we don't free up a seperate reader + } else { + log.Println("freeing reader") + <-sem + } + } +} + +func stopAllReaders(stopChans []chan bool) { + for _, stopChan := range stopChans { + close(stopChan) + } +} + +// lots of this code stolen brazenly from JP https://github.com/jprobinson +func listener(key, secret, queueName, maxN string, outChan chan []byte, stopChan chan bool) error { + auth := aws.Auth{AccessKey: key, SecretKey: secret} + sqsClient := sqs.New(auth, aws.USEast) + log.Println("getting SQS queue") + queue, err := sqsClient.GetQueue(queueName) + if err != nil { + return err + } + + params := map[string]string{ + "WaitTimeSeconds": "1", + "MaxNumberOfMessages": maxN, + } + + var resp *sqs.ReceiveMessageResponse + log.Println("starting read loop") + for { + select { + case <-stopChan: + log.Println("Exiting SQS read loop") + return nil + default: + resp, err = queue.ReceiveMessageWithParameters(params) + if err != nil { + return err + } + if len(resp.Messages) == 0 { + break + } + for _, m := range resp.Messages { + select { + case outChan <- []byte(m.Body): + default: + log.Println("discarding messages") + log.Println(len(outChan)) + continue + } + _, err = queue.DeleteMessage(&m) + if err != nil { + return err + } + } + } } } // Run is the block's main loop. Here we listen on the different channels we set up. func (b *FromSQS) Run() { var err error + numReaders := 10 + stopChans := make([]chan bool, 0) + semChan := make(chan bool, numReaders) for { select { @@ -164,11 +142,20 @@ func (b *FromSQS) Run() { } } - b.stopListening() - log.Println("starting new listener") - go b.listener() + stopAllReaders(stopChans) + stopChans = make([]chan bool, 0) + + go func() { + for { + semChan <- true // maintain pressure + stopChan := make(chan bool, 1) + stopChans = append(stopChans, stopChan) + go b.runReader(semChan, b.fromListener, stopChan, b.auth) + } + }() + case <-b.quit: - b.stopListening() + stopAllReaders(stopChans) return case msg := <-b.fromListener: var outMsg interface{}