首页
关于
Search
1
SteamOS安装paru和yay
170 阅读
2
欢迎使用 Typecho
127 阅读
3
徕卡X | 冬日漫步
127 阅读
4
https自动跳转问题终极解决方案
116 阅读
5
Spark3中的Catalog组件设计
104 阅读
默认
摄影
后端开发
大数据
Spark
Flink
登录
Search
标签搜索
转载
ArlenDu
累计撰写
18
篇文章
累计收到
10
条评论
首页
栏目
默认
摄影
后端开发
大数据
Spark
Flink
页面
关于
搜索到
5
篇与
的结果
2024-12-12
Spark调优
Spark调优你的SQL是如何执行的?Catalyst优化器:逻辑计划解析:Catalyst就是要结合DataFrame的Schema信息,来确认计划中的表名、字段名、字段类型与实际数据是否一致逻辑计划优化:通过启发式规则,将Analyzed Logical Plan转换为Optimized Logical Plan。启发式规则包括以下几类:谓词下推,列裁剪,常量折叠。优化SparkPlan:主要是通过规则,将逻辑计划(要做什么)转换为(怎么做)的过程,其中最重要的则是JoinSelection。生成物理执行计划:主要是确认操作的输入要求,添加Shuffle,Sort等, 存储复用,子查询复用,UDF分发,最终通过Tungsten计划优化(全阶段代码生成)转化成可以执行的RDD分布式任务。Spark的五种Join策略。Tungsten(钨丝计划):数据结构设计: 紧凑的UnsafeRow的二进制数据格式(更低的存储开销,一个对象封装一条记录), 基于内存页的数据管理。全阶段代码生成(WSCG):基于同一Stage内操作符之间的调用关系,生成一份“手写代码”,来把所有计算融合为一个统一的函数。Spark核心概念如何理解弹性分布式数据集?属性名成员类型属性含义RDD特性partitions变量RDD的所有数据分片实体分布式partitioner方法划分数据分区的规则分布式dependencies变量生成该RDD所依赖的父RDD容错性compute方法生成该RDD的计算接口容错性RDD,DataFrame, DataSet的关系RDD,DataFrame, DataSet对比 RDDDataFrameDataSet不可变性✅✅✅分区✅✅✅Schema❌✅✅查询优化器❌✅✅API级别低高 (基于RDD实现)高(DataFrame的扩展)是否存储类型✅❌✅何时检测语法错误编译时编译时编译时何时检测分析错误编译时运行时编译时Spark的内存计算分布式数据缓存: Spark允许将分布式数据集缓存到计算节点的内存中,从而对其进行高效的访问。但是需要注意的是,只有需要频繁访问的数据集才有必要cache,对于一次性访问的数据集,cache不但不能提升执行效率,反而会产生额外的性能开销,让结果适得其反。DAG内部的流水式计算模式:对于我们经常说的Spark比Hive快,是因为Spark是基于内存计算的这种说法,其实是错误的,因为无论是Spark还是Hive,计算都是发生在内存中,真正的说法应该是“在同一个STAGE内,Spark会尽可能的使用内存计算,减少中间结果的落盘,并且通过融合计算来提升数据在内存中的转换效率,从而提升应用的整体性能”。DAG的划分过程:以Actions算子为起点,从后向前回溯DAG,以Shuffle操作为边界去划分Stages。意义:1.根据DAG可以进行任务的调度,无依赖的STAGE可以并行计算;2.构建DAG后当前STAGE发生故障可以根据DAG的血缘的父依赖计算,不用从头重新计算一次;那么,Spark调度如何进行任务的调度呢?工作流程如下:序号流程步骤调度系统组件1将DAG拆分为不同的运行阶段StagesDAGScheduler2创建分布式任务Tasks和任务组TaskSetDAGScheduler3获取集群内可用的硬件资源情况SchedulerBackend4按照调度规则决定优先调度哪些任务/组TaskScheduler5依序将分布式任务分发到执行器ExecutorTaskScheduler对于第5步来说,分发的原则是:Spark调度系统的原则是尽可能地让数据呆在原地(本地性级别)、保持不动,同时尽可能地把承载计算任务的代码分发到离数据最近的地方,从而最大限度地降低分布式系统中的网络开销。Spark调优CPU视角:并行并行指的是为了实现数据的分布式计算,分布式数据集被划分出来的份数。并行度明确了数据划分的粒度:并行度越高,数据的粒度越细,数据分片越多,数据越分散。由两个参数控制:spark.default.parallelism:设置RDD的默认并行度spark.sql.shuffle.partitions:Spark SQL开发框架下,指定了Shuffle Reduce阶段默认的并行度并发基本由下列参数控制:spark.executor.cores:Executor的线程池大小由参数spark.executor.cores (默认为1且基本不用改变) 决定,每个任务在执行期间需要消耗的线程数由spark.task.cpus配置项给定配置建议:首先,在一个Executor中,每个CPU线程能够申请到的内存比例是有上下限的,最高不超过1/N,最低不少于1/N/2,其中N代表线程池大小。其次,在给定线程池大小和执行内存的时候,并行度较低、数据分片较大容易导致CPU线程挂起,线程频繁挂起不利于提升CPU利用率,而并行度过高、数据过于分散会让调度开销更显著,也不利于提升CPU利用率。最后,在给定执行内存M、线程池大小N和数据总量D的时候,想要有效地提升CPU利用率,我们就要计算出最佳并行度P,计算方法是让数据分片的平均大小D/P坐落在(M/N/2, M/N)区间。这样,在运行时,我们的CPU利用率往往不会太差。内存视角:Spark内存模型:Execution Memory:执行内存,堆外内存用于执行分布式任务,如Shuffle、Sort和Aggregate等操作,堆内内存用于存储任务执行过程中产生的临时数据和广播变量;堆外内存则用于执行Shuffle,堆外排序,Netty网络通信等。Storage Memory:用于存储RDD或DataFrame中缓存的数据。对于需要高效序列化和反序列化的数据,可以通过使用堆外内存来减少JVM的GC压力。同时堆外内存还可以存储UnSafe紧凑结构的数据,可以减少内存开销和提高CPU缓存的利用率。Reserved Memory: Spark保留内存,用于存储Spark内部对象User Memeory:用于存储用户定义的数据结构内存分配:调参建议: 对于日常调优来说,使用默认的参数4G即可,如果不够可以增加executor的数量。这里要注意的一点是spark.executor.cores对于参数的影响,同一个executor的内存会根据这个参数进行切分,避免切的太碎导致OOM。硬盘视角:磁盘的作用: 溢出临时文件 存储Shuffle中间文件 缓存分布式数据集调参建议: 一般来说,磁盘是不需要调整的,使用默认的即可。但是如果可以,尽量使用SSD磁盘来提升,通过spark.local.dir指定。Spark性能杀手-ShuffleShuffle介绍1.普通Shuffle2.bypass 运行机制3.Tungsten Sort Shuffle 运行机制Tungsten Sort 是对普通 Sort 的一种优化,Tungsten Sort 会进行排序,但排序的不是内容本身,而是内容序列化后字节数组的指针(元数据),把数据的排序转变为了指针数组的排序,实现了直接对序列化后的二进制数据进行排序。由于直接基于二进制数据进行操作,所以在这里面没有序列化和反序列化的过程。内存的消耗大大降低,相应的,会极大的减少的 GC 的开销。Shuffle慢的原因Shuffle需要消耗所有的硬件资源。数据的分发需要写入内存,内存不够时又需要溢写到磁盘,跨节点的网络分发,也消耗了大量的网络IO。其次,Shuffle消耗的不同硬件资源之间很难达到平衡。资源消耗对比:网络IO>磁盘IO>内存IO如何避免Shuffle?尽可能的延迟shuffle的操作,比如先执行其他的过滤,聚合操作,再去做join。使用广播变量广播变量广播变量是一种分发机制,它一次性封装目标数据结构,以Executors为粒度去做数据分发。使用广播变量,由于本地节点就有全量数据,所以可以再在本地直接进行关联,避免了分布式数据集需要shuffle后再进行join的难题。如何使用?代码中显示指定。Spark代 API中使用broadcast方法进行广播。SparkSQL中使用SQL Hints 指定 /+ broadcast/使用参数自动调参:spark.sql.autoBroadcastJoinThreshold(默认值为10MB),也就是说,低于10M表在使用时,会自动优先选择broadcast join。需要注意的是,这个参数不是越大越好,要考虑到内存中是否放得下这么大的数据避免导致OOM。Spark3新特性介绍AQESparkSQL的优化历程经历了三个阶段:RBO(Rule Based Optimization,基于规则的优化) —> CBO(Cost Based Optimization,基于成本的优化,2.2版本中加入)-> AQE(Adaptive Query Execution,自适应查询执行)RBO和CBO都是静态的优化计划,虽然CBO可以根据表的真实情况进行一定的优化,但是一旦执行计划被提交,就不会再改变,而AQE的出现,则是解决了这个问题。AQE是Spark SQL的一种动态优化机制,在运行时,每当Shuffle Map阶段执行完毕,AQE都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化。AQE的三大特性:Join策略调整:如果某张表在过滤之后,尺寸小于广播变量阈值,这张表参与的数据关联就会从Shuffle Sort Merge Join降级(Demote)为执行效率更高的Broadcast Hash Join。自动分区合并:在Shuffle过后,Reduce Task数据分布参差不齐,AQE将自动合并过小的数据分区。自动倾斜处理:结合配置项,AQE自动拆分Reduce阶段过大的数据分区,降低单个Reduce Task的工作负载。DPP原理:DPP是基于分区裁剪实现的,如果过滤谓词中包含分区键,那么Spark SQL对分区表做扫描的时候,是完全可以跳过(剪掉)不满足谓词条件的分区目录,这就是分区剪裁,而动态分区裁剪可以根据Join Key中的数据去做分区裁剪。其中,触发DPP的三个条件:事实表必须是分区表,而且分区字段(可以是多个)必须包含Join Key。DPP仅支持等值Joins,不支持大于、小于这种不等值关联关系。维度表过滤之后的数据集要小于广播阈值。Join HintsSpark SQL在生成物理执行计划的时候,会根据规则去选择最优的join selection,但是所谓计划赶不上变化,预置的规则自然很难覆盖多样且变化无常的计算场景。因此,当我们掌握了不同Join策略的工作原理,结合我们对于业务和数据的深刻理解,完全可以自行决定应该选择哪种Join策略。使用方法: 在SQL中使用 /*+ Join Hints /
2024年12月12日
28 阅读
0 评论
0 点赞
2024-10-30
Flink的状态
状态种类 Keyed State:Keyed State 总是和 Key 相关联,只能在 KeyedStream 的函数或算子中使用,因为 Flink 中的 keyBy 操作保证了每一个键相关联的所有消息都会送给下游算子的同一个并行实例处理,因此 Keyed State 也可以看作是 Operator State 的一种分区(partitioned)形式,每一个键都关联一个状态分片(state-partition)。Operator State:Operator State 绑定到算子的每一个并行实例(sub-task) 中。状态后端 (StateBackend)MemoryStateBackend: 将状态存储在内存中,通常只在调试和开发环境中用FsStateBackend: FsStateBackend 会将状态存储到一个持久化的存储中,如 HDFS,只在 JobManager 的内存中存储一些 metadata。RocksDBStateBackend:状态存储在RockDB中,并且支持增量快照。在最新版的 Flink 中,这三种形式的存储后端都支持异步快照模式。四种类型:ValueStateListStateReducingStateMapState要获取状态,首先需要定义状态描述符(StateDescriptor)。状态描述符状态的名字(保证唯一性),状态的类型,以及部分状态需要的自定义函数。根据想要获取的状态的不同,状态描述符也分为 ValueStateDescriptor, ListStateDescriptor, ReducingStateDescriptor, MapStateDescriptor。自定义状态CheckpointedFunction接口ListCheckpointed接口RuntimeContext对于 Keyed State,通常都是通过 RuntimeContext 实例来获取,这通常需要在 rich functions 中才可以做到。 RuntimeContext 提供的获取状态的方法包括:ValueState<T> getState(ValueStateDescriptor<T>) ReducingState<T> getReducingState(ReducingStateDescriptor<T>) ListState<T> getListState(ListStateDescriptor<T>) MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)检查点(CheckPoint)检查点机制还需要依赖于 1.支持数据重播的数据源 2.状态的持久存储终端。Flink 实现了一个轻量级的分布式快照机制,其核心点在于 Barrier。 Coordinator 在需要触发检查点的时候要求数据源注入向数据流中注入 barrier, barrier 和正常的数据流中的消息一起向前流动,相当于将数据流中的消息切分到了不同的检查点中。当一个 operator 从它所有的 input channel 中都收到了 barrier,则会触发当前 operator 的快照操作,并向其下游 channel 中发射 barrier。当所有的 sink 都反馈收到了 barrier 后,则当前检查点创建完毕。Barrier对齐一个关键的问题在于,一些 operator 拥有多个 input channel,它往往不会同时从这些 channel 中接收到 barrier。如果 Operator 继续处理 barrier 先到达的 channel 中的消息,那么在所有 channel 的 barrier 都到达时,operator 就会处于一种混杂的状态。在这种情况下,Flink 采用对齐操作来保证 Exactly Once 特性。Operator 会阻塞 barrier 先到达的 channel,通常是将其流入的消息放入缓冲区中,待收到所有 input channel 的 barrier 后,进行快照操作,释放被阻塞的 channel,并向下游发射 barrier。 对齐操作会对流处理造成延时,但通常不会特别明显。如果应用对一致性要求比较宽泛的话,那么也可以选择跳过对齐操作。这意味着快照中会包含一些属于下一个检查点的数据,这样就不能保证 Exactly Once 特性,而只能降级为 At Least Once。异步快照前述的检查点创建流程中,在 operator 进行快照操作时,不再处理数据流中的消息。这种同步的方式会在每一次进行快照操作的时候引入延时。实际上,Flink 也支持采用异步的方式创建快照,这就要求 operator 在触发快照操作的时候创建一个不受后续操作影响的状态对象,通常选用 copy-on-write 的数据结构。Flink 中基于 RocketDB 的状态存储后端就可以支持异步操作。保存点(Savepoint)所谓的保存点,其实是用户手动触发的一种特殊的检查点。其本质就是检查点,但它相比检查点有两点不同:1.用户自行触发 2.当有新的已完成的检查点产生的时候,不会自动失效。可查询状态(Queryable State)Flink 自1.2起新增了一个 Queryable State 特性,允许从 Flink 系统外直接查询作业流水中的状态。这主要是来自于两方面的诉求:1)很多应用都有直接获取应用实时状态的需求,2)将状态频繁写入外部系统中可能是应用的瓶颈。有两种方式来使用 Queryable State :QueryableStateStream, 将 KeyedStream 转换为 QueryableStateStream,类似于 Sink,后续不能进行任何转换操作StateDescriptor#setQueryable(String queryableStateName),将 Keyed State 设置为可查询的 (不支持 Operator State)外部应用在查询 Flink 作业内部状态的时候要使用 QueryableStateClient, 提交异步查询请求来获取状态。下面的两张图大致给出了 Queryable State 的工作机制: 目前还存在的一些局限:只能支持 Keyed Operator对状态大小是否有限制(不支持 ListState)作业失败后如何保证可用性Clent API 的易用性参考Working with StateA Deep Dive into Rescalable State in Apache FlinkQueryable States in ApacheFlink - How it works
2024年10月30日
21 阅读
0 评论
0 点赞
2024-09-23
《转载》Flink之Akka RPC通信
1、简说AkkaFlink 内部节点之间的通信是用 Akka,比如 JobManager 和 TaskManager 之间的通信。 而 operator 之间的数据传输是利用 Netty,所以是不是有必要说一下Akka ?Akka和Actor并发问题的核心就是存在数据共享,同时有多个线程对一份数据进行修改,就会出现错乱的情况解决该问题一般有两种方式 :1、基于JVM内存模型的设计,通常需要通过加锁等同步机制保证共享数据的一致性。但是加锁在高并发的场景下,往往性能不是很好2、使用消息传递的方式Actor的基础就是消息传递,一个Actor可以认为是一个基本的计算单元,它能接收消息并基于其执行运算,它也可以发送消息给其他Actor。Actors 之间相互隔离,它们之间并不共享内存Actor 本身封装了状态和行为,在进行并发编程时,Actor只需要关注消息和它本身。而消息是一个不可变对象,所以 Actor 不需要去关注锁和内存原子性等一系列多线程常见的问题。所以Actor是由状态(State)、行为(Behavior)和邮箱(MailBox,可以认为是一个消息队列)三部分组成状态:Actor 中的状态指Actor对象的变量信息,状态由Actor自己管理,避免了并发环境下的锁和内存原子性等问题行为:Actor 中的计算逻辑,通过Actor接收到的消息来改变Actor的状态邮箱:邮箱是 Actor 和 Actor 之间的通信桥梁,邮箱内部通过 FIFO(先入先出)消息队列来存储发送方Actor消息,接受方Actor从邮箱队列中获取消息任意一个Actor即可发送消息,也可以接受消息Akka是一个构建在JVM上,基于Actor模型的的并发框架,支持Java和Scala两种API2、Akka详解使用 Akka 可以让你从为 Actor 系统创建基础设施和编写控制基本行为所需的初级代码中解脱出来。为了理解这一点,让我们看看在代码中创建的Actor与Akka在内部创建和管理的Actor之间的关系,Actor的生命周期和失败处理Akka的Actor层级Akka的Actor总是属于父Actor。通常,你可以通过调用 getContext().actorOf() 来创建 Actor。与创建一个“独立的”Actor不同,这会将新Actor作为一个子节点注入到已经存在的树中,创建Actor的Actor成为新创建的子Actor的父级。你可能会问,你创造的第一个Actor的父节点是谁?如下图所示,所有的 Actor 都有一个共同的父节点,即用户守护者。可以使用 system.actorOf() 在当前Actor下创建新的Actor实例。创建 Actor 将返回一个有效的 URL 引用。例如,如果我们用 system.actorOf(..., "someActor") 创建一个名为 someActor 的 Actor,它的引用将包括路径 /user/someActor事实上,在你在代码中创建 Actor 之前,Akka 已经在系统中创建了三个 Actor 。这些内置的 Actor 的名字包含 guardian ,因为他们守护他们所在路径下的每一个子 Actor。守护者 Actor 包括 :/ ,根守护者( root guardian )。这是系统中所有Actor的父Actor,也是系统本身终止时要停止的最后一个 Actor/user ,守护者( guardian )。这是用户创建的所有Actor的父 Actor。不要让用户名混淆,它与最终用户和用户处理无关。使用Akka库创建的每个Actor都将有一个事先准备的固定路径/user//system ,系统守护者( system guardian )pom.xml :<properties> <akka.version>2.6.9</akka.version> </properties> <dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor-typed_2.13</artifactId> <version>${akka.version}</version> </dependency> </dependencies>示例:package com.journey.flink.akka.task2; import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; public class HierarchyActorTest { public static void main(String[] args) throws Exception { ActorSystem system = ActorSystem.create("testSystem"); ActorRef firstActor = system.actorOf(MyPrintActor.props(), "firstActor"); System.out.println("firstActor : " + firstActor); firstActor.tell("print_info", ActorRef.noSender()); System.out.println(">>> Press ENTER to exit <<<"); try { System.in.read(); } finally { system.terminate(); } } // 创建Actor static class MyPrintActor extends AbstractActor { static Props props() { return Props.create(MyPrintActor.class, MyPrintActor::new); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals("print_info", p -> { ActorRef secondActorRef = getContext().actorOf(Props.empty()); System.out.println("secondActorRef : " + secondActorRef); }).build(); } } }输出结果 :firstActor : Actor[akka://testSystem/user/firstActor#-1802697549] >>> Press ENTER to exit <<< secondActorRef : Actor[akka://testSystem/user/firstActor/$a#-1282757800]两条路径都以akka://testSystem/开头。因为所有 Actor的引用都是有效的URL, akka://是协议字段的值ActorSystem名为testSystem ,但它可以是任何其他名称。如果启用了多个系统之间的远程通信,则URL的这一部分包括主机名和端口,以便其他系统可以在网络上找到它,下面会有案例因为第二个 Actor的引用包含路径 /firstActor/ ,这个标识它为第一个Actor的子ActorActor引用的最后一部分,即#-1802697549和#-1282757800是唯一标识符Actor的生命周期既然了解了Actor层次结构的样子,你可能会想 : 为什么我们需要这个层次结构?它是用来干什么的?层次结构的一个重要作用是安全地管理Actor的生命周期。接下来,我们来考虑一下,这些知识如何帮助我们编写更好的代码Actor在被创建时就会出现,然后在用户请求时被停止。每当一个Actor被停止时,它的所有子 Actor也会被递归地停止。这种行为大大简化了资源清理,并有助于避免诸如由打开的套接字和文件引起的资源泄漏要停止Actor,建议的模式是调用Actor内部的 getContext().stop(getSelf()) 来停止自身,通常是对某些用户定义的停止消息的响应,或者当Actor完成其任务时Akka Actor的API暴露了许多生命周期的钩子,你可以在 Actor 的实现中覆盖这些钩子。最常用的是 preStart() 和 postStop() 方法preStart() 在 Actor 启动之后但在处理其第一条消息之前调用postStop() 在 Actor 停止之前调用,在此时之后将不再处理任何消息示例:package com.journey.flink.akka.task3; import akka.actor.AbstractActor; import akka.actor.Props; public class Actor1 extends AbstractActor { static Props props() { return Props.create(Actor1.class, Actor1:: new); } @Override public void preStart() throws Exception { System.out.println("first actor started"); getContext().actorOf(Actor2.props(), "second"); } @Override public void postStop() throws Exception { System.out.println("first actor stopped"); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals("stop", s -> { getContext().stop(getSelf()); }).build(); } } package com.journey.flink.akka.task3; import akka.actor.AbstractActor; import akka.actor.Props; public class Actor2 extends AbstractActor { static Props props() { return Props.create(Actor2.class, Actor2::new); } @Override public void preStart() throws Exception { System.out.println("second actor started"); } @Override public void postStop() throws Exception { System.out.println("second actor stopped"); } @Override public Receive createReceive() { return receiveBuilder().build(); } } package com.journey.flink.akka.task3; import akka.actor.ActorRef; import akka.actor.ActorSystem; public class LifeCycleMain { public static void main(String[] args) { ActorSystem system = ActorSystem.create("testSystem"); ActorRef first = system.actorOf(Actor1.props(), "first"); first.tell("stop", ActorRef.noSender()); } }打印输出 :first actor started second actor started second actor stopped first actor stopped当我们停止 first Actor 时,它会在停止自身之前,先停止了它的子 Actor second 。这个顺序是严格的,在调用父Actor的 postStop() 钩子之前,会先调用所有子Actor的 postStop() 钩子失败处理父 Actor 和子 Actor 在他们的生命周期中是相互联系的。当一个 Actor 失败(抛出一个异常或从接收中冒出一个未处理的异常)时,它将暂时挂起。如前所述,失败信息被传播到父 Actor,然后父 Actor 决定如何处理由子 Actor 引起的异常。这样,父 Actor 就可以作为子 Actor 的监督者( supervisors )。默认的监督策略是停止并重新启动子 Actor。如果不更改默认策略,所有失败都会导致重新启动示例:package com.journey.flink.akka.task4; import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; public class SupervisingActor extends AbstractActor { public static void main(String[] args) throws Exception { ActorSystem system = ActorSystem.create("testSystem"); ActorRef supervisingActor = system.actorOf(SupervisingActor.props(), "supervising-actor"); supervisingActor.tell("failChild", ActorRef.noSender()); } static Props props() { return Props.create(SupervisingActor.class, SupervisingActor :: new); } ActorRef child = getContext().actorOf(SupervisedActor.props(), "supervised-actor"); @Override public Receive createReceive() { return receiveBuilder() .matchEquals("failChild", f -> { child.tell("fail", getSelf()); }).build(); } } class SupervisedActor extends AbstractActor { static Props props() { return Props.create(SupervisedActor.class, SupervisedActor::new); } @Override public void preStart() throws Exception { System.out.println("supervised actor started"); } @Override public void postStop() throws Exception { System.out.println("supervised actor stopped"); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals("fail", f -> { System.out.println("supervised actor fails now"); throw new Exception("I failed"); }).build(); } }打印如下 :supervised actor started supervised actor fails now supervised actor stopped supervised actor started [ERROR] [04/04/2023 10:37:51.084] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/supervising-actor/supervised-actor] I failed java.lang.Exception: I failed我们看到失败后,被监督的 Actor 停止并立即重新启动,我们使用了preStart()和postStop()钩子,这是重启后和重启前 默认调用的钩子,因此我们无法区分 Actor 内部是第一次启动还是重启。实际上,在重新启动时,调用 的是preRestart()和postRestart()方法,但如果不重写这两个方法,则默认委托给preStart()Actor剖析由于 Akka 实施父级监督,每个 Actor 都受到其父级的监督并且监督其子级定义 Actor 类Actor 类是通过继承 AbstractActor 类并在 createReceive 方法中设置“初始行为”来实现 的createReceive方法没有参数,并返回AbstractActor.Receive。它定义了 Actor 可以处理哪些消 息,以及如何处理消息的实现。可以使用名为ReceiveBuilder的生成器来构建此类行为。在 AbstractActor中,有一个名为receiveBuilder的方便的工厂方法 package com.journey.flink.akka.task5; import akka.actor.AbstractActor; public class MyActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder() .match( String.class, info -> { System.out.println("Received String message " + info); } ) .matchAny( e -> { System.out.println("received unknown message"); } ) .build(); } }PropsProps 是一个配置类,用于指定创建 Actor 的选项,将其视为不可变的,因此可以自由共享,用于创建 Actor 的方法,包括关联的部署信息Props.create(Actor1.class, Actor1::new)Actor APIAbstractActor类定义了一个名为createReceive的方法,该方法用于设置 Actor 的“初始行为”此外,它还提供 :getSelf(),对 Actor 的ActorRef的引用getSender(),前一次接收到的消息的发送方 Actor 的引用supervisorStrategy(),用户可重写定义用于监视子 Actor 的策略getContext() ,公开 Actor 和当前消息的上下文信息,例如: 创建子 Actor 的工厂方法(actorOf)Actor核心概念发送消息tell 的意思是“发送并忘记( fire-and-forget )”,例如异步发送消息并立即返回,target.tell(message, getSelf())ask 异步发送消息,并返回一个表示可能的答复,ask模式涉及 Actor 和Future回复消息如果你想有一个回复消息的句柄,可以使用 getSender() ,它会给你一个 ActorRef 。你可 以通过使用 getSender().tell(replyMsg, getSelf()) 发送 ActorRef 来进行回复。你还 可以存储 ActorRef 以供稍后回复或传递给其他 Actorpublic class ActionCommunication { static class Actor1 extends AbstractActor { private ActorRef actor2 = getContext().actorOf(Actor2.props(), "actor2"); static Props props() { return Props.create(Actor1.class, Actor1::new); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals("hello", f -> { actor2.tell("hello", getSelf()); }) .matchEquals("ack", f -> { System.out.println("nice to meet you too."); }) .build(); } } static class Actor2 extends AbstractActor { static Props props() { return Props.create(Actor2.class, Actor2::new); } @Override public Receive createReceive() { return receiveBuilder().matchEquals("hello", f -> { System.out.println("nice to meet you."); getSender().tell("ack", getSelf()); }).build(); } } public static void main(String[] args) throws Exception { ActorSystem system = ActorSystem.create("testSystem"); ActorRef firstActor = system.actorOf(Actor1.props(), "first"); firstActor.tell("hello", ActorRef.noSender()); } }接受超时package com.journey.flink.akka.task5; import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.ReceiveTimeout; import java.time.Duration; public class TimeoutActor extends AbstractActor { public TimeoutActor() { // 设置初始化延迟 getContext().setReceiveTimeout(Duration.ofMillis(100)); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals("hello", s -> { // 其实就是如果超过100ms没有接收消息,就会调用 ReceiveTimeout getContext().setReceiveTimeout(Duration.ofMillis(100)); }) .match(ReceiveTimeout.class, r -> { System.out.println("超时"); getContext().cancelReceiveTimeout(); getContext().stop(getSelf()); }) .build(); } public static void main(String[] args) throws Exception { ActorSystem system = ActorSystem.create("testSystem"); ActorRef timeoutActor2 = system.actorOf(Props.create(TimeoutActor.class, TimeoutActor::new)); int num = 0; while (num <= 10) { System.out.println("send...." + num); timeoutActor2.tell("hello", ActorRef.noSender()); Thread.sleep(10); num++; } } }Actor的调度器调度器(Dispatchers)是 Akka 核心的一部分,这意味着它们也是 akka-actor 依赖的一部分默认调度器每个 ActorSystem 都将有一个默认的调度器,在没有为 Actor 配置其他内容的情况下使用该调度器。如果ActorSystem是通过ExecutionContext传入来创建的,则此ExecutionContext将用作此ActorSystem中所有程序的执行位置。如果没有给出ExecutionContext,它将回退到akka.actor.default-dispatcher.default-executor.fallback作为执行上下文。默认情况下,这是一个“fork-join-executor”,它在大多数情况下都有出色的性能为 Actor 设置调度器# 配置一个调度器 my-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # 使用哪种ExecutionService executor = "fork-join-executor" # 配置fork join池 fork-join-executor { # 最小线程数 parallelism-min = 2 # Parallelism (threads) ... ceil(available processors * factor) parallelism-factor = 2.0 # Max number of threads to cap factor-based parallelism number to parallelism-max = 10 } # 吞吐量定义了最大的消息数量 # 一个actor最多只能占用某线程100个消息的时间 # 设置为1可以尽可能公平地 throughput = 100 }代码中定义 :ActorRef myActor = system.actorOf(Props.create(MyActor.class).withDispatcher("my-dispatcher"), "myactor3");调度器类型有三种不同类型的消息调度器Dispatcher : 这是一个基于事件的调度程序,它将一组 Actor 绑定到线程池。如果未指定调度器,则使用默认调度器PinnedDispatcher : 这个调度器为每个使用它的演员分配一个独特的线程;即每个Actor将拥有自己的线程池,池中只有一个线程CallingThreadDispatcher : 该调度程序仅在当前线程上运行调用。这个调度程序不会创建任何新的线程,但可以同时为不同的线程使用同一个actor,主要用于测试3、Flink中基于Akka的RPCFLink RPC核心组件Flink 中的 RPC 实现主要在 flink-runtime 模块下的 org.apache.flink.runtime.rpc 包中,涉及到的最重要的 API 主要是以下这四个RpcGateway网关,都是 RpcGateWay 的子类接口是用于远程调用的代理接口。 RpcGateway 提供了获取其所代理的 RpcEndpoint 的地址的方法在实现一个提供 RPC 调用的组件时,通常需要先定一个接口,该接口继承 RpcGateway 并约定好提供的远程调用的方法,getAddress() & getHostname()RpcServiceRPC服务的接口,用于连接到一个远程的RPC server,或者启动一个rpc server来转发远程调用到rpcEndpoint是 RpcEndpoint 的运行时环境, RpcService 提供了启动 RpcEndpoint , 连接到远端 RpcEndpoint 并返回远端 RpcEndpoint 的代理对象等方法。此外, RpcService 还提供了某些异步任务或者周期性调度任务的方法RpcServer相当于 RpcEndpoint 自身的的代理对象(self gateway)。 RpcServer 是 RpcService 在启动了 RpcEndpoint 之后返回的对象,每一个 RpcEndpoint 对象内部都有一个 RpcServer 的成员变量,通过 getSelfGateway 方法就可以获得自身的代理,然后调用该Endpoint 提供的服务RpcEndpoint是对 RPC 框架中提供具体服务的实体的抽象,所有提供远程调用方法的组件都需要继承 该抽象类类似于Acotr,封装了传输的业务逻辑源码分析RpcGateway :/** * TODO 定义通信行为;用于远程调用RpcEndpoint的某些方法,可以理解为对方的客户端代理 */ public interface RpcGateway { String getAddress(); String getHostname(); }RpcEndpoint : 启动rpcServerpublic abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { protected final Logger log = LoggerFactory.getLogger(getClass()); /** * 1、根据提供的RpcEndpoint来启动和停止RpcServer(Actor) * 2、根据提供的地址连接到(对方)RpcServer,并返回一个RpcGateway * 3、延迟、立刻调度Runnable、Callable */ private final RpcService rpcService; /** / protected final RpcServer rpcServer; ..... protected RpcEndpoint(final RpcService rpcService, final String endpointId) { // 保存rpcService和endpointId this.rpcService = checkNotNull(rpcService, "rpcService"); this.endpointId = checkNotNull(endpointId, "endpointId"); // 通过 RpcService 启动RpcServer /** * 构造的时候调用 rpcService.startServer()启动RpcServer,进入可以接受处理请求的状态,最后将RpcServer绑定到主线程上 * 真正执行起来 * 在RpcEndpoint中还定义了一些放入如 runAsync(Runnable)、callAsync(Callable,Time)方法来执行Rpc调用,值得注意的是在Flink * 的设计中,对于同一个Endpoint,所有的调用都运行在主线程,因此不会有并发问题,当启动RpcEndpoint进行Rpc调用时,其会委托RpcServer进行处理 */ this.rpcServer = rpcService.startServer(this); // 主线程执行器,所有调用在主线程中串行执行 this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread); } .... @Override public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) { checkNotNull(rpcEndpoint, "rpc endpoint"); /** * 根据RpcEndpoint的类型来创建对应的Actor,目前支持两种Actor的创建 * 1、AkkaRpcActor * 2、FencedAkkaRpcActor,对AkkaRpcActor进行扩展,能够过滤到与指定token无关的消息 */ final SupervisorActor.ActorRegistration actorRegistration = registerAkkaRpcActor(rpcEndpoint); // 这里拿到的是AkkaRpcActor的引用 final ActorRef actorRef = actorRegistration.getActorRef(); final CompletableFuture<Void> actorTerminationFuture = actorRegistration.getTerminationFuture(); LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path()); final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef); final String hostname; Option<String> host = actorRef.path().address().host(); if (host.isEmpty()) { hostname = "localhost"; } else { hostname = host.get(); } // 提取集成RpcEndpoint的所有子类 Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass())); implementedRpcGateways.add(RpcServer.class); implementedRpcGateways.add(AkkaBasedEndpoint.class); // 对上述指定的类集合进行代理 final InvocationHandler akkaInvocationHandler; if (rpcEndpoint instanceof FencedRpcEndpoint) { // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler akkaInvocationHandler = new FencedAkkaInvocationHandler<>( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), actorTerminationFuture, ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken, captureAskCallstacks); implementedRpcGateways.add(FencedMainThreadExecutable.class); } else { akkaInvocationHandler = new AkkaInvocationHandler( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), actorTerminationFuture, captureAskCallstacks); } // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader = getClass().getClassLoader(); // 针对RpcServer生成一个动态代理 @SuppressWarnings("unchecked") RpcServer server = (RpcServer) Proxy.newProxyInstance( classLoader, implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]), akkaInvocationHandler); return server; }AkkaRpcActor :@Override public Receive createReceive() { return ReceiveBuilder.create() // TODO 重点 // TODO 如果是 RemoteHandshakeMessage 信息,执行 handleHandshakeMessage,处理握手消息 .match(RemoteHandshakeMessage.class, this::handleHandshakeMessage) // TODO 如果是 ControlMessages,执行 handleControlMessage,如启动、停止、中断 .match(ControlMessages.class, this::handleControlMessage) // TODO 其他情况,handleMessage .matchAny(this::handleMessage) .build(); } private void handleMessage(final Object message) { if (state.isRunning()) { mainThreadValidator.enterMainThread(); try { // 处理消息 handleRpcMessage(message); } finally { mainThreadValidator.exitMainThread(); } } else { log.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.", rpcEndpoint.getClass().getName(), message.getClass().getName()); sendErrorIfSender(new AkkaRpcException( String.format("Discard message, because the rpc endpoint %s has not been started yet.", rpcEndpoint.getAddress()))); } }connect :@Override public <C extends RpcGateway> CompletableFuture<C> connect( final String address, final Class<C> clazz) { // 连接远程Rpc Server,返回的是代理对象 return connectInternal( address, clazz, (ActorRef actorRef) -> { Tuple2<String, String> addressHostname = extractAddressHostname(actorRef); return new AkkaInvocationHandler( addressHostname.f0, addressHostname.f1, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), null, captureAskCallstacks); }); } private <C extends RpcGateway> CompletableFuture<C> connectInternal( final String address, final Class<C> clazz, Function<ActorRef, InvocationHandler> invocationHandlerFactory) { checkState(!stopped, "RpcService is stopped"); LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.", address, clazz.getName()); // 根据Akka Actor地址获取ActorRef final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address); // 发送一个握手成功的消息给远程Actor final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose( (ActorRef actorRef) -> FutureUtils.toJava( Patterns .ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), configuration.getTimeout().toMilliseconds()) .<HandshakeSuccessMessage>mapTo(ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class)))); // 创建动态代理,并返回 return actorRefFuture.thenCombineAsync( handshakeFuture, (ActorRef actorRef, HandshakeSuccessMessage ignored) -> { // AkkaInvocationHandler,针对客户端会调用 invokeRpc InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef); // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader = getClass().getClassLoader(); // 创建动态代理 @SuppressWarnings("unchecked") C proxy = (C) Proxy.newProxyInstance( classLoader, new Class<?>[]{clazz}, invocationHandler); return proxy; }, actorSystem.dispatcher()); }AkkaInvocationHandler : 同时服务端和客户端同时使用@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class<?> declaringClass = method.getDeclaringClass(); Object result; // 判断方法的类是否为指定的类,符合如下的类,执行本地调用,否则实行远程调用 if (declaringClass.equals(AkkaBasedEndpoint.class) || declaringClass.equals(Object.class) || declaringClass.equals(RpcGateway.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(RpcServer.class)) { result = method.invoke(this, args); } else if (declaringClass.equals(FencedRpcGateway.class)) { throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" + method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " + "fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " + "retrieve a properly FencedRpcGateway."); } else { // TODO 客户端会进行这里,进行远程的调用 result = invokeRpc(method, args); } return result; }4、Flink RPC实例<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor-typed_2.13</artifactId> <version>2.6.9</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-remote_2.13</artifactId> <version>2.6.9</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-rpc-akka</artifactId> <version>1.17.0</version> </dependency>定义TaskGateway接口,继承RpcGateway接口 :package org.apache.flink.runtime.rpc.akka; import org.apache.flink.runtime.rpc.RpcGateway; public interface TaskGateway extends RpcGateway { String sayHello(String name); }创建一个TaskEndpoint,继承RpcEndpoint同时实现TaskGateway接口package org.apache.flink.runtime.rpc.akka; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; public class TaskEndpoint extends RpcEndpoint implements TaskGateway { public TaskEndpoint(RpcService rpcService) { super(rpcService); } @Override public String sayHello(String name) { return "hello " + name; } }Server端 :package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorSystem; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration; public class FlinkRpcServer { public static void main(String[] args) throws Exception { // 1. 创建RPC服务 ActorSystem defaultActorSystem = AkkaUtils.createDefaultActorSystem(); AkkaRpcService akkaRpcService = new AkkaRpcService(defaultActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration()); // 2. 创建TaskEndpoint实例 TaskEndpoint endpoint = new TaskEndpoint(akkaRpcService); System.out.println("address : "+endpoint.getAddress()); // 3. 启动Endpoint endpoint.start(); } }client端 :package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorSystem; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration; import java.util.concurrent.CompletableFuture; public class FlinkRpcClient { public static void main(String[] args) throws Exception { // 1. 创建RPC服务 ActorSystem defaultActorSystem = AkkaUtils.createDefaultActorSystem(); AkkaRpcService akkaRpcService = new AkkaRpcService(defaultActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration()); // 2. 连接远程RPC服务,注意:连接地址是服务端程序打印的地址 CompletableFuture<TaskGateway> gatewayFuture = akkaRpcService .connect("akka.tcp://flink@192.168.31.162:53465/user/rpc/c38b60e7-c87c-4b7f-b948-3099dfb12999", TaskGateway.class); // 3. 远程调用 TaskGateway gateway = gatewayFuture.get(); System.out.println(gateway.sayHello("flink-akka")); } }注意:必须要在 org.apache.flink.runtime.rpc.akka 这个包下,因为AkkaUtils是这个包的可见如感兴趣,点赞加关注,谢谢
2024年09月23日
18 阅读
0 评论
0 点赞
2024-09-12
Spark3中的Catalog组件设计
接口定义 CatalogPlugin:catalog的顶级接口,该接口主要实现了catalog的name和defaultNamespace。比如V2SessionCatalog的默认实现就是name="spark_catalog",defaultNamespace="defualt"TableCatalog:定义Table的相关接口,包括listTable, loadTable, CreateTable, DropTable,AlterTable等。CatalogExtension:继承自TableCatalog,通过setDelegateCatalog()方法把上面的V2SessionCatalog 实例进行代理。经过实际测试发现,无法改变catalog的name,所以只能对原始的catalog进行功能扩展。DelegatingCatalogExtension:抽象类,包含了对CatalogExtendsion的所有方法实现。总结:如果需要自定义catalog,则需实现TableCatalog即可。如果需要对当前的catalog进行扩展,则实现CatalogExtension或者重写DelegatingCatalogExtension的方法。Catalog初始化过程 Spark通过CatalogManager管理多个catalog,通过spark.sql.catalog.${name} 可以注册多个catalog,Spark的默认实现则是spark.sql.catalog.spark_catalog。1.SparkSession在创建时,可以选择是否enableHiveSupport()来指定catalog类型,如果指定了则会实例化HiveExternalCatalog,不指定则实例InMemoryCatalog。2.在BaseSessionStateBuilder/HiveSessionStateBuilder则会使用上面实例化的externalCatalog创建catalog对象,再根据catalog对象创建V2SessionCatalog对象(V2SessionCatalog就是对SessionCatalog的封装)3.根据catalog和v2SessionCatalog创建CatalogManager实例,CatalogManager中内置一个catalogs的HashMap<String,CatalogPlugin>来管理catalog。4.再SQL中调用catalog时候,会通过CatalogManger.catalog(String name)返回对应名称的catalog实例,如果没有则通过Catalog.load(name, conf)来进行实例化5.对于默认的spark_catalog,如果需要进行扩展,则可以通过spark.sql.catalog.spark_catalog来指定对应的实现类,该类可以通过实现CatalogExtension,然后会自动调用其setDelegateCatalog()来设置默认的spark_catalog为代理对象,然后进行扩展即可。总结:在DQC SQL升级引擎的过程中,使用了Spark3的Catalog特性来支持原生的multi catalog关联查询。对于JDBC协议的数据库来说,则使用JDBCTableCatalog来进行扩展catalog即可;对于Hive来说,第一种方案是实现CatalogExtension来扩展,但是发现这种方案无法修改其catalog name来适配元数据中心的hive catalog名称;第二种方案而通过完全自定义TableCatalog则需要完全初始化一套SparkSessionCatalog,在实践的过程中发现,可以进行元数据的拉取,但是在loadTable读取数据的时候报“表不支持batch read"的问题;第三种方案是采用修改DQC SQL的hvie catalog名称统一为spark_catalog的方式来实现Hive Catalog的使用,算是一个取巧的办法,后续有时间则可以继续扩展SparkSessionCatalog来实现hive的适配。
2024年09月12日
104 阅读
2 评论
0 点赞
2024-01-17
LSM索引原理
LSM作为一种设计思想,它把数据拆分为两个部分,一部分放在内存,一部分放在磁盘。内存中的数据检索可以使用红黑树,调表等时间复杂度比较低的结构进行检索。当数据到达一定阈值的时候则会将数据写到磁盘文件中,此时的写入的方式是顺序写,所以LSM写入性能很高。并发读写问题内存在写入磁盘过程中,如果有新的数据插入,则会带来并发读写的问题,所以就需要对这部分内存区域进行加锁。加锁的话又会导致写入过程阻塞,所以业界一般是当内存到达某个阈值之后,将这片区域标记为可读,然后新的数据将插入到新的内存区域,而旧的内存区域是只读的,所以可以不加锁的进行同步到磁盘的过程。小文件问题众所周知由于内存的容量有限,并且进行了分区,导致每次生成文件必然不会很大,这样就会造成检索效率很慢的问题。LSM是这样解决问题的:查找数据时候从多个磁盘文件中读取数据,然后进行合并,取最新的数据(Merge On Read)。由于写入的数据在内存是有序的,所以磁盘的小文件也是有序的(sstable)。这样可以保证单个文件中的检索是非常快的,但是存在一个问题:如果查找一个值的时候,在多个文件的索引有重叠的话就需要在多个sstable中查找数据(最坏的可能需要检索所有的文件),所以需要将小文件进行合并,让索引不再有重叠,就可以解决很好的剪枝文件。这也是Hudi点查性能不好的原因,没有保证索引不重叠。文件合并虽然文件合并带来的好处很多,但是合并的时机非常重要,如果新增一个就去进行合并全部文件,就会造成磁盘IO一直处于一个很高的水平,这样性能反而不好。所以LSM采用的是多层合并的方法,每一层的容量是上一层的10倍。level0层是内存直接写入的文件,当写满这一层的个数上限之后,再进行合并然后存入下一层,然后当下一层写满之后再继续合并到下一层,直到合并到最大层数则不再合并。这样就只存在level0层的索引是有重合的,其他的层的索引数据都是不重合的,可以很好的进行File Skiping,并且由于这种设计,将文件合并的时机分摊到了多次,缓解了写放大的问题。总结LSM在数据写入方面,使用了内存分区标记解决了读写并发问题,并且使用多层合并的机制解决了写放大的问题,提供了非常好的写入性能和小文件合并的机制。在读方面,可以先从内存读取,找不到再从level0一直往高层找,并且由于level0后的数据都是有序且不重合的,通过二分查找,能够很好的进行File Skiping,再配合布隆过滤器来快速判断元素是否存在与文件中。在最差的情况下,可能要遍历所有的文件,所以LSM适合写多读少的场景。
2024年01月17日
46 阅读
0 评论
0 点赞