在多年的学习路上,也掌握了几门比较常见的语言,例如Java、Python以及前端Vue生态中包含的语言。很多时候,各种语言相似功能的框架都会被放在一起比较,来评判语言本身的优劣。
在我的实际学习应用中,发现每种语言都有自己适合的领域。例如Java拥有庞大丰富的后台生态,所以常用来构建后端服务。Python轻量且简单易上手,常应用于数据分析、爬虫、机器学习等场景。除此之外,还有一些小众语言也在各自的领域熠熠生辉。
在还没有接触大数据开发之前,是从来没有听说过scala这门语言。后来在实时开发Spark、Flink领域,在官方提供Java、Python和scala中,我对scala情有独钟,仿佛scala天生就是为流数据处理而生。
所以本篇文章就从scala独有的特性入手,结合一些开发中的小技巧,且看为什么寂寂无名的scala,能在流处理中与Java争雄。
第一次接触implicit的时候:“啥是隐式转换?”后来学完之后,悟了:“藏起来的类型转换”,先从类型转换开始入门。
假设我定义了一个字符串变量a,如果将a转换成int类型,并需要赋值给一个变量。在Java中,我需要:
String a = "6";
int a_ = Integer.parseInt(a)
在python中,我需要:
a = '6'
a = int(a)
在上面的两种语言中,从string到int有个共同点,就是都需要调用方法来类型转换,且Java还需要重新创建一个int类型的变量,来接收转换后的数值。
因为Python是个动态类型语言,所以在Python中可以直接使用变量a,来接收转换后的数值,而在Java是静态类型语言,在定义变量时,变量的类型就已经声明确定了。如果直接将int赋值给String类型的变量a,在类型检查时就会报错。
scala也是个静态类型语言,在scala虽然使用val或var来定义变量,但实际上只是在变量定义时,可以省略数据类型,然后由scala编译器来自动声明。所以在上面的样例中,scala和java的的情况是一样的。如果非要实现python那种一个变量,两种类型的动态类型效果,接着往下看:
var a: Int = "6"
a += 1
print(a)
在上面的代码中,我直接将String类型的数值,赋值给了int类型的变量,???这比Python更动态了吧,而且scala最后也是转换成Java运行,这能不报错?
编译无问题、运行无报错,String类型的6还变成了int类型,最后输出结果7。正常情况下,在赋值的那一步就开始报错了,但是有了scala的隐式转换,scala编译器就自动转换了。在上述代码中,我定义了一个隐式转换方法:
使用 implicit定义一个方法,方法参数类型就是要被转换的数据类型,方法返回值就是要被赋值目标变量的类型。当检测到String类型要赋值给Int类型变量时,就会调用这个隐式转换函数,将String类型转换成int。
如果删除了这个隐式转换方法,和Java一样会报错。
隐式参数就是在定义方法形参时,在形参名称前面使用implicit修饰。然后在调用这个方法的时候,如果你传入了参数,就是一个正常的方法调用。如果没有传入参数,会自动寻找邻近的、同类型的、implicit修饰的变量,当做方法的参数自动传入。
如图:我定义了一个say方法,有一个String类型的参数使用implicit修饰。使用say("aqi")调用方法时,正常执行,如果我直接就写一个say,也不加括号、也不传入参数,就报错了。
报错的意思是就是:没有发现String类型的隐式参数。我们就在调用say之前,定义一个隐式变量作为参数。
def say(implicit s: String) = println(s)
implicit val a = "aqi aqi"
say
使用implicit修饰了String类型变量a,直接使用say调用方法,会自动将a当做参数传入,最后正常输出。
如图,最后say的形参自动绑定implicit修饰的变量a,传入say()中输出结果。
既然开胃菜吃完,接着就从scala最简单的语法看起。
虽然Scala依赖于JDK,且能引用Java类,但是除了字符串要用双引号之外,感觉scala和Java没有太大的相似之处。下面是scala定义变量的样例:
var a = 1
val b = new util.HashMap[String, Int]
val定义常量、var定义变量。a是一个Int类型,b是一个Java的HashMap,熟悉Java的朋友可能会指出:”HashMap后面少加了一个括号!“。在Scala中,如果使用无参构造器,是可以省略掉括号的。
scala摒弃了Java这种public static void定义函数方式,而是和Python一样使用关键字def。在此基础上还有进一步的优化,就是返回值不用return。
val a = 1
def aqi() = {
a
}
print(aqi)
输出结果为1,这里的a就是被aqi这个函数的返回值。
不仅如此,在定义函数时,我还可以将不同参数放在不同的括号里面:
def add(x:Int)(y: Int): Int = x + y
add(1)(2)
最后输出结果为3。到这里可能有疑问,这个花里胡哨的有啥用呢?后面在进阶用法中会讲到它的妙用。
在scala中的方法定义中,除了使用常见的数据类型作为参数,还可以使用函数作为参数。例如我定义一个方法:
def say(func: () => Unit) = {
println("say....")
func()
}
在定义时say()时,定义形参func是一个函数。所以在调用时,就要传入一个函数。,
val func = () => println("aqi")
say(func)
定义一个func函数变量,然后在调用say()时传入,运行结果:
在scala中,有三种方式定义一个类,分别是:class、object、case class,class和object通常被定义在一个源文件中,且名称相同。class是object的伴生类,object是class的伴生对象。这些概念的东西很拗口,我在理解这一块的时候也是费了一番功夫。这里先不管,直接从用法来记住这些概念。
定义一个class:
class aqi {
def say(word: String) ={
print(word)
}
}
按照Java的用法,如果想要调用say(),我们需要先new aqi()来创建对象,然后.say(xxx)调用方法。
val aqi = new aqi()
aqi.say("hello aqi")
最后输出hello aqi。But sorry,在scala中虽然可以这样用,但是建议不要这么用。通常使用object的方式来创建class。
我们在上面的class文件中再创建一个同名的object。
// 伴生对象
object aqi {
def apply(word: String): aqi = {
val aqi_ = new aqi
aqi_.say(word)
aqi_
}
}
在伴生对象中有个apply函数,是scala中的语法糖,通过object创建对象,实际上直接调用的是apply()。
以下面代码为例:
val aqi_ = aqi("hello aqi")
这里的aqi前面没有new,所以引用的是object而不是class,因为apply需要一个String参数,所以所以传入一个String。然后再apply中使用new,来创建一个aqi的class对象aqi,调用say(),并返回aqi。
从上面样例可以看出,class和object是相互依存的,object的apply必须返回一个对象,而class被apply用来创造对象,两者是伴生关系,又根据名称翻译,所以class是伴生类,object是伴生对象。
除此之外,object提供apply来创建对象,也同样提供了unapply来结构对象。同时,object是单例,且只有object才有main()来启动应用。
而case class会自动生成伴生对象,并实现了。
case class Person(name:String, age:Int)
编译之后查看class文件,自动生成了伴生对象MODULE$,并实现了apply、unapply、equals、hashcode方法,以及实现Java的Serializable接口和scala的Product接口。
case class在Spark开发中常用来定义实体类。
在阅读Spark源码的时候,发现了很多scala很多有趣的用法,这里就拿出其中具有代表性的两个用法:柯里化和贷出模式。
柯里化指将原来接受两个参数的函数,变成新的接受一个参数的函数的过程。在上面函数定义时讲到,一个函数的多个形参,可以放在两个括号里。
先从柯里化代码来了解概念。所以柯里化的过程就是将一个
def func1(x: Int)(y: Int) = x + y
val func2 = func1(1)_
定义一个func1(),x、y两个参数列表,可以通过func1(1)(2)调用,返回值为3。柯里化,就是我将func1其中的一个参数,先写死,然后另一个参数使用占位符_表示, 表示第二个参数先不传, 返回值是一个函数值,然后赋值给func2,func2就变成了只需要传一个参数的函数。
如图所示,就是上述柯里化代码的一个运行结果。
贷出模式主要涉及到资源的获取、使用和释放,通常应用于文件、数据库连接等资源的管理过程。我们在一个方法中定义了连接的获取和关闭,这个方法中的形参是个函数,我们就在方法中,把获取的连接等资源,就“贷”给形参的函数,然后在调用这个方法传入函数时,在函数体直接使用连接进行操作。
而连接的初始化、关闭都在方法中进行,做到一个资源的管控,不理解看代码:
def withFileReader[T](fileName: String)(func: BufferedReader => T): T = {
val fileReader = new BufferedReader(new FileReader(fileName))
try {
// 将Reader对象借给了func形参
func(fileReader)
} finally {
fileReader.close()
}
}
// 调用withFileReader,使用贷出模式读取文件
val result = withFileReader("aqi.txt") { reader =>
reader.readLine()
}
这样,在调用withFileReader传入的形参函数体中,我们可以使用withFileReader中,贷出的Reader对象来读取文件。
在开头提到,在Spark/Flink中,提供了Java、Python、scala三种开发语言,原则上就是你会哪种语言就用哪种语言开发。在刚开始学习spark开发的时候,已经掌握了Java和Python,但是我还是又学了scala。原因有二:
下面是Spark官方文档提供的三段代码,三段代码做了相同的事情,是一个RDD到DataFrame实现SparkSQL计算的代码。我们无需理解代码的逻辑,只看每种代码的开发复杂度和可读性。
用Java来做流处理开发,代码有些繁多,每一个变量都要明确声明数据类型。
/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
private String word;
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
}
...
/** DataFrame operations inside your streaming program */
JavaDStream<String> words = ...
words.foreachRDD((rdd, time) -> {
// Get the singleton instance of SparkSession
SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();
// Convert RDD[String] to RDD[case class] to DataFrame
JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
JavaRow record = new JavaRow();
record.setWord(word);
return record;
});
DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);
// Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words");
// Do word count on table using SQL and print it
DataFrame wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word");
wordCountsDataFrame.show();
});
在RDD到DataFrame的转换中,Java还需要定义一个实体类。
Python是流开发中我最不建议的一种,非必要不使用,Python代码最后还是被转换成Java来运行。
# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf=sparkConf) \
.getOrCreate()
return globals()["sparkSessionSingletonInstance"]
...
# DataFrame operations inside your streaming program
words = ... # DStream of strings
def process(time, rdd):
print("========= %s =========" % str(time))
try:
# Get the singleton instance of SparkSession
spark = getSparkSessionInstance(rdd.context.getConf())
# Convert RDD[String] to RDD[Row] to DataFrame
rowRdd = rdd.map(lambda w: Row(word=w))
wordsDataFrame = spark.createDataFrame(rowRdd)
# Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words")
# Do word count on table using SQL and print it
wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
except:
pass
words.foreachRDD(process)
代码量少了很多,但是可读性稍微差一点。
最后就是scala,我不说,你自己看!
/** DataFrame operations inside your streaming program */
val words: DStream[String] = ...
words.foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.toDF("word")
// Create a temporary view
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on DataFrame using SQL and print it
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}
整体代码从简洁性和可读性,远超Java和Python。虽然和Java一样是一个静态类型语言,但是RDD转换DataFram的时候,无需定义实体类,直接一个toDF完成。
这就是我个人对使用scala时,总结的部分开发小技巧和比较有意思的用法。整体来说,scala在大数据流处理开发领域绝对是秒杀Java和Python的。而且scala虽然依赖于Java,但是其开发灵活性和代码简洁性是要超过Java的。
所以,scala真的是一门比较值得学习的语言。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。