准备三台服务器,角色分配如下:
yun01:Zookeeper、nimbus。
yun02:Zookeeper、supervisor。
yun03:Zookeeper、supervisor。
略
参见: Zookeeper集群的搭建
解压安装包即可
修改$STORM_HOME/conf目录下的storm.yaml文件。
storm.zookeeper.services:
配置zookeeper集群的主机名称。
nimbus.host:
指定了集群中nimbus的节点。
supervisor.slots.ports:
配置控制每个supervisor节点运行多少个worker进程。这个配置定义为worker监听的端口的列表,监听端口的个数控制了supervisor节点上有多少个worker的插槽。默认的storm使用6700~6703端口,每个supervisor节点上有4个worker插槽。
storm.local.dir:
storm工作时产生的工作文件存放的位置,注意,要避免配置到/tmp下。
配置样式如下:
#Zookeeper集群的ip地址或者主机名
storm.zookeeper.services:
- "yun01"
- "yun02"
- "yun03"
#主节点的ip或者主机名
nimbus.host: "yun01"
#个节点的工作端口
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
#产生文件存储位置
storm.local.dir: "/home/software/storm/tmp"
注意:每行主要语句开始的时候不要有空格,否则会有意想不到的错误发生。
nimbus.childopts(default: -Xms1024m):
这项JVM配置会添加在启动nimbs守护进程的java命令行中。
ui.port(default:8080):
这项配置指定了Storm UI的Web服务器监听的端口。
ui.childopts(default:-Xms1024m):
这项JVM配置会添加在StormUI服务启动的Java命令行中。
supervisor.childopts(default:-Xms768m):
这项JVM配置会添加Supervisor服务启动的Java命令行中。
worker.childopts(default:-Xms768m):
这项JVM配置会添加worker服务启动的Java命令行中。
topology.message.timeout.secs(default:30):
这个配置项定义了一个tuple树需要应答最大时间秒数限制,超过这个时间则认为超时失败。
topology.max.spout.pending(default:null):
在默认值null的情况下,spout每当产生新的tuple时会立即向后端发送,由于下游bolt执行可能具有延迟,可能导致topology过载,从而导致消息处理超时。如果手动将该值改为非null正整数时,会通过暂停spout发送数据来限制同时处理的tuple不能超过这个数,从而达到为Spout限速的作用。
topology.enable.message.timeouts(default:true):
这个选项用来锚定的tuple的超时时间。如果设置为false,则锚定的tuple不会超时。
在启动storm之前确保storm使用的zookeeper已经启动且可以使用。
storm nimbus
启动nimbus守护进程。
storm supervisor
启动supervisor守护进程。
storm ui
启动stormui的守护进程,从而可以通过webUI界面来监控storm运行过程。
storm drpc
启动一个DRPC服务守护进程。
storm jar topology_jar topology_class[arguments...]
向集群提交topology。它会使用指定的参数运行topology_class中的main()方法,同时上传topology_jar文件到nimbus以分发到整个集群。提交后,Storm集群会激活并且开始运行topology。topology中的main()方法需要调用StormSubmitter.submitTopology()方法,并且为topology提供集群内唯一的名称。
storm kill topology_name[-w wait_time]
用来关闭已经部署的topology。
storm deactivate topology_name
停止指定topology的spout发送tuple
storm activate topology_name
恢复指定topology的spout发送tuple。
storm rebalance topology_name[-w wait_time][-n worker_count][-e component_name=executor_count]
指定storm在集群的worker之间重新平均地分配任务,不需要关闭或者重新提交现有的topology。当执行rebalance命令时,Storm会先取消激活topology,等待配置的的时间使剩余的tuple处理完成,然后再supervisor节点中均匀的重新分配worker。重新分配后,Storm会将topology恢复到之前的激活状态。
storm remoteconfvalue conf-name
用来查看远程集群中的配置参数值。
案例:改造之前的单词计数案例,将其在集群中运行。
修改提交topology的代码:
StormSubmitter.submitTopology("mywc", conf, topology);
将程序打成jar包,同时设置jar包的主类,将jar包上传到集群中。
通过命令执行jar包:
storm jar /root/work/stormwc.jar cn.tedu.storm.wc.WordCountTopology
执行一段时间后,可以通过如果下命令关闭topology
storm kill mywc