Spark

MaG1ciaN 2025-1-1
Post on:2025-1-1|Last edited: 2025-3-5|
type
status
date
slug
summary
tags
category
icon
password

Overview

大数据分析计算引擎
spark core
spark sql: HiveQL,每一个数据库表被当做一个RDD
spark streaming:实时分析
MLlib
GraphX

架构

主从架构,具体的“主”可以是YARN,关键是“主”的责任
Task :Spark 作业执行的最小单位,是对数据分区进行具体计算的工作单元。
Executor :执行多个Task
Worker :守护进程负责启动 Executor,与Master通信
Master: 控制中心,负责接收 Worker 节点的注册信息,了解各节点的资源状况;为提交的 Spark 应用程序分配资源,决定在哪些 Worker 节点上启动 Executor 进程以及分配多少资源;对任务进行调度,将任务合理分配到合适的 Executor 中执行;同时实时监控 Worker 节点和 Executor 的状态,处理故障和异常情况。
Driver 是运行用户编写的 Spark 应用程序的主程序,它是 Spark 应用程序的入口点,负责整个应用程序的调度和管理。在提交 Spark 作业时,Driver 程序会被启动,它会与集群管理器(如 Standalone 模式下的 Master、YARN 中的 ResourceManager 等)进行交互,为应用程序申请资源,并协调各个 Executor 执行具体的任务。
BlockManager 是 Spark 用于管理分布式数据块的模块,它在每个节点(包括 DriverExecutor)上都有一个实例。每个 BlockManager 负责管理所在节点上的数据块,包括数据块的存储、读取、删除等操作,以及与其他节点上的 BlockManager 进行数据块的传输。

RDD

RDD 是一个不可变的、可分区的、分布式的集合对象。它代表了一个可并行操作的元素集合,这些元素被分割成多个分区,分布在集群的不同节点上。RDD 可以从外部数据源(如 HDFS、数据库等)创建,也可以通过对其他 RDD 进行转换操作得到。
Resilient(弹性):RDD之间会形成有向无环图(DAG),如果RDD丢失了或者失效了,可以从父RDD重新计算得到。即容错性。
Distributed(分布式):RDD的数据是以逻辑分区的形式分布在集群的不同节点的。
Dataset(数据集):即RDD存储的数据记录,可以从外部数据生成RDD,例如Json文件,CSV文件,文本文件,数据库等。
transformations
actions
count,collect,reduce(f),foreach(f),take(n),first()

窄依赖

单对单,无需shuffle

宽依赖

rdd与父的每个分区都有关(多对多),groupByKey,reduceByKey,join,repartition,coalesce(shuffle=true)

checkpoint

持久化切断之前的血缘关系。

本地化

计算调度到离数据近的地方

缺陷

没有内置优化器
受jvm,java序列化,gc的影响
会发生溢写

DataFrame

DataFrame 是一种分布式的、具有命名列的数据集合,类似于传统关系数据库中的表。它可以看作是由行(Row)组成的分布式集合,每一行包含多个列,每个列都有一个名称和特定的数据类型。DataFrame 提供了一种高级的、声明式的编程接口,允许用户以类似于 SQL的方式对数据进行操作。

特点

  • 结构化数据处理:DataFrame 提供了结构化的数据表示,每列都有明确的名称和数据类型。这使得数据处理更加直观和方便,尤其适合处理具有固定结构的数据,如日志文件、数据库表等。
  • 优化的执行计划:Spark 的 Catalyst 优化器可以对 DataFrame 的操作进行优化,生成高效的执行计划。Catalyst 会对查询进行语义分析、逻辑优化和物理优化,从而提高查询的执行效率。
  • 多语言支持:DataFrame 支持多种编程语言,包括 Python(PySpark)、Scala、Java 和 R。这使得不同背景的开发者都可以方便地使用 DataFrame 进行数据处理。
  • 可扩展性:DataFrame 可以处理大规模的数据,通过分布式计算的方式将数据分布在集群的多个节点上进行并行处理。同时,它可以与 Spark 的其他组件(如 Spark SQL、Spark MLlib 等)无缝集成,实现更复杂的数据分析和机器学习任务。

