首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Transformation转换算子之Value类型

Transformation转换算子之Value类型

作者头像
用户1483438
发布2022-04-27 14:53:46
发布2022-04-27 14:53:46
64600
代码可运行
举报
文章被收录于专栏:大数据共享大数据共享
运行总次数:0
代码可运行

定义SparkContext

代码语言:javascript
代码运行次数:0
运行
复制
  val conf=new SparkConf().setMaster("local[4]").setAppName("test")
  val sc=new SparkContext(conf)

Map算子

代码语言:javascript
代码运行次数:0
运行
复制
  @Test
  def mapTest(): Unit ={
    // 创建本地集合 RDD ,最小分区数为4
    val list=List(1,2,3,4,5,6,7,8)
    val rdd1=sc.makeRDD(list,4)

    // 使用 map 算子
    val mapRdd: RDD[Unit] = rdd1.map(i => {
      println(i)
    })

    // 调用
    mapRdd.collect()
  }

运行结果;以为是分区计算,所以结果是无序的。

代码语言:javascript
代码运行次数:0
运行
复制
5
3
7
1
8
4
6
2

查看每个分区处理哪些数据

代码语言:javascript
代码运行次数:0
运行
复制
rdd1.mapPartitionsWithIndex((index,it)=>{
    println(s"index=$index data=${it.toList}")
    it
}).collect

运行结果

代码语言:javascript
代码运行次数:0
运行
复制
index=3 data=List(7, 8)
index=0 data=List(1, 2)
index=2 data=List(5, 6)
index=1 data=List(3, 4)

获取map算子返回

代码语言:javascript
代码运行次数:0
运行
复制
  @Test
  def mapTest(): Unit ={
    // 创建本地集合 RDD ,最小分区数为4
    val list=List(1,2,3,4,5,6,7,8)
    val rdd1=sc.makeRDD(list,4)

    // 使用 map 算子
    val mapRdd: RDD[Int] = rdd1.map(i =>i*i)

    // 调用
    println(mapRdd.collect().toList)
  }
代码语言:javascript
代码运行次数:0
运行
复制
List(1, 4, 9, 16, 25, 36, 49, 64)

其基本操作和scala的map一致。 map 算子 源码

代码语言:javascript
代码运行次数:0
运行
复制
 def map[U: ClassTag](f: T => U): RDD[U] = withScope {
   // clean 用于清理函数闭包,以为闭包无法进行序列化。
    val cleanF = sc.clean(f)
   // 创建 MapPartitionsRDD 对象;
   // this:创建新的RDD时绑定当前RDD的依赖关系
   // 需要传入一个函数,有三个参数
   // 第一个_:指定TaskContext 
   // 第二个_:指定TaskContext 
   // iter:当前分区的迭代器(内容如下);然后调用scala中的map而不是spark的map算子。
   //index=3 data=List(7, 8)
   //index=0 data=List(1, 2)
   //index=2 data=List(5, 6)
   //index=1 data=List(3, 4)
    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
  }

思考一个问题?map算子并没有指定分区,但是却是还是4个分区? 首先 map的数据来源于rdd1;rdd1指定了分区。

代码语言:javascript
代码运行次数:0
运行
复制
val rdd1=sc.makeRDD(list,4)

然后map绑定当前rdd的关联关系

代码语言:javascript
代码运行次数:0
运行
复制
// 由rdd1 调用
val mapRdd: RDD[Int] = rdd1.map(i =>i*i)
// 返回一个新的 rdd时绑定了当前的rdd 也就是this
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))

最重要的还是 MapPartitionsRDD 继承的RDD就是rdd1 ,可以通过 prev看到。

