关联函数补充
join为主基础算子
# -*- coding: utf-8 -*-
# Program function:演示join操作
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
print('PySpark join Function Program')
# TODO:1、创建应用程序入口SparkContext实例对象
conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
# TODO: 2、从本地文件系统创建RDD数据集
x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])
y = sc.parallelize([(1001, "sales"), (1002, "tech")])
# TODO:3、使用join完成联合操作
print(x.join(y).collect()) # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
print(x.leftOuterJoin(y).collect())
print(x.rightOuterJoin(y).collect()) # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
sc.stop()
为什么使用缓存
如何进行缓存?
spark中提供cache方法
spark中提供persist方法
# -*- coding: utf-8 -*-
# Program function:演示join操作
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
import time
if __name__ == '__main__':
print('PySpark join Function Program')
# TODO:1、创建应用程序入口SparkContext实例对象
conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
# TODO: 2、从本地文件系统创建RDD数据集
x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])
y = sc.parallelize([(1001, "sales"), (1002, "tech")])
# TODO:3、使用join完成联合操作
join_result_rdd = x.join(y)
print(join_result_rdd.collect()) # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
print(x.leftOuterJoin(y).collect())
print(x.rightOuterJoin(y).collect()) # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
# 缓存--基于内存缓存-cache底层调用的是self.persist(StorageLevel.MEMORY_ONLY)
join_result_rdd.cache()
# join_result_rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
# 如果执行了缓存的操作,需要使用action算子触发,在4040页面上看到绿颜色标识
join_result_rdd.collect()
# 如果后续执行任何的操作会直接基于上述缓存的数据执行,比如count
print(join_result_rdd.count())
time.sleep(600)
sc.stop()
缓存级别
释放缓存
后续讲到Spark内存模型中,缓存放在Execution内存模块
如果不在需要缓存的数据,可以释放
最近最少使用(LRU)
print(“释放缓存之后,直接从rdd的依赖链重新读取”) print(join_result_rdd.count())
* <img src="https://maynor.oss-cn-shenzhen.aliyuncs.com/img/20231009192818.png" alt="image-20210913104616717" style="zoom:150%;" />
何时缓存数据
持久化和Checkpoint的区别
案例测试:
先cache在checkpoint测试