DataSet

Dataset 是具有强类型的数据集合,它是对分布式数据的一种抽象表示。每个 Dataset 由一系列特定类型的对象组成,这些对象可以是自定义的 Java 或 Scala 类实例。Dataset 结合了面向对象编程的便利性和 Spark 的分布式计算能力,允许开发者以类型安全的方式对数据进行操作。

特点

  • 强类型检查:在编译时进行类型检查,能够提前发现类型不匹配的错误,减少运行时错误。例如,在 Scala 中定义一个包含自定义类对象的 Dataset,编译器会确保所有操作都符合该类的类型定义。
  • 优化的执行计划:和 DataFrame 一样,Dataset 可以利用 Catalyst 优化器对查询计划进行优化。Catalyst 会对查询进行语义分析、逻辑优化和物理优化,提高查询的执行效率。
  • 支持多种数据源:可以从多种数据源创建 Dataset,如文件系统(如 HDFS、本地文件系统)、关系型数据库、NoSQL 数据库等。同时,也能将 Dataset 保存到这些数据源中。
  • 统一的编程接口:提供了统一的编程接口,无论是处理结构化数据还是非结构化数据,都可以使用相同的 API 进行操作,方便开发者进行数据处理和分析。

分区

分区是 RDD、DataFrame 和 Dataset 等数据抽象的基本组成单元。一个 RDD 或其他数据结构可以被划分为多个分区,每个分区包含了数据的一部分子集,这些分区分布在集群的不同节点上,从而实现数据的分布式存储和处理。通常情况下一个分区对应一个Task

哈希

范围

spark中可以不指定范围,会进行自动采样

Spark的任务运行流程

Standalone

总结: 申请资源→提交代码→分解为Job→生成DAG→分解Stage→Task→Executor执行
1) 首先,SparkContext连接到Master,向Master注册并申请资源。 2) Worker定期发送心跳信息给Master并报告Executor状态。 3) Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪Worker上分配资源,然后在该Worker上获取资源,启动StandaloneExecutorBackend。 4) StandaloneExecutorBackend向SparkContext注册。 5) SparkContext将Application代码发送给StandaloneExecutorBackend,并且SparkContext解析Application代码,构建DAG图,并提交DAG Scheduler,分解成Stage(当碰到Action操作时),就会催生Job,每个Job中含有一个或多个Stage,然后分配到相应的Worker,最后提交给StandaloneExecutorBackend执行。 6) StandaloneExecutorBackend会建立Executor线程池,开始执行Task,并向SparkContext报告,直至Task完成。 7)所有Task完成后,SparkContext向Master注销,释放资源。

YARN

TODO

Shuffle

map(shuffle write) → shuffle read → reduce → final output

hash shuffle

file consolidation ,多个mapper 任务共享一个输出文件,如果有 E 个 Executor,每个 Executor 运行 C 个 mapper 任务,则文件数量从 E * C * R 减少到 E * R

Sort Shuffle


Map 阶段(Shuffle Write)
数据分区
  • 每个 mapper 任务会处理一个输入分区的数据。
  • 数据会根据 key 的哈希值分配到对应的输出分区中。
    • 分区号计算公式:partitionId = key.hashCode() % numPartitions
    • 其中,numPartitions 是 Shuffle 输出的分区数。
 写入内存缓冲区
  • 数据会被写入一个内存缓冲区(MemoryBuffer)。
  • 缓冲区的大小由 spark.shuffle.spill.initialMemoryThreshold 参数控制(默认值为 5MB)。
  • 缓冲区是一个数组,存储 (partitionId, key, value) 的三元组。
排序
  • 当缓冲区满时,数据会按照 分区号 和 key 进行排序。
    • 排序规则:
        1. 先按 partitionId 排序。
        1. 如果 partitionId 相同,则按 key 排序。
  • 排序后的数据会被溢写到磁盘文件。
