前言
Hudi索引可以加快upsert过程中查询数据物理位置(存储于哪个file group)。Hudi支持多种Index。本篇为大家介绍使用HBase作为Index存储媒介的方式。
注意:经过本人验证Hudi使用HBase存在诸多限制而且存在较多问题。目前不推荐生产环境使用。
环境信息
- Hudi 0.14.0
- Spark 3.3.0
- HBase 2.4.9
- HDFS 3.1.1
环境准备
编译hudi
默认情况HBase依赖的hadoop-hdfs-client为hadoop2.x,直接拿来编译hudi spark bundle使用时候会出现java.lang.NoSuchMethodError: org.apache.hadoop.hdfs.client.HdfsDataInputStream.getReadStatistics()L错误。所以需要提前编译并install带有3.x版本hadoop-hdfs-client的HBase依赖包再去编译Hudi。参见:https://github.com/apache/hudi/pull/6756/files/6923a9acb4872305ffbba1e0b2b1546bae8d0070
首先需要编译HBase。执行:
git clone https://github.com/apache/hbase
进入源代码目录后checkout 2.4.9 tag。
git checkout rel/2.4.9
最后执行如下命令编译HBase:
mvn clean install -Denforcer.skip -DskipTests -Dhadoop.profile=3.0 -Psite-install-step
到这里HBase编译完毕。接下来编译Hudi。
git clone https://github.com/apache/hudi.git
cd hudi
编辑项目根目录的pom.xml文件
去掉HBase的relocation。
修改maven-shade-plugin
插件的配置。注释掉如下部分:
<!-- hbase -->
<!--
<relocation>
<pattern>org.apache.hadoop.hbase.</pattern>
<shadedPattern>org.apache.hudi.org.apache.hadoop.hbase.</shadedPattern>
<excludes>
<exclude>org.apache.hadoop.hbase.KeyValue$KeyComparator</exclude>
</excludes>
</relocation>
<relocation>
<pattern>org.apache.hbase.</pattern>
<shadedPattern>org.apache.hudi.org.apache.hbase.</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.htrace.</pattern>
<shadedPattern>org.apache.hudi.org.apache.htrace.</shadedPattern>
</relocation>
-->
经本人测试,如果不去掉这些relocation,连接HBase region server的时候会出现Connection Closed错误。社区并不推荐这么做,这么做目前带来的问题未知。
执行如下命令编译:
mvn clean package -Dflink1.15 -Dscala2.12 -Dspark3.3 -DskipTests -Pflink-bundle-shade-hive3 -T 4 -Denforcer.skip
准备好编译完成的hudi-spark-bundle,位于packaging/hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.14.0.jar
。
部署HBase 2.4.9
必须使用HBase 2.4.9。本人测试了Hudi编译的时候修改HBase版本为2.2.7和2.4.17,前者缺少必须的类编译失败,后者运行时出现ClassNotFound问题(relocation相关)。
下载并解压HBase 2.4.9到任意目录。下载地址为:https://archive.apache.org/dist/hbase/2.4.9/。
修改如下配置文件(位于conf
目录):
hbase-env.sh
:
export HBASE_MANAGES_ZK=false
export JAVA_HOME=/usr/jdk64/java
这里不适用HBase内嵌的Zookeeper。使用独立部署的Zookeeper。
hbase-site.xml
:
<configuration>
<property>
<name>hbase.tmp.dir</name>
<value>./tmp</value>
</property>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hdfs_ip:8020/hbase24</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>zookeeper.znode.parent</name>
<value>/hbase</value>
</property>
<name>hbase.zookeeper.quorum</name>
<value>zk_ip</value>
<property>
<name>hbase.master.info.port</name>
<value>16010</value>
</property>
<property>
<name>hbase.master.port</name>
<value>16000</value>
</property>
<property>
<name>hbase.regionserver.info.port</name>
<value>16030</value>
</property>
<property>
<name>hbase.regionserver.port</name>
<value>16020</value>
</property>
</configuration>
regionservers
文件需要配置region server所在节点。
最后执行start-hbase.sh
启动HBase集群。
确保HBase的能够有权读写HDFS中的数据存放目录。在该例子中为
hdfs://hdfs_ip:8020/hbase24
部署Spark Hudi
参考Spark 使用之操作Hudi表。
写入数据Hudi表
创建HBase存放索引的表
使用HBase作为索引存储之前,需要先创建出索引表。
例如我们的索引表叫做hudi_mor_tbl_shell
,进入HBase shell,执行:
create "hudi_mor_tbl_shell", "_s"
写入Hudi表数据
Hudi使用HBase索引表存在一些问题。按照社区提示参考如下链接:https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-considerations.html。可执行成功。
使用如下命令进入Spark shell:
./spark-shell --jars /opt/zy/hudi-hbase/*.jar --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false"
进入Spark shell之后,执行如下代码插入示例数据:
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.hudi.config.HoodieHBaseIndexConfig._
import org.apache.hudi.config.HoodieIndexConfig._
val fields = Array(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("price", DoubleType, true),
StructField("ts", LongType, true)
)
val simpleSchema = StructType(fields)
val data = Seq(Row(2, "a2", 200.0, 100L))
val df = spark.createDataFrame(data, simpleSchema)
df.write.format("hudi").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "id").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(INDEX_TYPE.key(), "HBASE").
option(ZKPORT.key(), "2181").
option(TABLENAME.key(), "hudi_mor_tbl_shell").
option(ZK_NODE_PATH.key(), "/hbase-unsecure").
option(ZKQUORUM.key(), "zk_ip").
option(TABLE_NAME, "hudi_mor_tbl_shell").
option(QPS_FRACTION.key(), 0.5).
option(QPS_ALLOCATOR_CLASS_NAME.key(), "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator").
option(MAX_QPS_PER_REGION_SERVER.key(), 1000).
mode(Append).
save("hdfs:///hudi/hudi_mor_tbl_shell")
其中涉及HBase索引表的配置为:
-
INDEX_TYPE.key()
: 值为HBASE表示使用HBase索引类型。 -
TABLENAME.key()
:HBase索引表名。不要和TABLE_NAME
弄混。这个是Hudi表本身的名字。 -
ZKQUORUM.key()
:存放HBase节点信息的Zookeeper地址。 -
ZKPORT.key()
:Zookeeper的端口号。 -
ZK_NODE_PATH.key()
:HBase信息在Zookeeper中的parent path。 - QPS相关配置:用于访问HBase限流。
按照本篇的配置方式,到这一步Hudi表数据应能够成功插入。
查看HBase索引表内容
上一步Hudi表数据对应的HBase索引数据都写入成功。可以使用如下方式查看索引表中的数据:
hbase:001:0> scan "hudi_mor_tbl_shell"
ROW COLUMN+CELL
2 column=_s:commit_ts, timestamp=2023-11-27T18:03:13.225, value=20231127180307879
2 column=_s:file_name, timestamp=2023-11-27T18:03:13.225, value=0c550d74-eeb3-4b58-92c5-c39504fb910b-0
2 column=_s:partition_path, timestamp=2023-11-27T18:03:13.225, value=
1 row(s)
可以发现HBase索引表存放了Hudi record key和提交时间(commit_ts
),文件名(file_name
)和分区路径(partition_path
)的对应关系。