Skip to content
Open

Sqs #555

Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 87 additions & 100 deletions st/library/fromSQS.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package library

import (
"encoding/json"
"errors"
"log"
"time"

"github.com/goamz/goamz/aws"
"github.com/goamz/goamz/sqs"
Expand All @@ -13,92 +15,6 @@ import (
"github.com/nytlabs/streamtools/st/util"
)

// lots of this code stolen brazenly from JP https://github.com/jprobinson
func (b *FromSQS) listener() {
log.Println("in listener")
b.lock.Lock()
// OK I KNOW that everything inside this lock is bad. This is a quick fix. Promise to do better in the future.
log.Println("in lock")
accessKey, ok := b.auth["AccessKey"].(string)
if !ok {
b.Error("could not assert AccessKey to a string")
b.listening = false
b.lock.Unlock()
return
}
accessSecret, ok := b.auth["AccessSecret"].(string)
if !ok {
b.Error("could not assert AccessSecret to a string")
b.listening = false
b.lock.Unlock()
return
}
queueName, ok := b.auth["QueueName"].(string)
if !ok {
b.Error("could not assert queue name to a string")
b.listening = false
b.lock.Unlock()
return
}
maxN, ok := b.auth["MaxNumberOfMessages"].(string)
if !ok {
b.Error("could not assert MaxNumberOfMessages to a string")
b.listening = false
b.lock.Unlock()
return
}
log.Println("authenticating with aws")
auth := aws.Auth{AccessKey: accessKey, SecretKey: accessSecret}
sqsClient := sqs.New(auth, aws.USEast)
log.Println("getting SQS queue")
queue, err := sqsClient.GetQueue(queueName)
if err != nil {
b.Error(err)
b.listening = false
b.lock.Unlock()
return
}

log.Println("setting listening flag")
b.listening = true
b.lock.Unlock()

params := map[string]string{
"WaitTimeSeconds":"1",
"MaxNumberOfMessages":maxN,
}

var resp *sqs.ReceiveMessageResponse
log.Println("starting read loop")
for {
select {
case <-b.stop:
log.Println("Exiting SQS read loop")
return
default:
resp, err = queue.ReceiveMessageWithParameters(params)
if err != nil {
b.Error(err)
}
if len(resp.Messages) == 0 {
break
}
for _, m := range resp.Messages {
select {
case b.fromListener <- []byte(m.Body):
default:
log.Println("discarding messages")
log.Println(len(b.fromListener))
continue
}

if _, err = queue.DeleteMessage(&m); err != nil {
b.Error(err)
}
}
}
}
}

// specify those channels we're going to use to communicate with streamtools
type FromSQS struct {
Expand All @@ -111,7 +27,7 @@ type FromSQS struct {
lock sync.Mutex
listening bool
fromListener chan []byte
auth map[string]interface{}
auth map[string]string
stop chan bool
}

Expand All @@ -130,28 +46,90 @@ func (b *FromSQS) Setup() {
b.out = b.Broadcast()
b.fromListener = make(chan []byte, 1000)
b.stop = make(chan bool)
b.auth = map[string]interface{}{
b.auth = map[string]string{
"QueueName": "",
"AccessKey": "",
"AccessSecret": "",
"MaxNumberOfMessages": "10",
}
}

func (b *FromSQS) stopListening() {
log.Println("attempting to stop SQS reader")
log.Println(b.listening)
if b.listening {
log.Println("sending stop")
b.stop <- true
log.Println("sent stop")
b.listening = false
func (b *FromSQS) runReader(sem chan bool, outChan chan []byte, stopChan chan bool, auth map[string]string) {
log.Println("starting new reader")
t := time.Now()
err := listener(auth["AccessKey"], auth["AccessSecret"], auth["QueueName"], auth["MaxNumberOfMessages"], outChan, stopChan)
if err != nil {
b.Error(err)
if time.Since(t) < 1*time.Second {
log.Println("reader failed in less than one second")
b.Error(errors.New("reader died rapidly - check SQS reader parameters"))
// here we don't free up a seperate reader
} else {
log.Println("freeing reader")
<-sem
}
}
}

func stopAllReaders(stopChans []chan bool) {
for _, stopChan := range stopChans {
close(stopChan)
}
}

// lots of this code stolen brazenly from JP https://github.com/jprobinson
func listener(key, secret, queueName, maxN string, outChan chan []byte, stopChan chan bool) error {
auth := aws.Auth{AccessKey: key, SecretKey: secret}
sqsClient := sqs.New(auth, aws.USEast)
log.Println("getting SQS queue")
queue, err := sqsClient.GetQueue(queueName)
if err != nil {
return err
}

params := map[string]string{
"WaitTimeSeconds": "1",
"MaxNumberOfMessages": maxN,
}

var resp *sqs.ReceiveMessageResponse
log.Println("starting read loop")
for {
select {
case <-stopChan:
log.Println("Exiting SQS read loop")
return nil
default:
resp, err = queue.ReceiveMessageWithParameters(params)
if err != nil {
return err
}
if len(resp.Messages) == 0 {
break
}
for _, m := range resp.Messages {
select {
case outChan <- []byte(m.Body):
default:
log.Println("discarding messages")
log.Println(len(outChan))
continue
}
_, err = queue.DeleteMessage(&m)
if err != nil {
return err
}
}
}
}
}

// Run is the block's main loop. Here we listen on the different channels we set up.
func (b *FromSQS) Run() {
var err error
numReaders := 10
stopChans := make([]chan bool, 0)
semChan := make(chan bool, numReaders)

for {
select {
Expand All @@ -164,11 +142,20 @@ func (b *FromSQS) Run() {
}
}

b.stopListening()
log.Println("starting new listener")
go b.listener()
stopAllReaders(stopChans)
stopChans = make([]chan bool, 0)

go func() {
for {
semChan <- true // maintain pressure
stopChan := make(chan bool, 1)
stopChans = append(stopChans, stopChan)
go b.runReader(semChan, b.fromListener, stopChan, b.auth)
}
}()

case <-b.quit:
b.stopListening()
stopAllReaders(stopChans)
return
case msg := <-b.fromListener:
var outMsg interface{}
Expand Down