本文主要演示一下storm drpc实例
version: '2'
services:
supervisor:
image: storm
container_name: supervisor
command: storm supervisor -c storm.local.hostname="192.168.99.100" -c drpc.servers='["192.168.99.100"]' -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774
depends_on:
- nimbus
- zookeeper
links:
- nimbus
- zookeeper
restart: always
ports:
- 6700:6700
- 6701:6701
- 6702:6702
- 6703:6703
- 8000:8000
drpc:
image: storm
container_name: drpc
command: storm drpc -c storm.local.hostname="192.168.99.100" -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774
depends_on:
- nimbus
- supervisor
- zookeeper
links:
- nimbus
- supervisor
- zookeeper
restart: always
ports:
- 3772:3772
- 3773:3773
- 3774:3774
好让外部的DRPCClient访问
)、drpc.invocations.port(让worker访问
) @Test
public void testDeployDRPCStateQuery() throws InterruptedException, TException {
TridentTopology topology = new TridentTopology();
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"),
new Values("how many apples can you eat"));
spout.setCycle(true);
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
//NOTE transforms a Stream into a TridentState object
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.parallelismHint(6);
topology.newDRPCStream("words")
.each(new Fields("args"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
.each(new Fields("count"), new FilterNull())
.aggregate(new Fields("count"), new Sum(), new Fields("sum"));
StormTopology stormTopology = topology.build();
//远程提交 mvn clean package -Dmaven.test.skip=true
//storm默认会使用System.getProperty("storm.jar")去取,如果不设定,就不能提交
System.setProperty("storm.jar",TOPOLOGY_JAR);
Config conf = new Config();
conf.put(Config.NIMBUS_SEEDS,Arrays.asList("192.168.99.100")); //配置nimbus连接主机地址,比如:192.168.10.1
conf.put(Config.NIMBUS_THRIFT_PORT,6627);//配置nimbus连接端口,默认 6627
conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.99.100")); //配置zookeeper连接主机地址,可以使用集合存放多个
conf.put(Config.STORM_ZOOKEEPER_PORT,2181); //配置zookeeper连接端口,默认2181
StormSubmitter.submitTopology("DRPCStateQuery", conf, stormTopology);
}
@Test
public void testLaunchDrpcClient() throws TException {
Config conf = new Config();
//NOTE 要设置Config.DRPC_THRIFT_TRANSPORT_PLUGIN属性,不然client直接跑空指针
conf.put(Config.DRPC_THRIFT_TRANSPORT_PLUGIN,SimpleTransportPlugin.class.getName());
conf.put(Config.STORM_NIMBUS_RETRY_TIMES,3);
conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL,10000);
conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING,10000);
conf.put(Config.DRPC_MAX_BUFFER_SIZE, 104857600); // 100M
DRPCClient client = new DRPCClient(conf, "192.168.99.100", 3772);
System.out.println(client.execute("words", "cat dog the man"));
}
DRPCClient使用的是thrift协议调用
)原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。