标签搜索
侧边栏壁纸
  • 累计撰写 18 篇文章
  • 累计收到 10 条评论

Spark调优

Arlen
2024-12-12 / 0 评论 / 28 阅读 / 正在检测是否收录...

Spark调优

你的SQL是如何执行的?

d274c70c50994ce299c64edf25660c39.jpg

Catalyst优化器

  • 逻辑计划解析:Catalyst就是要结合DataFrame的Schema信息,来确认计划中的表名、字段名、字段类型与实际数据是否一致
  • 逻辑计划优化:通过启发式规则,将Analyzed Logical Plan转换为Optimized Logical Plan。启发式规则包括以下几类:谓词下推,列裁剪,常量折叠。
  • 优化SparkPlan:主要是通过规则,将逻辑计划(要做什么)转换为(怎么做)的过程,其中最重要的则是JoinSelection。
  • 生成物理执行计划:主要是确认操作的输入要求,添加Shuffle,Sort等, 存储复用,子查询复用,UDF分发,最终通过Tungsten计划优化(全阶段代码生成)转化成可以执行的RDD分布式任务。

Spark的五种Join策略。

19233270a5f84830aad82f14bcc89c2d.jpg

Tungsten(钨丝计划)

  • 数据结构设计: 紧凑的UnsafeRow的二进制数据格式(更低的存储开销,一个对象封装一条记录), 基于内存页的数据管理。
  • 全阶段代码生成(WSCG):基于同一Stage内操作符之间的调用关系,生成一份“手写代码”,来把所有计算融合为一个统一的函数。

Spark核心概念

如何理解弹性分布式数据集?

属性名成员类型属性含义RDD特性
partitions变量RDD的所有数据分片实体分布式
partitioner方法划分数据分区的规则分布式
dependencies变量生成该RDD所依赖的父RDD容错性
compute方法生成该RDD的计算接口容错性

RDD,DataFrame, DataSet的关系

Spark中RDD、DataFrame和DataSet的区别与联系-CSDN博客

RDD,DataFrame, DataSet对比

RDDDataFrameDataSet
不可变性
分区
Schema
查询优化器
API级别高 (基于RDD实现)高(DataFrame的扩展)
是否存储类型
何时检测语法错误编译时编译时编译时
何时检测分析错误编译时运行时编译时

Spark的内存计算

分布式数据缓存: Spark允许将分布式数据集缓存到计算节点的内存中,从而对其进行高效的访问。但是需要注意的是,只有需要频繁访问的数据集才有必要cache,对于一次性访问的数据集,cache不但不能提升执行效率,反而会产生额外的性能开销,让结果适得其反。

DAG内部的流水式计算模式:对于我们经常说的Spark比Hive快,是因为Spark是基于内存计算的这种说法,其实是错误的,因为无论是Spark还是Hive,计算都是发生在内存中,真正的说法应该是“在同一个STAGE内,Spark会尽可能的使用内存计算,减少中间结果的落盘,并且通过融合计算来提升数据在内存中的转换效率,从而提升应用的整体性能”。

DAG的划分

过程:以Actions算子为起点,从后向前回溯DAG,以Shuffle操作为边界去划分Stages

意义:1.根据DAG可以进行任务的调度,无依赖的STAGE可以并行计算;2.构建DAG后当前STAGE发生故障可以根据DAG的血缘的父依赖计算,不用从头重新计算一次;

那么,Spark调度如何进行任务的调度呢?

工作流程如下:

序号流程步骤调度系统组件
1将DAG拆分为不同的运行阶段StagesDAGScheduler
2创建分布式任务Tasks和任务组TaskSetDAGScheduler
3获取集群内可用的硬件资源情况SchedulerBackend
4按照调度规则决定优先调度哪些任务/组TaskScheduler
5依序将分布式任务分发到执行器ExecutorTaskScheduler

对于第5步来说,分发的原则是:Spark调度系统的原则是尽可能地让数据呆在原地(本地性级别)、保持不动,同时尽可能地把承载计算任务的代码分发到离数据最近的地方,从而最大限度地降低分布式系统中的网络开销

Spark调优

CPU视角:

并行

并行指的是为了实现数据的分布式计算,分布式数据集被划分出来的份数。并行度明确了数据划分的粒度:并行度越高,数据的粒度越细,数据分片越多,数据越分散。

由两个参数控制:

spark.default.parallelism:设置RDD的默认并行度

spark.sql.shuffle.partitions:Spark SQL开发框架下,指定了Shuffle Reduce阶段默认的并行度

并发

基本由下列参数控制:

spark.executor.cores:Executor的线程池大小由参数spark.executor.cores (默认为1且基本不用改变) 决定,每个任务在执行期间需要消耗的线程数由spark.task.cpus配置项给定

配置建议:

首先,在一个Executor中,每个CPU线程能够申请到的内存比例是有上下限的,最高不超过1/N,最低不少于1/N/2,其中N代表线程池大小。

其次,在给定线程池大小和执行内存的时候,并行度较低、数据分片较大容易导致CPU线程挂起,线程频繁挂起不利于提升CPU利用率,而并行度过高、数据过于分散会让调度开销更显著,也不利于提升CPU利用率。

最后,在给定执行内存M、线程池大小N和数据总量D的时候,想要有效地提升CPU利用率,我们就要计算出最佳并行度P,计算方法是让数据分片的平均大小D/P坐落在(M/N/2, M/N)区间。这样,在运行时,我们的CPU利用率往往不会太差。

