Apache Flink是一个开源的针对批量数据和流数据的处理引擎,已经发展为ASF的顶级项目之一。Flink 的核心是在数据流上提供了数据分发、通信、具备容错的分布式计算。同时,Flink 在流处理引擎上构建了批处理引擎,原生支持了迭代计算、内存管理和程序优化。
Flink的技术栈:
Flink的主要API:
此外,Flink还针对特定的应用领域提供了领域库,例如:
Flink的部署方式:
另外,Flink也可以方便地和Hadoop生态圈中其他项目集成,例如Flink可以读取存储在HDFS或HBase中的静态数据,以Kafka作为流式的数据源,直接重用MapReduce或Storm代码,或是通过YARN申请集群资源等。
当Flink集群启动后,首先会启动一个JobManger和一个或多个的 TaskManager。由Client提交任务给JobManager,JobManager再调度任务到各个TaskManager去执行,然后TaskManager将心跳和统计信息汇报给 JobManager。TaskManager之间以流的形式进行数据的传输。上述三者均为独立的JVM进程。
使用Flink的自带例子: flink-stream-examples/WordCount,这是一个从字符串数组读取句子计算每个单词出现次数的例子。
1、启动flink bin/start-local.sh
2、运行WordCount bin/flink run examples/streaming/WordCount.jar
3、执行完之后查看统计结果 cat log/flink-$USER-jobmanager-0-$USER-VirtualBox.out
4、那么我们访问localhost:8081可以查看到此job的执行计划
WordCount代码:
public static void main(String[] args) throws Exception {
// Checking input parameters
final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataStream<String> text;
if (params.has("input")) {
// read the text file from given input path
text = env.readTextFile(params.get("input"));
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
// get default test text data
text = env.fromElements(WordCountData.WORDS);
}
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer()).setParallelism(2)
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0).sum(1).setParallelism(2);
// emit result
if (params.has("output")) {
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
// execute program
env.execute("Streaming WordCount");
}
考虑到业界当前主流的流式处理引擎为Apache Storm,Flink为了更好的与业界衔接,在流处理上对Storm是做了兼容,通过复用代码的方式即可实现Storm在Flink运行环境的执行,这个也大大降低了Storm使用者过渡到Flink的难度;同理Flink也可以运行我们数平的JStorm。
1、先来对比一下Apache Flink 与Apache Storm的异同:
2、由上可得,虽然两者运行实体的结构及代码有一定的差别,但归根到底两者运行的都是有向无环图(DAG),所以从Storm的Topology相关类转换成Flink执行的DataStream相关类是可以作转换的。 以下是粗略的转换过程:Storm Topology -> Flink Topology -> DataStream StreamGraph
3、举个例子:已有WordCountTopology,需要提交到Flink集群,那么只需下面几行代码:
final TopologyBuilder builder = WordCountTopology.buildTopology();//构造storm的topology
Map conf = new HashMap();
conf.put("nimbus.host", xxxx);//optional,master server
conf.put("nimbus.thrift.port", xxxx);//optional, master server port
FlinkSubmitter.submitTopology(topologyId, conf, FlinkTopology.createTopology(b
gy转换成FlinkTopology再提交
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。