代码语言:javascript
代码运行次数:0
运行
复制
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( 
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U], 
    其他参数
) extends RDD[U](prev) {

所以map算子的分区大小是其父类指定的分区大小。


mapPartitions 算子

案例:使用mapPartitions,通过id查询用户信息

代码语言:javascript
代码运行次数:0
运行
复制
  @Test
  def mysqlQueryByMapPartitions(): Unit ={
    // 读取文件
    val lines=sc.textFile("file:///C:/Users/123456/Desktop/userId.txt",4)

    val arr=lines.mapPartitions(it => {
      // 加载驱动类
      Class.forName("com.mysql.jdbc.Driver")

      //连接数据库
      val connection= DriverManager.getConnection("jdbc:mysql://hadoop102:3306/gmall?characterEncoding=UTF-8", "root", "123321")

      val statement = connection.prepareStatement("select * from user_info where id=?")

      try{
        while (it.hasNext) {
          //主键id
          val id: String = it.next()
          // 设置参数
          statement.setInt(1,id.toInt)
          // 执行查询
          val result: ResultSet = statement.executeQuery()

          while (result.next()) {
            val id = result.getInt("id")
            val nickName = result.getString("nick_name")
            val name = result.getString("name")
            println(s"id=$id,nickName=$nickName,name=$name")
          }
        }

      }catch {
        case e:Exception => {
          println("数据库查询失败")
          println(e.printStackTrace())
        }
      }finally {
        //关闭资源
        if(statement!=null) statement.close()
        if(connection!=null) connection.close()
      }
      it
    }).collect()

    println(arr.toList)

  }

我是将id 一行行存储到该文件中的。数据量不大,一百个id。

代码语言:javascript
代码运行次数:0
运行
复制
val lines=sc.textFile("file:///C:/Users/123456/Desktop/userId.txt",4)

数据就不复制出来了,就看看数据库资源关闭了几次。总共有四个分区,所以最终数据库关闭了4次。

代码语言:javascript
代码运行次数:0
运行
复制
关闭数据库资源....
关闭数据库资源....
关闭数据库资源....
关闭数据库资源....

mapPartitions换成 map

代码语言:javascript
代码运行次数:0
运行
复制
@Test
  def mysqlQueryByMap(): Unit ={
    // 读取文件
    val lines=sc.textFile("file:///C:/Users/123456/Desktop/userId.txt",4)

    val arr=lines.map(id => {
      // 加载驱动类
      Class.forName("com.mysql.jdbc.Driver")

      //连接数据库
      val connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/gmall?characterEncoding=UTF-8", "root", "123321")

      val statement = connection.prepareStatement("select * from user_info where id=?")

      try {
        // 设置参数
        statement.setInt(1, id.toInt)
        // 执行查询
        val result: ResultSet = statement.executeQuery()

        while (result.next()) {
          val id = result.getInt("id")
          val nickName = result.getString("nick_name")
          val name = result.getString("name")
          println(s"id=$id,nickName=$nickName,name=$name")
        }
      }catch {
        case e:Exception => {
          println("数据库查询失败")
          println(e.printStackTrace())
        }
      }finally {
        println("关闭数据库资源....")
        //关闭资源
        if(statement!=null) statement.close()
        if(connection!=null) connection.close()
      }
    }).collect()
  }

数据库的建立与销毁来来回回一百次(可以自己试试。

代码语言:javascript
代码运行次数:0
运行
复制
关闭数据库资源....
关闭数据库资源....
关闭数据库资源....
关闭数据库资源....
关闭数据库资源....
关闭数据库资源....
关闭数据库资源....
...

数据量少所以,没有问题,但是若数据不是一百而是上百万,千万呢?肯定是不行的。 可能将资源链接丢入map中才会造成这样的原因。 如果把connection 提出去会怎么样?

代码语言:javascript
代码运行次数:0
运行
复制
  @Test
  def mysqlQueryByMap(): Unit ={
    // 读取文件
    val lines=sc.textFile("file:///C:/Users/123456/Desktop/userId.txt",4)

    //连接数据库
    val connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/gmall?characterEncoding=UTF-8", "root", "123321")

    val statement = connection.prepareStatement("select * from user_info where id=?")


    try {

      val arr=lines.map(id => {
        // 加载驱动类
        Class.forName("com.mysql.jdbc.Driver")
        // 设置参数
        statement.setInt(1, id.toInt)
        // 执行查询
        val result: ResultSet = statement.executeQuery()

        while (result.next()) {
          val id = result.getInt("id")
          val nickName = result.getString("nick_name")
          val name = result.getString("name")
          println(s"id=$id,nickName=$nickName,name=$name")
        }

      }).collect()

    }catch {
      case e:Exception => {
        println("数据库查询失败")
        println(e.printStackTrace())
      }
    }finally {
      println("关闭数据库资源....")
      //关闭资源
      if(statement!=null) statement.close()
      if(connection!=null) connection.close()
    }
  }

报了一个错,该错误的原因是jdbc,并没有实现序列化,无法进行传输。

代码语言:javascript
代码运行次数:0
运行
复制
org.apache.spark.SparkException: Task not serializable

该案例除了说明 mapmapPartitions 的区别外,更想表达的意思是。 rdd 会将数据进行分区,每个分区的计算逻辑或数据可能不在同一个节点上。即使是local模式,分区之间也是并行处理。


mapPartitions 与 map 的区别:

  1. map里面的函数是针对分区里面的每个元素进行计算,mapPartitions里面的函数是针对每个分区的所有数据的迭代器进行计算
  2. map里面的函数是计算一个元素返回一个结果,所以map生成的新的RDD里面的元素个数 = 原来RDD元素个数 mapPartitions里面的函数是计算一个分区的所有数据的迭代器然后返回一个新的迭代器,所以mapPartitions生成的新的RDD里面的元素的个数与原来RDD元素个数可能不相同
  3. map是针对单个元素操作,元素操作完成之后可以进行回收内存了 mapPartitions是针对一个迭代器操作,操作完成迭代器一个元素之后,该元素不能回收必须等到整个迭代器都处理完成之后才能回收。如果一个分区中数据量很大,可能导致内存溢出。如果出现内存溢出可以用map代替。【完成比完美更重要】

mapPartitions源码赏析

代码语言:javascript
代码运行次数:0
运行
复制
  // f:传入一个函数,参为迭代器
  // preservesPartitioning:是否保留分区,默认为false
  def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] = withScope {
    // 清除闭包,保证数据可以进行序列化传输
    val cleanedF = sc.clean(f)
    // 创建 MapPartitionsRDD
    // this 绑定当前RDD,
    // iter 迭代器
    new MapPartitionsRDD(this,(_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter),preservesPartitioning)
  }

点击进入MapPartitionsRDD 它会执行一个compute函数

代码语言:javascript
代码运行次数:0
运行
复制
  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

mapPartitionsWithIndex

和 mapPartitions类似,但是它可以指定分区号

代码语言:javascript
代码运行次数:0
运行
复制
  @Test
  def mapPartitionsWithIndexTest(): Unit ={
    val list=List(1,2,3,4,5,6,7,8)
    val rdd1=sc.makeRDD(list,4)
    rdd1.mapPartitionsWithIndex((index,it)=>{
      while (it.hasNext) {
        println(s"index=$index, ${it.next()}")
      }
      it
    }).collect()
  }

结果

代码语言:javascript
代码运行次数:0
运行
复制
index=2, 5
index=1, 3
index=0, 1
index=1, 4
index=2, 6
index=0, 2
index=3, 7
index=3, 8

flatMap()

与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。 区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。

代码语言:javascript
代码运行次数:0
运行
复制
  @Test
  def flatMapTest(): Unit ={
    val list=List(
      "hello,java,scala",
      "python,html,xml",
      "xpath,js,vue",
      "linux,windows"
    )
    // 创建本地集合RDD
    val rdd1= sc.parallelize(list, 4)
    // flatMap
    val rdd2= rdd1.flatMap(_.split(","))
    // 计算,汇总
    println(rdd2.collect.toList)
  }

结果

代码语言:javascript
代码运行次数:0
运行
复制
List(hello, java, scala, python, html, xml, xpath, js, vue, linux, windows)

glom()

该操作将RDD中每一个分区变成一个数组,并放置在新的RDD中,数组中元素的类型与原分区中元素类型一致

代码语言:javascript
代码运行次数:0
运行
复制
  @Test
  def glomTest(): Unit ={

    val list: Seq[Int] =List(1,2,3,4,5,6,7,8)
    val rdd1: RDD[Int] =sc.parallelize(list,4)
    val rdd2: RDD[Array[Int]] =rdd1.glom()
    // 获取
    val arrList: List[Array[Int]] = rdd2.collect.toList
    for (arr<- arrList){
      println(arr.toList)
    }

  }
代码语言:javascript
代码运行次数:0
运行
复制
List(1, 2)
List(3, 4)
List(5, 6)
List(7, 8)

groupBy

对数据进行分组

代码语言:javascript
代码运行次数:0
运行
复制
  @Test
  def groupBy(): Unit ={
    // 生成一百个数
    val range = Range(0, 100)

    val rdd1: RDD[Int] = sc.parallelize(range, 4)


    // 将一百以内的数据按照 2的倍数和3的倍数 进行分类。
    val f=(i:Int)=>{
      if(i%2==0 && i%3==0){0}
      else if(i%2==0){1}
      else if(i%3==0){2}
      else -1
    }


    val rdd2: RDD[(Int, Iterable[Int])] = rdd1.groupBy(f)

    val result: List[(Int, Iterable[Int])] = rdd2.collect.toList
    for(r <- result){
        r match {
          case (k,v) => println(k,v)
        }
    }

  }

结果

代码语言:javascript
代码运行次数:0
运行
复制
(0,CompactBuffer(0, 6, 12, 18, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78, 84, 90, 96))
(1,CompactBuffer(2, 4, 8, 10, 14, 16, 20, 22, 26, 28, 32, 34, 38, 40, 44, 46, 50, 52, 56, 58, 62, 64, 68, 70, 74, 76, 80, 82, 86, 88, 92, 94, 98))
(2,CompactBuffer(3, 9, 15, 21, 27, 33, 39, 45, 51, 57, 63, 69, 75, 81, 87, 93, 99))
(-1,CompactBuffer(1, 5, 7, 11, 13, 17, 19, 23, 25, 29, 31, 35, 37, 41, 43, 47, 49, 53, 55, 59, 61, 65, 67, 71, 73, 77, 79, 83, 85, 89, 91, 95, 97))

使用groupby 完成worldCount作业

代码语言:javascript
代码运行次数:0
运行
复制
@Test
  def worldCount():Unit={
    //读取文件
    val lines=sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)

    // 内容扁平化
    val worldList: RDD[String] = lines.flatMap(_.split(" "))

    // 内容分组
    val groupList: RDD[(String, Iterable[String])] = worldList.groupBy(s => s)

    // 统计单词数量
    val result=groupList.map(x=>(x._1,x._2.size))

    println(result.collect().toList)

  }

