首页
关于
Search
1
SteamOS安装paru和yay
170 阅读
2
欢迎使用 Typecho
127 阅读
3
徕卡X | 冬日漫步
127 阅读
4
https自动跳转问题终极解决方案
116 阅读
5
Spark3中的Catalog组件设计
104 阅读
默认
摄影
后端开发
大数据
Spark
Flink
登录
Search
标签搜索
转载
ArlenDu
累计撰写
18
篇文章
累计收到
10
条评论
首页
栏目
默认
摄影
后端开发
大数据
Spark
Flink
页面
关于
搜索到
2
篇与
的结果
2024-12-12
Spark调优
Spark调优你的SQL是如何执行的?Catalyst优化器:逻辑计划解析:Catalyst就是要结合DataFrame的Schema信息,来确认计划中的表名、字段名、字段类型与实际数据是否一致逻辑计划优化:通过启发式规则,将Analyzed Logical Plan转换为Optimized Logical Plan。启发式规则包括以下几类:谓词下推,列裁剪,常量折叠。优化SparkPlan:主要是通过规则,将逻辑计划(要做什么)转换为(怎么做)的过程,其中最重要的则是JoinSelection。生成物理执行计划:主要是确认操作的输入要求,添加Shuffle,Sort等, 存储复用,子查询复用,UDF分发,最终通过Tungsten计划优化(全阶段代码生成)转化成可以执行的RDD分布式任务。Spark的五种Join策略。Tungsten(钨丝计划):数据结构设计: 紧凑的UnsafeRow的二进制数据格式(更低的存储开销,一个对象封装一条记录), 基于内存页的数据管理。全阶段代码生成(WSCG):基于同一Stage内操作符之间的调用关系,生成一份“手写代码”,来把所有计算融合为一个统一的函数。Spark核心概念如何理解弹性分布式数据集?属性名成员类型属性含义RDD特性partitions变量RDD的所有数据分片实体分布式partitioner方法划分数据分区的规则分布式dependencies变量生成该RDD所依赖的父RDD容错性compute方法生成该RDD的计算接口容错性RDD,DataFrame, DataSet的关系RDD,DataFrame, DataSet对比 RDDDataFrameDataSet不可变性✅✅✅分区✅✅✅Schema❌✅✅查询优化器❌✅✅API级别低高 (基于RDD实现)高(DataFrame的扩展)是否存储类型✅❌✅何时检测语法错误编译时编译时编译时何时检测分析错误编译时运行时编译时Spark的内存计算分布式数据缓存: Spark允许将分布式数据集缓存到计算节点的内存中,从而对其进行高效的访问。但是需要注意的是,只有需要频繁访问的数据集才有必要cache,对于一次性访问的数据集,cache不但不能提升执行效率,反而会产生额外的性能开销,让结果适得其反。DAG内部的流水式计算模式:对于我们经常说的Spark比Hive快,是因为Spark是基于内存计算的这种说法,其实是错误的,因为无论是Spark还是Hive,计算都是发生在内存中,真正的说法应该是“在同一个STAGE内,Spark会尽可能的使用内存计算,减少中间结果的落盘,并且通过融合计算来提升数据在内存中的转换效率,从而提升应用的整体性能”。DAG的划分过程:以Actions算子为起点,从后向前回溯DAG,以Shuffle操作为边界去划分Stages。意义:1.根据DAG可以进行任务的调度,无依赖的STAGE可以并行计算;2.构建DAG后当前STAGE发生故障可以根据DAG的血缘的父依赖计算,不用从头重新计算一次;那么,Spark调度如何进行任务的调度呢?工作流程如下:序号流程步骤调度系统组件1将DAG拆分为不同的运行阶段StagesDAGScheduler2创建分布式任务Tasks和任务组TaskSetDAGScheduler3获取集群内可用的硬件资源情况SchedulerBackend4按照调度规则决定优先调度哪些任务/组TaskScheduler5依序将分布式任务分发到执行器ExecutorTaskScheduler对于第5步来说,分发的原则是:Spark调度系统的原则是尽可能地让数据呆在原地(本地性级别)、保持不动,同时尽可能地把承载计算任务的代码分发到离数据最近的地方,从而最大限度地降低分布式系统中的网络开销。Spark调优CPU视角:并行并行指的是为了实现数据的分布式计算,分布式数据集被划分出来的份数。并行度明确了数据划分的粒度:并行度越高,数据的粒度越细,数据分片越多,数据越分散。由两个参数控制:spark.default.parallelism:设置RDD的默认并行度spark.sql.shuffle.partitions:Spark SQL开发框架下,指定了Shuffle Reduce阶段默认的并行度并发基本由下列参数控制:spark.executor.cores:Executor的线程池大小由参数spark.executor.cores (默认为1且基本不用改变) 决定,每个任务在执行期间需要消耗的线程数由spark.task.cpus配置项给定配置建议:首先,在一个Executor中,每个CPU线程能够申请到的内存比例是有上下限的,最高不超过1/N,最低不少于1/N/2,其中N代表线程池大小。其次,在给定线程池大小和执行内存的时候,并行度较低、数据分片较大容易导致CPU线程挂起,线程频繁挂起不利于提升CPU利用率,而并行度过高、数据过于分散会让调度开销更显著,也不利于提升CPU利用率。最后,在给定执行内存M、线程池大小N和数据总量D的时候,想要有效地提升CPU利用率,我们就要计算出最佳并行度P,计算方法是让数据分片的平均大小D/P坐落在(M/N/2, M/N)区间。这样,在运行时,我们的CPU利用率往往不会太差。内存视角:Spark内存模型:Execution Memory:执行内存,堆外内存用于执行分布式任务,如Shuffle、Sort和Aggregate等操作,堆内内存用于存储任务执行过程中产生的临时数据和广播变量;堆外内存则用于执行Shuffle,堆外排序,Netty网络通信等。Storage Memory:用于存储RDD或DataFrame中缓存的数据。对于需要高效序列化和反序列化的数据,可以通过使用堆外内存来减少JVM的GC压力。同时堆外内存还可以存储UnSafe紧凑结构的数据,可以减少内存开销和提高CPU缓存的利用率。Reserved Memory: Spark保留内存,用于存储Spark内部对象User Memeory:用于存储用户定义的数据结构内存分配:调参建议: 对于日常调优来说,使用默认的参数4G即可,如果不够可以增加executor的数量。这里要注意的一点是spark.executor.cores对于参数的影响,同一个executor的内存会根据这个参数进行切分,避免切的太碎导致OOM。硬盘视角:磁盘的作用: 溢出临时文件 存储Shuffle中间文件 缓存分布式数据集调参建议: 一般来说,磁盘是不需要调整的,使用默认的即可。但是如果可以,尽量使用SSD磁盘来提升,通过spark.local.dir指定。Spark性能杀手-ShuffleShuffle介绍1.普通Shuffle2.bypass 运行机制3.Tungsten Sort Shuffle 运行机制Tungsten Sort 是对普通 Sort 的一种优化,Tungsten Sort 会进行排序,但排序的不是内容本身,而是内容序列化后字节数组的指针(元数据),把数据的排序转变为了指针数组的排序,实现了直接对序列化后的二进制数据进行排序。由于直接基于二进制数据进行操作,所以在这里面没有序列化和反序列化的过程。内存的消耗大大降低,相应的,会极大的减少的 GC 的开销。Shuffle慢的原因Shuffle需要消耗所有的硬件资源。数据的分发需要写入内存,内存不够时又需要溢写到磁盘,跨节点的网络分发,也消耗了大量的网络IO。其次,Shuffle消耗的不同硬件资源之间很难达到平衡。资源消耗对比:网络IO>磁盘IO>内存IO如何避免Shuffle?尽可能的延迟shuffle的操作,比如先执行其他的过滤,聚合操作,再去做join。使用广播变量广播变量广播变量是一种分发机制,它一次性封装目标数据结构,以Executors为粒度去做数据分发。使用广播变量,由于本地节点就有全量数据,所以可以再在本地直接进行关联,避免了分布式数据集需要shuffle后再进行join的难题。如何使用?代码中显示指定。Spark代 API中使用broadcast方法进行广播。SparkSQL中使用SQL Hints 指定 /+ broadcast/使用参数自动调参:spark.sql.autoBroadcastJoinThreshold(默认值为10MB),也就是说,低于10M表在使用时,会自动优先选择broadcast join。需要注意的是,这个参数不是越大越好,要考虑到内存中是否放得下这么大的数据避免导致OOM。Spark3新特性介绍AQESparkSQL的优化历程经历了三个阶段:RBO(Rule Based Optimization,基于规则的优化) —> CBO(Cost Based Optimization,基于成本的优化,2.2版本中加入)-> AQE(Adaptive Query Execution,自适应查询执行)RBO和CBO都是静态的优化计划,虽然CBO可以根据表的真实情况进行一定的优化,但是一旦执行计划被提交,就不会再改变,而AQE的出现,则是解决了这个问题。AQE是Spark SQL的一种动态优化机制,在运行时,每当Shuffle Map阶段执行完毕,AQE都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化。AQE的三大特性:Join策略调整:如果某张表在过滤之后,尺寸小于广播变量阈值,这张表参与的数据关联就会从Shuffle Sort Merge Join降级(Demote)为执行效率更高的Broadcast Hash Join。自动分区合并:在Shuffle过后,Reduce Task数据分布参差不齐,AQE将自动合并过小的数据分区。自动倾斜处理:结合配置项,AQE自动拆分Reduce阶段过大的数据分区,降低单个Reduce Task的工作负载。DPP原理:DPP是基于分区裁剪实现的,如果过滤谓词中包含分区键,那么Spark SQL对分区表做扫描的时候,是完全可以跳过(剪掉)不满足谓词条件的分区目录,这就是分区剪裁,而动态分区裁剪可以根据Join Key中的数据去做分区裁剪。其中,触发DPP的三个条件:事实表必须是分区表,而且分区字段(可以是多个)必须包含Join Key。DPP仅支持等值Joins,不支持大于、小于这种不等值关联关系。维度表过滤之后的数据集要小于广播阈值。Join HintsSpark SQL在生成物理执行计划的时候,会根据规则去选择最优的join selection,但是所谓计划赶不上变化,预置的规则自然很难覆盖多样且变化无常的计算场景。因此,当我们掌握了不同Join策略的工作原理,结合我们对于业务和数据的深刻理解,完全可以自行决定应该选择哪种Join策略。使用方法: 在SQL中使用 /*+ Join Hints /
2024年12月12日
28 阅读
0 评论
0 点赞
2024-09-12
Spark3中的Catalog组件设计
接口定义 CatalogPlugin:catalog的顶级接口,该接口主要实现了catalog的name和defaultNamespace。比如V2SessionCatalog的默认实现就是name="spark_catalog",defaultNamespace="defualt"TableCatalog:定义Table的相关接口,包括listTable, loadTable, CreateTable, DropTable,AlterTable等。CatalogExtension:继承自TableCatalog,通过setDelegateCatalog()方法把上面的V2SessionCatalog 实例进行代理。经过实际测试发现,无法改变catalog的name,所以只能对原始的catalog进行功能扩展。DelegatingCatalogExtension:抽象类,包含了对CatalogExtendsion的所有方法实现。总结:如果需要自定义catalog,则需实现TableCatalog即可。如果需要对当前的catalog进行扩展,则实现CatalogExtension或者重写DelegatingCatalogExtension的方法。Catalog初始化过程 Spark通过CatalogManager管理多个catalog,通过spark.sql.catalog.${name} 可以注册多个catalog,Spark的默认实现则是spark.sql.catalog.spark_catalog。1.SparkSession在创建时,可以选择是否enableHiveSupport()来指定catalog类型,如果指定了则会实例化HiveExternalCatalog,不指定则实例InMemoryCatalog。2.在BaseSessionStateBuilder/HiveSessionStateBuilder则会使用上面实例化的externalCatalog创建catalog对象,再根据catalog对象创建V2SessionCatalog对象(V2SessionCatalog就是对SessionCatalog的封装)3.根据catalog和v2SessionCatalog创建CatalogManager实例,CatalogManager中内置一个catalogs的HashMap<String,CatalogPlugin>来管理catalog。4.再SQL中调用catalog时候,会通过CatalogManger.catalog(String name)返回对应名称的catalog实例,如果没有则通过Catalog.load(name, conf)来进行实例化5.对于默认的spark_catalog,如果需要进行扩展,则可以通过spark.sql.catalog.spark_catalog来指定对应的实现类,该类可以通过实现CatalogExtension,然后会自动调用其setDelegateCatalog()来设置默认的spark_catalog为代理对象,然后进行扩展即可。总结:在DQC SQL升级引擎的过程中,使用了Spark3的Catalog特性来支持原生的multi catalog关联查询。对于JDBC协议的数据库来说,则使用JDBCTableCatalog来进行扩展catalog即可;对于Hive来说,第一种方案是实现CatalogExtension来扩展,但是发现这种方案无法修改其catalog name来适配元数据中心的hive catalog名称;第二种方案而通过完全自定义TableCatalog则需要完全初始化一套SparkSessionCatalog,在实践的过程中发现,可以进行元数据的拉取,但是在loadTable读取数据的时候报“表不支持batch read"的问题;第三种方案是采用修改DQC SQL的hvie catalog名称统一为spark_catalog的方式来实现Hive Catalog的使用,算是一个取巧的办法,后续有时间则可以继续扩展SparkSessionCatalog来实现hive的适配。
2024年09月12日
104 阅读
2 评论
0 点赞