Spark调优
你的SQL是如何执行的?
Catalyst优化器:
- 逻辑计划解析:Catalyst就是要结合DataFrame的Schema信息,来确认计划中的表名、字段名、字段类型与实际数据是否一致
- 逻辑计划优化:通过启发式规则,将Analyzed Logical Plan转换为Optimized Logical Plan。启发式规则包括以下几类:谓词下推,列裁剪,常量折叠。
- 优化SparkPlan:主要是通过规则,将逻辑计划(要做什么)转换为(怎么做)的过程,其中最重要的则是JoinSelection。
- 生成物理执行计划:主要是确认操作的输入要求,添加Shuffle,Sort等, 存储复用,子查询复用,UDF分发,最终通过Tungsten计划优化(全阶段代码生成)转化成可以执行的RDD分布式任务。
Spark的五种Join策略。
Tungsten(钨丝计划):
- 数据结构设计: 紧凑的UnsafeRow的二进制数据格式(更低的存储开销,一个对象封装一条记录), 基于内存页的数据管理。
- 全阶段代码生成(WSCG):基于同一Stage内操作符之间的调用关系,生成一份“手写代码”,来把所有计算融合为一个统一的函数。
Spark核心概念
如何理解弹性分布式数据集?
属性名 | 成员类型 | 属性含义 | RDD特性 |
---|---|---|---|
partitions | 变量 | RDD的所有数据分片实体 | 分布式 |
partitioner | 方法 | 划分数据分区的规则 | 分布式 |
dependencies | 变量 | 生成该RDD所依赖的父RDD | 容错性 |
compute | 方法 | 生成该RDD的计算接口 | 容错性 |
RDD,DataFrame, DataSet的关系
RDD,DataFrame, DataSet对比
RDD | DataFrame | DataSet | |
---|---|---|---|
不可变性 | ✅ | ✅ | ✅ |
分区 | ✅ | ✅ | ✅ |
Schema | ❌ | ✅ | ✅ |
查询优化器 | ❌ | ✅ | ✅ |
API级别 | 低 | 高 (基于RDD实现) | 高(DataFrame的扩展) |
是否存储类型 | ✅ | ❌ | ✅ |
何时检测语法错误 | 编译时 | 编译时 | 编译时 |
何时检测分析错误 | 编译时 | 运行时 | 编译时 |
Spark的内存计算
分布式数据缓存: Spark允许将分布式数据集缓存到计算节点的内存中,从而对其进行高效的访问。但是需要注意的是,只有需要频繁访问的数据集才有必要cache,对于一次性访问的数据集,cache不但不能提升执行效率,反而会产生额外的性能开销,让结果适得其反。
DAG内部的流水式计算模式:对于我们经常说的Spark比Hive快,是因为Spark是基于内存计算的这种说法,其实是错误的,因为无论是Spark还是Hive,计算都是发生在内存中,真正的说法应该是“在同一个STAGE内,Spark会尽可能的使用内存计算,减少中间结果的落盘,并且通过融合计算来提升数据在内存中的转换效率,从而提升应用的整体性能”。
DAG的划分
过程:以Actions算子为起点,从后向前回溯DAG,以Shuffle操作为边界去划分Stages。
意义:1.根据DAG可以进行任务的调度,无依赖的STAGE可以并行计算;2.构建DAG后当前STAGE发生故障可以根据DAG的血缘的父依赖计算,不用从头重新计算一次;
那么,Spark调度如何进行任务的调度呢?
工作流程如下:
序号 | 流程步骤 | 调度系统组件 |
---|---|---|
1 | 将DAG拆分为不同的运行阶段Stages | DAGScheduler |
2 | 创建分布式任务Tasks和任务组TaskSet | DAGScheduler |
3 | 获取集群内可用的硬件资源情况 | SchedulerBackend |
4 | 按照调度规则决定优先调度哪些任务/组 | TaskScheduler |
5 | 依序将分布式任务分发到执行器Executor | TaskScheduler |
对于第5步来说,分发的原则是:Spark调度系统的原则是尽可能地让数据呆在原地(本地性级别)、保持不动,同时尽可能地把承载计算任务的代码分发到离数据最近的地方,从而最大限度地降低分布式系统中的网络开销。
Spark调优
CPU视角:
并行
并行指的是为了实现数据的分布式计算,分布式数据集被划分出来的份数。并行度明确了数据划分的粒度:并行度越高,数据的粒度越细,数据分片越多,数据越分散。
由两个参数控制:
spark.default.parallelism:设置RDD的默认并行度
spark.sql.shuffle.partitions:Spark SQL开发框架下,指定了Shuffle Reduce阶段默认的并行度
并发
基本由下列参数控制:
spark.executor.cores:Executor的线程池大小由参数spark.executor.cores (默认为1且基本不用改变) 决定,每个任务在执行期间需要消耗的线程数由spark.task.cpus配置项给定
配置建议:
首先,在一个Executor中,每个CPU线程能够申请到的内存比例是有上下限的,最高不超过1/N,最低不少于1/N/2,其中N代表线程池大小。
其次,在给定线程池大小和执行内存的时候,并行度较低、数据分片较大容易导致CPU线程挂起,线程频繁挂起不利于提升CPU利用率,而并行度过高、数据过于分散会让调度开销更显著,也不利于提升CPU利用率。
最后,在给定执行内存M、线程池大小N和数据总量D的时候,想要有效地提升CPU利用率,我们就要计算出最佳并行度P,计算方法是让数据分片的平均大小D/P坐落在(M/N/2, M/N)区间。这样,在运行时,我们的CPU利用率往往不会太差。
内存视角:
Spark内存模型:
Execution Memory:执行内存,堆外内存用于执行分布式任务,如Shuffle、Sort和Aggregate等操作,堆内内存用于存储任务执行过程中产生的临时数据和广播变量;堆外内存则用于执行Shuffle,堆外排序,Netty网络通信等。
Storage Memory:用于存储RDD或DataFrame中缓存的数据。对于需要高效序列化和反序列化的数据,可以通过使用堆外内存来减少JVM的GC压力。同时堆外内存还可以存储UnSafe紧凑结构的数据,可以减少内存开销和提高CPU缓存的利用率。
Reserved Memory: Spark保留内存,用于存储Spark内部对象
User Memeory:用于存储用户定义的数据结构
内存分配:
调参建议:
对于日常调优来说,使用默认的参数4G即可,如果不够可以增加executor的数量。这里要注意的一点是spark.executor.cores对于参数的影响,同一个executor的内存会根据这个参数进行切分,避免切的太碎导致OOM。
硬盘视角:
磁盘的作用:
溢出临时文件
存储Shuffle中间文件
缓存分布式数据集
调参建议:
一般来说,磁盘是不需要调整的,使用默认的即可。但是如果可以,尽量使用SSD磁盘来提升,通过spark.local.dir指定。
Spark性能杀手-Shuffle
Shuffle介绍
1.普通Shuffle
2.bypass 运行机制
3.Tungsten Sort Shuffle 运行机制
Tungsten Sort 是对普通 Sort 的一种优化,Tungsten Sort 会进行排序,但排序的不是内容本身,而是内容序列化后字节数组的指针(元数据),把数据的排序转变为了指针数组的排序,实现了直接对序列化后的二进制数据进行排序。由于直接基于二进制数据进行操作,所以在这里面没有序列化和反序列化的过程。内存的消耗大大降低,相应的,会极大的减少的 GC 的开销。
Shuffle慢的原因
- Shuffle需要消耗所有的硬件资源。数据的分发需要写入内存,内存不够时又需要溢写到磁盘,跨节点的网络分发,也消耗了大量的网络IO。
- 其次,Shuffle消耗的不同硬件资源之间很难达到平衡。资源消耗对比:网络IO>磁盘IO>内存IO
如何避免Shuffle?
- 尽可能的延迟shuffle的操作,比如先执行其他的过滤,聚合操作,再去做join。
- 使用广播变量
广播变量
广播变量是一种分发机制,它一次性封装目标数据结构,以Executors为粒度去做数据分发。
使用广播变量,由于本地节点就有全量数据,所以可以再在本地直接进行关联,避免了分布式数据集需要shuffle后再进行join的难题。
如何使用?
- 代码中显示指定。Spark代 API中使用broadcast方法进行广播。SparkSQL中使用SQL Hints 指定 /+ broadcast/
- 使用参数自动调参:spark.sql.autoBroadcastJoinThreshold(默认值为10MB),也就是说,低于10M表在使用时,会自动优先选择broadcast join。需要注意的是,这个参数不是越大越好,要考虑到内存中是否放得下这么大的数据避免导致OOM。
Spark3新特性介绍
AQE
SparkSQL的优化历程经历了三个阶段:RBO(Rule Based Optimization,基于规则的优化) —> CBO(Cost Based Optimization,基于成本的优化,2.2版本中加入)-> AQE(Adaptive Query Execution,自适应查询执行)
RBO和CBO都是静态的优化计划,虽然CBO可以根据表的真实情况进行一定的优化,但是一旦执行计划被提交,就不会再改变,而AQE的出现,则是解决了这个问题。AQE是Spark SQL的一种动态优化机制,在运行时,每当Shuffle Map阶段执行完毕,AQE都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化。
AQE的三大特性:
- Join策略调整:如果某张表在过滤之后,尺寸小于广播变量阈值,这张表参与的数据关联就会从Shuffle Sort Merge Join降级(Demote)为执行效率更高的Broadcast Hash Join。
- 自动分区合并:在Shuffle过后,Reduce Task数据分布参差不齐,AQE将自动合并过小的数据分区。
- 自动倾斜处理:结合配置项,AQE自动拆分Reduce阶段过大的数据分区,降低单个Reduce Task的工作负载。
DPP
原理:DPP是基于分区裁剪实现的,如果过滤谓词中包含分区键,那么Spark SQL对分区表做扫描的时候,是完全可以跳过(剪掉)不满足谓词条件的分区目录,这就是分区剪裁,而动态分区裁剪可以根据Join Key中的数据去做分区裁剪。
其中,触发DPP的三个条件:
事实表必须是分区表,而且分区字段(可以是多个)必须包含Join Key。
DPP仅支持等值Joins,不支持大于、小于这种不等值关联关系。
维度表过滤之后的数据集要小于广播阈值。
Join Hints
Spark SQL在生成物理执行计划的时候,会根据规则去选择最优的join selection,但是所谓计划赶不上变化,预置的规则自然很难覆盖多样且变化无常的计算场景。因此,当我们掌握了不同Join策略的工作原理,结合我们对于业务和数据的深刻理解,完全可以自行决定应该选择哪种Join策略。
使用方法: 在SQL中使用 /*+ Join Hints /
评论 (0)