数据结果

代码语言:javascript
代码运行次数:0
运行
复制
List((shell,4), (wahaha,1), (hello,2), (python,1), (java,5))

filter 过滤

接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中。

案例:找到一百中所有的偶数

代码语言:javascript
代码运行次数:0
运行
复制
  @Test
  def filterTest(): Unit ={
    // 生成0-100的数
    val range=Range(0,100)
    val rdd1: RDD[Int] = sc.parallelize(range, 4)

    //过滤奇数
    val value: RDD[Int] = rdd1.filter(_%2==0)
    println(value.collect.toList)
  }
代码语言:javascript
代码运行次数:0
运行
复制
List(0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98)

sample 采样

代码语言:javascript
代码运行次数:0
运行
复制
  def sample(withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T] = {...}
  • withReplacement: 是否放回[true-代表放回,意味着同一个数据可能被多次采样 false-不返回,意味着同一条数据最多被采样一次] 【工作中设置为false】
  • fraction: 【工作中一般设置为 0.1-0.2 】: 如果withReplacement=false, fraction代表每个元素被采样的概率[0,1] 如果withReplacement=true, fraction代表每个元素期望被采样的次数
  • seed: 随机数种子

对应 0-100的数字进行采样;100%

代码语言:javascript
代码运行次数:0
运行
复制
  @Test
  def sampleTest(): Unit ={
    // 生成0-100的数
    val range=Range(0,100)
    val rdd1: RDD[Int] = sc.parallelize(range, 4)
    // withReplacement=false ;fraction=1
    val value: RDD[Int] = rdd1.sample(false, 1)

    val list=value.collect.toList
    println(list)
  }
