首页
关于
Search
1
SteamOS安装paru和yay
172 阅读
2
欢迎使用 Typecho
127 阅读
3
徕卡X | 冬日漫步
127 阅读
4
https自动跳转问题终极解决方案
116 阅读
5
Spark3中的Catalog组件设计
104 阅读
默认
摄影
后端开发
大数据
Spark
Flink
登录
Search
标签搜索
转载
ArlenDu
累计撰写
18
篇文章
累计收到
10
条评论
首页
栏目
默认
摄影
后端开发
大数据
Spark
Flink
页面
关于
搜索到
11
篇与
的结果
2025-05-10
【转载】用certbot申请Let's Encrypt泛域名证书
转载来源:https://www.cnblogs.com/oboth-zl/p/14330854.html正文什么是Let's Encrypt?目前世界上就只有为数不多的几家域名证书签发机构得到浏览器的认可,而Let‘s Encrypt就是其中一家,并且你可以申请到免费的证书,当然你如果想要付费也行,很多机构证书动辄几千几万一年。如果我们只想搭建个测试环境有需要https,我们肯定不会去花这个冤枉钱,当然免费的午餐并没有那么好吃,Let's Encrypt申请的证书只有90天有效期,所以到期你得进行续期操作。并且还有各种各样得条件限制,比如一周你只能申请多少次,同一个ip一天只能操作多少次之类,详见https://letsencrypt.org/docs/rate-limits/假如你使用jdk生成的自签证书能不能用呢?当然是可以用的,只不过浏览器会告诉你这个证书我不承认。也就是左上角会给你挂个不安全的警告。什么是泛域名证书?例如:*.xxx.cn 也就是这个证书可以给某个域名的所有二级域名使用,就叫做泛域名证书(也称作通配符证书)。Let's Encrypt 官方推荐我们使用certbot 脚本申请证书(当然也可以使用acme.sh等方式),以下是申请步骤基于Debian10 python3.7.3如果你在操作过程中遇到什么报错,请多考虑python工具包的版本问题之类的。Let's Encrypt自2018年开始支持申请泛域名证书,相比于单域名证书,泛域名证书更利于日常的维护。准备工作下载 certbot,这个很多发行版的源中都已经自带了。比如Debian的:sudo apt install certbot需要有域名的管理权限,因为申请泛解析证书需要使用 DNS 验证,这就需要你能够根据要求操作 DNS 解析记录,以此证明你对域名的权限。开始申请泛域名证书certbot certonly --preferred-challenges dns --manual -d *.xx.cn --server https://acme-v02.api.letsencrypt.org/directorySaving debug log to /var/log/letsencrypt/letsencrypt.logPlugins selected: Authenticator manual, Installer NoneObtaining a new certificatePerforming the following challenges:dns-01 challenge for xx.cn- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -NOTE: The IP of this machine will be publicly logged as having requested thiscertificate. If you're running certbot in manual mode on a machine that is notyour server, please ensure you're okay with that.Are you OK with your IP being logged?- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -(Y)es/(N)o: y- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -Please deploy a DNS TXT record under the name_acme-challenge.xx.cn with the following value:nI0DhzH-vn0W7STVuLi2O-oIKuFNlqQx5EnjB-zewvsBefore continuing, verify the record is deployed.- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -Press Enter to Continue #让你的二级域名_acme-challenge.xx.cn用TXT方式解析到nI0DhzH-vn0W7STVuLi2O-oIKuFNlqQx5EnjB-zewvs#可用dig -t txt _acme-challenge.xx.cn验证解析是否生效,然后按下Enter通过验证。 debian可用apt install dnsutils来安装dig命令。 #也可以用nslookup命令来验证。Waiting for verification...Cleaning up challengesIMPORTANT NOTES:- Congratulations! Your certificate and chain have been saved at:/etc/letsencrypt/live/xx.cn/fullchain.pemYour key file has been saved at:/etc/letsencrypt/live/xx.cn/privkey.pemYour cert will expire on 2021-04-26. To obtain a new or tweakedversion of this certificate in the future, simply run certbotagain. To non-interactively renew all of your certificates, run"certbot renew"- If you like Certbot, please consider supporting our work by:Donating to ISRG / Let's Encrypt: https://letsencrypt.org/donateDonating to EFF: https://eff.org/donate-le\#至此证书申请成功参数说明:certonly 表示只申请证书。--no-bootstrap 需要用户同意的系统级操作直接选N。--manual 表示交互式申请。-d 为那些主机申请证书如 *.xxx.cn(此处为泛域名)--preferred-challenges dns,使用 DNS 方式校验域名所有权,可以配置多个--server Let's Encrypt ACME v2 版本使用的服务器不同于 v1 版本(V2版本才支持泛域名解析),需要显示指定。证书签发成功后去Nginx或Apache配置新生成的证书文件即可。撤销证书倘若有不需要的证书了,可撤销删除。sudo certbot revoke --cert-path /etc/letsencrypt/archive/xx.cn/cert1.pemSaving debug log to /var/log/letsencrypt/letsencrypt.log- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -Would you like to delete the cert(s) you just revoked, along with all earlierand later versions of the cert?- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -(Y)es (recommended)/(N)o: y #是否删除证书自动验证的方式申请证书上面的方式需要手动去到DNS解析服务商更改解析通过验证,其实通过Certbot 官方的插件可以实现自动更新DNS通过验证。https://certbot.eff.org/docs/using.html#manual支持上面这些服务商。准备安装对应的DNS插件sudo apt install python3-certbot-dns-cloudflare #sudo apt install python2-certbot-dns-cloudflare#我用到的服务商是Cloudflare*配置 DNS 插件*在cloudflare获取KEY/TOKEN****两种方式均可,只是token的方式需要2.3.1版本以上的python*-certbot-dns-cloudflare插件。出于安全因素,推荐前者的方式。# Cloudflare API credentials used by Certbot dns_cloudflare_email = cloudflare@example.com dns_cloudflare_api_key = 0123456789abcdef0123456789abcdef01234567将上面的配置信息写入~/cloudflare.ini。申请sudo certbot certonly --dns-cloudflare --dns-cloudflare-credentials ~/cloudflare.ini --dns-cloudflare-propagation-seconds 60 --preferred-challenges dns -d *.example.win --server https://acme-v02.api.letsencrypt.org/directory #--dns-cloudflare-propagation-seconds 60 #等待60秒, 等DNS解析生效由于这里的泛域名证书之前已申请过,且存在于服务器,Certbot自动进行了续期操作。至此,泛域名证书申请完成。续期若是通过手动更改DNS解析的方式申请的域名,我们执行certbot renew时会报错。此时只有重新按照申请证书的步骤再来一遍。certbot certonly --preferred-challenges dns --manual -d *.xx.cn --server https://acme-v02.api.letsencrypt.org/directory若是通过API自动验证的方式申请的,则可以直接用cerbot renew的方式更新证书!
2025年05月10日
12 阅读
0 评论
0 点赞
2025-01-06
SQL优化技巧
存储优化1. 行/列存储顾名思义,行存储就是数据的存储按照标准的一行一行的对数据进行存储,而列存储则是一列的数据是连续的存储在一起,这一列存完之后再进行下一列的存储,这样有个好处是当仅需查询某几列的时候能快速读取这几列的数据,减少数据的检索和读取。行存储:由于每行数据都存储在一起,追加数据比较简单,相对来说效率较好;另外如果使用select *的场景比较多,列存储没有什么优势,可以使用行存储。列存储:具有良好的读取性能,能够快速读取需要的列,更适合大数据集的数据查询,比如数据仓库的表一般来说不需要查询全部字段,故绝大部分表都是使用的列存储。另外列存储支持更好的压缩比,能够更好的节省空间。2. 表分区表分区在大数据中是一个非常重要的优化手段,以Hive表为例,每一个分区在存储上都是一个独立的目录,当SQL中,如果使用分区限制条件能快速的定位到这个数据目录,然后进行数据的读取,可以极大的提升查询效率。另外Hive表也需要使用分区目录进行整个目录的覆盖,所以如果表设计成需要能重刷的能力,最好建成分区表以便支持重刷。SQL案例:1,尽早的过滤数据说明:尽早的过滤数据可以减少网络IO,也可以减少不必要的计算,比如尽早加上分区条件,where dt='日期'×:select * from test.test_city_base_info where create_time >='2024-06-11 12:00:00' and create_time<='2024-06-11 13:00:00'√:select * from test.test_city_base_info where dt='2024-06-11' and create_time>='2024-06-11 12:00:00' and create_time<='2024-06-11 13:00:00'2,只查询需要的字段,避免使用Select * 写法说明:数仓表大多数时候都使用列式存储, 查询字段能很快的查到对应的列,并且也可以减少数据量,减少网络IO。×:select * from test.test_city_base_info√:select city_id,province_id from test.test_city_base_info
2025年01月06日
26 阅读
0 评论
0 点赞
2024-12-27
SteamOS安装paru和yay
1.修改配置文件sudo vim /etc/pacman.conf#国内镜像源 [archlinuxcn] Server = https://mirrors.ustc.edu.cn/archlinuxcn/$arch2.生成签名keysudo pacman-key --lsign-key "ci-package-builder-1@steamos.cloud" sudo pacman -Syy && sudo pacman -S archlinuxcn-keyring3.安装paru/yay(二选一即可)安装fakeroot注意:最新版yay需要依赖pacman 6.1.0, steamdeck已经无法安装,建议使用parusudo pacman -S paru sudo pacman -S yay sudo pacman -S fakeroot sudo pacman -S --needed base-devel4.附上安装clash-verge命令paru -S clash-verge-rev-binPS:如果安装报错签名问题,修改配置文件禁用签名sudo vim /etc/pacman.conf SigLevel = Never
2024年12月27日
172 阅读
0 评论
0 点赞
2024-11-09
Flink内存模型
Flink的内存分为堆外和堆内内存。假设一个任务的TaskManager的任务配置了4G,内存模型如下图所示。其中托管内存(1.34G)和网络内存(343M)基本没用,那就可以把托管内存和网络内存设置为最小值64M,那就可以省出1340-64+343-64=1555M, 这部分内存就会加到堆内存上面去,此时堆内存的容量就变成了3G,而仅仅使用了1G,完全可以省出1-1.5G出来。堆内内存:FreamworkHeap:框架使用的堆内存(默认固定128M)TaskHeap:任务使用的堆内存堆外内存:ManagedMemory:托管内存,默认情况分配40%,但是不一定使用,是内存治理的重灾区。托管内存的用处有:批处理算法,比如排序,hashjoin等。RocksDB StateBackend,Flink 只会预留一部分空间并扣除预算,但是不介入实际内存分配PyFlink, 与 JNI 类似,在与 Python 进程交互的过程中,也会用到一部分托管内存。Freamwork Off-Heap:框架使用的堆外内存,(默认固定128M)DirectMemory:框架自身(taskmanager.memory.framework.off-heap.size 参数,默认 128M,例如 Sort-Merge Shuffle 算法所需的内存)用户任务(taskmanager.memory.task.off-heap.size 参数,默认设为 0)Netty 对 Network Buffer 的网络传输(taskmanager.memory.network.fraction 等参数,默认 0.1 即 10% 的 Flink 总内存)。JVM Metaspace:JVM Metaspace 主要保存了加载的类和方法的元数据,Flink 配置的参数是 taskmanager.memory.jvm-metaspace.size,默认大小为 256M,JVM 参数是 -XX:MaxMetaspaceSize.JVM OverHead:JVM运行时开销,除了上述描述的内存区域外,JVM 自己还有一小块 “自留地”,用来存放线程栈、编译的代码缓存、JNI 调用的库所分配的内存等等,Flink 配置参数是 taskmanager.memory.jvm-overhead.fraction,默认是 JVM 总内存的 10%。
2024年11月09日
33 阅读
0 评论
0 点赞
2024-10-22
<转载>网易Spark Kyuubi核心架构设计与源码实现剖析
版权声明:本文为 xpleaf(香飘叶子)博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。本文较为系统、全面并且由浅入深地介绍了网易Spark Kyuubi出现的背景、核心架构设计与关键源码实现,是学习、应用和对Kyuubi进行二次开发不可多得的技术干货,但由于作者认知水平有限,文中难免会出现描述不准确的措辞,还请多多包容和指出。1 概述 Kyuubi是网易数帆旗下易数大数据团队开源的一个高性能的通用JDBC和SQL执行引擎,建立在Apache Spark之上,Kyuubi的出现,较好的弥补了Spark ThriftServer在多租户、资源隔离和高可用等方面的不足,是一个真正可以满足大多数生产环境场景的开源项目。通过分析Spark ThriftServer的设计与不足,本文会逐渐带你深入理解Kyuubi的核心设计与实现,同时会选取多个关键场景来剖析其源码,通过本文的阅读,希望能让读者对网易Kyuubi的整体架构设计有一个较为清晰的理解,并能够用在自己的生产环境中解决更多实际应用问题。本文主要主要选取 Kyuubi 1.1.0版本来对其设计与实现进行分析,后续的版本迭代社区加入了数据湖等概念和实现,本文不会对这方面的内容进行探讨。2 Spark ThriftServer的设计、实现与不足2.1 产生背景在最初使用Spark时,只有理解了Spark RDD模型和其提供的各种算子时,才能比较好地使用Spark进行数据处理和分析,显然由于向上层暴露了过多底层实现细节,Spark有一定的高使用门槛,在易用性上对许多初入门用户来说并不太友好。SparkSQL的出现则较好地解决了这一问题,通过使用SparkSQL提供的简易API,用户只需要有基本的编程基础并且会使用SQL,就可以借助Spark强大的快速分布式计算能力来处理和分析他们的大规模数据集。而Spark ThriftServer的出现使Spark的易用性又向前迈进了一步,通过提供标准的JDBC接口和命令行终端的方式,平台开发者可以基于其提供的服务来快速构建它们的数据分析应用,普通用户甚至不需要有编程基础即可借助其强大的能力来进行交互式数据分析。2.2 核心设计顾名思义,本质上,Spark ThriftServer是一个基于Apache Thrift框架构建并且封装了SparkContext的RPC服务端,或者从Spark的层面来讲,我们也可以说,Spark ThriftServer是一个提供了各种RPC服务的Spark Driver。但不管从哪个角度去看Spark ThriftServer,有一点可以肯定的是,它是一个Server,是需要对外提供服务的,因此其是常驻的进程,并不会像一般我们构建的Spark Application在完成数据处理的工作逻辑后就退出。其整体架构图如下所示: Apache Thrift是业界流行的RPC框架,通过其提供的接口描述语言(IDL),可以快速构建用于数据通信的并且语言无关的RPC客户端和服务端,在带来高性能的同时,大大降低了开发人员构建RPC服务的成本,因此在大数据生态其有较多的应用场景,比如我们熟知的hiveserver2即是基于Apache Thrift来构建其RPC服务。当用户通过JDBC或beeline方式执行一条SQL语句时,TThreadPoolServer会接收到该SQL,通过一系列的Session和Operation的管理,最终会使用在启动Spark ThriftServer时已经构建好的SparkContext来执行该SQL,并获取最后的结果集。从上面的基本分析中我们可以看到,在不考虑Spark ThrfitServer的底层RPC通信框架和业务细节时,其整体实现思路是比较清晰和简单的。当然实际上要构建一个对外提供SQL能力的RPC服务时,是有许多细节需要考虑的,并且工作量也会非常巨大,Spark ThriftServer在实现时实际上也没有自己重复造轮子,它复用了hiveserver2的许多组件和逻辑,并根据自身的业务需求来对其进行特定改造;同样的,后面当我们去看Kyuubi时,也会发现它复用了hiveserver2和Spark ThriftServer的一些组件和逻辑,并在此基础上创新性地设计自己的一套架构。2.3 基本实现这里列举的代码是基于Spark 2.1的源码,新版本在结构上可能有所有区别,但不影响我们对其本质实现原理的理解。前面提到的TThreadPoolServer是Apache Thrift提供的用于构建RPC Server的一个工作线程池类,在Spark ThriftServer的Service体系结构中,ThriftBinaryService正是使用TThreadPoolServer来构建RPC服务端并对外提供一系列RPC服务接口:Spark ThriftServer Service体系ThriftBinaryService基于TThreadPoolServer构建RPC服务端ThriftBinaryService提供的RPC服务接口可以看到,其提供的相当一部分接口都是提供SQL服务时所必要的能力。当然,不管是使用标准的JDBC接口还是通过beeline的方式来访问Spark ThriftServer,必然都是通过Spark基于Apache Thrift构建的RPC客户端来访问这些RPC服务接口的,因此我们去看Spark ThriftServer提供的RPC客户端,其提供的方法接口与RPC服务端提供的是对应的,可以参考 org.apache.hive.service.cli.thrift.TCLIService.Client。如果比较难以理解,建议可以先研究一下RPC框架的本质,然后再简单使用一下Apache Thrift来构建RPC服务端和客户端,这样就会有一个比较清晰的理解,这里不对其底层框架和原理做更多深入的分析。个人觉得,要理解Spark ThriftServer,或是后面要介绍的Kyubbi,本质上是理解其通信框架,也就是其是怎么使用Apache Thrift来进行通信的,因为其它的细节都是业务实现。2.4 主要不足Spark ThriftServer在带来各种便利性的同时,其不足也是显而易见的。首先,Spark ThriftServer难以满足生产环境下多租户与资源隔离的场景需求。由于一个Spark ThriftServer全局只有一个SparkContext,也即只有一个Spark Application,其在启动时就确定了全局唯一的用户名,因此在Spark ThriftServer的维护人员看来,所有通过Spark ThriftServer下发的SQL都是来自同一用户(也就是启动时确定的全局唯一的用户名),尽管其背后实际上是由使用Spark ThriftServer服务的不同用户下发的,但所有背后的这些用户都共享使用了Spark ThriftServer的资源、权限和数据,因此我们难以单独对某个用户做资源和权限上的控制,操作审计和其它安全策略。在Spark ThriftServer执行的一条SQL实际上会被转换为一个job执行,如果用户A下发的SQL的job执行时间较长,必然也会阻塞后续用户B下发的SQL的执行。其次,单个Spark ThriftServer也容易带来单点故障问题。从Spark ThriftServer接受的客户端请求和其与Executor的通信来考虑,Spark ThriftServer本身的可靠性也难以满足生产环境下的需求。因此,在将Spark ThriftServer应用于生产环境当中,上面提及的问题和局限性都会不可避免,那业界有没有比较好的解决方案呢?网易开源的Spark Kyuubi就给出了比较好的答案。3 Kyuubi核心架构设计与源码实现剖析3.1 Kyuubi核心架构设计Kyuubi的整体架构设计如下:Kyuubi从整体上可以分为用户层、服务发现层、Kyuubi Server层、Kyuubi Engine层,其整体概述如下:用户层指通过不同方式使用Kyuubi的用户,比如通过JDBC或beeline方式使用Kyuubi的用户。服务发现层服务发现层依赖于Zookeeper实现,其又分为Kyuubi Server层的服务发现和Kyuubi Engine层的服务发现。Kyuubi Server层由多个不同的KyuubiServer实例组成,每个KyuubiServer实例本质上为基于Apache Thrift实现的RPC服务端,其接收来自用户的请求,但并不会真正执行该请求的相关SQL操作,只会作为代理转发该请求到Kyuubi Engine层用户所属的SparkSQLEngine实例上。Kyuubi Engine层由多个不同的SparkSQLEngine实例组成,每个SparkSQLEngine实例本质上为基于Apache Thrift实现的并且持有一个SparkSession实例的RPC服务端,其接收来自KyuubiServer实例的请求,并通过SparkSession实例来执行。在Kyuubi的USER共享层级上,每个SparkSQLEngine实例都是用户级别的,即不同的用户其会持有不同的SparkSQLEngine实例,以实现用户级别的资源隔离和控制。下面将会对每一层以及它们的协作与交互展开较为详细的分析。3.1.1 用户层用户层就是指实际需要使用Kyuubi服务的用户,它们通过不过的用户名进行标识,以JDBC或beeline方式进行连接。比如我们可以在beeline中指定以不同用户名进行登录:使用xpleaf用户名进行登录使用yyh用户名进行登录使用leaf用户名进行登录当然,这里的用户名或登录标识并不是可以随意指定或使用的,它应该根据实际使用场景由运维系统管理人员进行分配,并且其背后应当有一整套完整的认证、授权和审计机制,以确保整体系统的安全。3.1.2 服务发现层服务发现层主要是指Zookeepr服务以及Kyuubi Server层的KyuubiServer实例和Kyuubi Engine层的SparkSQLEngine在上面注册的命名空间(即node节点),以提供负载均衡和高可用等特性,因此它分为Kyuubi Server层的服务发现和Kyuubi Engine层的服务发现。Kyuubi Server层的服务发现Kyuubi Server层的服务发现是需要用户感知的。KyuubiServer实例在启动之后都会向Zookeeper的/kyuubi节点下面创建关于自己实例信息的节点,主要是包含KyuubiServer实例监听的host和port这两个关键信息,这样用户在连接KyuubiServer时,只需要到Zookeeper的/kyuubi节点下面获取对应的服务信息即可,当有多个KyuubiServer实例时,选取哪一个实例进行登录,这个是由用户自行决定的,Kyuubi本身并不会进行干预。在实际应用时也可以封装接口实现随机返回实例给用户,以避免直接暴露Kyuubi的底层实现给用户。另外,KyuubiServer实例是对所有用户共享,并不会存在特定KyuubiServer实例只对特定用户服务的问题。当然在实际应用时你也可以这么做,比如你可以不对用户暴露服务发现,也就是不对用户暴露Zookeeper,对于不同用户,直接告诉他们相应的KyuubiServer实例连接信息即可。不过这样一来,Kyuubi Server层的高可用就难以保证了。比如有多个在不同节点上启动的KyuubiServer实例,其在Zookeeper上面注册的信息如下:Kyuubi Engine层的服务发现Kyuubi Engine层的服务发现是不需要用户感知的,其属于Kyuubi内部不同组件之间的一种通信协作方式。SparkSQLEngine实例在启动之后都会向Zookeeper的/kyuubi_USER节点下面创建关于自己实例信息的节点,主要是包含该实例监听的host和port以及其所属user的相关信息,也就是说SparkSQLEngine实例并不是所有用户共享的,它是由用户独享的。比如Kyuubi系统中有多个不同用户使用了Kyuubi服务,启动了多个SparkSQLEngine实例,其在Zookeeper上面注册的信息如下:3.1.3 Kyuubi Server层Kyuubi Server层由多个不同的KyuubiServer实例组成,每个KyuubiServer实例本质上为基于Apache Thrift实现的RPC服务端,其接收来自用户的请求,但并不会真正执行该请求的相关SQL操作,只会作为代理转发该请求到Kyuubi Engine层用户所属的SparkSQLEngine实例上。整个Kyuubi系统中需要存在多少个KyuubiServer实例是由Kyuubi系统管理员决定的,根据实际使用Kyuubi服务的用户数和并发数,可以部署一个或多个KyuubiServer实例,以满足SLA要求。当然后续发现KyuubiServer实例不够时,可以横向动态扩容,只需要在Kyuubi中系统配置好host和port,启动新的KyuubiServer实例即可。3.1.4 Kyuubi Engine层Kyuubi Engine层由多个不同的SparkSQLEngine实例组成,每个SparkSQLEngine实例本质上为基于Apache Thrift实现的并且持有一个SparkSession实例的RPC服务端,其接收来自KyuubiServer实例的请求,并通过SparkSession实例来执行。在Kyuubi的USER共享层级上,每个SparkSQLEngine实例都是用户级别的,即不同的用户其会持有不同的SparkSQLEngine实例,以实现用户级别的资源隔离和控制。SparkSQLEngine实例是针对不同的用户按需启动的。在Kyuubi整体系统启动之后,如果没有用户访问Kyuubi服务,实际上在整个系统中只有一个或多个KyuubiServer实例,当有用户通过JDBC或beeline的方式连接KyuubiServer实例时,其会在Zookeeper上去查找是否存在用户所属的SparkSQLEngine实例,如果没有,则通过spark-submit提交一个Spark应用,而这个Spark应用本身就是SparkSQLEngine,启动后,基于其内部构建的SparkSession实例,即可为特定用户执行相关SQL操作。3.1.5 整体协作流程通过前面对各层的介绍,结合KyubbiServer架构图,以用户xpleaf访问Kyuubi服务为例来描述整个流程。1.Kyuubi系统管理员在大数据集群中启动了3个KyuubiServer实例和1个Zookeeper集群,其中3个KyuubiServer实例的连接信息分别为10.2.10.1:10009、10.2.10.1:10010和10.2.10.2:1009;2.用户xpleaf通过beeline终端的方式连接了其中一个KyuubiServer实例;在这里我们假设用户xpleaf事先已经通过管理员告知的方式知道了该KyuubiServer实例的连接信息。3.KyuubiServer\_instance1接收到xpleaf的连接请求,会为该用户创建session会话,同时会去Zookeeper上检查是否已经存在xpleaf所属的SparkSQLEngine实例;3.1 如果已经存在,则获取其连接信息;3.2 如果不存在,则通过spark-submit的方式提交一个Spark应用,启动一个SparkSQLEngine实例;4.KyuubiServer\_instance1在Zookeeper上没有找到xpleaf所属的SparkSQLEngine实例信息,其通过spark-submit的方式启动了一个SparkSQLEngine实例;5.属于xpleaf用户的新的SparkSQLEngine\_instance1实例在10.2.10.1节点上进行启动,并且监听的52463端口,启动后,其向Zookeeper注册自己的连接信息/kyuubi_USER/xpleaf/instance1_10.2.10.1:52463;6.KyuubiServer\_instance1在检测到SparkSQLEngine\_instance1启动成功后,会向其发送创建session会话的连接请求;7.SparkSQLEngine\_instance1收到KyuubiServer\_instance1创建session会话的连接请求,则创建一个新的session会话;8.用户启动beeleine完成并成功创建会话,接着用户执行SQL查询;9.KyuubiServer\_instance1接收到xpleaf的执行SQL查询的请求,会先检查是否存在xpleaf所属的SparkSQLEngine实例;10.KyuubiServer\_instance1找到xpleaf所属的SparkSQLEngine\_instance1实例,接着会为这次执行SQL的操作创建一个Operation;11.KyuubiServer\_instance1根据连接信息创建了一个RPC Client,并且构建SQL执行的RPC请求,发到对应的SparkSQLEngine\_instance1实例上;12.SparkSQLEngine\_instance1接收到该请求后,会创建一个该SQL操作的Operation,并且使用其内部的SparkSession实例来进行执行,最后将执行结果返回给KyuubiServer\_instance1;13.KyuubiServer\_instance1接收到SparkSQLEngine\_instance1的执行结果,返回给用户,这样一次SQL查询操作就完成了。透过整体协作流程我们可以看到:站在用户层视角来看,其为RPC客户端,而为其提供RPC服务的是Kyuubi Server层,在这里,Kyuubi Server是RPC服务端;站在Kyuubi Server层视角来看,其既是为用户层提供RPC服务的RPC服务端,同时也是使用Kyuubi Engine层RPC服务的RPC客户端;站在Kyuubi Engine层视角来看,其为RPC服务端,其为Kyuubi Server层提供RPC服务;Kyuubi在整体Server端和Client端以及其实现功能的设计上,是十分清晰的。3.2 Kyuubi源码实现剖析通过前面对Kyuubi各层以及整体协作流程的描述,相信对Kyuubi的核心架构设计会有一个比较清晰的理解,这样再去分析Kyuubi的源码时就会简单很多。首先我们会来介绍Kyuubi整体的Service体系与组合关系,以对Kyuubi整体核心代码有一个概览性的理解,接着会选取多个关键场景来对Kyuubi的源码进行分析,并且给出每个场景的代码执行流程图。确实没有办法在较为简短的篇幅里为大家介绍Kyuubi源码的方方面面,但我个人认为不管对于哪个大数据组件,在理解了其底层通信框架的基础上,再选取关于该组件的几个或多个关键场景来分析其源码,基本上对其整体设计就会有概览性的理解,这样后面对于该组件可能出现的Bug进行排查与修复,或是对该组件进行深度定制以满足业务的实际需求,我相信问题都不大——这也就达到了我们的目的,就是去解决实际问题。当然,在这个过程当中你也可以欣赏到漂亮的代码,这本身也是一种享受。3.2.1 RPC与Apache Thrift基本概述RPCRPC(Remote Procedure Call)远程过程调用,如果按照百度百科的解释会非常羞涩难懂(上面提供的图应该还是《TCP/IP详解卷1:协议》上面的一个图),但实际上我们就可以简单地把它理解为,一个进程调用另外一个进程的服务即可,不管是通过Socket、内存共享或是网络的方式,只要其调用的服务的具体实现不是在调用方的进程内完成的就可以,目前我们见得比较多的是通过网络通信调用服务的方式。在Java语言层面上比较普遍的RPC实现方式是,反射+网络通信+动态代理的方式来实现RPC,而网络通信由于需要考虑各种性能指标,主要用的Netty或者原生的NIO比较多,Socket一般比较少用,比如可以看一下阿里Doubbo的实现。如果想加深这方面的理解,可以参考我的一个开源RPC框架,其实就是非常mini版的Doubbo实现: https://github.com/xpleaf/minidubbo,建议有时间可以看下,实际上这会非常有用,因为几乎所有的大数据组件都会用到相关的RPC框架,不管是开源三方的还是其自己实现的(比如Hadoop的就是使用自己实现的一套RPC框架)。Apache Thrift Apache Thrift是业界流行的RPC框架,通过其提供的接口描述语言(IDL),可以快速构建用于数据通信的并且语言无关的RPC客户端和服务端,在带来高性能的同时,大大降低了开发人员构建RPC服务的成本,因此在大数据生态其有较多的应用场景,比如我们熟知的hiveserver2即是基于Apache Thrift来构建其RPC服务。3.2.2 Kyuubi Service体系与组合关系在看Kyuubi的源码时,我们可以把较多精力放在某几种较重要的类和其体系上,这样有助于我们抓住重点,理解Kyuubi最核心的部分。仅考虑Kyuubi整体的架构设计和实现,比较重要的是Service、Session和Operation等相关的类和体系。Service体系Service,顾名思义就是服务,在Kyuubi中,各种不同核心功能的提供都是通过其Service体系下各个实现类来进行提供的。我们前面提到的服务发现层、Kyuubi Server层和Kyuubi Engine层,在代码实现上绝大部分核心功能都是由Kyuubi源码项目的Server类体系来完成的,可以这么说,理解了Service体系涉及类的相关功能,就基本上从源码级别上理解了整个Kyuubi的体系架构设计和实现。当然这些Service的实现类并不一定使用Service结尾,比如SessionManager、OperationManager等,但基本上从名字我们就能对其功能窥探一二。其完整的继承关系如下:基于Kyuubi提供的核心功能,我们可以大致按Kyuubi Server层和Kyuubi Engine层来将整个体系中的Service类进行一个划分:Kyuubi Server层功能入口 - KyuubiServer:提供main方法,是Kyuubi Server层KyuubiServer实例初始化和启动的入口;服务发现 - KyuubiServiceDiscovery:封装了zkClient,用来与Zookeeper服务进行交互;核心功能 - FrontendService:封装了Apache Thrift的TThreadPoolServer,在Kyuubi Server层,其主要用于向用户层提供RPC服务; - KyuubiBackendService:封装了来自用户层不同RPC请求的处理逻辑,比如openSession、executeStatement、fetchResults等;Session管理 - KyuubiSessionManager:提供对用户层的请求会话(session)管理;Operation管理 - KyuubiOperationManager:提供对用户层的请求操作(operation)管理;Kyuubi Engine层功能入口 - SparkSQLEngine:提供main方法,是Kyuubi Engine层SparkSQLEngine实例初始化和启动的入口;服务发现 - EngineServiceDiscovery:封装了zkClient,用来与Zookeeper服务进行交互;核心功能 - FrontendService:封装了Apache Thrift的TThreadPoolServer,在Kyuubi Engine层,其主要用于向Kyuubi Server层提供RPC服务; - SparkSQLBackendService:封装了来自Kyuubi Server层不同RPC请求的处理逻辑,比如openSession、executeStatement、fetchResults等;Session管理 - SparkSQLSessionManager:提供对Kyuubi Server层的请求会话(session)管理;Operation管理 - SparkSQLOperationManager:提供对Kyuubi Server层的请求操作(operation)管理;这里我们只对具体实现类进行归类,因为中间抽象类只是提取多个子类的公共方法,不影响我们对其体系功能的说明和讲解;而以Noop开头的实际上是Kyuubi的测试实现类,因此我们也不展开说明;KinitAuxiliaryService是Kyuubi中用于认证的类,这里我们不对其认证功能实现进行说明。通过对Service体系各个具体实现类的介绍,再回顾前面对Kyuubi整体架构和协作流程的介绍,其抽象的功能在源码实现类上面就有了一个相对比较清晰的体现,并且基本上也是可以一一对应上的。Service组合关系为了理解Kyuubi在源码层面上是如何进行整体协作的,除了前面介绍的Service体系外,我们还有必要理清其各个Service之间的组合关系。在整个Service体系中,CompositeService这个中间抽象类在设计上是需要额外关注的,它表示的是在它之下的实现类都至少有一个成员为其它Service服务类对象,比如对于KyuubiServer,它的成员则包含有KyuubiBackdService、KyuubiServiceDiscovery等多个Service实现类,SparkSQLEngine也是如此。我们将一些关键的Service类及其组合关系梳理如下,这对后面我们分析关键场景的代码执行流程时会提供很清晰的思路参考:Session与SessionHandleSession当我们使用通过JDBC或beeline的方式连接Kyuubi时,实际上在Kyuubi内部就为我们创建了一个Session,用以标识本次会话的所有相关信息,后续的所有操作都是基于这次会话来完成的,我们可以在一次会话下执行多个操作(比如多次执行某次SQL,我们只需要建立一次会话连接即可)。Session在Kyuubi中又分为Kyuubi Server层的Session和Kyuubi Engine层的Session。Kyuubi Server层的Session实现类为KyuubiSessionImpl,用来标识来自用户层的会话连接信息;Kyuubi Engine层的Session实现类为SparkSessionImpl,用来标识来自Kyuubi Server层的会话连接信息。两个Session实现类都有一个共同的抽象父类AbstractSession,用于Session操作的主要功能逻辑都是在该类实现的。SessionHandleSession对象的存储实际上由SessionManager来完成,在SessionManager内部其通过一个Map来存储Session的详细信息,其中key为SessionHandle,value为Session对象本身。SessionHandle可以理解为就是封装了一个唯一标识一个用户会话的字符串,这样用户在会话建立后进行通信时只需要携带该字符串标识即可,并不需要传输完整的会话信息,以避免网络传输带来的开销。Operation与OperationHandleOperation用户在建立会话后执行的相关语句在Kyuubi内部都会抽象为一个个的Operation,比如执行一条SQL语句对应的Operation实现类为Executement,不过需要注意,Operation又分为Kyuubi Server层的KyuubiOperation和Kyuubi Engine层的SparkOperation。Kyuubi Server层的Operation并不会执行真正的操作,它只是一个代理,它会通过RPC Client请求Kyuubi Engine层来执行该Operation,因此所有Operation的真正执行都是在Kyuubi Engine层来完成的。由于Operation都是建立在Session之下的,所以我们在看前面的组合关系时可以看到,用于管理Operation的OperationManager为SessionManager的成员属性。OperationHandleOperation对象的存储实际上由OprationManager来完成,在SessioOprationManagerManager内部其通过一个Map来存储Session的详细信息,其中key为OperationHandle,value为Operation对象本身。OperationHandle可以理解为就是封装了一个唯一标识一个用户操作的字符串,这样用户基于会话的操作时只需要携带该字符串标识即可,并不需要传输完整的操作信息,以避免网络传输带来的开销。第一次提交Operation时还是需要完整信息,后续只需要提供OperationHandle即可,实际上SQL语句的执行在Kyuubi内部是异步执行的,用户端在提交Opeation后即可获得OperationHandle,后续只需要持着该OperationHandle去获取结果即可,我们在分析SQL执行的代码时就可以看到这一点。3.2.3 Kyuubi启动流程Kyuubi的启动实际上包含两部分,分别是KyuubiServer的启动和SparkSQLEngine的启动。KyuubiServer实例的启动发生在系统管理员根据实际业务需要启动KyuubiServer实例,这个是手动操作完成的;而SparkSQLEngine实例的启动则是在为用户建立会话时为由KyuubiServer实例通过spark-submit的方式去提交一个Spark应用来完成的。KyuubiServer启动流程当我们在Kyuubi的bin目录下去执行./kyuubi run命令去启动KyuubiServer时,就会去执行KyuubiServer的main方法:在加载完配置信息后,通过调用startServer(conf)方法,就开始了KyuubiServer的启动流程:可以看到,实际上KyuubiServer的启动包括两部分:初始化和启动。KyuubiServer的初始化和启动实际上是一个递归初始化和启动的过程。我们前面提到,KyuubiServer为Service体系下的一个CompositeService,参考前面给出的组合关系图,它本身的成员又包含了多个Service对象,它们都保存在保存在serviceList这个成员当中,因此初始化和启动KyuubiServer实际上就是初始化和启动serviceList中所包含的各个Service对象。而这些Service对象本身又可能是CompositeService,因此KyuubiServer的启动和初始化实际上就是一个递归初始化和启动的过程。这样一来,整个KyuubiServer的启动流程就比较清晰了,这也是我们在最开始就列出其Service体系和组合关系的原因,由于整体的启动流程和细节所包含的代码比较多,我们就没有必要贴代码了,这里我把整个初始化和启动流程步骤的流程图梳理了出来,待会再对其中一些需要重点关注的点进行说明,如下:我们重点关注一下FontendService和ServiceDiscoveryService的初始化和启动流程。FrontendService的初始化和启动我们需要重点关注一下FrontendService,因为KyuubiServer实例对外提供RPC服务都是由其作为入口来完成的。其初始化时主要是获取和设置了Apache Thrift内置的用于构建RPC服务端的TThreadPoolServer的相关参数:可以看到主要是host、port、minThreads、maxThreads、maxMessageSize、requestTimeout等,这些参数都是可配置的,关于其详细作用可以参考KyuubiConf这个类的说明。其启动比较简单,主要是调用TThreadPoolServer的server()方法来完成:ServiceDiscoveryService的初始化和启动初始化时主要是创建一个用于后续连接ZooKeeper的zkClient:当然这里还看到其获取了一个HA_ZK_NAMESPACE的配置值,其默认值为kyuubi:在ServiceDiscoveryService进行启动的时候,就会基于该namesapce来构建在Kyuubi Server层进行服务发现所需要的KyuubiServer实例信息:在这里,就会在Zookeeper的/kyuubi节点下面创建一个包含KyuubiServer实例详细连接信息的节点,假设KyuubiServer实例所配置的host和post分别为10.2.10.1和10009,那么其所创建的zk节点为:KyuubiSessionManager的初始化和启动我们主要关注一下其启动过程:在这里主要完成的事情:1.获取session check interval;2.获取session timout;3.起一个schedule的调度线程;4.根据interval和timeout对handleToSession的session进行检查;5.如果session超时(超过timeout没有access),则closesession;那么对于KyuubiServer的启动过程我们就分析到这里,更多细节部分大家可以结合我的流程图来自行阅读代码即可,实际上当我们把Kyuubi的Service体系和组合关系整理下来之后,再去分析它的启动流程时就会发现简单很多,这个过程中无非就是要关注它的一些相关参数获取和设置是在哪里完成的,它是怎么侦听服务的(真正用于侦听host和port的server的启动)。SparkSQLEngine启动流程在KyuubiServer为用户建立会话时会去通过服务发现层去Zookeeper查找该用户是否存在对应的SparkSQLEngine实例,如果没有则通过spark-submit的启动一个属于该用户的SparkSQLEngine实例。后面在分析KyuubiServer Session建立过程会提到,实际上KyuubiServer是通过调用外部进程命令的方式来提交一个Spark应用的,为了方便分析SparkSQLEngine的启动流程,这里我先将其大致的命令贴出来:kyuubi-spark-sql-engine-1.1.0.jar是Kyuubi发布版本里面的一个jar包,里面就包含了SparkSQLEngine这个类,通过-class参数我们可以知道,实际上就是要运行SparkSQLEngine的main方法,由于开启了SparkSQLEngine的启动流程。需要说明的是,提交Sparkk App的这些参数在SparkSQLEngine启动之前都会被设置到SparkSQLEngine的成员变量kyuubiConf当中,获取方法比较简单,通过scala提供的sys.props就可以获取,这些参数在SparkSQLEngine的初始化和启动中都会起到十分关键的作用。接下来我们看一下SparkSQLEngine的main方法:首先会通过createSpark()创建一个SparkSession对象,后续SQL的真正执行都会交由其去执行,其创建方法如下:这里主要是设置了一些在创建SparkSession时需要的参数,包括appName、spark运行方式、spark ui的端口等。另外这里还特别对frontend.bind.port参数设置为0,关于该参数本身的定义如下:可以看到其默认值为10009,前面KyuubiServer在构建TThreadPoolServer时就直接使用了默认值,这也是我们启动的KyuubiServer实例侦听10009端口的原因,而在这里,也就是SparkSQLEngine启动时将其设置为0是有原因,我们将在下面继续说明。创建完成SparkSession后才调用startEngine(spark)方法启动SparkSQLEngine本身:可以看到也是先进行初始化,然后再启动,SparkSQLEngine本身是CompositeService,所以初始化和启动过程跟KyuubiServer是一模一样的(当然其包含的成员会有所差别),都是递归对serviceList中所包含的各个Service对象进行初始化和启动:FrontendService的初始化和启动FrontendService在SparkSQLEngine中的启动流程与在KyuubiServer中的启动流程是基本一样的,可以参考前面的说明,这里主要说明一些比较细微的差别点。前面已经设置了frontend.bind.port参数的值为0,在FrontendService这个类当中,它会赋值给portNum这个变量,用以构建TThreadPoolServer所需要的参数ServerSocket对象:所以实际上,不管是KyuubiServer还是SparkSQLEngine,其所侦听的端口是在这里构建ServerSocket对象的时候确定下来的,对ServerSocket对象,如果传入一个为0的portNum,则表示使用系统随机分配的端口号,所以这也就是我们在启动了SparkSQLEngine之后看到其侦听的端口号都是随机端口号的原因。ServiceDiscoveryService的初始化和启动与KyuubiServer类似,这里分析一下其差别点。前面在通过spark-submit提交应用时传入了--conf spark.kyuubi.ha.zookeeper.namespace=/kyuubi_USER/xpleaf的参数,实际上在SparkSQLEngine初始化KyuubiConfig对象时会设置到KyuubiConfig.HA_ZK_NAMESPACE属性上,因此在ServiceDiscoveryService初始化时获取的namespace实际上就为/kyuubi_USER/xpleaf,而不是默认的kyuubi,这点是需要注意的:因此在启动调用start()方法时,其在Zookeeper上构建的znode节点也就不同:比如其创建的znode节点为:SparkSQLSessionManager的初始化和启动SparkSQLSessionManager也是继承自SessionManager,因此与KyuubiServer的KyuubiSessionManager一样,其也启动了一个用于检查Session是否超时的checker。此外,还启动了另外一个checker,如下:实际上这个checker是在SparkSQLEngine递归初始化和启动其serviceList之前就已经启动,从它的实现当中我们可以看到,当超过一定时时并且SparkSQLEngine维护的Session为0时,整个SparkSQLEngine实例就会退出,这样做的好处就是,如果一个用户的SparkSQLEngine实例长期没有被使用,我们就可以将其占用的资源释放出来,达到节省资源的目的。3.2.4 Kyuubi Session建立过程Kyuubi Session的建立实际上包含两部分,分别是KyuubiServer Session建立和SparkSQLEngine Session建立,这两个过程不是独立进行的,KyuubiServer Session的建立伴随着SparkSQLEngine Session的建立,KyuubiServer Session和SparkSQLEngine Session才完整构成了Kyuubi中可用于执行特定Operation操作的Session。KyuubiServer Session建立过程当用户通过JDBC或beeline的方式连接Kyuubi时,实际上就开启了KyuubiServer Session的一个建立过程,此时KyuubiServer中FrontedService的OpenSession方法就会被执行:进而开启了KyuubiServer Session建立以及后续SparkSQLEngine实例启动(这部分前面已经单独介绍)、SparkSQLEngine Session建立的过程:整体流程并不复杂,在执行FrontendService#OpenSession方法时,最终会调用到KyuubiSessionImpl#open方法,这是整个KyuubiServer Session建立最复杂也是最为关键的一个过程,为此我们单独将其流程整理出来进行说明:流程中其实已经可以比较清晰地说明其过程,这里我们再详细展开说下,其主要分为下面的过程:服务发现与SparkSQLEngine实例启动第一次建立特定user的session时,在zk的/kyuubi\_USER path下是没有相关user的节点的,比如/kyuubi\_USER/xpleaf,因此在代码执行流程中,其获取的值会为None,这就触发了其调用外部命令来启动一个SparkSQLEngine实例:而调用的外部命令实际上就是我们在前面讲解SparkSQLEngine实例中提到的spark-submit命令:之后就是SparkSQLEngine实例的启动过程,其启动完成之后,就会在Zookeeper上面注册自己的节点信息。对于KyuubiSessionImpl#open方法,在不超时的情况下,循环会一直执行,直到其获取到用户的SparkSQLEngine实例信息,循环结束,进入下面跟SparkSQLEngine实例建立会话的过程。建立与SparkSQLEngine实例的会话SparkSQLEngine本质上也是一个RPC服务端,为了与其进行通信以建立会话,就需要构建RPC客户端,这里KyuubiSessionImpl#openSession方法中构建RPC客户端的方法主要是Apache Thrift的一些模板代码,如下:在发送请求给SparkSQLEngine的时候,又会触发SparkSQLEngine Session建立的过程(这个接下来说明),在跟其建立完Session之后,KyuubiSessionImpl会将其用于标识用户端会话的sessionHandle、用于跟SparkSQLEngine进行通信的RPC客户端和在SparkSQLEngine实例中进行Session标识的remoteSessionHandle缓存下来,这样在整个Kyuubi体系中,就构建了一个完整的Session映射关系:userSessionInKyuubiServer-RPCClient-KyuubiServerSessionInSparkSQLEngine,后续的Operation都是建立在这样一个体系之下。KyuubiServer在Session建立完成后会给客户端返回一个SessionHandle,后续客户端在与KyuubiServer进行通信时都会携带该SessionHandle,以标识其用于会话的窗口。SparkSQLEngine Session建立过程在接收到来自KyuubiServer的建立会话的RPC请求之后,SparkSQLEngine中FrontedService的OpenSession方法就会被执行,其整体流程与KyuubiServer Session的建立过程是类似的,主要不同在于SparkSQLSessionManager#openSession方法执行上面,如下:其对应的关键代码如下:sessionImpl.open()实际上只是做了日志记录的一些操作,所以其实这里的核心是将创建的Session记录下来。SparkSQLEngine在Session建立完成后会给KyuubiServer返回一个SessionHandle,后续KyuubiServer在与SparkSQLEngine进行通信时都会携带该SessionHandle,以标识其用于会话的窗口。3.2.5 Kyuubi SQL执行流程Kyuubi SQL的执行流程实际上包含两部分,分别是KyuubiServer SQL执行流程和SparkSQLEngine SQL执行流程,其结合起来才是一个完整的SQL执行流程,KyuubiServer只是一个代理,真正的SQL执行是在SparkSQLEngine中完成。另外由于在Kyuubi中,SQL的执行是异步的,也就是可以先提交一个SQL让其去执行,后续再通过其返回的operationHandle去获取结果,所以在KyuubiServer和SparkSQLEngine内部,SQL的执行流程又可以再细分为提交Statement和FetchResults两个过程,在分别分析KyuubiServer SQL执行流程和SparkSQLEngine SQL执行流程时,我们就是对提交Statment和FetchResults这两个过程来展开详细的分析,整体会有些繁多,但并不复杂。KyuubiServer SQL执行流程1.提交Statement当用户通过JDBC或beeline的方式执行一条SQL语句时,就开启了SQL语句在Kyuubi中的执行流程,此时KyuubiServer中FrontedService的ExecuteStatement方法就会被执行:runAsync值为true,因此会通过异步的方式来执行SQL,也就是会执行BackendService的executeStatementAsync方法,开启了异步执行SQL的流程:首先会通过KyuubiOperationManager去创建一个表示执行SQL的ExecuteStatement:client实际上就是我们前面在KyuubiServer Session建立过程中建立的用于与SparkSQLEngine通信的RPC客户端,ExecuteStatement需要client来发送执行SQL语句的请求给SparkSQLEngine实例,不过需要注意的是,这里的ExecuteStatement是KyuubiServer体系下的,其类全路径为org.apache.kyuubi.operation.ExecuteStatement,因为后面在分析SparkSQLEngine SQL执行流程时,在SparkSQLEngine体系下也有一个ExecuteStatement,但其类全路径为org.apache.kyuubi.engine.spark.operation.ExecuteStatement。这里的整个流程关键在于后面执行operation.run()方法,进而执行runInternal()方法:这里会通过异步的方式来执行,其先同步执行executeStatement()方法,然后再提交一个异步线程来执行asyncOperation(sessionManager.submitBackgroundOperation(asyncOperation)实际上就是通过线程池来提交一个线程线程),我们先看一下其executeStatement()方法:这里statement实际上就是要执行的SQL语句,所以本质上就是向SparkSQLEngine发送了一个用于执行SQL语句的RPC请求,这样就会触发SparkSQLEngine执行提交Statement的一个过程(这个接下来会分析),请求成功后,KyuubiServer会将SparkSQLEngine实例用于记录该操作的operationHandle记录下来,就是赋值给成员变量_remoteOpHandle,_remoteOpHandle用后续用于查询statement在SparkSQLEngine实例中的执行状态和FetchResults。执行完executeStatement()方法后,我们再看一下其提交异步线程时所执行的操作,也就是waitStatementComplete()方法:可以看到其主要操作是构建用于查询SparkSQLEngine实例中Operation的执行状态。再回过来看一下runInternal()方法:这里提交一个线程后的返回结果backgroundOperation实际上为一个FutureTask对象,后续在FetchResults过程中通过该对象就可以知道Operation在SparkSQLEngine实例中的执行状态。在提交完Statement之后,KyuubiServer会将operationHandle返回给用户端,用于后续获取执行结果。2.FetchResults提交完Statement后,用户层的RPC客户端就会去获取结果,此时KyuubiServer中FrontedService的FetchResults方法就会被执行:在获取真正执行结果之前,会有多次获取操作日志的请求,也就是req.getFetchType == 1的情况,这里我们只关注fetchLog为false的情况:获取执行结果的过程就比较简单,主要是调用RPC客户端的FetchResults方法,这样就会触发SparkSQLEngine FetchResults的一个过程(这个接下来会分析),不过在获取执行结果前会检查其执行状态,前面在分析在提交Statement时,异步线程waitStatementComplete()就会请求SparkSQLEngine更新其状态为FINISHED,因此这里可以正常获取执行结果。SparkSQLEngine SQL执行流程1.提交Statement接收到KyuubiServer提交Statement的RPC请求时,此时SparkSQLEngine中FrontedService的ExecuteStatement方法就会被执行,进而触发接下来提交Statement的整个流程:其整体流程与KyuubiServer是十分相似的,主要区别在于:1.其创建的Statement为SparkSQLEngine体系下的ExecuteStatement;2.其异步线程是通过SparkSession来执行SQL语句;因此我们来看一下其runInternal()方法和异步线程执行的executeStatement()方法:可以看到其执行非常简单,就是直接调用SparkSession的sql()方法来执行SQL语句,最后再将结果保存到迭代器iter,并设置执行状态为完成。在提交完Statement之后,SparkSQLEngine会将operationHandle返回给KyuubiServer,用于后续获取执行结果。2.FetchResults接收到KyuubiServer获取结果的RPC请求时,此时SparkSQLEngine中FrontedService的FetchResults方法就会被执行,进而触发接下来FetchResults的整个流程:整个过程比较简单,就是将iter的结果转换为rowSet的对象格式,最后返回给KyuubiServer。参考资料《Spark SQL内核剖析》《网易数帆开源Kyuubi:基于Spark的高性能JDBC和SQL执行引擎》《大数据实战:Kyuubi 与 Spark ThriftServer 的全面对比分析》Apache Thriftminidubbo:A Full RPC Framework Based on Netty.
2024年10月22日
41 阅读
0 评论
0 点赞
1
2
3