首页
关于
Search
1
SteamOS安装paru和yay
170 阅读
2
欢迎使用 Typecho
127 阅读
3
徕卡X | 冬日漫步
127 阅读
4
https自动跳转问题终极解决方案
116 阅读
5
Spark3中的Catalog组件设计
104 阅读
默认
摄影
后端开发
大数据
Spark
Flink
登录
Search
标签搜索
转载
ArlenDu
累计撰写
18
篇文章
累计收到
10
条评论
首页
栏目
默认
摄影
后端开发
大数据
Spark
Flink
页面
关于
搜索到
3
篇与
的结果
2024-10-30
Flink的状态
状态种类 Keyed State:Keyed State 总是和 Key 相关联,只能在 KeyedStream 的函数或算子中使用,因为 Flink 中的 keyBy 操作保证了每一个键相关联的所有消息都会送给下游算子的同一个并行实例处理,因此 Keyed State 也可以看作是 Operator State 的一种分区(partitioned)形式,每一个键都关联一个状态分片(state-partition)。Operator State:Operator State 绑定到算子的每一个并行实例(sub-task) 中。状态后端 (StateBackend)MemoryStateBackend: 将状态存储在内存中,通常只在调试和开发环境中用FsStateBackend: FsStateBackend 会将状态存储到一个持久化的存储中,如 HDFS,只在 JobManager 的内存中存储一些 metadata。RocksDBStateBackend:状态存储在RockDB中,并且支持增量快照。在最新版的 Flink 中,这三种形式的存储后端都支持异步快照模式。四种类型:ValueStateListStateReducingStateMapState要获取状态,首先需要定义状态描述符(StateDescriptor)。状态描述符状态的名字(保证唯一性),状态的类型,以及部分状态需要的自定义函数。根据想要获取的状态的不同,状态描述符也分为 ValueStateDescriptor, ListStateDescriptor, ReducingStateDescriptor, MapStateDescriptor。自定义状态CheckpointedFunction接口ListCheckpointed接口RuntimeContext对于 Keyed State,通常都是通过 RuntimeContext 实例来获取,这通常需要在 rich functions 中才可以做到。 RuntimeContext 提供的获取状态的方法包括:ValueState<T> getState(ValueStateDescriptor<T>) ReducingState<T> getReducingState(ReducingStateDescriptor<T>) ListState<T> getListState(ListStateDescriptor<T>) MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)检查点(CheckPoint)检查点机制还需要依赖于 1.支持数据重播的数据源 2.状态的持久存储终端。Flink 实现了一个轻量级的分布式快照机制,其核心点在于 Barrier。 Coordinator 在需要触发检查点的时候要求数据源注入向数据流中注入 barrier, barrier 和正常的数据流中的消息一起向前流动,相当于将数据流中的消息切分到了不同的检查点中。当一个 operator 从它所有的 input channel 中都收到了 barrier,则会触发当前 operator 的快照操作,并向其下游 channel 中发射 barrier。当所有的 sink 都反馈收到了 barrier 后,则当前检查点创建完毕。Barrier对齐一个关键的问题在于,一些 operator 拥有多个 input channel,它往往不会同时从这些 channel 中接收到 barrier。如果 Operator 继续处理 barrier 先到达的 channel 中的消息,那么在所有 channel 的 barrier 都到达时,operator 就会处于一种混杂的状态。在这种情况下,Flink 采用对齐操作来保证 Exactly Once 特性。Operator 会阻塞 barrier 先到达的 channel,通常是将其流入的消息放入缓冲区中,待收到所有 input channel 的 barrier 后,进行快照操作,释放被阻塞的 channel,并向下游发射 barrier。 对齐操作会对流处理造成延时,但通常不会特别明显。如果应用对一致性要求比较宽泛的话,那么也可以选择跳过对齐操作。这意味着快照中会包含一些属于下一个检查点的数据,这样就不能保证 Exactly Once 特性,而只能降级为 At Least Once。异步快照前述的检查点创建流程中,在 operator 进行快照操作时,不再处理数据流中的消息。这种同步的方式会在每一次进行快照操作的时候引入延时。实际上,Flink 也支持采用异步的方式创建快照,这就要求 operator 在触发快照操作的时候创建一个不受后续操作影响的状态对象,通常选用 copy-on-write 的数据结构。Flink 中基于 RocketDB 的状态存储后端就可以支持异步操作。保存点(Savepoint)所谓的保存点,其实是用户手动触发的一种特殊的检查点。其本质就是检查点,但它相比检查点有两点不同:1.用户自行触发 2.当有新的已完成的检查点产生的时候,不会自动失效。可查询状态(Queryable State)Flink 自1.2起新增了一个 Queryable State 特性,允许从 Flink 系统外直接查询作业流水中的状态。这主要是来自于两方面的诉求:1)很多应用都有直接获取应用实时状态的需求,2)将状态频繁写入外部系统中可能是应用的瓶颈。有两种方式来使用 Queryable State :QueryableStateStream, 将 KeyedStream 转换为 QueryableStateStream,类似于 Sink,后续不能进行任何转换操作StateDescriptor#setQueryable(String queryableStateName),将 Keyed State 设置为可查询的 (不支持 Operator State)外部应用在查询 Flink 作业内部状态的时候要使用 QueryableStateClient, 提交异步查询请求来获取状态。下面的两张图大致给出了 Queryable State 的工作机制: 目前还存在的一些局限:只能支持 Keyed Operator对状态大小是否有限制(不支持 ListState)作业失败后如何保证可用性Clent API 的易用性参考Working with StateA Deep Dive into Rescalable State in Apache FlinkQueryable States in ApacheFlink - How it works
2024年10月30日
21 阅读
0 评论
0 点赞
2024-09-23
《转载》Flink之Akka RPC通信
1、简说AkkaFlink 内部节点之间的通信是用 Akka,比如 JobManager 和 TaskManager 之间的通信。 而 operator 之间的数据传输是利用 Netty,所以是不是有必要说一下Akka ?Akka和Actor并发问题的核心就是存在数据共享,同时有多个线程对一份数据进行修改,就会出现错乱的情况解决该问题一般有两种方式 :1、基于JVM内存模型的设计,通常需要通过加锁等同步机制保证共享数据的一致性。但是加锁在高并发的场景下,往往性能不是很好2、使用消息传递的方式Actor的基础就是消息传递,一个Actor可以认为是一个基本的计算单元,它能接收消息并基于其执行运算,它也可以发送消息给其他Actor。Actors 之间相互隔离,它们之间并不共享内存Actor 本身封装了状态和行为,在进行并发编程时,Actor只需要关注消息和它本身。而消息是一个不可变对象,所以 Actor 不需要去关注锁和内存原子性等一系列多线程常见的问题。所以Actor是由状态(State)、行为(Behavior)和邮箱(MailBox,可以认为是一个消息队列)三部分组成状态:Actor 中的状态指Actor对象的变量信息,状态由Actor自己管理,避免了并发环境下的锁和内存原子性等问题行为:Actor 中的计算逻辑,通过Actor接收到的消息来改变Actor的状态邮箱:邮箱是 Actor 和 Actor 之间的通信桥梁,邮箱内部通过 FIFO(先入先出)消息队列来存储发送方Actor消息,接受方Actor从邮箱队列中获取消息任意一个Actor即可发送消息,也可以接受消息Akka是一个构建在JVM上,基于Actor模型的的并发框架,支持Java和Scala两种API2、Akka详解使用 Akka 可以让你从为 Actor 系统创建基础设施和编写控制基本行为所需的初级代码中解脱出来。为了理解这一点,让我们看看在代码中创建的Actor与Akka在内部创建和管理的Actor之间的关系,Actor的生命周期和失败处理Akka的Actor层级Akka的Actor总是属于父Actor。通常,你可以通过调用 getContext().actorOf() 来创建 Actor。与创建一个“独立的”Actor不同,这会将新Actor作为一个子节点注入到已经存在的树中,创建Actor的Actor成为新创建的子Actor的父级。你可能会问,你创造的第一个Actor的父节点是谁?如下图所示,所有的 Actor 都有一个共同的父节点,即用户守护者。可以使用 system.actorOf() 在当前Actor下创建新的Actor实例。创建 Actor 将返回一个有效的 URL 引用。例如,如果我们用 system.actorOf(..., "someActor") 创建一个名为 someActor 的 Actor,它的引用将包括路径 /user/someActor事实上,在你在代码中创建 Actor 之前,Akka 已经在系统中创建了三个 Actor 。这些内置的 Actor 的名字包含 guardian ,因为他们守护他们所在路径下的每一个子 Actor。守护者 Actor 包括 :/ ,根守护者( root guardian )。这是系统中所有Actor的父Actor,也是系统本身终止时要停止的最后一个 Actor/user ,守护者( guardian )。这是用户创建的所有Actor的父 Actor。不要让用户名混淆,它与最终用户和用户处理无关。使用Akka库创建的每个Actor都将有一个事先准备的固定路径/user//system ,系统守护者( system guardian )pom.xml :<properties> <akka.version>2.6.9</akka.version> </properties> <dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor-typed_2.13</artifactId> <version>${akka.version}</version> </dependency> </dependencies>示例:package com.journey.flink.akka.task2; import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; public class HierarchyActorTest { public static void main(String[] args) throws Exception { ActorSystem system = ActorSystem.create("testSystem"); ActorRef firstActor = system.actorOf(MyPrintActor.props(), "firstActor"); System.out.println("firstActor : " + firstActor); firstActor.tell("print_info", ActorRef.noSender()); System.out.println(">>> Press ENTER to exit <<<"); try { System.in.read(); } finally { system.terminate(); } } // 创建Actor static class MyPrintActor extends AbstractActor { static Props props() { return Props.create(MyPrintActor.class, MyPrintActor::new); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals("print_info", p -> { ActorRef secondActorRef = getContext().actorOf(Props.empty()); System.out.println("secondActorRef : " + secondActorRef); }).build(); } } }输出结果 :firstActor : Actor[akka://testSystem/user/firstActor#-1802697549] >>> Press ENTER to exit <<< secondActorRef : Actor[akka://testSystem/user/firstActor/$a#-1282757800]两条路径都以akka://testSystem/开头。因为所有 Actor的引用都是有效的URL, akka://是协议字段的值ActorSystem名为testSystem ,但它可以是任何其他名称。如果启用了多个系统之间的远程通信,则URL的这一部分包括主机名和端口,以便其他系统可以在网络上找到它,下面会有案例因为第二个 Actor的引用包含路径 /firstActor/ ,这个标识它为第一个Actor的子ActorActor引用的最后一部分,即#-1802697549和#-1282757800是唯一标识符Actor的生命周期既然了解了Actor层次结构的样子,你可能会想 : 为什么我们需要这个层次结构?它是用来干什么的?层次结构的一个重要作用是安全地管理Actor的生命周期。接下来,我们来考虑一下,这些知识如何帮助我们编写更好的代码Actor在被创建时就会出现,然后在用户请求时被停止。每当一个Actor被停止时,它的所有子 Actor也会被递归地停止。这种行为大大简化了资源清理,并有助于避免诸如由打开的套接字和文件引起的资源泄漏要停止Actor,建议的模式是调用Actor内部的 getContext().stop(getSelf()) 来停止自身,通常是对某些用户定义的停止消息的响应,或者当Actor完成其任务时Akka Actor的API暴露了许多生命周期的钩子,你可以在 Actor 的实现中覆盖这些钩子。最常用的是 preStart() 和 postStop() 方法preStart() 在 Actor 启动之后但在处理其第一条消息之前调用postStop() 在 Actor 停止之前调用,在此时之后将不再处理任何消息示例:package com.journey.flink.akka.task3; import akka.actor.AbstractActor; import akka.actor.Props; public class Actor1 extends AbstractActor { static Props props() { return Props.create(Actor1.class, Actor1:: new); } @Override public void preStart() throws Exception { System.out.println("first actor started"); getContext().actorOf(Actor2.props(), "second"); } @Override public void postStop() throws Exception { System.out.println("first actor stopped"); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals("stop", s -> { getContext().stop(getSelf()); }).build(); } } package com.journey.flink.akka.task3; import akka.actor.AbstractActor; import akka.actor.Props; public class Actor2 extends AbstractActor { static Props props() { return Props.create(Actor2.class, Actor2::new); } @Override public void preStart() throws Exception { System.out.println("second actor started"); } @Override public void postStop() throws Exception { System.out.println("second actor stopped"); } @Override public Receive createReceive() { return receiveBuilder().build(); } } package com.journey.flink.akka.task3; import akka.actor.ActorRef; import akka.actor.ActorSystem; public class LifeCycleMain { public static void main(String[] args) { ActorSystem system = ActorSystem.create("testSystem"); ActorRef first = system.actorOf(Actor1.props(), "first"); first.tell("stop", ActorRef.noSender()); } }打印输出 :first actor started second actor started second actor stopped first actor stopped当我们停止 first Actor 时,它会在停止自身之前,先停止了它的子 Actor second 。这个顺序是严格的,在调用父Actor的 postStop() 钩子之前,会先调用所有子Actor的 postStop() 钩子失败处理父 Actor 和子 Actor 在他们的生命周期中是相互联系的。当一个 Actor 失败(抛出一个异常或从接收中冒出一个未处理的异常)时,它将暂时挂起。如前所述,失败信息被传播到父 Actor,然后父 Actor 决定如何处理由子 Actor 引起的异常。这样,父 Actor 就可以作为子 Actor 的监督者( supervisors )。默认的监督策略是停止并重新启动子 Actor。如果不更改默认策略,所有失败都会导致重新启动示例:package com.journey.flink.akka.task4; import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; public class SupervisingActor extends AbstractActor { public static void main(String[] args) throws Exception { ActorSystem system = ActorSystem.create("testSystem"); ActorRef supervisingActor = system.actorOf(SupervisingActor.props(), "supervising-actor"); supervisingActor.tell("failChild", ActorRef.noSender()); } static Props props() { return Props.create(SupervisingActor.class, SupervisingActor :: new); } ActorRef child = getContext().actorOf(SupervisedActor.props(), "supervised-actor"); @Override public Receive createReceive() { return receiveBuilder() .matchEquals("failChild", f -> { child.tell("fail", getSelf()); }).build(); } } class SupervisedActor extends AbstractActor { static Props props() { return Props.create(SupervisedActor.class, SupervisedActor::new); } @Override public void preStart() throws Exception { System.out.println("supervised actor started"); } @Override public void postStop() throws Exception { System.out.println("supervised actor stopped"); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals("fail", f -> { System.out.println("supervised actor fails now"); throw new Exception("I failed"); }).build(); } }打印如下 :supervised actor started supervised actor fails now supervised actor stopped supervised actor started [ERROR] [04/04/2023 10:37:51.084] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/supervising-actor/supervised-actor] I failed java.lang.Exception: I failed我们看到失败后,被监督的 Actor 停止并立即重新启动,我们使用了preStart()和postStop()钩子,这是重启后和重启前 默认调用的钩子,因此我们无法区分 Actor 内部是第一次启动还是重启。实际上,在重新启动时,调用 的是preRestart()和postRestart()方法,但如果不重写这两个方法,则默认委托给preStart()Actor剖析由于 Akka 实施父级监督,每个 Actor 都受到其父级的监督并且监督其子级定义 Actor 类Actor 类是通过继承 AbstractActor 类并在 createReceive 方法中设置“初始行为”来实现 的createReceive方法没有参数,并返回AbstractActor.Receive。它定义了 Actor 可以处理哪些消 息,以及如何处理消息的实现。可以使用名为ReceiveBuilder的生成器来构建此类行为。在 AbstractActor中,有一个名为receiveBuilder的方便的工厂方法 package com.journey.flink.akka.task5; import akka.actor.AbstractActor; public class MyActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder() .match( String.class, info -> { System.out.println("Received String message " + info); } ) .matchAny( e -> { System.out.println("received unknown message"); } ) .build(); } }PropsProps 是一个配置类,用于指定创建 Actor 的选项,将其视为不可变的,因此可以自由共享,用于创建 Actor 的方法,包括关联的部署信息Props.create(Actor1.class, Actor1::new)Actor APIAbstractActor类定义了一个名为createReceive的方法,该方法用于设置 Actor 的“初始行为”此外,它还提供 :getSelf(),对 Actor 的ActorRef的引用getSender(),前一次接收到的消息的发送方 Actor 的引用supervisorStrategy(),用户可重写定义用于监视子 Actor 的策略getContext() ,公开 Actor 和当前消息的上下文信息,例如: 创建子 Actor 的工厂方法(actorOf)Actor核心概念发送消息tell 的意思是“发送并忘记( fire-and-forget )”,例如异步发送消息并立即返回,target.tell(message, getSelf())ask 异步发送消息,并返回一个表示可能的答复,ask模式涉及 Actor 和Future回复消息如果你想有一个回复消息的句柄,可以使用 getSender() ,它会给你一个 ActorRef 。你可 以通过使用 getSender().tell(replyMsg, getSelf()) 发送 ActorRef 来进行回复。你还 可以存储 ActorRef 以供稍后回复或传递给其他 Actorpublic class ActionCommunication { static class Actor1 extends AbstractActor { private ActorRef actor2 = getContext().actorOf(Actor2.props(), "actor2"); static Props props() { return Props.create(Actor1.class, Actor1::new); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals("hello", f -> { actor2.tell("hello", getSelf()); }) .matchEquals("ack", f -> { System.out.println("nice to meet you too."); }) .build(); } } static class Actor2 extends AbstractActor { static Props props() { return Props.create(Actor2.class, Actor2::new); } @Override public Receive createReceive() { return receiveBuilder().matchEquals("hello", f -> { System.out.println("nice to meet you."); getSender().tell("ack", getSelf()); }).build(); } } public static void main(String[] args) throws Exception { ActorSystem system = ActorSystem.create("testSystem"); ActorRef firstActor = system.actorOf(Actor1.props(), "first"); firstActor.tell("hello", ActorRef.noSender()); } }接受超时package com.journey.flink.akka.task5; import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.ReceiveTimeout; import java.time.Duration; public class TimeoutActor extends AbstractActor { public TimeoutActor() { // 设置初始化延迟 getContext().setReceiveTimeout(Duration.ofMillis(100)); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals("hello", s -> { // 其实就是如果超过100ms没有接收消息,就会调用 ReceiveTimeout getContext().setReceiveTimeout(Duration.ofMillis(100)); }) .match(ReceiveTimeout.class, r -> { System.out.println("超时"); getContext().cancelReceiveTimeout(); getContext().stop(getSelf()); }) .build(); } public static void main(String[] args) throws Exception { ActorSystem system = ActorSystem.create("testSystem"); ActorRef timeoutActor2 = system.actorOf(Props.create(TimeoutActor.class, TimeoutActor::new)); int num = 0; while (num <= 10) { System.out.println("send...." + num); timeoutActor2.tell("hello", ActorRef.noSender()); Thread.sleep(10); num++; } } }Actor的调度器调度器(Dispatchers)是 Akka 核心的一部分,这意味着它们也是 akka-actor 依赖的一部分默认调度器每个 ActorSystem 都将有一个默认的调度器,在没有为 Actor 配置其他内容的情况下使用该调度器。如果ActorSystem是通过ExecutionContext传入来创建的,则此ExecutionContext将用作此ActorSystem中所有程序的执行位置。如果没有给出ExecutionContext,它将回退到akka.actor.default-dispatcher.default-executor.fallback作为执行上下文。默认情况下,这是一个“fork-join-executor”,它在大多数情况下都有出色的性能为 Actor 设置调度器# 配置一个调度器 my-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # 使用哪种ExecutionService executor = "fork-join-executor" # 配置fork join池 fork-join-executor { # 最小线程数 parallelism-min = 2 # Parallelism (threads) ... ceil(available processors * factor) parallelism-factor = 2.0 # Max number of threads to cap factor-based parallelism number to parallelism-max = 10 } # 吞吐量定义了最大的消息数量 # 一个actor最多只能占用某线程100个消息的时间 # 设置为1可以尽可能公平地 throughput = 100 }代码中定义 :ActorRef myActor = system.actorOf(Props.create(MyActor.class).withDispatcher("my-dispatcher"), "myactor3");调度器类型有三种不同类型的消息调度器Dispatcher : 这是一个基于事件的调度程序,它将一组 Actor 绑定到线程池。如果未指定调度器,则使用默认调度器PinnedDispatcher : 这个调度器为每个使用它的演员分配一个独特的线程;即每个Actor将拥有自己的线程池,池中只有一个线程CallingThreadDispatcher : 该调度程序仅在当前线程上运行调用。这个调度程序不会创建任何新的线程,但可以同时为不同的线程使用同一个actor,主要用于测试3、Flink中基于Akka的RPCFLink RPC核心组件Flink 中的 RPC 实现主要在 flink-runtime 模块下的 org.apache.flink.runtime.rpc 包中,涉及到的最重要的 API 主要是以下这四个RpcGateway网关,都是 RpcGateWay 的子类接口是用于远程调用的代理接口。 RpcGateway 提供了获取其所代理的 RpcEndpoint 的地址的方法在实现一个提供 RPC 调用的组件时,通常需要先定一个接口,该接口继承 RpcGateway 并约定好提供的远程调用的方法,getAddress() & getHostname()RpcServiceRPC服务的接口,用于连接到一个远程的RPC server,或者启动一个rpc server来转发远程调用到rpcEndpoint是 RpcEndpoint 的运行时环境, RpcService 提供了启动 RpcEndpoint , 连接到远端 RpcEndpoint 并返回远端 RpcEndpoint 的代理对象等方法。此外, RpcService 还提供了某些异步任务或者周期性调度任务的方法RpcServer相当于 RpcEndpoint 自身的的代理对象(self gateway)。 RpcServer 是 RpcService 在启动了 RpcEndpoint 之后返回的对象,每一个 RpcEndpoint 对象内部都有一个 RpcServer 的成员变量,通过 getSelfGateway 方法就可以获得自身的代理,然后调用该Endpoint 提供的服务RpcEndpoint是对 RPC 框架中提供具体服务的实体的抽象,所有提供远程调用方法的组件都需要继承 该抽象类类似于Acotr,封装了传输的业务逻辑源码分析RpcGateway :/** * TODO 定义通信行为;用于远程调用RpcEndpoint的某些方法,可以理解为对方的客户端代理 */ public interface RpcGateway { String getAddress(); String getHostname(); }RpcEndpoint : 启动rpcServerpublic abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { protected final Logger log = LoggerFactory.getLogger(getClass()); /** * 1、根据提供的RpcEndpoint来启动和停止RpcServer(Actor) * 2、根据提供的地址连接到(对方)RpcServer,并返回一个RpcGateway * 3、延迟、立刻调度Runnable、Callable */ private final RpcService rpcService; /** / protected final RpcServer rpcServer; ..... protected RpcEndpoint(final RpcService rpcService, final String endpointId) { // 保存rpcService和endpointId this.rpcService = checkNotNull(rpcService, "rpcService"); this.endpointId = checkNotNull(endpointId, "endpointId"); // 通过 RpcService 启动RpcServer /** * 构造的时候调用 rpcService.startServer()启动RpcServer,进入可以接受处理请求的状态,最后将RpcServer绑定到主线程上 * 真正执行起来 * 在RpcEndpoint中还定义了一些放入如 runAsync(Runnable)、callAsync(Callable,Time)方法来执行Rpc调用,值得注意的是在Flink * 的设计中,对于同一个Endpoint,所有的调用都运行在主线程,因此不会有并发问题,当启动RpcEndpoint进行Rpc调用时,其会委托RpcServer进行处理 */ this.rpcServer = rpcService.startServer(this); // 主线程执行器,所有调用在主线程中串行执行 this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread); } .... @Override public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) { checkNotNull(rpcEndpoint, "rpc endpoint"); /** * 根据RpcEndpoint的类型来创建对应的Actor,目前支持两种Actor的创建 * 1、AkkaRpcActor * 2、FencedAkkaRpcActor,对AkkaRpcActor进行扩展,能够过滤到与指定token无关的消息 */ final SupervisorActor.ActorRegistration actorRegistration = registerAkkaRpcActor(rpcEndpoint); // 这里拿到的是AkkaRpcActor的引用 final ActorRef actorRef = actorRegistration.getActorRef(); final CompletableFuture<Void> actorTerminationFuture = actorRegistration.getTerminationFuture(); LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path()); final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef); final String hostname; Option<String> host = actorRef.path().address().host(); if (host.isEmpty()) { hostname = "localhost"; } else { hostname = host.get(); } // 提取集成RpcEndpoint的所有子类 Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass())); implementedRpcGateways.add(RpcServer.class); implementedRpcGateways.add(AkkaBasedEndpoint.class); // 对上述指定的类集合进行代理 final InvocationHandler akkaInvocationHandler; if (rpcEndpoint instanceof FencedRpcEndpoint) { // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler akkaInvocationHandler = new FencedAkkaInvocationHandler<>( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), actorTerminationFuture, ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken, captureAskCallstacks); implementedRpcGateways.add(FencedMainThreadExecutable.class); } else { akkaInvocationHandler = new AkkaInvocationHandler( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), actorTerminationFuture, captureAskCallstacks); } // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader = getClass().getClassLoader(); // 针对RpcServer生成一个动态代理 @SuppressWarnings("unchecked") RpcServer server = (RpcServer) Proxy.newProxyInstance( classLoader, implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]), akkaInvocationHandler); return server; }AkkaRpcActor :@Override public Receive createReceive() { return ReceiveBuilder.create() // TODO 重点 // TODO 如果是 RemoteHandshakeMessage 信息,执行 handleHandshakeMessage,处理握手消息 .match(RemoteHandshakeMessage.class, this::handleHandshakeMessage) // TODO 如果是 ControlMessages,执行 handleControlMessage,如启动、停止、中断 .match(ControlMessages.class, this::handleControlMessage) // TODO 其他情况,handleMessage .matchAny(this::handleMessage) .build(); } private void handleMessage(final Object message) { if (state.isRunning()) { mainThreadValidator.enterMainThread(); try { // 处理消息 handleRpcMessage(message); } finally { mainThreadValidator.exitMainThread(); } } else { log.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.", rpcEndpoint.getClass().getName(), message.getClass().getName()); sendErrorIfSender(new AkkaRpcException( String.format("Discard message, because the rpc endpoint %s has not been started yet.", rpcEndpoint.getAddress()))); } }connect :@Override public <C extends RpcGateway> CompletableFuture<C> connect( final String address, final Class<C> clazz) { // 连接远程Rpc Server,返回的是代理对象 return connectInternal( address, clazz, (ActorRef actorRef) -> { Tuple2<String, String> addressHostname = extractAddressHostname(actorRef); return new AkkaInvocationHandler( addressHostname.f0, addressHostname.f1, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), null, captureAskCallstacks); }); } private <C extends RpcGateway> CompletableFuture<C> connectInternal( final String address, final Class<C> clazz, Function<ActorRef, InvocationHandler> invocationHandlerFactory) { checkState(!stopped, "RpcService is stopped"); LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.", address, clazz.getName()); // 根据Akka Actor地址获取ActorRef final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address); // 发送一个握手成功的消息给远程Actor final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose( (ActorRef actorRef) -> FutureUtils.toJava( Patterns .ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), configuration.getTimeout().toMilliseconds()) .<HandshakeSuccessMessage>mapTo(ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class)))); // 创建动态代理,并返回 return actorRefFuture.thenCombineAsync( handshakeFuture, (ActorRef actorRef, HandshakeSuccessMessage ignored) -> { // AkkaInvocationHandler,针对客户端会调用 invokeRpc InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef); // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader = getClass().getClassLoader(); // 创建动态代理 @SuppressWarnings("unchecked") C proxy = (C) Proxy.newProxyInstance( classLoader, new Class<?>[]{clazz}, invocationHandler); return proxy; }, actorSystem.dispatcher()); }AkkaInvocationHandler : 同时服务端和客户端同时使用@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class<?> declaringClass = method.getDeclaringClass(); Object result; // 判断方法的类是否为指定的类,符合如下的类,执行本地调用,否则实行远程调用 if (declaringClass.equals(AkkaBasedEndpoint.class) || declaringClass.equals(Object.class) || declaringClass.equals(RpcGateway.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(RpcServer.class)) { result = method.invoke(this, args); } else if (declaringClass.equals(FencedRpcGateway.class)) { throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" + method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " + "fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " + "retrieve a properly FencedRpcGateway."); } else { // TODO 客户端会进行这里,进行远程的调用 result = invokeRpc(method, args); } return result; }4、Flink RPC实例<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor-typed_2.13</artifactId> <version>2.6.9</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-remote_2.13</artifactId> <version>2.6.9</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-rpc-akka</artifactId> <version>1.17.0</version> </dependency>定义TaskGateway接口,继承RpcGateway接口 :package org.apache.flink.runtime.rpc.akka; import org.apache.flink.runtime.rpc.RpcGateway; public interface TaskGateway extends RpcGateway { String sayHello(String name); }创建一个TaskEndpoint,继承RpcEndpoint同时实现TaskGateway接口package org.apache.flink.runtime.rpc.akka; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; public class TaskEndpoint extends RpcEndpoint implements TaskGateway { public TaskEndpoint(RpcService rpcService) { super(rpcService); } @Override public String sayHello(String name) { return "hello " + name; } }Server端 :package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorSystem; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration; public class FlinkRpcServer { public static void main(String[] args) throws Exception { // 1. 创建RPC服务 ActorSystem defaultActorSystem = AkkaUtils.createDefaultActorSystem(); AkkaRpcService akkaRpcService = new AkkaRpcService(defaultActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration()); // 2. 创建TaskEndpoint实例 TaskEndpoint endpoint = new TaskEndpoint(akkaRpcService); System.out.println("address : "+endpoint.getAddress()); // 3. 启动Endpoint endpoint.start(); } }client端 :package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorSystem; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration; import java.util.concurrent.CompletableFuture; public class FlinkRpcClient { public static void main(String[] args) throws Exception { // 1. 创建RPC服务 ActorSystem defaultActorSystem = AkkaUtils.createDefaultActorSystem(); AkkaRpcService akkaRpcService = new AkkaRpcService(defaultActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration()); // 2. 连接远程RPC服务,注意:连接地址是服务端程序打印的地址 CompletableFuture<TaskGateway> gatewayFuture = akkaRpcService .connect("akka.tcp://flink@192.168.31.162:53465/user/rpc/c38b60e7-c87c-4b7f-b948-3099dfb12999", TaskGateway.class); // 3. 远程调用 TaskGateway gateway = gatewayFuture.get(); System.out.println(gateway.sayHello("flink-akka")); } }注意:必须要在 org.apache.flink.runtime.rpc.akka 这个包下,因为AkkaUtils是这个包的可见如感兴趣,点赞加关注,谢谢
2024年09月23日
18 阅读
0 评论
0 点赞
2024-01-17
LSM索引原理
LSM作为一种设计思想,它把数据拆分为两个部分,一部分放在内存,一部分放在磁盘。内存中的数据检索可以使用红黑树,调表等时间复杂度比较低的结构进行检索。当数据到达一定阈值的时候则会将数据写到磁盘文件中,此时的写入的方式是顺序写,所以LSM写入性能很高。并发读写问题内存在写入磁盘过程中,如果有新的数据插入,则会带来并发读写的问题,所以就需要对这部分内存区域进行加锁。加锁的话又会导致写入过程阻塞,所以业界一般是当内存到达某个阈值之后,将这片区域标记为可读,然后新的数据将插入到新的内存区域,而旧的内存区域是只读的,所以可以不加锁的进行同步到磁盘的过程。小文件问题众所周知由于内存的容量有限,并且进行了分区,导致每次生成文件必然不会很大,这样就会造成检索效率很慢的问题。LSM是这样解决问题的:查找数据时候从多个磁盘文件中读取数据,然后进行合并,取最新的数据(Merge On Read)。由于写入的数据在内存是有序的,所以磁盘的小文件也是有序的(sstable)。这样可以保证单个文件中的检索是非常快的,但是存在一个问题:如果查找一个值的时候,在多个文件的索引有重叠的话就需要在多个sstable中查找数据(最坏的可能需要检索所有的文件),所以需要将小文件进行合并,让索引不再有重叠,就可以解决很好的剪枝文件。这也是Hudi点查性能不好的原因,没有保证索引不重叠。文件合并虽然文件合并带来的好处很多,但是合并的时机非常重要,如果新增一个就去进行合并全部文件,就会造成磁盘IO一直处于一个很高的水平,这样性能反而不好。所以LSM采用的是多层合并的方法,每一层的容量是上一层的10倍。level0层是内存直接写入的文件,当写满这一层的个数上限之后,再进行合并然后存入下一层,然后当下一层写满之后再继续合并到下一层,直到合并到最大层数则不再合并。这样就只存在level0层的索引是有重合的,其他的层的索引数据都是不重合的,可以很好的进行File Skiping,并且由于这种设计,将文件合并的时机分摊到了多次,缓解了写放大的问题。总结LSM在数据写入方面,使用了内存分区标记解决了读写并发问题,并且使用多层合并的机制解决了写放大的问题,提供了非常好的写入性能和小文件合并的机制。在读方面,可以先从内存读取,找不到再从level0一直往高层找,并且由于level0后的数据都是有序且不重合的,通过二分查找,能够很好的进行File Skiping,再配合布隆过滤器来快速判断元素是否存在与文件中。在最差的情况下,可能要遍历所有的文件,所以LSM适合写多读少的场景。
2024年01月17日
46 阅读
0 评论
0 点赞