代码语言:javascript
代码运行次数:0
运行
复制
List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)

对应 0-100的数字进行采样;50%

代码语言:javascript
代码运行次数:0
运行
复制
  @Test
  def sampleTest(): Unit ={
    // 生成0-100的数
    val range=Range(0,100)
    val rdd1: RDD[Int] = sc.parallelize(range, 4)
    // withReplacement=false ;fraction=0.5
    val value: RDD[Int] = rdd1.sample(false, 0.5)

    val list=value.collect.toList
    println(list)
  }

随机抽查50个不重样数据

代码语言:javascript
代码运行次数:0
运行
复制
List(0, 1, 3, 4, 6, 7, 9, 11, 15, 16, 17, 20, 22, 23, 25, 27, 30, 32, 33, 34, 35, 36, 37, 38, 40, 42, 44, 45, 46, 48, 50, 54, 56, 60, 63, 65, 66, 67, 70, 71, 73, 75, 77, 79, 80, 81, 82, 83, 85, 89, 91, 92, 97)

对应 0-100的数字进行采样;10%

代码语言:javascript
代码运行次数:0
运行
复制
  @Test
  def sampleTest(): Unit ={
    // 生成0-100的数
    val range=Range(0,100)
    val rdd1: RDD[Int] = sc.parallelize(range, 4)
    // withReplacement=false ;fraction=0.1
    val value: RDD[Int] = rdd1.sample(false, 0.1)

    val list=value.collect.toList
    println(list)
  }

