由于做大数据开发,使用最多的语言就是scala和python,java。 刚开始由于spark是scala开发的,就去学习了scala,然后看spark的源码。 后面就是基于spark-core写原生scala的应用。总体来时, scala使用是真的丝滑和顺手。尤其函数式编程。我开发flink也是使用scala。目前很少使用Java了。 我下面就系统介绍一下,scala的内容学习, 我自己也做一个系统的学习补充吧。
这篇主要从scala入门介绍。
scala官网: https://www.scala-lang.org/
Scala combines object-oriented and functional programming in one concise, high-level language. Scala's static types help avoid bugs in complex applications, and its JVM and JavaScript runtimes let you build high-performance systems with easy access to huge ecosystems of libraries.
创始人马丁·奥德斯基(Martin Odersky)是编译器及编程的狂热爱好者,长时间的编程之后,希望发明一种语言,能够让写程序这样的基础工作变得高效,简单。所以当接触到JAVA语言后,对JAVA这门便携式,运行在网络,且存在垃圾回收的语言产生了极大的兴趣,所以决定将函数式编程语言的特点融合到JAVA中,由此发明了两种语言(Pizza & Scala)
它运行在Java虚拟机(JavaVirtualMachine)之上,轻松实现和丰富的Java类库互联互通。scala和java程序都是编译成.class文件,然后在jvm上运行。
所以,如果没有java基础,还是建议先学java基础。 scala很多实现机制都是在java基础上实现的。比如,伴生对象,成员变量,静态方法等, 这些都要反编译 .class文件去看编译的java代码的。
注意: 这里介绍了函数式编程概念, 大家下去可以了解一下 命令式编程和函数式编程的概念和不同。
scala的安装方式有很多种, 大家注意版本,比如做spark开发,以及一些apache的软件都有scala版本的要求。 注意版本匹配。
mac:
brew update
brew install scala
另一种就是下载 二进制包 直接安装
具体安装以及idea配置,就不说了。我认为你会的。
spark代码
object Wd {
def main(args: Array[String]): Unit = {
// 配置, 运行环境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
// 上下文对象
val sc = new SparkContext(sparkConf)
// 读取文件,将文件内容一行一行的读取出来
// 如果从本地中使用 file://
val lines: RDD[String] = sc.textFile("file://Users/youdi/Project/javaProject/sparkOne/input")
val words: RDD[String] = lines.flatMap(_.split(" "))
val wordToOne: RDD[(String, Int)] = words.map((_, 1)) // tuple 2
// 分组聚合
val wordSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
val array: Array[(String, Int)] = wordSum.collect()
array.foreach(println)
}
}
flink代码:
package org.youdi.wc
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
object StreamWordCount {
def main(args: Array[String]): Unit = {
val parameters: ParameterTool = ParameterTool.fromArgs(args)
val host: String = parameters.get("host")
val port: Int = parameters.getInt("port")
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// env.disableOperatorChaining()
// 接收socket数据流
val textDataStream: DataStream[String] = env.socketTextStream("localhost", 7777)
val wordCountStream: DataStream[(String, Int)] = textDataStream.flatMap(_.split(" "))
.filter(_.nonEmpty).disableChaining()
.map((_, 1)).startNewChain()
.keyBy(0)
.sum(1)
wordCountStream.print().setParallelism(1)
// 打印输出
env.execute("stream word count job")
// 6> (youdi,1) 线程数据
//1> (nihao,1)
}
}