-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathresque_processor.go
More file actions
119 lines (96 loc) · 2.76 KB
/
resque_processor.go
File metadata and controls
119 lines (96 loc) · 2.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package resque_status
import (
"encoding/json"
"github.com/Shop2market/goworker"
)
// ResqueProcessor - Main Processor type
type ResqueProcessor struct {
JobName string
Lock
Handler
}
// JobParams - arguments from resque jobs
type JobParams map[string]interface{}
// ExpiresIn - expiration config for jobs, default 24h
var ExpiresIn *int64
func init() {
var defaultExpiresIn int64
defaultExpiresIn = 24 * 60 * 60 // 24 hours in seconds
ExpiresIn = &defaultExpiresIn
}
// Handler Job handler function
type Handler func(JobParams) error
func NewResqueProcessor(jobName, lockKeyPrefix string, keyParamNames []string, handler Handler) *ResqueProcessor {
return &ResqueProcessor{JobName: jobName, Lock: Lock{LockKeyPrefix: lockKeyPrefix, KeyParamNames: keyParamNames}, Handler: handler}
}
func (rp *ResqueProcessor) Process(queue string, args ...interface{}) error {
jobUUID := args[0].(string)
err := rp.updateStatus(jobUUID, "working")
if err != nil {
return err
}
defer rp.updateStatus(jobUUID, "completed")
params := args[1].(map[string]interface{})
defer rp.unlock(params)
return rp.Handler(params)
}
func (rp *ResqueProcessor) unlock(params map[string]interface{}) error {
conn, err := goworker.GetConn()
defer goworker.PutConn(conn)
if err != nil {
return err
}
_, err = conn.Do("DEL", rp.Key(params))
return err
}
func (rp *ResqueProcessor) updateStatus(uuid, statusString string) error {
conn, err := goworker.GetConn()
defer goworker.PutConn(conn)
if err != nil {
return err
}
serializedStatus, err := rp.readJobStatus(conn, uuid)
if err != nil {
return err
}
serializedStatus.Status = statusString
serializedStatus.Name = rp.JobName
err = rp.saveJobStatus(conn, uuid, serializedStatus)
if err != nil {
return err
}
return nil
}
func (rp *ResqueProcessor) saveJobStatus(conn *goworker.RedisConn, uuid string, serializedStatus status) error {
statusBytes, err := json.Marshal(serializedStatus)
if err != nil {
return err
}
_, err = conn.Do("SET", "resque:status:"+uuid, statusBytes)
if err != nil {
return err
}
if ExpiresIn != nil {
_, err = conn.Do("EXPIRE", "resque:status:"+uuid, *ExpiresIn)
if err != nil {
return err
}
}
return nil
}
func (rp *ResqueProcessor) readJobStatus(conn *goworker.RedisConn, uuid string) (status, error) {
jobStatus, err := conn.Do("GET", "resque:status:"+uuid)
if err != nil || jobStatus == nil {
return status{}, err
}
serializedStatus := status{}
json.Unmarshal(jobStatus.([]byte), &serializedStatus)
return serializedStatus, nil
}
type status struct {
Time int64 `json:"time"`
Status string `json:"status"`
Name string `json:"name"`
UUID string `json:"uuid"`
Options map[string]interface{} `json:"options"`
}