-
Notifications
You must be signed in to change notification settings - Fork 3
Yoruichi/stream-compute
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
多线程流式计算模型
现在的Count引擎使用的一个框架。非常轻量级。全部使用了JDK自带的线程池和队列做实现。使用起来也相当方便。
一、QuickStart
1.
2.三大基础类
2.1 EmitItem : 发布和处理的KV对的封装
2.2 ModeHandler : 执行业务逻辑的单元 已改名为Bolt
2.3 FirstModeHandler : 读取文件/队列/存储等用于生产第一批EmitItem的单元 已改名为Spout
3.拓扑类
3.1 Topology包含了一系列的ModeHandler和一种FirstModeHandler的实现。
3.2 可以按照实际的业务需求来任意组合ModeHandler的执行顺序,但是不能动态调整,每次调整都需要重新编译
3.3 拓扑的启动可以直接main方法启动,也可以放置于容器内部署启动
二、SimpleExample
1.利用map统计单词出现的次数
public class CountModeHandler extends ModeHandler {
...
@Override
public int execute() { //业务逻辑实现方法
EmitItem item = null;
int num = 0;
String word = null;
Map<String, Integer> map = new HashMap<String, Integer>();
while ((item = getReadMessageQueue().poll()) != null) {//从前置队列中获取item
try {
word = (String) item.getMessage(0);//从item中得到要统计的word
if (map.containsKey(word))
map.put(word, map.get(word) + 1);//利用map来统计word出现的次数
else
map.put(word, 1);
num++;
} catch (Exception e) {
e.printStackTrace();
}
}
if (num > 0) {
emit(0, map);//将统计结果存放的map继续发射到下一个处理单元
}
return num;
}
...
}
2.将不同统计Mode的map结果合并成一个map
public class MergeModeHandler extends ModeHandler {
...
@Override
public int execute() {
EmitItem item = null;
int num = 0;
while ((item = getReadMessageQueue().poll()) != null) {
try {
Map<String, Integer> m = (Map<String, Integer>) item.getMessage(0);
for (String key : m.keySet()) {
if (map.containsKey(key))
map.put(key, map.get(key) + m.get(key));
else
map.put(key, m.get(key));
}
} catch (Exception e) {
e.printStackTrace();
}
num++;
}
return num;
}
...
}
3.模拟读入文章并将单词发射出去的FirstModeHandler
public class TestFirstModeHandler extends FirstModeHandler {
public String[][] message = new String[][] {
{ "one", "apple", "a", "day", "doctor", "keeps", "away" },
{ "when", "a", "man", "loves", "a", "woman" },
{ "what", "doesn't", "kill", "you", "makes", "you", "stronger" } };
@Override
public int execute() {
int index = 0;
for (int i = 0; i < message.length; i++)
for (int j = 0; j < message[i].length; j++) {
emit(index, message[i][j]);
index++;
}
return index;
}
...
}
4.将各个单元组装成拓扑结构并运行
...
public static void main(String[] args) {
TestTopology tt = new TestTopology();
TestFirstModeHandler h1 = new TestFirstModeHandler();
CountModeHandler h2 = new CountModeHandler();
MergeModeHandler h3 = new MergeModeHandler();
tt.prepare();//拓扑的准备阶段,可以初始化一些参数
h3.prepare(tt.map);//ModeHandler的准备阶段,同样可以初始化参数
tt.setFirstModeHandler(h1, 1).setModeHandler(h2, 4).setModeHandler(h3, 2); //拓扑设置各个单元的数量和顺序,先set哪个ModeHandler,哪个就先被执行。
tt.start();//启动
System.out.println("The last result is : " + tt.map);//输入统计单词的结果
tt.shutdown();//停止
}
...
5.TestTopology只简单的初始化一个map用来存放最终统计的结果就可以了
public class TestTopology extends NewTopology {
public Map<String, Integer> map;
@Override
public void prepare() {
map = new ConcurrentHashMap<String, Integer>();
};
}
三、TODOLIST
· 分流/整流/订阅 三种拓扑结构的实现
· 监控的一些整合
About
No description, website, or topics provided.
Resources
Stars
Watchers
Forks
Releases
No releases published
Packages 0
No packages published