假设我们有以下拓扑
spout A -> bolt B -> bolt C -> bolt E
而bolt 是最后一个,它将信息保存在数据库中,因此不需要发出任何元组。如何实现这样的解决方案,
如果我没有定义output_fields,那么我会得到异常
Exception in thread "main" java.io.IOException: org.apache.storm.thrift.protocol.TProtocolException: Required field 'output_fields' is unset! Struct
在mongodb中使用Apache Storm 2.1.0时,我在Bolt执行中出现错误: java.lang.NullPointerException
at org.apache.storm.mongodb.common.mapper.SimpleMongoLookupMapper.toTuple(SimpleMongoLookupMapper.java:46)
at org.apache.storm.mongodb.bolt.MongoLookupBolt.execute(MongoLookupBolt.java:70)
at org.apache.storm.e
我正在使用ApacheStorm0.10.0-beta1,并开始将一些拓扑转换为Flux。我决定从一个简单的拓扑开始,从一个Kafka队列中读取并写入一个不同的Kafka队列。我得到了这个错误,我有一个困难的时间找出什么是错的。拓扑yaml文件遵循错误。
Parsing file: /Users/frank/src/mapper/mapper.yaml
388 [main] INFO o.a.s.f.p.FluxParser - loading YAML from input stream...
391 [main] INFO o.a.s.f.p.FluxParser - Not pe
我需要在LocalCluster模式下运行多个storm项目。
ProjectA->TopologyA->Spout1(listing to some stream X )->Bolt1->Bolt2
ProjectB->TopologyB->Spout1(listing to some stream Y )->Bolt1->Bolt2
ProjectC->TopologyC->Spout1(listing to some stream Z )->Bolt1->Bolt2
当我运行项目A、B和C时,不知何故,流被其他项目的
I am using Leiningen 2.5.3 on Java 1.7.0_79 OpenJDK 64-Bit Server VM.
我想用lein uberjar排除风暴核心罐。下面是我的project.clj
(defproject kafka2hdfs "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
我正在尝试调试非常简单的拓扑(1口1螺栓)
public class JoinerTopologyTest {
public static void main(String[] args) throws IOException {
Config conf = new Config();
conf.setNumWorkers(5);
conf.setDebug(true);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("SPOUT-1",new My
我想使用kundera框架将一个对象插入到cassandra中,但是我收到了以下错误:
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.String cannot be cast to [B
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:448) ~[storm-core-1.0.0.jar:1.0.0]
at org.apache.storm.utils.D
现在我想在一个污点中使用Drools,它在LocalCluster中正常工作,但是当我把它放到生产集群中时,它有错误。污点是:
public class DealLostBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_LOST_BOLT");
private OutputCollector colle
我刚刚学习了storm,并测试了来自的示例代码。它使用setDebug(true)来启用调试日志记录。
但我对以下调试信息感到困惑:
13867 [storm.starter.WordCountTopology.main()] INFO backtype.storm.daemon.nimbus -
Setting new assignment for topology id word-count-1-1403745226:
#backtype.storm.daemon.common.Assignment{:master-code-dir "/tmp/4ef1358b-92ce-40