首页
关于
Search
1
SteamOS安装paru和yay
172 阅读
2
欢迎使用 Typecho
127 阅读
3
徕卡X | 冬日漫步
127 阅读
4
https自动跳转问题终极解决方案
116 阅读
5
Spark3中的Catalog组件设计
104 阅读
默认
摄影
后端开发
大数据
Spark
Flink
登录
Search
标签搜索
转载
ArlenDu
累计撰写
18
篇文章
累计收到
10
条评论
首页
栏目
默认
摄影
后端开发
大数据
Spark
Flink
页面
关于
搜索到
18
篇与
的结果
2024-10-30
Flink的状态
状态种类 Keyed State:Keyed State 总是和 Key 相关联,只能在 KeyedStream 的函数或算子中使用,因为 Flink 中的 keyBy 操作保证了每一个键相关联的所有消息都会送给下游算子的同一个并行实例处理,因此 Keyed State 也可以看作是 Operator State 的一种分区(partitioned)形式,每一个键都关联一个状态分片(state-partition)。Operator State:Operator State 绑定到算子的每一个并行实例(sub-task) 中。状态后端 (StateBackend)MemoryStateBackend: 将状态存储在内存中,通常只在调试和开发环境中用FsStateBackend: FsStateBackend 会将状态存储到一个持久化的存储中,如 HDFS,只在 JobManager 的内存中存储一些 metadata。RocksDBStateBackend:状态存储在RockDB中,并且支持增量快照。在最新版的 Flink 中,这三种形式的存储后端都支持异步快照模式。四种类型:ValueStateListStateReducingStateMapState要获取状态,首先需要定义状态描述符(StateDescriptor)。状态描述符状态的名字(保证唯一性),状态的类型,以及部分状态需要的自定义函数。根据想要获取的状态的不同,状态描述符也分为 ValueStateDescriptor, ListStateDescriptor, ReducingStateDescriptor, MapStateDescriptor。自定义状态CheckpointedFunction接口ListCheckpointed接口RuntimeContext对于 Keyed State,通常都是通过 RuntimeContext 实例来获取,这通常需要在 rich functions 中才可以做到。 RuntimeContext 提供的获取状态的方法包括:ValueState<T> getState(ValueStateDescriptor<T>) ReducingState<T> getReducingState(ReducingStateDescriptor<T>) ListState<T> getListState(ListStateDescriptor<T>) MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)检查点(CheckPoint)检查点机制还需要依赖于 1.支持数据重播的数据源 2.状态的持久存储终端。Flink 实现了一个轻量级的分布式快照机制,其核心点在于 Barrier。 Coordinator 在需要触发检查点的时候要求数据源注入向数据流中注入 barrier, barrier 和正常的数据流中的消息一起向前流动,相当于将数据流中的消息切分到了不同的检查点中。当一个 operator 从它所有的 input channel 中都收到了 barrier,则会触发当前 operator 的快照操作,并向其下游 channel 中发射 barrier。当所有的 sink 都反馈收到了 barrier 后,则当前检查点创建完毕。Barrier对齐一个关键的问题在于,一些 operator 拥有多个 input channel,它往往不会同时从这些 channel 中接收到 barrier。如果 Operator 继续处理 barrier 先到达的 channel 中的消息,那么在所有 channel 的 barrier 都到达时,operator 就会处于一种混杂的状态。在这种情况下,Flink 采用对齐操作来保证 Exactly Once 特性。Operator 会阻塞 barrier 先到达的 channel,通常是将其流入的消息放入缓冲区中,待收到所有 input channel 的 barrier 后,进行快照操作,释放被阻塞的 channel,并向下游发射 barrier。 对齐操作会对流处理造成延时,但通常不会特别明显。如果应用对一致性要求比较宽泛的话,那么也可以选择跳过对齐操作。这意味着快照中会包含一些属于下一个检查点的数据,这样就不能保证 Exactly Once 特性,而只能降级为 At Least Once。异步快照前述的检查点创建流程中,在 operator 进行快照操作时,不再处理数据流中的消息。这种同步的方式会在每一次进行快照操作的时候引入延时。实际上,Flink 也支持采用异步的方式创建快照,这就要求 operator 在触发快照操作的时候创建一个不受后续操作影响的状态对象,通常选用 copy-on-write 的数据结构。Flink 中基于 RocketDB 的状态存储后端就可以支持异步操作。保存点(Savepoint)所谓的保存点,其实是用户手动触发的一种特殊的检查点。其本质就是检查点,但它相比检查点有两点不同:1.用户自行触发 2.当有新的已完成的检查点产生的时候,不会自动失效。可查询状态(Queryable State)Flink 自1.2起新增了一个 Queryable State 特性,允许从 Flink 系统外直接查询作业流水中的状态。这主要是来自于两方面的诉求:1)很多应用都有直接获取应用实时状态的需求,2)将状态频繁写入外部系统中可能是应用的瓶颈。有两种方式来使用 Queryable State :QueryableStateStream, 将 KeyedStream 转换为 QueryableStateStream,类似于 Sink,后续不能进行任何转换操作StateDescriptor#setQueryable(String queryableStateName),将 Keyed State 设置为可查询的 (不支持 Operator State)外部应用在查询 Flink 作业内部状态的时候要使用 QueryableStateClient, 提交异步查询请求来获取状态。下面的两张图大致给出了 Queryable State 的工作机制: 目前还存在的一些局限:只能支持 Keyed Operator对状态大小是否有限制(不支持 ListState)作业失败后如何保证可用性Clent API 的易用性参考Working with StateA Deep Dive into Rescalable State in Apache FlinkQueryable States in ApacheFlink - How it works
2024年10月30日
21 阅读
0 评论
0 点赞
2024-10-22
<转载>网易Spark Kyuubi核心架构设计与源码实现剖析
版权声明:本文为 xpleaf(香飘叶子)博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。本文较为系统、全面并且由浅入深地介绍了网易Spark Kyuubi出现的背景、核心架构设计与关键源码实现,是学习、应用和对Kyuubi进行二次开发不可多得的技术干货,但由于作者认知水平有限,文中难免会出现描述不准确的措辞,还请多多包容和指出。1 概述 Kyuubi是网易数帆旗下易数大数据团队开源的一个高性能的通用JDBC和SQL执行引擎,建立在Apache Spark之上,Kyuubi的出现,较好的弥补了Spark ThriftServer在多租户、资源隔离和高可用等方面的不足,是一个真正可以满足大多数生产环境场景的开源项目。通过分析Spark ThriftServer的设计与不足,本文会逐渐带你深入理解Kyuubi的核心设计与实现,同时会选取多个关键场景来剖析其源码,通过本文的阅读,希望能让读者对网易Kyuubi的整体架构设计有一个较为清晰的理解,并能够用在自己的生产环境中解决更多实际应用问题。本文主要主要选取 Kyuubi 1.1.0版本来对其设计与实现进行分析,后续的版本迭代社区加入了数据湖等概念和实现,本文不会对这方面的内容进行探讨。2 Spark ThriftServer的设计、实现与不足2.1 产生背景在最初使用Spark时,只有理解了Spark RDD模型和其提供的各种算子时,才能比较好地使用Spark进行数据处理和分析,显然由于向上层暴露了过多底层实现细节,Spark有一定的高使用门槛,在易用性上对许多初入门用户来说并不太友好。SparkSQL的出现则较好地解决了这一问题,通过使用SparkSQL提供的简易API,用户只需要有基本的编程基础并且会使用SQL,就可以借助Spark强大的快速分布式计算能力来处理和分析他们的大规模数据集。而Spark ThriftServer的出现使Spark的易用性又向前迈进了一步,通过提供标准的JDBC接口和命令行终端的方式,平台开发者可以基于其提供的服务来快速构建它们的数据分析应用,普通用户甚至不需要有编程基础即可借助其强大的能力来进行交互式数据分析。2.2 核心设计顾名思义,本质上,Spark ThriftServer是一个基于Apache Thrift框架构建并且封装了SparkContext的RPC服务端,或者从Spark的层面来讲,我们也可以说,Spark ThriftServer是一个提供了各种RPC服务的Spark Driver。但不管从哪个角度去看Spark ThriftServer,有一点可以肯定的是,它是一个Server,是需要对外提供服务的,因此其是常驻的进程,并不会像一般我们构建的Spark Application在完成数据处理的工作逻辑后就退出。其整体架构图如下所示: Apache Thrift是业界流行的RPC框架,通过其提供的接口描述语言(IDL),可以快速构建用于数据通信的并且语言无关的RPC客户端和服务端,在带来高性能的同时,大大降低了开发人员构建RPC服务的成本,因此在大数据生态其有较多的应用场景,比如我们熟知的hiveserver2即是基于Apache Thrift来构建其RPC服务。当用户通过JDBC或beeline方式执行一条SQL语句时,TThreadPoolServer会接收到该SQL,通过一系列的Session和Operation的管理,最终会使用在启动Spark ThriftServer时已经构建好的SparkContext来执行该SQL,并获取最后的结果集。从上面的基本分析中我们可以看到,在不考虑Spark ThrfitServer的底层RPC通信框架和业务细节时,其整体实现思路是比较清晰和简单的。当然实际上要构建一个对外提供SQL能力的RPC服务时,是有许多细节需要考虑的,并且工作量也会非常巨大,Spark ThriftServer在实现时实际上也没有自己重复造轮子,它复用了hiveserver2的许多组件和逻辑,并根据自身的业务需求来对其进行特定改造;同样的,后面当我们去看Kyuubi时,也会发现它复用了hiveserver2和Spark ThriftServer的一些组件和逻辑,并在此基础上创新性地设计自己的一套架构。2.3 基本实现这里列举的代码是基于Spark 2.1的源码,新版本在结构上可能有所有区别,但不影响我们对其本质实现原理的理解。前面提到的TThreadPoolServer是Apache Thrift提供的用于构建RPC Server的一个工作线程池类,在Spark ThriftServer的Service体系结构中,ThriftBinaryService正是使用TThreadPoolServer来构建RPC服务端并对外提供一系列RPC服务接口:Spark ThriftServer Service体系ThriftBinaryService基于TThreadPoolServer构建RPC服务端ThriftBinaryService提供的RPC服务接口可以看到,其提供的相当一部分接口都是提供SQL服务时所必要的能力。当然,不管是使用标准的JDBC接口还是通过beeline的方式来访问Spark ThriftServer,必然都是通过Spark基于Apache Thrift构建的RPC客户端来访问这些RPC服务接口的,因此我们去看Spark ThriftServer提供的RPC客户端,其提供的方法接口与RPC服务端提供的是对应的,可以参考 org.apache.hive.service.cli.thrift.TCLIService.Client。如果比较难以理解,建议可以先研究一下RPC框架的本质,然后再简单使用一下Apache Thrift来构建RPC服务端和客户端,这样就会有一个比较清晰的理解,这里不对其底层框架和原理做更多深入的分析。个人觉得,要理解Spark ThriftServer,或是后面要介绍的Kyubbi,本质上是理解其通信框架,也就是其是怎么使用Apache Thrift来进行通信的,因为其它的细节都是业务实现。2.4 主要不足Spark ThriftServer在带来各种便利性的同时,其不足也是显而易见的。首先,Spark ThriftServer难以满足生产环境下多租户与资源隔离的场景需求。由于一个Spark ThriftServer全局只有一个SparkContext,也即只有一个Spark Application,其在启动时就确定了全局唯一的用户名,因此在Spark ThriftServer的维护人员看来,所有通过Spark ThriftServer下发的SQL都是来自同一用户(也就是启动时确定的全局唯一的用户名),尽管其背后实际上是由使用Spark ThriftServer服务的不同用户下发的,但所有背后的这些用户都共享使用了Spark ThriftServer的资源、权限和数据,因此我们难以单独对某个用户做资源和权限上的控制,操作审计和其它安全策略。在Spark ThriftServer执行的一条SQL实际上会被转换为一个job执行,如果用户A下发的SQL的job执行时间较长,必然也会阻塞后续用户B下发的SQL的执行。其次,单个Spark ThriftServer也容易带来单点故障问题。从Spark ThriftServer接受的客户端请求和其与Executor的通信来考虑,Spark ThriftServer本身的可靠性也难以满足生产环境下的需求。因此,在将Spark ThriftServer应用于生产环境当中,上面提及的问题和局限性都会不可避免,那业界有没有比较好的解决方案呢?网易开源的Spark Kyuubi就给出了比较好的答案。3 Kyuubi核心架构设计与源码实现剖析3.1 Kyuubi核心架构设计Kyuubi的整体架构设计如下:Kyuubi从整体上可以分为用户层、服务发现层、Kyuubi Server层、Kyuubi Engine层,其整体概述如下:用户层指通过不同方式使用Kyuubi的用户,比如通过JDBC或beeline方式使用Kyuubi的用户。服务发现层服务发现层依赖于Zookeeper实现,其又分为Kyuubi Server层的服务发现和Kyuubi Engine层的服务发现。Kyuubi Server层由多个不同的KyuubiServer实例组成,每个KyuubiServer实例本质上为基于Apache Thrift实现的RPC服务端,其接收来自用户的请求,但并不会真正执行该请求的相关SQL操作,只会作为代理转发该请求到Kyuubi Engine层用户所属的SparkSQLEngine实例上。Kyuubi Engine层由多个不同的SparkSQLEngine实例组成,每个SparkSQLEngine实例本质上为基于Apache Thrift实现的并且持有一个SparkSession实例的RPC服务端,其接收来自KyuubiServer实例的请求,并通过SparkSession实例来执行。在Kyuubi的USER共享层级上,每个SparkSQLEngine实例都是用户级别的,即不同的用户其会持有不同的SparkSQLEngine实例,以实现用户级别的资源隔离和控制。下面将会对每一层以及它们的协作与交互展开较为详细的分析。3.1.1 用户层用户层就是指实际需要使用Kyuubi服务的用户,它们通过不过的用户名进行标识,以JDBC或beeline方式进行连接。比如我们可以在beeline中指定以不同用户名进行登录:使用xpleaf用户名进行登录使用yyh用户名进行登录使用leaf用户名进行登录当然,这里的用户名或登录标识并不是可以随意指定或使用的,它应该根据实际使用场景由运维系统管理人员进行分配,并且其背后应当有一整套完整的认证、授权和审计机制,以确保整体系统的安全。3.1.2 服务发现层服务发现层主要是指Zookeepr服务以及Kyuubi Server层的KyuubiServer实例和Kyuubi Engine层的SparkSQLEngine在上面注册的命名空间(即node节点),以提供负载均衡和高可用等特性,因此它分为Kyuubi Server层的服务发现和Kyuubi Engine层的服务发现。Kyuubi Server层的服务发现Kyuubi Server层的服务发现是需要用户感知的。KyuubiServer实例在启动之后都会向Zookeeper的/kyuubi节点下面创建关于自己实例信息的节点,主要是包含KyuubiServer实例监听的host和port这两个关键信息,这样用户在连接KyuubiServer时,只需要到Zookeeper的/kyuubi节点下面获取对应的服务信息即可,当有多个KyuubiServer实例时,选取哪一个实例进行登录,这个是由用户自行决定的,Kyuubi本身并不会进行干预。在实际应用时也可以封装接口实现随机返回实例给用户,以避免直接暴露Kyuubi的底层实现给用户。另外,KyuubiServer实例是对所有用户共享,并不会存在特定KyuubiServer实例只对特定用户服务的问题。当然在实际应用时你也可以这么做,比如你可以不对用户暴露服务发现,也就是不对用户暴露Zookeeper,对于不同用户,直接告诉他们相应的KyuubiServer实例连接信息即可。不过这样一来,Kyuubi Server层的高可用就难以保证了。比如有多个在不同节点上启动的KyuubiServer实例,其在Zookeeper上面注册的信息如下:Kyuubi Engine层的服务发现Kyuubi Engine层的服务发现是不需要用户感知的,其属于Kyuubi内部不同组件之间的一种通信协作方式。SparkSQLEngine实例在启动之后都会向Zookeeper的/kyuubi_USER节点下面创建关于自己实例信息的节点,主要是包含该实例监听的host和port以及其所属user的相关信息,也就是说SparkSQLEngine实例并不是所有用户共享的,它是由用户独享的。比如Kyuubi系统中有多个不同用户使用了Kyuubi服务,启动了多个SparkSQLEngine实例,其在Zookeeper上面注册的信息如下:3.1.3 Kyuubi Server层Kyuubi Server层由多个不同的KyuubiServer实例组成,每个KyuubiServer实例本质上为基于Apache Thrift实现的RPC服务端,其接收来自用户的请求,但并不会真正执行该请求的相关SQL操作,只会作为代理转发该请求到Kyuubi Engine层用户所属的SparkSQLEngine实例上。整个Kyuubi系统中需要存在多少个KyuubiServer实例是由Kyuubi系统管理员决定的,根据实际使用Kyuubi服务的用户数和并发数,可以部署一个或多个KyuubiServer实例,以满足SLA要求。当然后续发现KyuubiServer实例不够时,可以横向动态扩容,只需要在Kyuubi中系统配置好host和port,启动新的KyuubiServer实例即可。3.1.4 Kyuubi Engine层Kyuubi Engine层由多个不同的SparkSQLEngine实例组成,每个SparkSQLEngine实例本质上为基于Apache Thrift实现的并且持有一个SparkSession实例的RPC服务端,其接收来自KyuubiServer实例的请求,并通过SparkSession实例来执行。在Kyuubi的USER共享层级上,每个SparkSQLEngine实例都是用户级别的,即不同的用户其会持有不同的SparkSQLEngine实例,以实现用户级别的资源隔离和控制。SparkSQLEngine实例是针对不同的用户按需启动的。在Kyuubi整体系统启动之后,如果没有用户访问Kyuubi服务,实际上在整个系统中只有一个或多个KyuubiServer实例,当有用户通过JDBC或beeline的方式连接KyuubiServer实例时,其会在Zookeeper上去查找是否存在用户所属的SparkSQLEngine实例,如果没有,则通过spark-submit提交一个Spark应用,而这个Spark应用本身就是SparkSQLEngine,启动后,基于其内部构建的SparkSession实例,即可为特定用户执行相关SQL操作。3.1.5 整体协作流程通过前面对各层的介绍,结合KyubbiServer架构图,以用户xpleaf访问Kyuubi服务为例来描述整个流程。1.Kyuubi系统管理员在大数据集群中启动了3个KyuubiServer实例和1个Zookeeper集群,其中3个KyuubiServer实例的连接信息分别为10.2.10.1:10009、10.2.10.1:10010和10.2.10.2:1009;2.用户xpleaf通过beeline终端的方式连接了其中一个KyuubiServer实例;在这里我们假设用户xpleaf事先已经通过管理员告知的方式知道了该KyuubiServer实例的连接信息。3.KyuubiServer\_instance1接收到xpleaf的连接请求,会为该用户创建session会话,同时会去Zookeeper上检查是否已经存在xpleaf所属的SparkSQLEngine实例;3.1 如果已经存在,则获取其连接信息;3.2 如果不存在,则通过spark-submit的方式提交一个Spark应用,启动一个SparkSQLEngine实例;4.KyuubiServer\_instance1在Zookeeper上没有找到xpleaf所属的SparkSQLEngine实例信息,其通过spark-submit的方式启动了一个SparkSQLEngine实例;5.属于xpleaf用户的新的SparkSQLEngine\_instance1实例在10.2.10.1节点上进行启动,并且监听的52463端口,启动后,其向Zookeeper注册自己的连接信息/kyuubi_USER/xpleaf/instance1_10.2.10.1:52463;6.KyuubiServer\_instance1在检测到SparkSQLEngine\_instance1启动成功后,会向其发送创建session会话的连接请求;7.SparkSQLEngine\_instance1收到KyuubiServer\_instance1创建session会话的连接请求,则创建一个新的session会话;8.用户启动beeleine完成并成功创建会话,接着用户执行SQL查询;9.KyuubiServer\_instance1接收到xpleaf的执行SQL查询的请求,会先检查是否存在xpleaf所属的SparkSQLEngine实例;10.KyuubiServer\_instance1找到xpleaf所属的SparkSQLEngine\_instance1实例,接着会为这次执行SQL的操作创建一个Operation;11.KyuubiServer\_instance1根据连接信息创建了一个RPC Client,并且构建SQL执行的RPC请求,发到对应的SparkSQLEngine\_instance1实例上;12.SparkSQLEngine\_instance1接收到该请求后,会创建一个该SQL操作的Operation,并且使用其内部的SparkSession实例来进行执行,最后将执行结果返回给KyuubiServer\_instance1;13.KyuubiServer\_instance1接收到SparkSQLEngine\_instance1的执行结果,返回给用户,这样一次SQL查询操作就完成了。透过整体协作流程我们可以看到:站在用户层视角来看,其为RPC客户端,而为其提供RPC服务的是Kyuubi Server层,在这里,Kyuubi Server是RPC服务端;站在Kyuubi Server层视角来看,其既是为用户层提供RPC服务的RPC服务端,同时也是使用Kyuubi Engine层RPC服务的RPC客户端;站在Kyuubi Engine层视角来看,其为RPC服务端,其为Kyuubi Server层提供RPC服务;Kyuubi在整体Server端和Client端以及其实现功能的设计上,是十分清晰的。3.2 Kyuubi源码实现剖析通过前面对Kyuubi各层以及整体协作流程的描述,相信对Kyuubi的核心架构设计会有一个比较清晰的理解,这样再去分析Kyuubi的源码时就会简单很多。首先我们会来介绍Kyuubi整体的Service体系与组合关系,以对Kyuubi整体核心代码有一个概览性的理解,接着会选取多个关键场景来对Kyuubi的源码进行分析,并且给出每个场景的代码执行流程图。确实没有办法在较为简短的篇幅里为大家介绍Kyuubi源码的方方面面,但我个人认为不管对于哪个大数据组件,在理解了其底层通信框架的基础上,再选取关于该组件的几个或多个关键场景来分析其源码,基本上对其整体设计就会有概览性的理解,这样后面对于该组件可能出现的Bug进行排查与修复,或是对该组件进行深度定制以满足业务的实际需求,我相信问题都不大——这也就达到了我们的目的,就是去解决实际问题。当然,在这个过程当中你也可以欣赏到漂亮的代码,这本身也是一种享受。3.2.1 RPC与Apache Thrift基本概述RPCRPC(Remote Procedure Call)远程过程调用,如果按照百度百科的解释会非常羞涩难懂(上面提供的图应该还是《TCP/IP详解卷1:协议》上面的一个图),但实际上我们就可以简单地把它理解为,一个进程调用另外一个进程的服务即可,不管是通过Socket、内存共享或是网络的方式,只要其调用的服务的具体实现不是在调用方的进程内完成的就可以,目前我们见得比较多的是通过网络通信调用服务的方式。在Java语言层面上比较普遍的RPC实现方式是,反射+网络通信+动态代理的方式来实现RPC,而网络通信由于需要考虑各种性能指标,主要用的Netty或者原生的NIO比较多,Socket一般比较少用,比如可以看一下阿里Doubbo的实现。如果想加深这方面的理解,可以参考我的一个开源RPC框架,其实就是非常mini版的Doubbo实现: https://github.com/xpleaf/minidubbo,建议有时间可以看下,实际上这会非常有用,因为几乎所有的大数据组件都会用到相关的RPC框架,不管是开源三方的还是其自己实现的(比如Hadoop的就是使用自己实现的一套RPC框架)。Apache Thrift Apache Thrift是业界流行的RPC框架,通过其提供的接口描述语言(IDL),可以快速构建用于数据通信的并且语言无关的RPC客户端和服务端,在带来高性能的同时,大大降低了开发人员构建RPC服务的成本,因此在大数据生态其有较多的应用场景,比如我们熟知的hiveserver2即是基于Apache Thrift来构建其RPC服务。3.2.2 Kyuubi Service体系与组合关系在看Kyuubi的源码时,我们可以把较多精力放在某几种较重要的类和其体系上,这样有助于我们抓住重点,理解Kyuubi最核心的部分。仅考虑Kyuubi整体的架构设计和实现,比较重要的是Service、Session和Operation等相关的类和体系。Service体系Service,顾名思义就是服务,在Kyuubi中,各种不同核心功能的提供都是通过其Service体系下各个实现类来进行提供的。我们前面提到的服务发现层、Kyuubi Server层和Kyuubi Engine层,在代码实现上绝大部分核心功能都是由Kyuubi源码项目的Server类体系来完成的,可以这么说,理解了Service体系涉及类的相关功能,就基本上从源码级别上理解了整个Kyuubi的体系架构设计和实现。当然这些Service的实现类并不一定使用Service结尾,比如SessionManager、OperationManager等,但基本上从名字我们就能对其功能窥探一二。其完整的继承关系如下:基于Kyuubi提供的核心功能,我们可以大致按Kyuubi Server层和Kyuubi Engine层来将整个体系中的Service类进行一个划分:Kyuubi Server层功能入口 - KyuubiServer:提供main方法,是Kyuubi Server层KyuubiServer实例初始化和启动的入口;服务发现 - KyuubiServiceDiscovery:封装了zkClient,用来与Zookeeper服务进行交互;核心功能 - FrontendService:封装了Apache Thrift的TThreadPoolServer,在Kyuubi Server层,其主要用于向用户层提供RPC服务; - KyuubiBackendService:封装了来自用户层不同RPC请求的处理逻辑,比如openSession、executeStatement、fetchResults等;Session管理 - KyuubiSessionManager:提供对用户层的请求会话(session)管理;Operation管理 - KyuubiOperationManager:提供对用户层的请求操作(operation)管理;Kyuubi Engine层功能入口 - SparkSQLEngine:提供main方法,是Kyuubi Engine层SparkSQLEngine实例初始化和启动的入口;服务发现 - EngineServiceDiscovery:封装了zkClient,用来与Zookeeper服务进行交互;核心功能 - FrontendService:封装了Apache Thrift的TThreadPoolServer,在Kyuubi Engine层,其主要用于向Kyuubi Server层提供RPC服务; - SparkSQLBackendService:封装了来自Kyuubi Server层不同RPC请求的处理逻辑,比如openSession、executeStatement、fetchResults等;Session管理 - SparkSQLSessionManager:提供对Kyuubi Server层的请求会话(session)管理;Operation管理 - SparkSQLOperationManager:提供对Kyuubi Server层的请求操作(operation)管理;这里我们只对具体实现类进行归类,因为中间抽象类只是提取多个子类的公共方法,不影响我们对其体系功能的说明和讲解;而以Noop开头的实际上是Kyuubi的测试实现类,因此我们也不展开说明;KinitAuxiliaryService是Kyuubi中用于认证的类,这里我们不对其认证功能实现进行说明。通过对Service体系各个具体实现类的介绍,再回顾前面对Kyuubi整体架构和协作流程的介绍,其抽象的功能在源码实现类上面就有了一个相对比较清晰的体现,并且基本上也是可以一一对应上的。Service组合关系为了理解Kyuubi在源码层面上是如何进行整体协作的,除了前面介绍的Service体系外,我们还有必要理清其各个Service之间的组合关系。在整个Service体系中,CompositeService这个中间抽象类在设计上是需要额外关注的,它表示的是在它之下的实现类都至少有一个成员为其它Service服务类对象,比如对于KyuubiServer,它的成员则包含有KyuubiBackdService、KyuubiServiceDiscovery等多个Service实现类,SparkSQLEngine也是如此。我们将一些关键的Service类及其组合关系梳理如下,这对后面我们分析关键场景的代码执行流程时会提供很清晰的思路参考:Session与SessionHandleSession当我们使用通过JDBC或beeline的方式连接Kyuubi时,实际上在Kyuubi内部就为我们创建了一个Session,用以标识本次会话的所有相关信息,后续的所有操作都是基于这次会话来完成的,我们可以在一次会话下执行多个操作(比如多次执行某次SQL,我们只需要建立一次会话连接即可)。Session在Kyuubi中又分为Kyuubi Server层的Session和Kyuubi Engine层的Session。Kyuubi Server层的Session实现类为KyuubiSessionImpl,用来标识来自用户层的会话连接信息;Kyuubi Engine层的Session实现类为SparkSessionImpl,用来标识来自Kyuubi Server层的会话连接信息。两个Session实现类都有一个共同的抽象父类AbstractSession,用于Session操作的主要功能逻辑都是在该类实现的。SessionHandleSession对象的存储实际上由SessionManager来完成,在SessionManager内部其通过一个Map来存储Session的详细信息,其中key为SessionHandle,value为Session对象本身。SessionHandle可以理解为就是封装了一个唯一标识一个用户会话的字符串,这样用户在会话建立后进行通信时只需要携带该字符串标识即可,并不需要传输完整的会话信息,以避免网络传输带来的开销。Operation与OperationHandleOperation用户在建立会话后执行的相关语句在Kyuubi内部都会抽象为一个个的Operation,比如执行一条SQL语句对应的Operation实现类为Executement,不过需要注意,Operation又分为Kyuubi Server层的KyuubiOperation和Kyuubi Engine层的SparkOperation。Kyuubi Server层的Operation并不会执行真正的操作,它只是一个代理,它会通过RPC Client请求Kyuubi Engine层来执行该Operation,因此所有Operation的真正执行都是在Kyuubi Engine层来完成的。由于Operation都是建立在Session之下的,所以我们在看前面的组合关系时可以看到,用于管理Operation的OperationManager为SessionManager的成员属性。OperationHandleOperation对象的存储实际上由OprationManager来完成,在SessioOprationManagerManager内部其通过一个Map来存储Session的详细信息,其中key为OperationHandle,value为Operation对象本身。OperationHandle可以理解为就是封装了一个唯一标识一个用户操作的字符串,这样用户基于会话的操作时只需要携带该字符串标识即可,并不需要传输完整的操作信息,以避免网络传输带来的开销。第一次提交Operation时还是需要完整信息,后续只需要提供OperationHandle即可,实际上SQL语句的执行在Kyuubi内部是异步执行的,用户端在提交Opeation后即可获得OperationHandle,后续只需要持着该OperationHandle去获取结果即可,我们在分析SQL执行的代码时就可以看到这一点。3.2.3 Kyuubi启动流程Kyuubi的启动实际上包含两部分,分别是KyuubiServer的启动和SparkSQLEngine的启动。KyuubiServer实例的启动发生在系统管理员根据实际业务需要启动KyuubiServer实例,这个是手动操作完成的;而SparkSQLEngine实例的启动则是在为用户建立会话时为由KyuubiServer实例通过spark-submit的方式去提交一个Spark应用来完成的。KyuubiServer启动流程当我们在Kyuubi的bin目录下去执行./kyuubi run命令去启动KyuubiServer时,就会去执行KyuubiServer的main方法:在加载完配置信息后,通过调用startServer(conf)方法,就开始了KyuubiServer的启动流程:可以看到,实际上KyuubiServer的启动包括两部分:初始化和启动。KyuubiServer的初始化和启动实际上是一个递归初始化和启动的过程。我们前面提到,KyuubiServer为Service体系下的一个CompositeService,参考前面给出的组合关系图,它本身的成员又包含了多个Service对象,它们都保存在保存在serviceList这个成员当中,因此初始化和启动KyuubiServer实际上就是初始化和启动serviceList中所包含的各个Service对象。而这些Service对象本身又可能是CompositeService,因此KyuubiServer的启动和初始化实际上就是一个递归初始化和启动的过程。这样一来,整个KyuubiServer的启动流程就比较清晰了,这也是我们在最开始就列出其Service体系和组合关系的原因,由于整体的启动流程和细节所包含的代码比较多,我们就没有必要贴代码了,这里我把整个初始化和启动流程步骤的流程图梳理了出来,待会再对其中一些需要重点关注的点进行说明,如下:我们重点关注一下FontendService和ServiceDiscoveryService的初始化和启动流程。FrontendService的初始化和启动我们需要重点关注一下FrontendService,因为KyuubiServer实例对外提供RPC服务都是由其作为入口来完成的。其初始化时主要是获取和设置了Apache Thrift内置的用于构建RPC服务端的TThreadPoolServer的相关参数:可以看到主要是host、port、minThreads、maxThreads、maxMessageSize、requestTimeout等,这些参数都是可配置的,关于其详细作用可以参考KyuubiConf这个类的说明。其启动比较简单,主要是调用TThreadPoolServer的server()方法来完成:ServiceDiscoveryService的初始化和启动初始化时主要是创建一个用于后续连接ZooKeeper的zkClient:当然这里还看到其获取了一个HA_ZK_NAMESPACE的配置值,其默认值为kyuubi:在ServiceDiscoveryService进行启动的时候,就会基于该namesapce来构建在Kyuubi Server层进行服务发现所需要的KyuubiServer实例信息:在这里,就会在Zookeeper的/kyuubi节点下面创建一个包含KyuubiServer实例详细连接信息的节点,假设KyuubiServer实例所配置的host和post分别为10.2.10.1和10009,那么其所创建的zk节点为:KyuubiSessionManager的初始化和启动我们主要关注一下其启动过程:在这里主要完成的事情:1.获取session check interval;2.获取session timout;3.起一个schedule的调度线程;4.根据interval和timeout对handleToSession的session进行检查;5.如果session超时(超过timeout没有access),则closesession;那么对于KyuubiServer的启动过程我们就分析到这里,更多细节部分大家可以结合我的流程图来自行阅读代码即可,实际上当我们把Kyuubi的Service体系和组合关系整理下来之后,再去分析它的启动流程时就会发现简单很多,这个过程中无非就是要关注它的一些相关参数获取和设置是在哪里完成的,它是怎么侦听服务的(真正用于侦听host和port的server的启动)。SparkSQLEngine启动流程在KyuubiServer为用户建立会话时会去通过服务发现层去Zookeeper查找该用户是否存在对应的SparkSQLEngine实例,如果没有则通过spark-submit的启动一个属于该用户的SparkSQLEngine实例。后面在分析KyuubiServer Session建立过程会提到,实际上KyuubiServer是通过调用外部进程命令的方式来提交一个Spark应用的,为了方便分析SparkSQLEngine的启动流程,这里我先将其大致的命令贴出来:kyuubi-spark-sql-engine-1.1.0.jar是Kyuubi发布版本里面的一个jar包,里面就包含了SparkSQLEngine这个类,通过-class参数我们可以知道,实际上就是要运行SparkSQLEngine的main方法,由于开启了SparkSQLEngine的启动流程。需要说明的是,提交Sparkk App的这些参数在SparkSQLEngine启动之前都会被设置到SparkSQLEngine的成员变量kyuubiConf当中,获取方法比较简单,通过scala提供的sys.props就可以获取,这些参数在SparkSQLEngine的初始化和启动中都会起到十分关键的作用。接下来我们看一下SparkSQLEngine的main方法:首先会通过createSpark()创建一个SparkSession对象,后续SQL的真正执行都会交由其去执行,其创建方法如下:这里主要是设置了一些在创建SparkSession时需要的参数,包括appName、spark运行方式、spark ui的端口等。另外这里还特别对frontend.bind.port参数设置为0,关于该参数本身的定义如下:可以看到其默认值为10009,前面KyuubiServer在构建TThreadPoolServer时就直接使用了默认值,这也是我们启动的KyuubiServer实例侦听10009端口的原因,而在这里,也就是SparkSQLEngine启动时将其设置为0是有原因,我们将在下面继续说明。创建完成SparkSession后才调用startEngine(spark)方法启动SparkSQLEngine本身:可以看到也是先进行初始化,然后再启动,SparkSQLEngine本身是CompositeService,所以初始化和启动过程跟KyuubiServer是一模一样的(当然其包含的成员会有所差别),都是递归对serviceList中所包含的各个Service对象进行初始化和启动:FrontendService的初始化和启动FrontendService在SparkSQLEngine中的启动流程与在KyuubiServer中的启动流程是基本一样的,可以参考前面的说明,这里主要说明一些比较细微的差别点。前面已经设置了frontend.bind.port参数的值为0,在FrontendService这个类当中,它会赋值给portNum这个变量,用以构建TThreadPoolServer所需要的参数ServerSocket对象:所以实际上,不管是KyuubiServer还是SparkSQLEngine,其所侦听的端口是在这里构建ServerSocket对象的时候确定下来的,对ServerSocket对象,如果传入一个为0的portNum,则表示使用系统随机分配的端口号,所以这也就是我们在启动了SparkSQLEngine之后看到其侦听的端口号都是随机端口号的原因。ServiceDiscoveryService的初始化和启动与KyuubiServer类似,这里分析一下其差别点。前面在通过spark-submit提交应用时传入了--conf spark.kyuubi.ha.zookeeper.namespace=/kyuubi_USER/xpleaf的参数,实际上在SparkSQLEngine初始化KyuubiConfig对象时会设置到KyuubiConfig.HA_ZK_NAMESPACE属性上,因此在ServiceDiscoveryService初始化时获取的namespace实际上就为/kyuubi_USER/xpleaf,而不是默认的kyuubi,这点是需要注意的:因此在启动调用start()方法时,其在Zookeeper上构建的znode节点也就不同:比如其创建的znode节点为:SparkSQLSessionManager的初始化和启动SparkSQLSessionManager也是继承自SessionManager,因此与KyuubiServer的KyuubiSessionManager一样,其也启动了一个用于检查Session是否超时的checker。此外,还启动了另外一个checker,如下:实际上这个checker是在SparkSQLEngine递归初始化和启动其serviceList之前就已经启动,从它的实现当中我们可以看到,当超过一定时时并且SparkSQLEngine维护的Session为0时,整个SparkSQLEngine实例就会退出,这样做的好处就是,如果一个用户的SparkSQLEngine实例长期没有被使用,我们就可以将其占用的资源释放出来,达到节省资源的目的。3.2.4 Kyuubi Session建立过程Kyuubi Session的建立实际上包含两部分,分别是KyuubiServer Session建立和SparkSQLEngine Session建立,这两个过程不是独立进行的,KyuubiServer Session的建立伴随着SparkSQLEngine Session的建立,KyuubiServer Session和SparkSQLEngine Session才完整构成了Kyuubi中可用于执行特定Operation操作的Session。KyuubiServer Session建立过程当用户通过JDBC或beeline的方式连接Kyuubi时,实际上就开启了KyuubiServer Session的一个建立过程,此时KyuubiServer中FrontedService的OpenSession方法就会被执行:进而开启了KyuubiServer Session建立以及后续SparkSQLEngine实例启动(这部分前面已经单独介绍)、SparkSQLEngine Session建立的过程:整体流程并不复杂,在执行FrontendService#OpenSession方法时,最终会调用到KyuubiSessionImpl#open方法,这是整个KyuubiServer Session建立最复杂也是最为关键的一个过程,为此我们单独将其流程整理出来进行说明:流程中其实已经可以比较清晰地说明其过程,这里我们再详细展开说下,其主要分为下面的过程:服务发现与SparkSQLEngine实例启动第一次建立特定user的session时,在zk的/kyuubi\_USER path下是没有相关user的节点的,比如/kyuubi\_USER/xpleaf,因此在代码执行流程中,其获取的值会为None,这就触发了其调用外部命令来启动一个SparkSQLEngine实例:而调用的外部命令实际上就是我们在前面讲解SparkSQLEngine实例中提到的spark-submit命令:之后就是SparkSQLEngine实例的启动过程,其启动完成之后,就会在Zookeeper上面注册自己的节点信息。对于KyuubiSessionImpl#open方法,在不超时的情况下,循环会一直执行,直到其获取到用户的SparkSQLEngine实例信息,循环结束,进入下面跟SparkSQLEngine实例建立会话的过程。建立与SparkSQLEngine实例的会话SparkSQLEngine本质上也是一个RPC服务端,为了与其进行通信以建立会话,就需要构建RPC客户端,这里KyuubiSessionImpl#openSession方法中构建RPC客户端的方法主要是Apache Thrift的一些模板代码,如下:在发送请求给SparkSQLEngine的时候,又会触发SparkSQLEngine Session建立的过程(这个接下来说明),在跟其建立完Session之后,KyuubiSessionImpl会将其用于标识用户端会话的sessionHandle、用于跟SparkSQLEngine进行通信的RPC客户端和在SparkSQLEngine实例中进行Session标识的remoteSessionHandle缓存下来,这样在整个Kyuubi体系中,就构建了一个完整的Session映射关系:userSessionInKyuubiServer-RPCClient-KyuubiServerSessionInSparkSQLEngine,后续的Operation都是建立在这样一个体系之下。KyuubiServer在Session建立完成后会给客户端返回一个SessionHandle,后续客户端在与KyuubiServer进行通信时都会携带该SessionHandle,以标识其用于会话的窗口。SparkSQLEngine Session建立过程在接收到来自KyuubiServer的建立会话的RPC请求之后,SparkSQLEngine中FrontedService的OpenSession方法就会被执行,其整体流程与KyuubiServer Session的建立过程是类似的,主要不同在于SparkSQLSessionManager#openSession方法执行上面,如下:其对应的关键代码如下:sessionImpl.open()实际上只是做了日志记录的一些操作,所以其实这里的核心是将创建的Session记录下来。SparkSQLEngine在Session建立完成后会给KyuubiServer返回一个SessionHandle,后续KyuubiServer在与SparkSQLEngine进行通信时都会携带该SessionHandle,以标识其用于会话的窗口。3.2.5 Kyuubi SQL执行流程Kyuubi SQL的执行流程实际上包含两部分,分别是KyuubiServer SQL执行流程和SparkSQLEngine SQL执行流程,其结合起来才是一个完整的SQL执行流程,KyuubiServer只是一个代理,真正的SQL执行是在SparkSQLEngine中完成。另外由于在Kyuubi中,SQL的执行是异步的,也就是可以先提交一个SQL让其去执行,后续再通过其返回的operationHandle去获取结果,所以在KyuubiServer和SparkSQLEngine内部,SQL的执行流程又可以再细分为提交Statement和FetchResults两个过程,在分别分析KyuubiServer SQL执行流程和SparkSQLEngine SQL执行流程时,我们就是对提交Statment和FetchResults这两个过程来展开详细的分析,整体会有些繁多,但并不复杂。KyuubiServer SQL执行流程1.提交Statement当用户通过JDBC或beeline的方式执行一条SQL语句时,就开启了SQL语句在Kyuubi中的执行流程,此时KyuubiServer中FrontedService的ExecuteStatement方法就会被执行:runAsync值为true,因此会通过异步的方式来执行SQL,也就是会执行BackendService的executeStatementAsync方法,开启了异步执行SQL的流程:首先会通过KyuubiOperationManager去创建一个表示执行SQL的ExecuteStatement:client实际上就是我们前面在KyuubiServer Session建立过程中建立的用于与SparkSQLEngine通信的RPC客户端,ExecuteStatement需要client来发送执行SQL语句的请求给SparkSQLEngine实例,不过需要注意的是,这里的ExecuteStatement是KyuubiServer体系下的,其类全路径为org.apache.kyuubi.operation.ExecuteStatement,因为后面在分析SparkSQLEngine SQL执行流程时,在SparkSQLEngine体系下也有一个ExecuteStatement,但其类全路径为org.apache.kyuubi.engine.spark.operation.ExecuteStatement。这里的整个流程关键在于后面执行operation.run()方法,进而执行runInternal()方法:这里会通过异步的方式来执行,其先同步执行executeStatement()方法,然后再提交一个异步线程来执行asyncOperation(sessionManager.submitBackgroundOperation(asyncOperation)实际上就是通过线程池来提交一个线程线程),我们先看一下其executeStatement()方法:这里statement实际上就是要执行的SQL语句,所以本质上就是向SparkSQLEngine发送了一个用于执行SQL语句的RPC请求,这样就会触发SparkSQLEngine执行提交Statement的一个过程(这个接下来会分析),请求成功后,KyuubiServer会将SparkSQLEngine实例用于记录该操作的operationHandle记录下来,就是赋值给成员变量_remoteOpHandle,_remoteOpHandle用后续用于查询statement在SparkSQLEngine实例中的执行状态和FetchResults。执行完executeStatement()方法后,我们再看一下其提交异步线程时所执行的操作,也就是waitStatementComplete()方法:可以看到其主要操作是构建用于查询SparkSQLEngine实例中Operation的执行状态。再回过来看一下runInternal()方法:这里提交一个线程后的返回结果backgroundOperation实际上为一个FutureTask对象,后续在FetchResults过程中通过该对象就可以知道Operation在SparkSQLEngine实例中的执行状态。在提交完Statement之后,KyuubiServer会将operationHandle返回给用户端,用于后续获取执行结果。2.FetchResults提交完Statement后,用户层的RPC客户端就会去获取结果,此时KyuubiServer中FrontedService的FetchResults方法就会被执行:在获取真正执行结果之前,会有多次获取操作日志的请求,也就是req.getFetchType == 1的情况,这里我们只关注fetchLog为false的情况:获取执行结果的过程就比较简单,主要是调用RPC客户端的FetchResults方法,这样就会触发SparkSQLEngine FetchResults的一个过程(这个接下来会分析),不过在获取执行结果前会检查其执行状态,前面在分析在提交Statement时,异步线程waitStatementComplete()就会请求SparkSQLEngine更新其状态为FINISHED,因此这里可以正常获取执行结果。SparkSQLEngine SQL执行流程1.提交Statement接收到KyuubiServer提交Statement的RPC请求时,此时SparkSQLEngine中FrontedService的ExecuteStatement方法就会被执行,进而触发接下来提交Statement的整个流程:其整体流程与KyuubiServer是十分相似的,主要区别在于:1.其创建的Statement为SparkSQLEngine体系下的ExecuteStatement;2.其异步线程是通过SparkSession来执行SQL语句;因此我们来看一下其runInternal()方法和异步线程执行的executeStatement()方法:可以看到其执行非常简单,就是直接调用SparkSession的sql()方法来执行SQL语句,最后再将结果保存到迭代器iter,并设置执行状态为完成。在提交完Statement之后,SparkSQLEngine会将operationHandle返回给KyuubiServer,用于后续获取执行结果。2.FetchResults接收到KyuubiServer获取结果的RPC请求时,此时SparkSQLEngine中FrontedService的FetchResults方法就会被执行,进而触发接下来FetchResults的整个流程:整个过程比较简单,就是将iter的结果转换为rowSet的对象格式,最后返回给KyuubiServer。参考资料《Spark SQL内核剖析》《网易数帆开源Kyuubi:基于Spark的高性能JDBC和SQL执行引擎》《大数据实战:Kyuubi 与 Spark ThriftServer 的全面对比分析》Apache Thriftminidubbo:A Full RPC Framework Based on Netty.
2024年10月22日
41 阅读
0 评论
0 点赞
2024-10-22
Redis学习笔记
Redis核心原理与实战Redis执行流程:当用户输入一条命令之后,客户端会以 socket 的方式把数据转换成 Redis 协议,并发送至服务器端,服务器端在接受到数据之后,会先将协议转换为真正的执行命令,在经过各种验证以保证命令能够正确并安全的执行,但验证处理完之后,会调用具体的方法执行此条命令,执行完成之后会进行相关的统计和记录,然后再把执行结果返回给客户端,整个执行流程,1.输入命令2.将命令转换成成Redis的协议,通过socket发送到服务器3.服务器接收到后对Redis协议进行解析,变成可以执行的redis命令4.服务端鉴权5.执行最终命令,调用 redisCommand 中的 proc 函数执行命令。6.执行完进行相关记录和统计7.将执行的结果通过socket返回给客户端2.Redis的容错RDB: 二进制文件,性能好,但是可能会丢数据AOF:日志文件,可以设置always, everyseconds, no;能最大限度的保证数据不丢失,但是恢复的性能比较慢,虽然AOF也提供了优化机制:AOF重写RDB和AOF混部: 其实还是读的AOF文件,因为当AOF开启时,不管有没有开启RDB,都会使用AOF。混合部署时,其实是将RBD文件写入到了AOF文件的开头。这样恢复时,就可以通过RDB+AOF的方式进行恢复了。混合持久化的加载流程如下:判断是否开启 AOF 持久化,开启继续执行后续流程,未开启执行加载 RDB 文件的流程;判断 appendonly.aof 文件是否存在,文件存在则执行后续流程;判断 AOF 文件开头是 RDB 的格式, 先加载 RDB 内容再加载剩余的 AOF 内容;判断 AOF 文件开头不是 RDB 的格式,直接以 AOF 格式加载整个文件。3.Redis基本类型和原理 1.字符串 实现原理:SDS,包含了三种不同的数据类型:int、embstr 和 raw。int 类型很好理解,整数类型对应的就是 int 类型,而字符串则对应是 embstr 类型,当字符串长度大于 44 字节时,会变为 raw 类型存储。 2.Hash 字典类型本质上是由数组和链表结构组成的。来看字典类型的源码实现:typedef struct dictEntry { // dict.h void *key; union { void *val; uint64_t u64; int64_t s64; double d; } v; struct dictEntry *next; // 下一个 entry } dictEntry;通常情况下字典类型会使用数组的方式来存储相关的数据,但发生哈希冲突时才会使用链表的结构来存储数据。Redis 为了保证应用的高性能运行,提供了一个重要的机制——渐进式 rehash。 渐进式 rehash 是用来保证字典缩放效率的,也就是说在字典进行扩容或者缩容是会采取渐进式 rehash 的机制。 3.List 列表类型 (List) 是一个使用链表结构存储的有序结构,它的元素插入会按照先后顺序存储到链表结构中,因此它的元素操作 (插入\删除) 时间复杂度为 O(1),所以相对来说速度还是比较快的,但它的查询时间复杂度为 O(n),因此查询可能会比较慢。底层:列表类型并不是简单的双向链表,而是采用了 quicklist 的数据结构对数据进行存取,quicklist 是 Redis 3.2 新增的数据类型,它的底层采取的是压缩列表加双向链表的存储结构,quicklist 为了存储更多的数据,会对每个 quicklistNode 节点进行压缩,这样就可以有效的存储更多的消息队列或者文章的数据 4.Set 集合类型是由整数集合 (intset) 或者是哈希表 (hashtable) 组成的,集合类型比较适合用来数据去重和保障数据的唯一性,除此之外,集合类型还可以用来统计多个集合的交集、错集和并集 (见附录)。当我们存储的数据是无序并且需要去重的情况下,比较适合使用集合类型进行存储。当元素都为整数并且元素的个数没有到达设置的最大值时,键值的存储使用的是 intset 的数据结构,反之到元素超过了一定的范围,又或者是存储的元素为非整数时,集合会选择使用 hashtable 的数据结构进行存储。 5.Sorted Set 有序集合是由 ziplist (压缩列表) 或 skiplist (跳跃表) 组成的。 为什么是跳跃表?而非红黑树? 因为跳跃表的性能和红黑树基本相近,但却比红黑树更好实现,所有 Redis 的有序集合会选用跳跃表来实现存储。4.Redis事务Redis 中的事务从开始到结束也是要经历三个阶段:开启事务命令入列执行事务/放弃事务其中,开启事务使用 multi 命令,事务执行使用 exec 命令,放弃事务使用 discard 命令。5.Redis键值过期的操作Redis 中设置过期时间主要通过以下四种方式:expire key seconds:设置 key 在 n 秒后过期;pexpire key milliseconds:设置 key 在 n 毫秒后过期;expireat key timestamp:设置 key 在某个时间戳(精确到秒)之后过期;pexpireat key millisecondsTimestamp:设置 key 在某个时间戳(精确到毫秒)之后过期;字符串中的过期操作字符串中几个直接操作过期时间的方法,如下列表:set key value ex seconds:设置键值对的同时指定过期时间(精确到秒);set key value px milliseconds:设置键值对的同时指定过期时间(精确到毫秒);setex key seconds valule:设置键值对的同时指定过期时间(精确到秒)。6.过期策略Redis 会删除已过期的键值,以此来减少 Redis 的空间占用,但因为 Redis 本身是单线的,如果因为删除操作而影响主业务的执行就得不偿失了,为此 Redis 需要制定多个(过期)删除策略来保证糟糕的事情不会发生。常见的过期策略有以下三种:定时删除惰性删除定期删除Redis 使用的是惰性删除加定期删除的过期策略。7.Redis 管道技术—Pipeline管道技术是将任务一次性批量发送到服务端处理,然后批量返回结果的一种的技术,主要是为了解决每条命令执行后需要等待的情况,从而有效的提高了程序的执行效率。但是使用管道技术也要注意避免发送的命令过大,或者管道内的数据太多而导致的网络阻塞。8.游标迭代器(过滤器)——ScanScan 是一个系列指令,除了 Scan 之外,还有以下 3 个命令:HScan 遍历字典游标迭代器SScan 遍历集合的游标迭代器ZScan 遍历有序集合的游标迭代器Scan 具备以下几个特点:Scan 可以实现 keys 的匹配功能;Scan 是通过游标进行查询的不会导致 Redis 假死;Scan 提供了 count 参数,可以规定遍历的数量;Scan 会把游标返回给客户端,用户客户端继续遍历查询;Scan 返回的结果可能会有重复数据,需要客户端去重;单次返回空值且游标不为 0,说明遍历还没结束;Scan 可以保证在开始检索之前,被删除的元素一定不会被查询出来;在迭代过程中如果有元素被修改, Scan 不保证能查询出相关的元素。9.HyperLogLog类似集合的效果,用于快速的统计count的算法,可以用极小的空间占用实现,但是由于是基于hash,可能会有一定的误差(0.83%)。HyperLogLog的算法:相当于把存储的值经过 hash 之后,再将 hash 值转换为二进制,存入到不同的桶中,这样就可以用很小的空间存储很多的数据,统计时再去相应的位置进行对比很快就能得出结论,这就是 HLL 算法的基本原理,想要更深入的了解算法及其推理过程,可以看去原版的论文,链接地址在文末。10.内存淘汰机制算法Redis 内存淘汰策略和过期回收策略是完全不同的概念,内存淘汰策略是解决 Redis 运行内存过大的问题的,通过与 maxmemory 比较,决定要不要淘汰数据,根据 maxmemory-policy 参数,决定使用何种淘汰策略,在 Redis 4.0 之后已经有 8 种淘汰策略了,默认的策略是 noeviction 当内存超出时不淘汰任何键值,只是新增操作会报错。
2024年10月22日
40 阅读
0 评论
0 点赞
2024-09-23
《转载》Flink之Akka RPC通信
1、简说AkkaFlink 内部节点之间的通信是用 Akka,比如 JobManager 和 TaskManager 之间的通信。 而 operator 之间的数据传输是利用 Netty,所以是不是有必要说一下Akka ?Akka和Actor并发问题的核心就是存在数据共享,同时有多个线程对一份数据进行修改,就会出现错乱的情况解决该问题一般有两种方式 :1、基于JVM内存模型的设计,通常需要通过加锁等同步机制保证共享数据的一致性。但是加锁在高并发的场景下,往往性能不是很好2、使用消息传递的方式Actor的基础就是消息传递,一个Actor可以认为是一个基本的计算单元,它能接收消息并基于其执行运算,它也可以发送消息给其他Actor。Actors 之间相互隔离,它们之间并不共享内存Actor 本身封装了状态和行为,在进行并发编程时,Actor只需要关注消息和它本身。而消息是一个不可变对象,所以 Actor 不需要去关注锁和内存原子性等一系列多线程常见的问题。所以Actor是由状态(State)、行为(Behavior)和邮箱(MailBox,可以认为是一个消息队列)三部分组成状态:Actor 中的状态指Actor对象的变量信息,状态由Actor自己管理,避免了并发环境下的锁和内存原子性等问题行为:Actor 中的计算逻辑,通过Actor接收到的消息来改变Actor的状态邮箱:邮箱是 Actor 和 Actor 之间的通信桥梁,邮箱内部通过 FIFO(先入先出)消息队列来存储发送方Actor消息,接受方Actor从邮箱队列中获取消息任意一个Actor即可发送消息,也可以接受消息Akka是一个构建在JVM上,基于Actor模型的的并发框架,支持Java和Scala两种API2、Akka详解使用 Akka 可以让你从为 Actor 系统创建基础设施和编写控制基本行为所需的初级代码中解脱出来。为了理解这一点,让我们看看在代码中创建的Actor与Akka在内部创建和管理的Actor之间的关系,Actor的生命周期和失败处理Akka的Actor层级Akka的Actor总是属于父Actor。通常,你可以通过调用 getContext().actorOf() 来创建 Actor。与创建一个“独立的”Actor不同,这会将新Actor作为一个子节点注入到已经存在的树中,创建Actor的Actor成为新创建的子Actor的父级。你可能会问,你创造的第一个Actor的父节点是谁?如下图所示,所有的 Actor 都有一个共同的父节点,即用户守护者。可以使用 system.actorOf() 在当前Actor下创建新的Actor实例。创建 Actor 将返回一个有效的 URL 引用。例如,如果我们用 system.actorOf(..., "someActor") 创建一个名为 someActor 的 Actor,它的引用将包括路径 /user/someActor事实上,在你在代码中创建 Actor 之前,Akka 已经在系统中创建了三个 Actor 。这些内置的 Actor 的名字包含 guardian ,因为他们守护他们所在路径下的每一个子 Actor。守护者 Actor 包括 :/ ,根守护者( root guardian )。这是系统中所有Actor的父Actor,也是系统本身终止时要停止的最后一个 Actor/user ,守护者( guardian )。这是用户创建的所有Actor的父 Actor。不要让用户名混淆,它与最终用户和用户处理无关。使用Akka库创建的每个Actor都将有一个事先准备的固定路径/user//system ,系统守护者( system guardian )pom.xml :<properties> <akka.version>2.6.9</akka.version> </properties> <dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor-typed_2.13</artifactId> <version>${akka.version}</version> </dependency> </dependencies>示例:package com.journey.flink.akka.task2; import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; public class HierarchyActorTest { public static void main(String[] args) throws Exception { ActorSystem system = ActorSystem.create("testSystem"); ActorRef firstActor = system.actorOf(MyPrintActor.props(), "firstActor"); System.out.println("firstActor : " + firstActor); firstActor.tell("print_info", ActorRef.noSender()); System.out.println(">>> Press ENTER to exit <<<"); try { System.in.read(); } finally { system.terminate(); } } // 创建Actor static class MyPrintActor extends AbstractActor { static Props props() { return Props.create(MyPrintActor.class, MyPrintActor::new); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals("print_info", p -> { ActorRef secondActorRef = getContext().actorOf(Props.empty()); System.out.println("secondActorRef : " + secondActorRef); }).build(); } } }输出结果 :firstActor : Actor[akka://testSystem/user/firstActor#-1802697549] >>> Press ENTER to exit <<< secondActorRef : Actor[akka://testSystem/user/firstActor/$a#-1282757800]两条路径都以akka://testSystem/开头。因为所有 Actor的引用都是有效的URL, akka://是协议字段的值ActorSystem名为testSystem ,但它可以是任何其他名称。如果启用了多个系统之间的远程通信,则URL的这一部分包括主机名和端口,以便其他系统可以在网络上找到它,下面会有案例因为第二个 Actor的引用包含路径 /firstActor/ ,这个标识它为第一个Actor的子ActorActor引用的最后一部分,即#-1802697549和#-1282757800是唯一标识符Actor的生命周期既然了解了Actor层次结构的样子,你可能会想 : 为什么我们需要这个层次结构?它是用来干什么的?层次结构的一个重要作用是安全地管理Actor的生命周期。接下来,我们来考虑一下,这些知识如何帮助我们编写更好的代码Actor在被创建时就会出现,然后在用户请求时被停止。每当一个Actor被停止时,它的所有子 Actor也会被递归地停止。这种行为大大简化了资源清理,并有助于避免诸如由打开的套接字和文件引起的资源泄漏要停止Actor,建议的模式是调用Actor内部的 getContext().stop(getSelf()) 来停止自身,通常是对某些用户定义的停止消息的响应,或者当Actor完成其任务时Akka Actor的API暴露了许多生命周期的钩子,你可以在 Actor 的实现中覆盖这些钩子。最常用的是 preStart() 和 postStop() 方法preStart() 在 Actor 启动之后但在处理其第一条消息之前调用postStop() 在 Actor 停止之前调用,在此时之后将不再处理任何消息示例:package com.journey.flink.akka.task3; import akka.actor.AbstractActor; import akka.actor.Props; public class Actor1 extends AbstractActor { static Props props() { return Props.create(Actor1.class, Actor1:: new); } @Override public void preStart() throws Exception { System.out.println("first actor started"); getContext().actorOf(Actor2.props(), "second"); } @Override public void postStop() throws Exception { System.out.println("first actor stopped"); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals("stop", s -> { getContext().stop(getSelf()); }).build(); } } package com.journey.flink.akka.task3; import akka.actor.AbstractActor; import akka.actor.Props; public class Actor2 extends AbstractActor { static Props props() { return Props.create(Actor2.class, Actor2::new); } @Override public void preStart() throws Exception { System.out.println("second actor started"); } @Override public void postStop() throws Exception { System.out.println("second actor stopped"); } @Override public Receive createReceive() { return receiveBuilder().build(); } } package com.journey.flink.akka.task3; import akka.actor.ActorRef; import akka.actor.ActorSystem; public class LifeCycleMain { public static void main(String[] args) { ActorSystem system = ActorSystem.create("testSystem"); ActorRef first = system.actorOf(Actor1.props(), "first"); first.tell("stop", ActorRef.noSender()); } }打印输出 :first actor started second actor started second actor stopped first actor stopped当我们停止 first Actor 时,它会在停止自身之前,先停止了它的子 Actor second 。这个顺序是严格的,在调用父Actor的 postStop() 钩子之前,会先调用所有子Actor的 postStop() 钩子失败处理父 Actor 和子 Actor 在他们的生命周期中是相互联系的。当一个 Actor 失败(抛出一个异常或从接收中冒出一个未处理的异常)时,它将暂时挂起。如前所述,失败信息被传播到父 Actor,然后父 Actor 决定如何处理由子 Actor 引起的异常。这样,父 Actor 就可以作为子 Actor 的监督者( supervisors )。默认的监督策略是停止并重新启动子 Actor。如果不更改默认策略,所有失败都会导致重新启动示例:package com.journey.flink.akka.task4; import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; public class SupervisingActor extends AbstractActor { public static void main(String[] args) throws Exception { ActorSystem system = ActorSystem.create("testSystem"); ActorRef supervisingActor = system.actorOf(SupervisingActor.props(), "supervising-actor"); supervisingActor.tell("failChild", ActorRef.noSender()); } static Props props() { return Props.create(SupervisingActor.class, SupervisingActor :: new); } ActorRef child = getContext().actorOf(SupervisedActor.props(), "supervised-actor"); @Override public Receive createReceive() { return receiveBuilder() .matchEquals("failChild", f -> { child.tell("fail", getSelf()); }).build(); } } class SupervisedActor extends AbstractActor { static Props props() { return Props.create(SupervisedActor.class, SupervisedActor::new); } @Override public void preStart() throws Exception { System.out.println("supervised actor started"); } @Override public void postStop() throws Exception { System.out.println("supervised actor stopped"); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals("fail", f -> { System.out.println("supervised actor fails now"); throw new Exception("I failed"); }).build(); } }打印如下 :supervised actor started supervised actor fails now supervised actor stopped supervised actor started [ERROR] [04/04/2023 10:37:51.084] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/supervising-actor/supervised-actor] I failed java.lang.Exception: I failed我们看到失败后,被监督的 Actor 停止并立即重新启动,我们使用了preStart()和postStop()钩子,这是重启后和重启前 默认调用的钩子,因此我们无法区分 Actor 内部是第一次启动还是重启。实际上,在重新启动时,调用 的是preRestart()和postRestart()方法,但如果不重写这两个方法,则默认委托给preStart()Actor剖析由于 Akka 实施父级监督,每个 Actor 都受到其父级的监督并且监督其子级定义 Actor 类Actor 类是通过继承 AbstractActor 类并在 createReceive 方法中设置“初始行为”来实现 的createReceive方法没有参数,并返回AbstractActor.Receive。它定义了 Actor 可以处理哪些消 息,以及如何处理消息的实现。可以使用名为ReceiveBuilder的生成器来构建此类行为。在 AbstractActor中,有一个名为receiveBuilder的方便的工厂方法 package com.journey.flink.akka.task5; import akka.actor.AbstractActor; public class MyActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder() .match( String.class, info -> { System.out.println("Received String message " + info); } ) .matchAny( e -> { System.out.println("received unknown message"); } ) .build(); } }PropsProps 是一个配置类,用于指定创建 Actor 的选项,将其视为不可变的,因此可以自由共享,用于创建 Actor 的方法,包括关联的部署信息Props.create(Actor1.class, Actor1::new)Actor APIAbstractActor类定义了一个名为createReceive的方法,该方法用于设置 Actor 的“初始行为”此外,它还提供 :getSelf(),对 Actor 的ActorRef的引用getSender(),前一次接收到的消息的发送方 Actor 的引用supervisorStrategy(),用户可重写定义用于监视子 Actor 的策略getContext() ,公开 Actor 和当前消息的上下文信息,例如: 创建子 Actor 的工厂方法(actorOf)Actor核心概念发送消息tell 的意思是“发送并忘记( fire-and-forget )”,例如异步发送消息并立即返回,target.tell(message, getSelf())ask 异步发送消息,并返回一个表示可能的答复,ask模式涉及 Actor 和Future回复消息如果你想有一个回复消息的句柄,可以使用 getSender() ,它会给你一个 ActorRef 。你可 以通过使用 getSender().tell(replyMsg, getSelf()) 发送 ActorRef 来进行回复。你还 可以存储 ActorRef 以供稍后回复或传递给其他 Actorpublic class ActionCommunication { static class Actor1 extends AbstractActor { private ActorRef actor2 = getContext().actorOf(Actor2.props(), "actor2"); static Props props() { return Props.create(Actor1.class, Actor1::new); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals("hello", f -> { actor2.tell("hello", getSelf()); }) .matchEquals("ack", f -> { System.out.println("nice to meet you too."); }) .build(); } } static class Actor2 extends AbstractActor { static Props props() { return Props.create(Actor2.class, Actor2::new); } @Override public Receive createReceive() { return receiveBuilder().matchEquals("hello", f -> { System.out.println("nice to meet you."); getSender().tell("ack", getSelf()); }).build(); } } public static void main(String[] args) throws Exception { ActorSystem system = ActorSystem.create("testSystem"); ActorRef firstActor = system.actorOf(Actor1.props(), "first"); firstActor.tell("hello", ActorRef.noSender()); } }接受超时package com.journey.flink.akka.task5; import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.ReceiveTimeout; import java.time.Duration; public class TimeoutActor extends AbstractActor { public TimeoutActor() { // 设置初始化延迟 getContext().setReceiveTimeout(Duration.ofMillis(100)); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals("hello", s -> { // 其实就是如果超过100ms没有接收消息,就会调用 ReceiveTimeout getContext().setReceiveTimeout(Duration.ofMillis(100)); }) .match(ReceiveTimeout.class, r -> { System.out.println("超时"); getContext().cancelReceiveTimeout(); getContext().stop(getSelf()); }) .build(); } public static void main(String[] args) throws Exception { ActorSystem system = ActorSystem.create("testSystem"); ActorRef timeoutActor2 = system.actorOf(Props.create(TimeoutActor.class, TimeoutActor::new)); int num = 0; while (num <= 10) { System.out.println("send...." + num); timeoutActor2.tell("hello", ActorRef.noSender()); Thread.sleep(10); num++; } } }Actor的调度器调度器(Dispatchers)是 Akka 核心的一部分,这意味着它们也是 akka-actor 依赖的一部分默认调度器每个 ActorSystem 都将有一个默认的调度器,在没有为 Actor 配置其他内容的情况下使用该调度器。如果ActorSystem是通过ExecutionContext传入来创建的,则此ExecutionContext将用作此ActorSystem中所有程序的执行位置。如果没有给出ExecutionContext,它将回退到akka.actor.default-dispatcher.default-executor.fallback作为执行上下文。默认情况下,这是一个“fork-join-executor”,它在大多数情况下都有出色的性能为 Actor 设置调度器# 配置一个调度器 my-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # 使用哪种ExecutionService executor = "fork-join-executor" # 配置fork join池 fork-join-executor { # 最小线程数 parallelism-min = 2 # Parallelism (threads) ... ceil(available processors * factor) parallelism-factor = 2.0 # Max number of threads to cap factor-based parallelism number to parallelism-max = 10 } # 吞吐量定义了最大的消息数量 # 一个actor最多只能占用某线程100个消息的时间 # 设置为1可以尽可能公平地 throughput = 100 }代码中定义 :ActorRef myActor = system.actorOf(Props.create(MyActor.class).withDispatcher("my-dispatcher"), "myactor3");调度器类型有三种不同类型的消息调度器Dispatcher : 这是一个基于事件的调度程序,它将一组 Actor 绑定到线程池。如果未指定调度器,则使用默认调度器PinnedDispatcher : 这个调度器为每个使用它的演员分配一个独特的线程;即每个Actor将拥有自己的线程池,池中只有一个线程CallingThreadDispatcher : 该调度程序仅在当前线程上运行调用。这个调度程序不会创建任何新的线程,但可以同时为不同的线程使用同一个actor,主要用于测试3、Flink中基于Akka的RPCFLink RPC核心组件Flink 中的 RPC 实现主要在 flink-runtime 模块下的 org.apache.flink.runtime.rpc 包中,涉及到的最重要的 API 主要是以下这四个RpcGateway网关,都是 RpcGateWay 的子类接口是用于远程调用的代理接口。 RpcGateway 提供了获取其所代理的 RpcEndpoint 的地址的方法在实现一个提供 RPC 调用的组件时,通常需要先定一个接口,该接口继承 RpcGateway 并约定好提供的远程调用的方法,getAddress() & getHostname()RpcServiceRPC服务的接口,用于连接到一个远程的RPC server,或者启动一个rpc server来转发远程调用到rpcEndpoint是 RpcEndpoint 的运行时环境, RpcService 提供了启动 RpcEndpoint , 连接到远端 RpcEndpoint 并返回远端 RpcEndpoint 的代理对象等方法。此外, RpcService 还提供了某些异步任务或者周期性调度任务的方法RpcServer相当于 RpcEndpoint 自身的的代理对象(self gateway)。 RpcServer 是 RpcService 在启动了 RpcEndpoint 之后返回的对象,每一个 RpcEndpoint 对象内部都有一个 RpcServer 的成员变量,通过 getSelfGateway 方法就可以获得自身的代理,然后调用该Endpoint 提供的服务RpcEndpoint是对 RPC 框架中提供具体服务的实体的抽象,所有提供远程调用方法的组件都需要继承 该抽象类类似于Acotr,封装了传输的业务逻辑源码分析RpcGateway :/** * TODO 定义通信行为;用于远程调用RpcEndpoint的某些方法,可以理解为对方的客户端代理 */ public interface RpcGateway { String getAddress(); String getHostname(); }RpcEndpoint : 启动rpcServerpublic abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { protected final Logger log = LoggerFactory.getLogger(getClass()); /** * 1、根据提供的RpcEndpoint来启动和停止RpcServer(Actor) * 2、根据提供的地址连接到(对方)RpcServer,并返回一个RpcGateway * 3、延迟、立刻调度Runnable、Callable */ private final RpcService rpcService; /** / protected final RpcServer rpcServer; ..... protected RpcEndpoint(final RpcService rpcService, final String endpointId) { // 保存rpcService和endpointId this.rpcService = checkNotNull(rpcService, "rpcService"); this.endpointId = checkNotNull(endpointId, "endpointId"); // 通过 RpcService 启动RpcServer /** * 构造的时候调用 rpcService.startServer()启动RpcServer,进入可以接受处理请求的状态,最后将RpcServer绑定到主线程上 * 真正执行起来 * 在RpcEndpoint中还定义了一些放入如 runAsync(Runnable)、callAsync(Callable,Time)方法来执行Rpc调用,值得注意的是在Flink * 的设计中,对于同一个Endpoint,所有的调用都运行在主线程,因此不会有并发问题,当启动RpcEndpoint进行Rpc调用时,其会委托RpcServer进行处理 */ this.rpcServer = rpcService.startServer(this); // 主线程执行器,所有调用在主线程中串行执行 this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread); } .... @Override public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) { checkNotNull(rpcEndpoint, "rpc endpoint"); /** * 根据RpcEndpoint的类型来创建对应的Actor,目前支持两种Actor的创建 * 1、AkkaRpcActor * 2、FencedAkkaRpcActor,对AkkaRpcActor进行扩展,能够过滤到与指定token无关的消息 */ final SupervisorActor.ActorRegistration actorRegistration = registerAkkaRpcActor(rpcEndpoint); // 这里拿到的是AkkaRpcActor的引用 final ActorRef actorRef = actorRegistration.getActorRef(); final CompletableFuture<Void> actorTerminationFuture = actorRegistration.getTerminationFuture(); LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path()); final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef); final String hostname; Option<String> host = actorRef.path().address().host(); if (host.isEmpty()) { hostname = "localhost"; } else { hostname = host.get(); } // 提取集成RpcEndpoint的所有子类 Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass())); implementedRpcGateways.add(RpcServer.class); implementedRpcGateways.add(AkkaBasedEndpoint.class); // 对上述指定的类集合进行代理 final InvocationHandler akkaInvocationHandler; if (rpcEndpoint instanceof FencedRpcEndpoint) { // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler akkaInvocationHandler = new FencedAkkaInvocationHandler<>( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), actorTerminationFuture, ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken, captureAskCallstacks); implementedRpcGateways.add(FencedMainThreadExecutable.class); } else { akkaInvocationHandler = new AkkaInvocationHandler( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), actorTerminationFuture, captureAskCallstacks); } // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader = getClass().getClassLoader(); // 针对RpcServer生成一个动态代理 @SuppressWarnings("unchecked") RpcServer server = (RpcServer) Proxy.newProxyInstance( classLoader, implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]), akkaInvocationHandler); return server; }AkkaRpcActor :@Override public Receive createReceive() { return ReceiveBuilder.create() // TODO 重点 // TODO 如果是 RemoteHandshakeMessage 信息,执行 handleHandshakeMessage,处理握手消息 .match(RemoteHandshakeMessage.class, this::handleHandshakeMessage) // TODO 如果是 ControlMessages,执行 handleControlMessage,如启动、停止、中断 .match(ControlMessages.class, this::handleControlMessage) // TODO 其他情况,handleMessage .matchAny(this::handleMessage) .build(); } private void handleMessage(final Object message) { if (state.isRunning()) { mainThreadValidator.enterMainThread(); try { // 处理消息 handleRpcMessage(message); } finally { mainThreadValidator.exitMainThread(); } } else { log.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.", rpcEndpoint.getClass().getName(), message.getClass().getName()); sendErrorIfSender(new AkkaRpcException( String.format("Discard message, because the rpc endpoint %s has not been started yet.", rpcEndpoint.getAddress()))); } }connect :@Override public <C extends RpcGateway> CompletableFuture<C> connect( final String address, final Class<C> clazz) { // 连接远程Rpc Server,返回的是代理对象 return connectInternal( address, clazz, (ActorRef actorRef) -> { Tuple2<String, String> addressHostname = extractAddressHostname(actorRef); return new AkkaInvocationHandler( addressHostname.f0, addressHostname.f1, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), null, captureAskCallstacks); }); } private <C extends RpcGateway> CompletableFuture<C> connectInternal( final String address, final Class<C> clazz, Function<ActorRef, InvocationHandler> invocationHandlerFactory) { checkState(!stopped, "RpcService is stopped"); LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.", address, clazz.getName()); // 根据Akka Actor地址获取ActorRef final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address); // 发送一个握手成功的消息给远程Actor final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose( (ActorRef actorRef) -> FutureUtils.toJava( Patterns .ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), configuration.getTimeout().toMilliseconds()) .<HandshakeSuccessMessage>mapTo(ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class)))); // 创建动态代理,并返回 return actorRefFuture.thenCombineAsync( handshakeFuture, (ActorRef actorRef, HandshakeSuccessMessage ignored) -> { // AkkaInvocationHandler,针对客户端会调用 invokeRpc InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef); // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader = getClass().getClassLoader(); // 创建动态代理 @SuppressWarnings("unchecked") C proxy = (C) Proxy.newProxyInstance( classLoader, new Class<?>[]{clazz}, invocationHandler); return proxy; }, actorSystem.dispatcher()); }AkkaInvocationHandler : 同时服务端和客户端同时使用@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class<?> declaringClass = method.getDeclaringClass(); Object result; // 判断方法的类是否为指定的类,符合如下的类,执行本地调用,否则实行远程调用 if (declaringClass.equals(AkkaBasedEndpoint.class) || declaringClass.equals(Object.class) || declaringClass.equals(RpcGateway.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(RpcServer.class)) { result = method.invoke(this, args); } else if (declaringClass.equals(FencedRpcGateway.class)) { throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" + method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " + "fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " + "retrieve a properly FencedRpcGateway."); } else { // TODO 客户端会进行这里,进行远程的调用 result = invokeRpc(method, args); } return result; }4、Flink RPC实例<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor-typed_2.13</artifactId> <version>2.6.9</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-remote_2.13</artifactId> <version>2.6.9</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-rpc-akka</artifactId> <version>1.17.0</version> </dependency>定义TaskGateway接口,继承RpcGateway接口 :package org.apache.flink.runtime.rpc.akka; import org.apache.flink.runtime.rpc.RpcGateway; public interface TaskGateway extends RpcGateway { String sayHello(String name); }创建一个TaskEndpoint,继承RpcEndpoint同时实现TaskGateway接口package org.apache.flink.runtime.rpc.akka; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; public class TaskEndpoint extends RpcEndpoint implements TaskGateway { public TaskEndpoint(RpcService rpcService) { super(rpcService); } @Override public String sayHello(String name) { return "hello " + name; } }Server端 :package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorSystem; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration; public class FlinkRpcServer { public static void main(String[] args) throws Exception { // 1. 创建RPC服务 ActorSystem defaultActorSystem = AkkaUtils.createDefaultActorSystem(); AkkaRpcService akkaRpcService = new AkkaRpcService(defaultActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration()); // 2. 创建TaskEndpoint实例 TaskEndpoint endpoint = new TaskEndpoint(akkaRpcService); System.out.println("address : "+endpoint.getAddress()); // 3. 启动Endpoint endpoint.start(); } }client端 :package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorSystem; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration; import java.util.concurrent.CompletableFuture; public class FlinkRpcClient { public static void main(String[] args) throws Exception { // 1. 创建RPC服务 ActorSystem defaultActorSystem = AkkaUtils.createDefaultActorSystem(); AkkaRpcService akkaRpcService = new AkkaRpcService(defaultActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration()); // 2. 连接远程RPC服务,注意:连接地址是服务端程序打印的地址 CompletableFuture<TaskGateway> gatewayFuture = akkaRpcService .connect("akka.tcp://flink@192.168.31.162:53465/user/rpc/c38b60e7-c87c-4b7f-b948-3099dfb12999", TaskGateway.class); // 3. 远程调用 TaskGateway gateway = gatewayFuture.get(); System.out.println(gateway.sayHello("flink-akka")); } }注意:必须要在 org.apache.flink.runtime.rpc.akka 这个包下,因为AkkaUtils是这个包的可见如感兴趣,点赞加关注,谢谢
2024年09月23日
18 阅读
0 评论
0 点赞
2024-09-12
Spark3中的Catalog组件设计
接口定义 CatalogPlugin:catalog的顶级接口,该接口主要实现了catalog的name和defaultNamespace。比如V2SessionCatalog的默认实现就是name="spark_catalog",defaultNamespace="defualt"TableCatalog:定义Table的相关接口,包括listTable, loadTable, CreateTable, DropTable,AlterTable等。CatalogExtension:继承自TableCatalog,通过setDelegateCatalog()方法把上面的V2SessionCatalog 实例进行代理。经过实际测试发现,无法改变catalog的name,所以只能对原始的catalog进行功能扩展。DelegatingCatalogExtension:抽象类,包含了对CatalogExtendsion的所有方法实现。总结:如果需要自定义catalog,则需实现TableCatalog即可。如果需要对当前的catalog进行扩展,则实现CatalogExtension或者重写DelegatingCatalogExtension的方法。Catalog初始化过程 Spark通过CatalogManager管理多个catalog,通过spark.sql.catalog.${name} 可以注册多个catalog,Spark的默认实现则是spark.sql.catalog.spark_catalog。1.SparkSession在创建时,可以选择是否enableHiveSupport()来指定catalog类型,如果指定了则会实例化HiveExternalCatalog,不指定则实例InMemoryCatalog。2.在BaseSessionStateBuilder/HiveSessionStateBuilder则会使用上面实例化的externalCatalog创建catalog对象,再根据catalog对象创建V2SessionCatalog对象(V2SessionCatalog就是对SessionCatalog的封装)3.根据catalog和v2SessionCatalog创建CatalogManager实例,CatalogManager中内置一个catalogs的HashMap<String,CatalogPlugin>来管理catalog。4.再SQL中调用catalog时候,会通过CatalogManger.catalog(String name)返回对应名称的catalog实例,如果没有则通过Catalog.load(name, conf)来进行实例化5.对于默认的spark_catalog,如果需要进行扩展,则可以通过spark.sql.catalog.spark_catalog来指定对应的实现类,该类可以通过实现CatalogExtension,然后会自动调用其setDelegateCatalog()来设置默认的spark_catalog为代理对象,然后进行扩展即可。总结:在DQC SQL升级引擎的过程中,使用了Spark3的Catalog特性来支持原生的multi catalog关联查询。对于JDBC协议的数据库来说,则使用JDBCTableCatalog来进行扩展catalog即可;对于Hive来说,第一种方案是实现CatalogExtension来扩展,但是发现这种方案无法修改其catalog name来适配元数据中心的hive catalog名称;第二种方案而通过完全自定义TableCatalog则需要完全初始化一套SparkSessionCatalog,在实践的过程中发现,可以进行元数据的拉取,但是在loadTable读取数据的时候报“表不支持batch read"的问题;第三种方案是采用修改DQC SQL的hvie catalog名称统一为spark_catalog的方式来实现Hive Catalog的使用,算是一个取巧的办法,后续有时间则可以继续扩展SparkSessionCatalog来实现hive的适配。
2024年09月12日
104 阅读
2 评论
0 点赞
1
2
3
4