这样看起方便点

代码语言:javascript
代码运行次数:0
运行
复制
List(15, 23, 44, 49, 50, 83, 85, 86, 96)

重复采样,单个元素可能被采样多次

代码语言:javascript
代码运行次数:0
运行
复制
  @Test
  def sampleTest(): Unit ={
    // 生成0-100的数
    val range=Range(0,100)
    val rdd1: RDD[Int] = sc.parallelize(range, 4)
    // withReplacement=true ;fraction=0.2 
    val value: RDD[Int] = rdd1.sample(true, 0.2)

    val list=value.collect.toList
    println(list)
  }

withReplacement 设置为true,有些数据可能会被多次采样(如88,95)。

代码语言:javascript
代码运行次数:0
运行
复制
List(3, 19, 25, 26, 27, 30, 30, 33, 38, 55, 60, 62, 63, 66, 72, 77, 88, 88, 93, 94, 95, 95)

将withReplacement 依旧为true,fraction改为整数 这次采用0-10的数据

代码语言:javascript
代码运行次数:0
运行
复制
  @Test
  def sampleTest(): Unit ={
    // 生成0-100的数
    val range=Range(0,10)
    val rdd1: RDD[Int] = sc.parallelize(range, 4)
    // withReplacement=true ;fraction=2
    val value: RDD[Int] = rdd1.sample(true,2)

    val list=value.collect.toList
    println(list)
  }

