版权声明:如需转载本文章,请保留出处! https://blog.csdn.net/xc_zhou/article/details/89966108
全量数据处理使用的大多是鼎鼎大名的hadoop或者hive,作为一个批处理系统,hadoop以其吞吐量大、自动容错等优点,在海量数据处理上得到了广泛的使用。但是,hadoop不擅长实时计算,因为它天然就是为批处理而生的,这也是业界一致的共识。否则最近这两年也不会有s4,storm,puma这些实时计算系统如雨后春笋般冒出来啦。先抛开s4,storm,puma这些系统不谈,我们首先来看一下,如果让我们自己设计一个实时计算系统,我们要解决哪些问题。
低延迟:都说了是实时计算系统了,延迟是一定要低的。 高性能:性能不高就是浪费机器,浪费机器是要受批评的哦。 分布式:系统都是为应用场景而生的,如果你的应用场景、你的数据和计算单机就能搞定,那么不用考虑这些复杂的问题了。我们所说的是单机搞不定的情况。 可扩展:伴随着业务的发展,我们的数据量、计算量可能会越来越大,所以希望这个系统是可扩展的。 容错:这是分布式系统中通用问题。一个节点挂了不能影响我的应用。
好,如果仅仅需要解决这5个问题,可能会有无数种方案,而且各有千秋,随便举一种方案,使用消息队列+分布在各个机器上的工作进程就ok啦。我们再继续往下看。
1、容易在上面开发应用程序。亲,你设计的系统需要应用程序开发人员考虑各个处理组件的分布、消息的传递吗?如果是,那有点麻烦啊,开发人员可能会用不好,也不会想去用。
2、消息不丢失。用户发布的一个宝贝消息不能在实时处理的时候给丢了,对吧?更严格一点,如果是一个精确数据统计的应用,那么它处理的消息要不多不少才行。这个要求有点高哦。
在2011年Storm开源之前,由于Hadoop的火红,整个业界都在喋喋不休地谈论大数据。Hadoop的高吞吐,海量数据处理的能力使得人们可以方便地处理海量数据。但是,Hadoop的缺点也和它的优点同样鲜明——延迟大,响应缓慢,运维复杂。
有需求也就有创造,在Hadoop基本奠定了大数据霸主地位的时候,很多的开源项目都是以弥补Hadoop的实时性为目标而被创造出来。而在这个节骨眼上Storm横空出世了。
分布式系统:可横向拓展,现在的项目不带个分布式特性都不好意思开源。 运维简单:Storm的部署的确简单。虽然没有Mongodb的解压即用那么简单,但是它也就是多安装两个依赖库而已。 高度容错:模块都是无状态的,随时宕机重启。 无数据丢失:Storm创新性提出的ack消息追踪框架和复杂的事务性处理,能够满足很多级别的数据处理需求。不过,越高的数据处理需求,性能下降越严重。 多语言:实际上,Storm的多语言更像是临时添加上去似的。因为,你的提交部分还是要使用Java实现。
Storm是一个免费开源、分布式、高容错的实时计算系统。Storm令持续不断的流计算变得容易,弥补了Hadoop批处理所不能满足的实时要求。Storm经常用于在实时分析、在线机器学习、持续计算、分布式远程调用和ETL等领域。Storm的部署管理非常简单,而且,在同类的流式计算工具,Storm的性能也是非常出众的。
Storm主要分为两种组件Nimbus和Supervisor。这两种组件都是快速失败的,没有状态。任务状态和心跳信息等都保存在Zookeeper上的,提交的代码资源都在本地机器的硬盘上。
下图是一个Topology设计的逻辑图的例子。
下图是Topology的提交流程图。
下图是Storm的数据交互图。可以看出两个模块Nimbus和Supervisor之间没有直接交互。状态都是保存在Zookeeper上。Worker之间通过ZeroMQ传送数据。
虽然,有些地方做得还是不太好,例如,底层使用的ZeroMQ不能控制内存使用(下个release版本,引入了新的消息机制使用netty代替ZeroMQ),多语言支持更多是噱头,Nimbus还不支持HA。但是,就像当年的Hadoop那样,很多公司选择它是因为它是唯一的选择。而这些先期使用者,反过来促进了Storm的发展。
Storm被广泛应用于实时分析,在线机器学习,持续计算、分布式远程调用等领域。来看一些实际的应用:
如果,业务场景中需要低延迟的响应,希望在秒级或者毫秒级完成分析、并得到响应,而且希望能够随着数据量的增大而拓展。那就可以考虑下,使用Storm了。
我们只需要实现每个分析的过程,而Storm帮我们把消息的传送和接受都完成了。更加激动人心的是,你只需要增加某个Bolt的并行度就能够解决掉某个结点上的性能瓶颈。
在流式处理领域里,Storm的直接对手是S4。不过,S4冷淡的社区、半成品的代码,在实际商用方面输给Storm不止一条街。
如果把范围扩大到实时处理,Storm就一点都不寂寞了。
当然,Storm也有Yarn-Storm项目,能让Storm运行在Hadoop2.0的Yarn框架上,可以让Hadoop的MapReduce和Storm共享资源。
知乎上有一个挺好的问答: 问:实时处理系统(类似s4, storm)对比直接用MQ来做好处在哪里? 答:好处是它帮你做了: 1) 集群控制。2) 任务分配。3) 任务分发 4) 监控 等等。
需要知道Storm不是一个完整的解决方案。使用Storm你需要加入消息队列做数据入口,考虑如何在流中保存状态,考虑怎样将大问题用分布式去解决。解决这些问题的成本可能比增加一个服务器的成本还高。但是,一旦下定决定使用了Storm并解决了那些恼人的细节,你就能享受到Storm给你带来的简单,可拓展等优势了。
技术的发展日新月异,数据处理领域越来越多优秀的开源产品。Storm的过去是成功的,将来会如何发展,我们拭目以待吧。
zookeeper是storm运行强依赖
安装zookeeper:请参考:https://blog.csdn.net/xc_zhou/article/details/81916189
为了让zookeeper异常退出后能自动重启,需要安装deamontools
wget http://cr.yp.to/daemontools/daemontools-0.76.tar.gz
tar zxvf daemontools-0.76.tar.gz
cd admin/daemontools-0.76/
vim src/error.h 找到:extern int errno; 改成:#include <errno.h>
执行
package/install
这时已经安装好了
[root@centos7vm daemontools-0.76]# which supervise
/usr/local/bin/supervise
创建/data/service/zookeeper/run文件,内容为:
#!/bin/bash
exec 2>&1
exec /data/zookeeper-3.4.8/bin/zkServer.sh start
增加执行权限
chmod +x /data/service/zookeeper/run
杀了之前手工启动的zookeeper,然后执行
cd /data/service/zookeeper
nohup supervise /data/service/zookeeper &
这时zookeeper被supervice启动了,尝试杀一次zookeeper后还会自动起来
wget http://apache.fayea.com/storm/apache-storm-1.2.2/apache-storm-1.2.2.tar.gz
tar -zxvf apache-storm-1.2.2.tar.gz
cd apache-storm-1.2.2
修改conf/storm.yaml,添加如下配置
storm.zookeeper.servers:
- "127.0.0.1"
nimbus.seeds: ["127.0.0.1"]
ui.port: 9090
#nimbus.host: "localhost"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
启动storm,执行
./bin/storm nimbus &
./bin/storm supervisor &
./bin/storm ui &
打开web界面,http://ip:8080
界面如下:
安装完成
storm系统由一个nimbus节点和多个supervisor节点组成,上面因为是部署单机版本,所以只启动了一个supervisor。他们之间是通过zookeeper协调运行的,所以必须依赖zookeeper。nimbus负责分配任务和监控任务,本身不做计算,supervisor负责真正的计算任务。
storm上运行的任务和map-reduce的不同在于它运行的是一种topology任务,也就是一种有向无环图形式的任务服务。
上面配置文件中配置的supervisor.slots.ports包含了4个port,也就是这个supervisor可以监听4个端口同时并发的执行4个任务,因此在web界面里我们看到Free slots是4
在map-reduce系统上运行的任务我们叫做mapper和reducer,相对之下,在storm上运行的任务叫做spout(涛涛不绝地喷口)和bolt(螺栓),在拓扑里传递的消息叫做tuple。spout其实就是信息产生的源头,而bolt就是处理逻辑
Python目前有两个库,一个是pyleus,一个是streamparse。前者在github上已经有两年都不更新了,只支持到storm 0.9。后者一直在更新,所以对于最新的strom 1.1.0, 没有多的选项了。
pip install streamparse
然后开始跑demo:
sparse quickstart wordcount
cd wordcount
sparse run
结果直接跳错,告诉我缺少lein,搞得我一脸懵逼。google了一下,才知道这是Clojure的包管理工具。于是直接去lein官网
lein的安装有两种方式,一种是用脚本下载安装,一种是要加PPA。原本lein也提供apt直接的安装了,结果各种历史原因,所以呵呵了。。。
作为懒人,首选脚本下载。结果速度奇慢无比。。。看着安装进度需要一天,我果断放弃。。。
只能选择PPA添加apt安装了:
sudo add-apt-repository ppa:mikegedelman/leiningen-git-stable
sudo apt-get update
sudo apt-get install lein
完成后继续跑sparse run命令。。结果还是不行,去stackoverflow翻了一通后发现,需要配置config.json。但是streamparse的demo里没提,说创建完项目直接就能跑,我顿时感觉有点坑啊。。。
配置config.json如下:
{
"serializer": "json",
"topology_specs": "topologies/",
"virtualenv_specs": "virtualenvs/",
"envs": {
"prod": {
"user": "user_xxx",
"ssh_password": "password",
"nimbus": "localhost",
"workers": ["localhost"],
"log": {
"path": "",
"max_bytes": 1000000,
"backup_count": 10,
"level": "info"
},
"virtualenv_root": "/path_xx/virtualenvs"
}
}
}
然后试着运行:
sparse run
然后立刻被打脸,说ssh user_xxx@localhost要输入密码,如果sshd_config如果没配置,即便输入正确的密码也会失败。这里可以参考如何ssh本地主机
配置完免密码登录后,连密码一栏都不用搞了,再次运行。
机器会花一定时间来编译JAR文件,然后就能看到实时流的输出了。
但是这只是试运行,如果要发布拓扑到storm集群上,则要运行:
sparse submit
结果又跳了一个错,说pip版本太低。。。
streamparse会在节点上构建python的虚拟环境, 然后在节点上安装好所有需要的python库。看脚本执行的顺序,会在生成虚拟环境后自动升级pip。但是不知道为何没有执行成功。所以我只能手动去对应的路径里升级pip:
cd /path_xx/virtualenvs/wordcount/bin
source activate
pip install --upgrade pip
deactivate
最后再次运行:
sparse submit
没有报错就表示已经提交拓扑到storm上了,打开ui地址,可以看到拓扑一栏里已经显示有wordcount的拓扑在运行。
参考:https://www.cnblogs.com/langtianya/p/5199529.html
https://www.jianshu.com/p/224995b66a84
https://blog.csdn.net/jiangjingxuan/article/details/54729034
https://blog.csdn.net/gohigher2018/article/details/80384256
https://blog.csdn.net/weixin_33947521/article/details/87218296
Apache Storm 官方文档中文版:https://www.cntofu.com/book/108/readme.html
https://blog.csdn.net/wanglha/article/details/51382335
https://github.com/weyo/Storm-Documents