前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 实时流Wordcount案例

Flink 实时流Wordcount案例

作者头像
编程那点事
发布2023-02-25 16:09:26
3900
发布2023-02-25 16:09:26
举报
文章被收录于专栏:java编程那点事

Scala版本

代码语言:javascript
复制
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

object SocketWindowWordCountScala {
 def main(args: Array[String]) : Unit = {
 // 定义一个数据类型保存单词出现的次数
 case class WordWithCount(word: String, count: Long)
 // port 表示需要连接的端口
 val port: Int = try {
   ParameterTool.fromArgs(args).getInt("port")
 } catch {
   case e: Exception => {
     System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
     return
   }
 }
 // 获取运行环境
 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
 // 连接此socket获取输入数据
 val text = env.socketTextStream("node21", port, '\n')
 //需要加上这一行隐式转换 否则在调用flatmap方法的时候会报错
 import org.apache.flink.api.scala._
 // 解析数据, 分组, 窗口化, 并且聚合求SUM
 val windowCounts = text
 .flatMap { w => w.split("\\s") }
 .map { w => WordWithCount(w, 1) }
 .keyBy("word")
 .timeWindow(Time.seconds(5), Time.seconds(1))
 .sum("count")
 // 打印输出并设置使用一个并行度
 windowCounts.print().setParallelism(1)
 env.execute("Socket Window WordCount")
 }
}

java版本

代码语言:javascript
复制
import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.utils.ParameterTool;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.util.Collector;

/**

* Desc: 使用flink对指定窗口内的数据进行实时统计,最终把结果打印出来

*       先在node21机器上执行nc -l 9000

*/

public class StreamingWindowWordCountJava {

 public static void main(String[] args) throws Exception {

 //定义socket的端口号

 int port;

 try{

     ParameterTool parameterTool = ParameterTool.fromArgs(args);

     port = parameterTool.getInt("port");

 }catch (Exception e){

     System.err.println("没有指定port参数,使用默认值9000");

     port = 9000;

 }

 //获取运行环境

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 //连接socket获取输入的数据

 DataStreamSource<String> text = env.socketTextStream("node21", port, "\n");

 //计算数据

 DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {

   public void flatMap(String value, Collector<WordWithCount> out) throws Exception {

       String[] splits = value.split("\\s");

       for (String word:splits) {

           out.collect(new WordWithCount(word,1L));

       }

   }

 })//打平操作,把每行的单词转为<word,count>类型的数据

       //针对相同的word数据进行分组

       .keyBy("word")

       //指定计算数据的窗口大小和滑动窗口大小

       .timeWindow(Time.seconds(2),Time.seconds(1))

       .sum("count");

 //把数据打印到控制台,使用一个并行度

 windowCount.print().setParallelism(1);

 //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行

 env.execute("streaming word count");

}

/**

* 主要为了存储单词以及单词出现的次数

*/

 static class WordWithCount{

   public String word;

   public long count;

   public WordWithCount(){}

   public WordWithCount(String word, long count) {

       this.word = word;

       this.count = count;

   }

   @Override

   public String toString() {

       return "WordWithCount{" +

               "word='" + word + '\'' +

               ", count=" + count +

               '}';

   }

 }

}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-03-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档