如果withReplacement=true, fraction代表每个元素期望被采样的次数; 这个期望值是不确定的,如上我期望值是2但是有的却只有1个如(7);但也比2高的。

代码语言:javascript
代码运行次数:0
运行
复制
List(0, 0, 0, 0, 0, 1, 1, 1, 3, 3, 4, 4, 5, 5, 5, 5, 5, 5, 7, 8, 8, 8, 9, 9, 9)

最后再说说 seed;默认是一个伪随机数,用来决定采样的随机率。 一般不会更改

代码语言:javascript
代码运行次数:0
运行
复制
Long = Utils.random.nextLong

seed 固定多次采用的结果也是一样

代码语言:javascript
代码运行次数:0
运行
复制
  @Test
  def sampleTest(): Unit ={
    // 生成0-100的数
    val range=Range(0,10)
    val rdd1: RDD[Int] = sc.parallelize(range, 4)
    // withReplacement=true ;fraction=2;seed=10
    val value: RDD[Int] = rdd1.sample(true,2,10)

    val list=value.collect.toList
    println(list)
  }

第一次运行

代码语言:javascript
代码运行次数:0
运行
复制
List(1, 2, 2, 2, 4, 4, 4, 4, 4, 5, 5, 5, 5, 6, 6, 7, 7, 7, 8, 9, 9, 9, 9)

第二次运行

代码语言:javascript
代码运行次数:0
运行
复制
List(1, 2, 2, 2, 4, 4, 4, 4, 4, 5, 5, 5, 5, 6, 6, 7, 7, 7, 8, 9, 9, 9, 9)

distinct 去重

代码语言:javascript
代码运行次数:0
运行
复制
  @Test
  def distinctTest(): Unit ={
    // 设置一些重复的元素
    val list=List(1,2,2,3,3,4,3,5,6,7,9,8,9,4,7)

    // 创建本地集合RDD
    val rdd1: RDD[Int] = sc.parallelize(list, 4)

    // 去重
    val value: RDD[Int] = rdd1.distinct()

    // 调用,打印
    println(value.collect.toList)
  }

结果

代码语言:javascript
代码运行次数:0
运行
复制
List(4, 8, 1, 9, 5, 6, 2, 3, 7)

除了使用distinct也可使用groupBy 实现去重功能

代码语言:javascript
代码运行次数:0
运行
复制
 @Test
  def distinctTest(): Unit ={
    // 设置一些重复的元素
    val list=List(1,2,2,3,3,4,3,5,6,7,9,8,9,4,7)

    // 创建本地集合RDD
    val rdd1: RDD[Int] = sc.parallelize(list, 4)

    // 去重
    val value: RDD[Int] = rdd1.groupBy(x=>x).map(_._1)

    // 调用,打印
    println(value.collect.toList)
  }

结果

代码语言:javascript
代码运行次数:0
运行
复制
List(4, 8, 1, 9, 5, 6, 2, 3, 7)

coalesce 合并分区

代码语言:javascript
代码运行次数:0
运行
复制
  @Test
  def coalesceTest(): Unit ={
    // 生成0-100的数
    val range=Range(0,100)

    // 创建本地集合RDD
    val rdd1: RDD[Int] = sc.parallelize(range, 4)    
  }

查看分区情况

代码语言:javascript
代码运行次数:0
运行
复制
rdd1.mapPartitionsWithIndex((index,it)=>{
  println(s"index=$index;it=${it.toList}")
  it
}).collect

各个分区情况

代码语言:javascript
代码运行次数:0
运行
复制
index=0;it=List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24)
index=1;it=List(25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49)
index=3;it=List(75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
index=2;it=List(50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74)

合并分区 numPartitions:合并分区的个数 shuffle:默认为false,并不会触发shuffle,设置为true,可以重新扩大分区,但是会进行shuffle操作。

代码语言:javascript
代码运行次数:0
运行
复制
 def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null): RDD[T] = withScope {...}