溢写磁盘
  • 每次溢写会生成一个临时文件(spill file)。
  • 溢写文件的数量取决于内存缓冲区的使用情况。
  • 溢写文件中的数据是按分区号和 key 排序的。
文件合并
  • 当所有数据都处理完毕后,所有溢写文件会被合并成一个有序的文件。
  • 合并后的文件包含所有分区的数据,并且每个分区的数据是有序的。
  • 最终,每个 mapper 任务只会生成一个输出文件(data file)和一个索引文件(index file)。
    • data file:存储所有分区的数据。
    • index file:存储每个分区的偏移量,用于快速定位数据。

Shuffle 阶段(Shuffle Read)
拉取数据
  • 每个 reducer 任务会从所有 mapper 任务的输出文件中拉取自己负责的分区数据。
  • 通过索引文件,reducer 任务可以快速定位到所需的数据。
数据合并
  • 如果拉取的数据来自多个 mapper 任务,reducer 任务会将这些数据合并成一个有序的迭代器。
  • 合并过程中,数据会按 key 进行排序。

Reduce 阶段
 数据处理
  • 每个 reducer 任务会处理拉取到的数据,并生成最终结果。
  • 例如,在 reduceByKey 操作中,reducer 任务会对相同 key 的 value 进行聚合。
 输出结果
  • 最终结果会被写入到输出 RDD 中。
优势
文件数量减少到M

ByPass

非聚合,不会排序,为每个分区创建一个单独的文件但是最后会合并为一个文件,会有一个索引文件。

Tungsten Sort Shuffle

要实现 Tungsten Sort Shuffle 机制需要满足以下条件:
• Shuffle 依赖中不带聚合操作没有对输出进行排序的要求。
• Shuffle 的序列化器支持序列化值的重定位(当前仅支持 KryoSerializer Spark SQL 框架自定义的序列化器)。
• Shuffle 过程中的输出分区个数少于 16777216个。实际上,使用过程中还有其他一些限制,如引入 Page 形式的内存管理模型后,内部单条记录的长度不能超过 128 MB(具体内存模型可以参考PackedRecordPointer类)。另外,分区个数的限制也是该内存模型导致的。

Spark的参数调优

  1. 资源配置,num-executors,executor-memory,executor-cores
  1. RDD复用与持久化 rdd.cache() , rdd.persist()
  1. 提前过滤掉不必要的数据
  1. task数量应该为总CPU core的两到三倍
  1. 广播大变量(减少磁盘io)
  1. Kryo序列化
  1. 增大数据本地化等待时长
  1. mapPartitions,foreachPartition,repartition,coalesce,reduceByKey(预防OOM)
  1. shuffle调节map端缓冲区大小(减少溢写),调节reduce端数据缓冲区大小,reduce端数据重试次数(增大),reduce端数据等待间隔(增大),SortShuffle排序操作阈值(不排序)
  1. Jvm {内存,连接时长}
 
Stage的划分依据
Spark Stage划分依据主要是基于Shuffle。Shuffle是产生宽依赖RDD的算子,例如reduceByKey、repartition、sortByKey等算子。同一个Stage内的所有Transformation算子所操作的RDD都是具有相同的Partition数量的。Stage划分基于数据依赖关系的,一般分为两类:宽依赖与窄依赖。每个Stage里Task的数量由Stage最后一个RDD中的分区数决定。如果Stage要生成Result,则该Stage里的Task都是ResultTask,否则是ShuffleMapTask。
 
为什么要有Stage
并行执行和流水线,容错基本单位
 
Join
join key 相同,分区必然相同,对分区进行join
broadcast hash join
shuffle hash join
sort-merge join
Spark怎么解决小文件合并
coalesce,repartition,降低并行度,新增并行度=1 的任务?
 
