本文是 storm 入门第一篇,因为 Storm 的本地模式体验极其简单, 故而我希望第一篇我们先来体验一下 Storm,而不是其他分布式技术那样, 开门就是架构,简介....
这里我们直接用 Maven 管理,直接在我们项目的 pom.xml 文件下加入:
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
</dependency> //继承自 BaseRichSpout
static class MySpout extends BaseRichSpout {
private Map map;
private TopologyContext topologyContext;
private SpoutOutputCollector spoutOutputCollector;
//初始化函数
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.map = map;
this.topologyContext = topologyContext;
this.spoutOutputCollector = spoutOutputCollector;
}
//模拟的外部数据
String[] outData = new String[]{"张三", "李四", "王五"};
String[] outData2 = new String[]{"12", "13", "22"};
/**
* 我们可以在这里来模拟从外部获取数据并发送到 bolt
* 该函数会在 storm 运行期间被循环调用
*/
public void nextTuple() {
String name = outData[(int) (Math.random() * 3)];
String age = outData2[(int) (Math.random() * 3)];
//将数据封装到 Tuple 里面
Values v = new Values(name,age);
// 将数据发送出去
spoutOutputCollector.emit(v);
//休眠一下,便于观察
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 数据声明
* 发送的数据,通过这里的声明 告诉下游,我这个数据是什么
* 相当于 表中的字段名
*/
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//这里我们发送的是 姓名 和 年龄,主要顺序不能乱
outputFieldsDeclarer.declare(new Fields("name","age"));
}
} //继承自 BaseRichBolt
static class MyBolt extends BaseRichBolt {
private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
//初始化函数
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
}
// 处理数据
public void execute(Tuple input) {
//获取上游发送的 name 字段
String name = input.getStringByField("name");
//获取上游发送的 age 字段
String age = input.getStringByField("age");
//这里我们简单的打印一下就好
System.out.println(name + "****"+age);
// 如果你还要继续往下发送 那么:collector.emit() 就可以
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//和 Spout 一样, 如果你还要继续往下发送数据,
// 那么你就要在这里声明的发送的数据是什么
// 我们这里不往下游发送,所以可以不用写
}
}
//在主程序里面进行组装提交
public static void main(String[] args) {
// Topology 的构建者
TopologyBuilder builder = new TopologyBuilder();
// 设置 Spout ,并为其命名为 textSpout
builder.setSpout("textSpout", new MySpout(), 3);
//设置 Bolt,并为其命名为 MyBolt,
builder.setBolt("MyBolt", new MyBolt(), 3)
// 设置其 Stream Grouping 为 shuffleGrouping,并且是从 textSpout 接受数据
.shuffleGrouping("textSpout");
// 创建Topology
StormTopology topology = builder.createTopology()
// 创建一个 本地集群
LocalCluster localCluster = new LocalCluster();
Config map = new Config();
map.setNumAckers(1);
//将 topology 提交到集群运行
localCluster.submitTopology("test", map, topology );
}
有没有感觉很简单?和我们平时写的本地代码基本没什么区别...
上面我们已经体验过 Storm 的本地模式了,虽然我们的代码极其简陋,但是最少让我们了解了 Storm 的编程模型到底是怎么样的了!再怎么复杂的东西,我们也可以从上面这个简陋的代码一步步衍生出来,下面我们看一下下面这幅图

从上面我们可以看到: 一个水龙头代表一个 Spout,一个闪电代表一个 Bolt, Spout 和 Bolt 通过 数据Tuple 的通道建立起了一条条数据流。 该图可以很好的说明 Storm 的工作模式, 通过 Spout 和 Bolt 可以构建起各种数据流以满足我们的业务需求。