合并成两个分区,并查看合并分区后的数据情况

代码语言:javascript
代码运行次数:0
运行
复制
    //合并成两个分区
    val value: RDD[Int] = rdd1.coalesce(2)
    println(s"分区数${value.getNumPartitions}")
    //查看合并之后的分区情况
    value.mapPartitionsWithIndex((index,it)=>{
      println(s"index=$index;it=${it.toList}")
      it
    }).collect

结果

代码语言:javascript
代码运行次数:0
运行
复制
分区数2
index=0;it=List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49)
index=1;it=List(50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)

默认情况下,无法增加分区

代码语言:javascript
代码运行次数:0
运行
复制
    //合并成两个分区
    val value: RDD[Int] = rdd1.coalesce(5)
    println(s"分区数${value.getNumPartitions}")
    //查看合并之后的分区情况
    value.mapPartitionsWithIndex((index,it)=>{
      println(s"index=$index;it=${it.toList}")
      it
    }).collect

就打印分区数就行了,无法扩展分区。

代码语言:javascript
代码运行次数:0
运行
复制
分区数2

coalesce默认情况下只能合并分区,如果想要增大分区数,需要设置shuffle=true

代码语言:javascript
代码运行次数:0
运行
复制
    //合并成两个分区
    val value: RDD[Int] = rdd1.coalesce(5,true)
    println(s"分区数${value.getNumPartitions}")

    //查看合并之后的分区情况
    value.mapPartitionsWithIndex((index,it)=>{
      println(s"index=$index;it=${it.toList}")
      it
    }).collect

数据结果

代码语言:javascript
代码运行次数:0
运行
复制
分区数5
index=0;it=List(4, 9, 14, 19, 24, 27, 32, 37, 42, 47, 53, 58, 63, 68, 73, 78, 83, 88, 93, 98)
index=1;it=List(0, 5, 10, 15, 20, 28, 33, 38, 43, 48, 54, 59, 64, 69, 74, 79, 84, 89, 94, 99)
index=3;it=List(2, 7, 12, 17, 22, 25, 30, 35, 40, 45, 51, 56, 61, 66, 71, 76, 81, 86, 91, 96)
index=2;it=List(1, 6, 11, 16, 21, 29, 34, 39, 44, 49, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95)
index=4;it=List(3, 8, 13, 18, 23, 26, 31, 36, 41, 46, 52, 57, 62, 67, 72, 77, 82, 87, 92, 97)

源码解析

代码语言:javascript
代码运行次数:0
运行
复制
def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
          (implicit ord: Ordering[T] = null): RDD[T] = withScope {
    require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
    // 判断 shuffle 是否为true
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](
          mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
          new HashPartitioner(numPartitions)),
        numPartitions,
        partitionCoalescer).values
    } else {
      new CoalescedRDD(this, numPartitions, partitionCoalescer)
    }
}

先分析shuffle=false情况

代码语言:javascript
代码运行次数:0
运行
复制
// this 当前调用coalesce的rdd
// numPartitions 分区数 ;上面设置的是5
new CoalescedRDD(this, numPartitions, partitionCoalescer)

partitionCoalescer 我们并没有指定,所以使用的是默认的DefaultPartitionCoalescer

代码语言:javascript
代码运行次数:0
运行
复制
  override def getPartitions: Array[Partition] = {
    val pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer())

    pc.coalesce(maxPartitions, prev).zipWithIndex.map {
      case (pg, i) =>
        val ids = pg.partitions.map(_.index).toArray
        CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
    }
  }

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 定义SparkContext
  • Map算子
  • mapPartitions 算子
  • mapPartitionsWithIndex
  • flatMap()
  • glom()
  • groupBy
  • filter 过滤
  • sample 采样
  • distinct 去重
  • coalesce 合并分区
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档