Broadcast
广播变量会在每个worker节点上保留一份副本,而不是为每个Task保留份副本。这样有什么好处?可以想象,在一个worker有时同时会运行若干的Task,若把一个包含较大数据的变量为Task都复制一份,而且还需要通过网络传输,应用的处理效率一定会受到很大影响。
Spark会通过某种广播算法来进行广播变量的分发,这样可以减少通信成本。Spark使用了类似于BitTorrent协议的数据分发算法来进行广播变量的数据分发。
广播变量有一定的适用场景:当任务跨多个stage,且需要同样的数据时,或以反序列化的形式来缓存数据时。
 
Spark内存模型
堆内
Execution,Storage,User Memory,Reserved Memory
堆外
动态
Unroll
 
SQL 执行原理

DAG

当用户提交一个 Spark 作业时,Spark 会根据用户定义的操作序列构建 DAG。
最开始一个Stage遇到Shuffle就断开,不然归并到一个Stage。
具体步骤如下:
  1. 解析操作序列:Spark 会解析用户编写的代码,识别出其中的 RDD 操作,包括转换操作和行动操作。
  1. 构建 RDD 依赖关系:根据操作的顺序和逻辑,确定每个 RDD 之间的依赖关系,形成有向边。例如,当执行 map 操作时,会在输入 RDD 和输出 RDD 之间建立窄依赖边。
  1. 划分阶段(Stage):根据宽依赖将 DAG 划分为多个阶段(Stage)。每个阶段内的操作可以连续执行,不需要进行 Shuffle;而宽依赖会作为阶段的边界,因为 Shuffle 操作需要将数据在不同节点之间进行重新分布。
  1. 生成 DAG:将所有的 RDD 节点、操作节点和有向边组合起来,形成最终的 DAG。

作用

  • 优化执行计划:Spark 的 DAG 调度器可以根据 DAG 的结构对执行计划进行优化。例如,通过合并相邻的窄依赖操作,减少任务的创建和调度开销;对宽依赖操作进行优化,如使用广播变量减少数据传输等。
  • 实现并行计算:DAG 清晰地表示了 RDD 之间的依赖关系和数据流动方向,使得 Spark 可以并行地执行不同阶段的任务。不同阶段的任务可以在不同的节点上同时执行,从而充分利用集群的计算资源。
  • 容错处理:当某个任务失败时,Spark 可以根据 DAG 的依赖关系,只重新计算受影响的 RDD 分区,而不需要重新计算整个作业。这是因为 DAG 记录了每个 RDD 的创建过程和依赖关系,使得 Spark 能够准确地定位和恢复丢失的数据。
 

数据倾斜

慢,OOM
数据倾斜的表现:
1)Spark作业的大部分task都执行迅速,只有有限的几个task执行的非常慢,此时可能出现了数据倾斜,作业可以运行,但是运行得非常慢
2)Spark作业的大部分task都执行迅速,但是有的task在运行过程中会突然报出OOM,反复执行几次都在某一个task报出0OM错误,此时可能出现了数据倾斜,作业无法正常运行
定位数据倾斜问题:
1) 查阅代码中的shuffle算子,例如reduceByKey、countByKey、groupByKey、join等算子,根据代码逻辑判断此处是否会出现数据倾斜
2) 查看Spark作业的log文件,log文件对于错误的记录会精确到代码的某一行,可以根据异常定位到的代码位置来明确错误发生在第几个stage,对应的shuffle算子是哪一个
解决
避免shaffle,hive中先聚合 (聚合元数据)
过滤
提高shuffle中reduce的并行度
加上随机前缀聚合,去除前缀再聚合
自定义分区器
broadcast join reduce join 变为 map join
单独拆分
随机数扩容,一个扩容一个稀释
采样
(1)对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key
(2) 然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD
(3)接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD
(4) 再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了
(5) 而另外两个普通的RDD就照常join即可
(6)最后将两次join的结果使用union算子合并起来即可,就是最终的join结果

Spark Streaming

TODO
 

Examples

 
 
关于Notion nextLakeSoul 实习
Loading...