-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathredis_list_adapter.go
More file actions
133 lines (110 loc) · 3.19 KB
/
redis_list_adapter.go
File metadata and controls
133 lines (110 loc) · 3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package dgmq
import (
"time"
dgctx "github.com/darwinOrg/go-common/context"
"github.com/darwinOrg/go-common/utils"
dglogger "github.com/darwinOrg/go-logger"
redisdk "github.com/darwinOrg/go-redis"
)
type redisListAdapter struct {
timeout time.Duration
}
func NewRedisListAdapter(config *MqAdapterConfig) MqAdapter {
return &redisListAdapter{
timeout: config.Timeout,
}
}
func (a *redisListAdapter) CreateTopic(ctx *dgctx.DgContext, topic string) error {
return nil
}
func (a *redisListAdapter) Publish(ctx *dgctx.DgContext, topic string, message any) error {
var strMsg string
switch message.(type) {
case string:
strMsg = message.(string)
case []byte:
strMsg = string(message.([]byte))
default:
jsonMsg, err := utils.ConvertBeanToJsonString(message)
if err != nil {
dglogger.Errorf(ctx, "ConvertBeanToJsonString error | topic: %s | err: %v", topic, err)
return err
}
strMsg = jsonMsg
}
_, err := redisdk.LPush(topic, strMsg)
if err != nil {
dglogger.Errorf(ctx, "Publish error | topic: %s | err: %v", topic, err)
}
return err
}
func (a *redisListAdapter) PublishWithTag(ctx *dgctx.DgContext, topic, tag string, message any) error {
return a.Publish(ctx, topic+"@"+tag, message)
}
func (a *redisListAdapter) PublishDelay(ctx *dgctx.DgContext, topic string, message any, delay time.Duration) error {
// TODO
return nil
}
func (a *redisListAdapter) Destroy(ctx *dgctx.DgContext, topic string) error {
_, err := redisdk.Del(topic)
if err != nil {
dglogger.Errorf(ctx, "Destroy error | topic: %s | err: %v", topic, err)
}
return err
}
func (a *redisListAdapter) Subscribe(ctx *dgctx.DgContext, topic string, handler SubscribeHandler) (SubscribeEndCallback, error) {
end := false
go func() {
for {
if end {
break
}
a.subscribe(ctx, topic, handler)
}
}()
return func() {
end = true
}, nil
}
func (a *redisListAdapter) SubscribeWithTag(ctx *dgctx.DgContext, topic, tag string, handler SubscribeHandler) (SubscribeEndCallback, error) {
return a.Subscribe(ctx, topic+"@"+tag, handler)
}
func (a *redisListAdapter) DynamicSubscribe(ctx *dgctx.DgContext, closeCh chan struct{}, topic string, handler SubscribeHandler) error {
go func() {
for {
select {
case <-closeCh:
dglogger.Infof(ctx, "closed topic: %s ", topic)
return
default:
a.subscribe(ctx, topic, handler)
}
}
}()
return nil
}
func (a *redisListAdapter) SubscribeDelay(ctx *dgctx.DgContext, topic string, sleepDuration time.Duration, handler SubscribeHandler) (SubscribeEndCallback, error) {
return nil, nil
}
func (a *redisListAdapter) Unsubscribe(_ *dgctx.DgContext, _ string) error {
return nil
}
func (a *redisListAdapter) UnsubscribeWithTag(_ *dgctx.DgContext, _, _ string) error {
return nil
}
func (a *redisListAdapter) subscribe(ctx *dgctx.DgContext, topic string, handler SubscribeHandler) {
rts, readErr := redisdk.BRPop(a.timeout, topic)
if readErr != nil {
dglogger.Debugf(ctx, "BRPop error | topic: %s | err: %v", topic, readErr)
time.Sleep(time.Second)
return
}
if rts != "" {
handlerErr := handler(ctx, rts)
if handlerErr != nil {
dglogger.Errorf(ctx, "Handle error | topic: %s | err: %v", topic, handlerErr)
}
}
}
func (a *redisListAdapter) Close() {
}