首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Storm入门(一):编程模型

Storm入门(一):编程模型

作者头像
solve
发布2019-10-30 19:02:41
发布2019-10-30 19:02:41
5570
举报
文章被收录于专栏:大数据技术栈大数据技术栈

前言

本文是 storm 入门第一篇,因为 Storm 的本地模式体验极其简单, 故而我希望第一篇我们先来体验一下 Storm,而不是其他分布式技术那样, 开门就是架构,简介....

1 Storm初体验之本地运行

1.1 下载 Storm Jar 包

这里我们直接用 Maven 管理,直接在我们项目的 pom.xml 文件下加入:

代码语言:javascript
复制
   <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.2.2</version>
        </dependency>

1.2 创建 Topology

  • 什么是 Topology ? Storm 为了方便编程,将 Storm 的程序封装成一个个的 Topology,这个Topology 也就是我们本文的重点 编程模型。(其实质是一个 DAG 有向无环图)
  • Topology 是怎么样的? Topology 包含以下几个结构:
    1. 数据 Tuple:在 Storm 中,所有的数据都是以 Tuple 的形式进行传输的
    2. 数据发送者 Spout:这个数据发送只是相对 Storm 本身来说的,Spout 一般从指定的外部数据源读取数据封装成 Tuple,进行数据的发送。
    3. 数据处理组件 Bolt:Spout 的数据会发送到 Bolt,Bolt 就是用来做数据处理的组件,为了提高效率,一般 Bolt 只会处理一些单一的功能,然后会将数据继续往下一个 Bolt发送,形成一个 Bolt 链。
    4. Stream Grouping:在数据从 Spout 到 Bolt 或者 从 Bolt 到 Bolt 的时候可以指定数据的流向规则,这个规则就是 Stream Grouping。
    5. Stream 数据流:从 Spout 发出,到 Bolt 处理完形成的数据通道就是一个数据流,一个Spout 可以发送多个数据流。
  • Topology 如何创建? 好了,现在我们正式开始编码吧...
    1. 创建 Spout:
代码语言:javascript
复制
 //继承自 BaseRichSpout 
static class MySpout extends BaseRichSpout {

      private Map map;
      private TopologyContext topologyContext;
      private SpoutOutputCollector spoutOutputCollector;

      //初始化函数
      public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
          this.map = map;
          this.topologyContext = topologyContext;
          this.spoutOutputCollector = spoutOutputCollector;
      }

      //模拟的外部数据
      String[] outData = new String[]{"张三", "李四", "王五"};
      String[] outData2 = new String[]{"12", "13", "22"};

      /**
       * 我们可以在这里来模拟从外部获取数据并发送到 bolt
       * 该函数会在 storm 运行期间被循环调用
       */
      public void nextTuple() {
          String name = outData[(int) (Math.random() * 3)];
          String age = outData2[(int) (Math.random() * 3)];
          //将数据封装到 Tuple 里面
          Values v = new Values(name,age);
          // 将数据发送出去
          spoutOutputCollector.emit(v);

          //休眠一下,便于观察
          try {
              Thread.sleep(1000);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }

      /**
       * 数据声明
       * 发送的数据,通过这里的声明 告诉下游,我这个数据是什么
       * 相当于 表中的字段名
       */
      public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
          //这里我们发送的是  姓名 和 年龄,主要顺序不能乱
          outputFieldsDeclarer.declare(new Fields("name","age"));
      }
  }
  1. 创建 Bolt
代码语言:javascript
复制
  //继承自 BaseRichBolt
  static class MyBolt extends BaseRichBolt {

      private Map stormConf;
      private TopologyContext context;
      private OutputCollector collector;
      //初始化函数
      public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
          this.stormConf = stormConf;
          this.context = context;
          this.collector = collector;
      }
      // 处理数据  
      public void execute(Tuple input) {
          //获取上游发送的 name 字段
          String name = input.getStringByField("name");
          //获取上游发送的 age 字段
          String age = input.getStringByField("age");
          //这里我们简单的打印一下就好
          
          System.out.println(name + "****"+age);
          // 如果你还要继续往下发送 那么:collector.emit() 就可以
      }
      
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
              //和 Spout 一样, 如果你还要继续往下发送数据,
              // 那么你就要在这里声明的发送的数据是什么
              // 我们这里不往下游发送,所以可以不用写
      }
  }
 
代码语言:javascript
复制
  //在主程序里面进行组装提交
  public static void main(String[] args) {
      // Topology 的构建者
      TopologyBuilder builder = new TopologyBuilder();
      // 设置 Spout ,并为其命名为 textSpout
      builder.setSpout("textSpout", new MySpout(), 3);
      //设置 Bolt,并为其命名为 MyBolt,
      builder.setBolt("MyBolt", new MyBolt(), 3)
      // 设置其 Stream Grouping 为 shuffleGrouping,并且是从 textSpout 接受数据
      .shuffleGrouping("textSpout");
      // 创建Topology
      StormTopology topology  = builder.createTopology()
      // 创建一个 本地集群
      LocalCluster localCluster = new LocalCluster();
      Config map = new Config();
      map.setNumAckers(1);
      //将 topology   提交到集群运行
      localCluster.submitTopology("test", map, topology  );
  }

有没有感觉很简单?和我们平时写的本地代码基本没什么区别...

Storm 计算模型

上面我们已经体验过 Storm 的本地模式了,虽然我们的代码极其简陋,但是最少让我们了解了 Storm 的编程模型到底是怎么样的了!再怎么复杂的东西,我们也可以从上面这个简陋的代码一步步衍生出来,下面我们看一下下面这幅图

从上面我们可以看到: 一个水龙头代表一个 Spout,一个闪电代表一个 Bolt, Spout 和 Bolt 通过 数据Tuple 的通道建立起了一条条数据流。 该图可以很好的说明 Storm 的工作模式, 通过 Spout 和 Bolt 可以构建起各种数据流以满足我们的业务需求。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019.06.20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 1 Storm初体验之本地运行
    • 1.1 下载 Storm Jar 包
    • 1.2 创建 Topology
  • Storm 计算模型
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档