接口定义
CatalogPlugin:catalog的顶级接口,该接口主要实现了catalog的name和defaultNamespace。比如V2SessionCatalog的默认实现就是name="spark_catalog",defaultNamespace="defualt"
TableCatalog:定义Table的相关接口,包括listTable, loadTable, CreateTable, DropTable,AlterTable等。
CatalogExtension:继承自TableCatalog,通过setDelegateCatalog()方法把上面的V2SessionCatalog 实例进行代理。经过实际测试发现,无法改变catalog的name,所以只能对原始的catalog进行功能扩展。
DelegatingCatalogExtension:抽象类,包含了对CatalogExtendsion的所有方法实现。
总结:如果需要自定义catalog,则需实现TableCatalog即可。如果需要对当前的catalog进行扩展,则实现CatalogExtension或者重写DelegatingCatalogExtension的方法。
Catalog初始化过程
Spark通过CatalogManager管理多个catalog,通过spark.sql.catalog.${name} 可以注册多个catalog,Spark的默认实现则是spark.sql.catalog.spark_catalog。
1.SparkSession在创建时,可以选择是否enableHiveSupport()来指定catalog类型,如果指定了则会实例化HiveExternalCatalog,不指定则实例InMemoryCatalog。
2.在BaseSessionStateBuilder/HiveSessionStateBuilder则会使用上面实例化的externalCatalog创建catalog对象,再根据catalog对象创建V2SessionCatalog对象(V2SessionCatalog就是对SessionCatalog的封装)
3.根据catalog和v2SessionCatalog创建CatalogManager实例,CatalogManager中内置一个catalogs的HashMap<String,CatalogPlugin>来管理catalog。
4.再SQL中调用catalog时候,会通过CatalogManger.catalog(String name)返回对应名称的catalog实例,如果没有则通过Catalog.load(name, conf)来进行实例化
5.对于默认的spark_catalog,如果需要进行扩展,则可以通过spark.sql.catalog.spark_catalog来指定对应的实现类,该类可以通过实现CatalogExtension,然后会自动调用其setDelegateCatalog()来设置默认的spark_catalog为代理对象,然后进行扩展即可。
总结:在DQC SQL升级引擎的过程中,使用了Spark3的Catalog特性来支持原生的multi catalog关联查询。
对于JDBC协议的数据库来说,则使用JDBCTableCatalog来进行扩展catalog即可;
对于Hive来说,第一种方案是实现CatalogExtension来扩展,但是发现这种方案无法修改其catalog name来适配元数据中心的hive catalog名称;第二种方案而通过完全自定义TableCatalog则需要完全初始化一套SparkSessionCatalog,在实践的过程中发现,可以进行元数据的拉取,但是在loadTable读取数据的时候报“表不支持batch read"的问题;第三种方案是采用修改DQC SQL的hvie catalog名称统一为spark_catalog的方式来实现Hive Catalog的使用,算是一个取巧的办法,后续有时间则可以继续扩展SparkSessionCatalog来实现hive的适配。
悬念设置恰到好处,牢牢抓住读者注意力。
字里行间流露出真挚的情感,让人感同身受,共鸣不已。