Apache Spark是一种基于Hadoop和MapReduce技术的开源,支持各种计算技术,以实现快速高效的处理。Spark 以其内存中集群计算而闻名,这是提高 Spark 应用程序处理速度的主要贡献功能。Spark是Matei Zaharia于2009年在加州大学伯克利分校的AMPLab作为Hadoop子项目的一部分开发的。它后来在2010年以BSD许可证开源,然后在2013年捐赠给Apache软件基金会。从2014年开始,Spark在Apache基金会承担的所有项目中占据了顶级位置。
Apache Spark是一个开源框架引擎,以其在大数据处理和分析领域的速度,易于使用的性质而闻名。它还具有用于图形处理,机器学习,流式传输,SQL等的内置模块。Spark 执行引擎支持内存计算和循环数据流,它可以在集群模式或独立模式下运行,并且可以访问各种数据源,如 HBase、HDFS、Cassandra 等。
RDD 代表 弹性分布数据集。它是并行运行操作元素的容错集合。RDD的分区数据是分布式的,不可变的。有两种类型的数据集:
DAG 代表 没有有向循环的有向无环图。会有有限的顶点和边。从一个顶点的每个边都按顺序定向到另一个折点。顶点表示Spark的RDD,边缘表示要在这些RDD上执行的操作。
Spark 中有 2 种部署模式。它们是:
除了上述两种模式之外,如果我们必须在本地机器上运行应用程序进行单元测试和开发,则部署模式称为“本地模式”。在这里,作业在一台机器的单个 JVM 上运行,这使得它的效率非常低,因为在某些时候或另一个时候会出现资源短缺,从而导致作业失败。由于内存和空间有限,也无法在此模式下纵向扩展资源。
接收方是那些使用来自不同数据源的数据,然后将其移动到 Spark 进行处理的实体。它们是使用长时间运行的任务形式的流式处理上下文创建的,这些任务计划以循环方式运行。每个接收器配置为仅使用一个内核。接收器可以在各种执行器上运行,以完成数据流的任务。有两种类型的接收器,具体取决于数据发送到 Spark 的方式:
重新分区 | 合并 |
---|---|
使用情况重新分区可以增加/减少数据分区的数量。 | Spark 合并只能减少数据分区的数量。 |
重新分区创建新的数据分区,并对均匀分布的数据执行完全随机排序。 | Coalesce 利用现有的分区来减少不均匀的随机数据量。 |
内部重新分区调用使用随机参数合并,从而使其比合并慢。 | 合并比重新分区更快。但是,如果存在大小不等的数据分区,则速度可能会稍慢。 |
Spark 支持原始文件和结构化文件格式,以实现高效的读取和处理。Spark支持Parquet,JSON,XML,CSV,RC,Avro,TSV等文件格式。
在不同分区之间重新分发数据的过程可能会导致也可能不会导致数据在 JVM 进程或单独机器上的执行程序之间移动,这称为Shuffling/重新分区。分区只不过是数据的较小逻辑划分。
需要注意的是,Spark无法控制数据分布在哪个分区。
MapReduce | 阿帕奇Spark |
---|---|
MapReduce只对数据进行批量处理。 | Apache Spark可以实时和批量处理数据。 |
MapReduce确实会减慢处理大数据的速度。 | Apache Spark在大数据处理方面的运行速度比MapReduce快约100倍。 |
MapReduce将数据存储在HDFS(Hadoop分布式文件系统)中,这使得获取数据需要很长时间。 | Spark 将数据存储在内存 (RAM) 中,从而可以在需要时更轻松、更快速地检索数据。 |
MapReduce高度依赖于磁盘,这使得它成为一个高延迟的框架。 | Spark 支持内存数据存储和缓存,使其成为低延迟计算框架。 |
MapReduce需要一个外部调度程序来处理作业。 | 由于内存中的数据计算,Spark 有自己的作业调度程序。 |
Spark应用程序以独立进程的形式运行,这些进程由驱动程序通过SparkSession对象进行良好协调。Spark 的群集管理器或资源管理器实体根据每个分区一个任务原则将运行 Spark 作业的任务分配给工作器节点。有各种迭代算法重复应用于数据,以跨各种迭代缓存数据集。每个任务都会将其操作单元应用于其分区中的数据集,并生成新的分区数据集。这些结果将发送回主驱动程序应用程序进行进一步处理或将数据存储在磁盘上。
DAG 代表直接无环图,它具有一组有限的顶点和边。顶点表示RDD,边表示要按顺序对RDD执行的操作。创建的 DAG 将提交到 DAG 计划程序,该计划程序根据应用于数据的转换将图形拆分为任务阶段。阶段视图包含该阶段的 RDD 的详细信息。
DAG 在 Spark 中的工作按照下面的工作流图定义:
每个RDD都会跟踪指向一个或多个父RDD的指针及其与父RDD的关系。例如,考虑对RDD的操作,然后我们有RDD子B跟踪其父A,称为RDD谱系。val childB=parentA.map()
Spark Streaming是Spark提供的最重要的功能之一。它只不过是一个Spark API扩展,用于支持来自不同来源的数据的流处理。
```scala
import org.apache.spark.{SparkConf, SparkContext}
object KeywordSearch {
def main(args: Array[String]): Unit = {
// 创建Spark配置
val conf = new SparkConf().setAppName("Keyword Search").setMaster("local[*]")
val sc = new SparkContext(conf)
// 读取文本文件
val textFile = sc.textFile("path/to/textfile.txt")
// 指定要查找的关键字
val keyword = "example"
// 使用filter操作筛选包含关键字的行
val filteredLines = textFile.filter(line => line.contains(keyword))
// 输出包含关键字的行数
val count = filteredLines.count()
println(s"Number of lines containing the keyword '$keyword': $count")
// 关闭SparkContext
sc.stop()
}
}
```
在上面的代码中,我们首先创建了一个SparkConf对象,并设置了应用程序的名称和运行模式。然后,我们创建了一个SparkContext对象,它是与Spark集群通信的入口点。
接下来,我们使用`textFile`方法从文本文件中读取数据,并将其加载到RDD(弹性分布式数据集)中。
然后,我们指定要查找的关键字,并使用`filter`操作筛选包含关键字的行。这将返回一个新的RDD,其中只包含满足条件的行。
最后,我们使用`count`方法计算包含关键字的行数,并将结果打印出来。
请将上述代码中的"path/to/textfile.txt"替换为实际的文本文件路径,然后您就可以运行该程序来检查关键字是否存在于文本文件中了。
Spark 数据集是 SparkSQL 的数据结构,它为 JVM 对象提供了 RDD 的所有好处(例如使用 lambda 函数的数据操作)以及 Spark SQL 优化的执行引擎。这是从 1.6 版开始作为 Spark 的一部分引入的。
数据集具有以下功能:
Spark 数据帧是数据集的分布式集合,这些数据集组织成类似于 SQL 的列。它相当于关系数据库中的表,主要针对大数据操作进行优化。 数据帧可以从来自不同数据源(如外部数据库、现有 RDD、Hive 表等)的数据数组创建。以下是 Spark 数据帧的功能:
在 Spark 中开发的应用程序具有为 Spark 执行程序定义的相同固定核心计数和固定堆大小。堆大小是指通过使用属于标志的属性来控制的 Spark 执行程序的内存。每个 Spark 应用程序在其运行的每个工作器节点上都有一个分配的执行器。执行程序内存是应用程序使用的工作器节点消耗的内存的度量。spark.executor.memory-executor-memory
SparkCore是用于大规模分布式和并行数据处理的主引擎。Spark 核心由分布式执行引擎组成,该引擎提供 Java、Python 和 Scala 中的各种 API,用于开发分布式 ETL 应用程序。 Spark Core 执行重要的功能,如内存管理、作业监视、容错、存储系统交互、作业调度,以及为所有基本 I/O 功能提供支持。在Spark Core之上构建了各种附加库,允许SQL,流式处理和机器学习的各种工作负载。他们负责:
工作器节点是在群集中运行 Spark 应用程序的节点。Spark 驱动程序侦听传入连接,并接受来自执行程序的传入连接,将它们地址到工作器节点执行。工作节点就像一个从属节点,它从其主节点获取工作并实际执行它们。工作节点执行数据处理,并将使用的资源报告给主节点。主节点决定需要分配多少资源,然后根据其可用性,由主节点为工作节点安排任务。
尽管Spark是强大的数据处理引擎,但在应用程序中使用Apache Spark存在一定的缺点。其中一些是:
数据传输对应于Shuffling过程。最大限度地减少这些传输可以使 Spark 应用程序更快、更可靠地运行。有多种方法可以最大限度地减少这些情况。它们是:
SchemaRDD 是一个 RDD,由行对象组成,这些行对象是整数数组或字符串的包装器,这些数组或字符串具有有关每列数据类型的模式信息。它们旨在简化开发人员在调试代码和在 SparkSQL 模块上运行单元测试用例时的生活。它们表示RDD的描述,类似于关系数据库的模式。SchemaRDD还提供了常见RDD的基本功能以及SparkSQL的一些关系查询接口。
考虑一个例子。如果您有一个名为“人员”的 RDD 来表示人员的数据。然后 SchemaRDD 表示每行 Person RDD 所代表的数据。如果 Person 具有姓名和年龄等属性,则它们在 SchemaRDD 中表示。
Spark提供了一个强大的模块,称为SparkSQL,它执行关系数据处理,并结合Spark的函数式编程功能的强大功能。此模块还通过 SQL 或 Hive 查询语言支持。它还提供对不同数据源的支持,并帮助开发人员使用代码转换编写功能强大的 SQL 查询。 SparkSQL的四个主要库是:
Spark SQL 通过以下方式支持结构化和半结构化数据的使用:
Spark 会自动保留来自不同随机操作的中间数据。但建议在 RDD 上调用该方法。将RDD存储在内存或磁盘上或两者上具有不同复制级别的具有不同的持久性级别。Spark 中可用的持久性级别包括:persist()
MEMORY_ONLY_SER
MEMORY_ONLY_SER
在 persist() 方法中使用持久性级别的语法为:
df.persist(StorageLevel.<level_value>)
下表总结了持久性级别的详细信息:
持久性级别 | 占用的空间 | 中央处理器时间 | 内存中? | 在磁盘上? |
---|---|---|---|---|
MEMORY_ONLY | 高 | 低 | 是的 | 不 |
MEMORY_ONLY_SER | 低 | 高 | 是的 | 不 |
MEMORY_AND_DISK | 高 | 中等 | 一些 | 一些 |
MEMORY_AND_DISK_SER | 低 | 高 | 一些 | 一些 |
DISK_ONLY | 低 | 高 | 不 | 是的 |
OFF_HEAP | 低 | 高 | 是(但堆外) | 不 |
假设您有以下有关群集的详细信息:
Number of nodes = 10
Number of cores in each node = 15 cores
RAM of each node = 61GB
为了确定内核的数量,我们遵循以下方法:
Number of Cores = number of concurrent tasks that can be run parallelly by the executor. The optimal value as part of a general rule of thumb is 5.
因此,要计算执行者的数量,我们遵循以下方法:
Number of executors = Number of cores/Concurrent Task
= 15/5
= 3
Number of executors = Number of nodes * Number of executor in each node
= 10 * 3
= 30 executors per Spark job
广播变量允许开发人员维护缓存在每台计算机上的只读变量,而不是随任务一起发送它的副本。它们用于有效地为大型输入数据集的每个节点提供副本。这些变量使用不同的算法广播到节点,以降低通信成本。
标准 | Spark数据集 | Spark数据帧 | SparkRDD |
---|---|---|---|
数据的表示 | Spark 数据集是数据帧和 RDD 的组合,具有静态类型安全和面向对象的接口等功能。 | Spark 数据帧是组织成命名列的分布式数据集合。 | Spark RDD 是没有架构的分布式数据集合。 |
优化 | 数据集利用催化剂优化器进行优化。 | 数据帧还利用催化剂优化器进行优化。 | 没有内置的优化引擎。 |
架构投影 | 数据集使用 SQL 引擎自动查找架构。 | 数据帧还会自动查找架构。 | 模式需要在RDD中手动定义。 |
聚合速度 | 数据集聚合比RDD快,但比数据帧慢。 | 由于提供了简单而强大的 API,数据帧中的聚合速度更快。 | RDD 比数据帧和数据集都慢,甚至可以执行数据分组等简单操作。 |
是的!Spark的主要特性是它与Hadoop的兼容性。这使得它成为一个强大的框架,因为使用这两者的组合有助于利用Hadoop的最佳YARN和HDFS功能来利用Spark的处理能力。
Hadoop可以通过以下方式与Spark集成:
稀疏向量由两个并行数组组成,其中一个数组用于存储索引,另一个用于存储值。这些向量用于存储非零值以节省空间。
val sparseVec: Vector = Vectors.sparse(5, Array(0, 4), Array(1.0, 2.0))
val denseVec = Vectors.dense(4405d,260100d,400d,5.0,4.0,198.0,9070d,1.0,1.0,2.0,0.0)
可以通过设置参数或对长时间运行的作业进行批量划分,然后将中间结果写入磁盘来自动触发清理任务。spark.cleaner.ttl
Spark Streaming涉及将数据流的数据分成X秒的批次,称为DStreams。这些DStreams允许开发人员将数据缓存到内存中,这在DStream的数据用于多个计算的情况下非常有用。数据的缓存可以使用 cache() 方法完成,也可以通过使用适当的持久性级别使用 persist() 方法来完成。通过Kafka、Flume等网络接收数据的输入流的默认持久化级别值设置为在2个节点上实现数据复制,实现容错。
val cacheDf = dframe.cache()
val persistDf = dframe.persist(StorageLevel.MEMORY_ONLY)
缓存的主要优点是:
Apache Spark提供了RDD的方法,它提供了编写职业的不同部分的机会,这些职业可以根据UNIX标准流使用任何语言。使用该方法,可以编写RDD转换,该转换可用于将RDD的每个元素读取为字符串。可以根据需要操作这些,结果可以显示为字符串。pipe()pipe()
Spark提供了一个名为GraphX的强大API,它扩展了Spark RDD以支持图形和基于图形的计算。Spark RDD的扩展属性称为弹性分布式属性图,它是一个具有多个并行边的有向多图。每条边和顶点都有关联的用户定义属性。平行边的存在表示同一组顶点之间存在多个关系。GraphX 有一组运算符,如子图、mapReduceTriplets、joinVertices 等,可以支持图计算。它还包括大量图形生成器和算法,用于简化与图形分析相关的任务。
Spark提供了一个非常强大,可扩展的基于机器学习的库,称为MLlib。该库旨在实现简单且可扩展的基于 ML 的常见算法,并具有分类、聚类、降维、回归过滤等功能。有关此库的更多信息可以从Spark的官方文档站点获得详细信息:https://spark.apache.org/docs/latest/ml-guide.html
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有