内存视角:

Spark内存模型:

img

Execution Memory:执行内存,堆外内存用于执行分布式任务,如Shuffle、Sort和Aggregate等操作,堆内内存用于存储任务执行过程中产生的临时数据和广播变量;堆外内存则用于执行Shuffle,堆外排序,Netty网络通信等。

Storage Memory:用于存储RDD或DataFrame中缓存的数据。对于需要高效序列化和反序列化的数据,可以通过使用堆外内存来减少JVM的GC压力。同时堆外内存还可以存储UnSafe紧凑结构的数据,可以减少内存开销和提高CPU缓存的利用率。

Reserved Memory: Spark保留内存,用于存储Spark内部对象

User Memeory:用于存储用户定义的数据结构

内存分配:

img

调参建议:

​ 对于日常调优来说,使用默认的参数4G即可,如果不够可以增加executor的数量。这里要注意的一点是spark.executor.cores对于参数的影响,同一个executor的内存会根据这个参数进行切分,避免切的太碎导致OOM。

硬盘视角:

磁盘的作用:

溢出临时文件

存储Shuffle中间文件

缓存分布式数据集

调参建议:

​ 一般来说,磁盘是不需要调整的,使用默认的即可。但是如果可以,尽量使用SSD磁盘来提升,通过spark.local.dir指定。

Spark性能杀手-Shuffle

Shuffle介绍

1.普通Shuffle

img

2.bypass 运行机制

img

3.Tungsten Sort Shuffle 运行机制

Tungsten Sort 是对普通 Sort 的一种优化,Tungsten Sort 会进行排序,但排序的不是内容本身,而是内容序列化后字节数组的指针(元数据),把数据的排序转变为了指针数组的排序,实现了直接对序列化后的二进制数据进行排序。由于直接基于二进制数据进行操作,所以在这里面没有序列化和反序列化的过程。内存的消耗大大降低,相应的,会极大的减少的 GC 的开销。

Shuffle慢的原因

  • Shuffle需要消耗所有的硬件资源。数据的分发需要写入内存,内存不够时又需要溢写到磁盘,跨节点的网络分发,也消耗了大量的网络IO。
  • 其次,Shuffle消耗的不同硬件资源之间很难达到平衡。资源消耗对比:网络IO>磁盘IO>内存IO

如何避免Shuffle?

  • 尽可能的延迟shuffle的操作,比如先执行其他的过滤,聚合操作,再去做join。
  • 使用广播变量

广播变量

广播变量是一种分发机制,它一次性封装目标数据结构,以Executors为粒度去做数据分发。

使用广播变量,由于本地节点就有全量数据,所以可以再在本地直接进行关联,避免了分布式数据集需要shuffle后再进行join的难题。img

如何使用?

  • 代码中显示指定。Spark代 API中使用broadcast方法进行广播。SparkSQL中使用SQL Hints 指定 /+ broadcast/
  • 使用参数自动调参:spark.sql.autoBroadcastJoinThreshold(默认值为10MB),也就是说,低于10M表在使用时,会自动优先选择broadcast join。需要注意的是,这个参数不是越大越好,要考虑到内存中是否放得下这么大的数据避免导致OOM。

Spark3新特性介绍

AQE

SparkSQL的优化历程经历了三个阶段:RBO(Rule Based Optimization,基于规则的优化) —> CBO(Cost Based Optimization,基于成本的优化,2.2版本中加入)-> AQE(Adaptive Query Execution,自适应查询执行)

RBO和CBO都是静态的优化计划,虽然CBO可以根据表的真实情况进行一定的优化,但是一旦执行计划被提交,就不会再改变,而AQE的出现,则是解决了这个问题。AQE是Spark SQL的一种动态优化机制,在运行时,每当Shuffle Map阶段执行完毕,AQE都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化

AQE的三大特性:

  • Join策略调整:如果某张表在过滤之后,尺寸小于广播变量阈值,这张表参与的数据关联就会从Shuffle Sort Merge Join降级(Demote)为执行效率更高的Broadcast Hash Join。
  • 自动分区合并:在Shuffle过后,Reduce Task数据分布参差不齐,AQE将自动合并过小的数据分区。
  • 自动倾斜处理:结合配置项,AQE自动拆分Reduce阶段过大的数据分区,降低单个Reduce Task的工作负载。

DPP

原理:DPP是基于分区裁剪实现的,如果过滤谓词中包含分区键,那么Spark SQL对分区表做扫描的时候,是完全可以跳过(剪掉)不满足谓词条件的分区目录,这就是分区剪裁,而动态分区裁剪可以根据Join Key中的数据去做分区裁剪。

其中,触发DPP的三个条件:

事实表必须是分区表,而且分区字段(可以是多个)必须包含Join Key

DPP仅支持等值Joins,不支持大于、小于这种不等值关联关系

维度表过滤之后的数据集要小于广播阈值。

img

Join Hints

Spark SQL在生成物理执行计划的时候,会根据规则去选择最优的join selection,但是所谓计划赶不上变化,预置的规则自然很难覆盖多样且变化无常的计算场景。因此,当我们掌握了不同Join策略的工作原理,结合我们对于业务和数据的深刻理解,完全可以自行决定应该选择哪种Join策略。

使用方法: 在SQL中使用 /*+ Join Hints /

img

0

评论 (0)

取消