-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaa.go
More file actions
91 lines (84 loc) · 2.58 KB
/
aa.go
File metadata and controls
91 lines (84 loc) · 2.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package main
import (
"AdxDc/app/rokmq"
"AdxDc/controllers"
_ "AdxDc/routers"
"github.com/astaxie/beego"
"strings"
)
func main() {
//MQ消费
topics := strings.Split(beego.AppConfig.String("topic"), ",")
ConsumeWithOrderly(topics...)
// 启动rpc
//go controllers.NewGRpcServer()
beego.Run()
}
// MQ消费:
func ConsumeWithOrderly(topics ...string) {
for _, v := range topics {
switch v {
case rokmq.Topic:
// mq消费 历史统计
mq := controllers.RocketmqController{}
consumerOption := rokmq.ConsumerOption{
GroupId: rokmq.GroupId,
Topic: rokmq.Topic,
Tags: []string{rokmq.TagAction, rokmq.TagTRACK},
Model: rokmq.CoCurrentlyConsumer, // 异步消费
//Model: rokmq.OrderlyConsumer, // 严格顺序消费
}
go rokmq.TcpConsumeWithOrderlyV2(consumerOption, mq.RocketMqCallback)
case rokmq.TopicView:
// view 消费
mq := controllers.RocketmqController{}
consumerOption := rokmq.ConsumerOption{
GroupId: rokmq.GroupId02,
Topic: rokmq.TopicView,
Tags: []string{rokmq.TagView},
Model: rokmq.CoCurrentlyConsumer,
}
go rokmq.TcpConsumeWithOrderlyV2(consumerOption, mq.RocketMqCallback)
case rokmq.TopicUserRecall:
//mq消费 召回老用户
userRecall := controllers.UserRecallController{}
consumerOption := rokmq.ConsumerOption{
GroupId: rokmq.GroupId03,
Topic: rokmq.TopicUserRecall,
Tags: []string{rokmq.TagUserRecall},
Model: rokmq.CoCurrentlyConsumer,
}
go rokmq.TcpConsumeWithOrderlyV2(consumerOption, userRecall.RocketMqCallback)
case rokmq.TopicClick:
// Click 消费
mq := controllers.RocketmqController{}
consumerOption := rokmq.ConsumerOption{
GroupId: rokmq.GroupId04,
Topic: rokmq.TopicClick,
Tags: []string{rokmq.TagClick},
Model: rokmq.CoCurrentlyConsumer,
}
go rokmq.TcpConsumeWithOrderlyV2(consumerOption, mq.RocketMqCallback)
case rokmq.TopicSem:
// Sem 消费
sem := controllers.SemController{}
consumerOption := rokmq.ConsumerOption{
GroupId: rokmq.GroupId05,
Topic: rokmq.TopicSem,
Tags: []string{rokmq.TagSem},
Model: rokmq.CoCurrentlyConsumer,
}
go rokmq.TcpConsumeWithOrderlyV2(consumerOption, sem.RocketMqCallback)
case rokmq.TopicDepthCall:
// 深度回传消费
depth := controllers.DepthCallController{}
consumerOption := rokmq.ConsumerOption{
GroupId: rokmq.GroupId06,
Topic: rokmq.TopicDepthCall,
Tags: []string{rokmq.TagDepthCall},
Model: rokmq.CoCurrentlyConsumer,
}
go rokmq.TcpConsumeWithOrderlyV2(consumerOption, depth.RocketMqCallback)
}
}
}