Hive学习3-UDF
1 UDF
1.1 概述
可参考:
- HivePlugins
- Hive Operators and User-Defined Functions (UDFs) Hive自带Function函数
Hive中有三种UDF: UDF(最常使用的)、UDAF(即用户定义聚集函数,user-defined aggregate function
)以及UDTF(用户定义表生成函数,user-defined table-generating function
)。它们所接受的输入和产生的输出的数据行的数量不同。
HiveUDF分为Temporary Functions和Permanent Functions。
1.2 Function Jar上传和注册
这里使用Hive官方例子,即字符串转为小写。
首先我们在pom.xml引入UDF用到的依赖包,version替换为你自己的即可:
<properties>
<hadoop.version>2.7.3</hadoop.version>
<hive.version>2.3.0</hive.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
写出你的UDF代码:
package com.chengc.demos.hive.test.udf.test1;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public final class Lower extends UDF {
public Text evaluate(final Text s) {
if (s == null) { return null; }
return new Text(s.toString().toLowerCase());
}
}
Session级别部署UDF Jar包,重开Cli后失效:
add jar /Users/chengc/cc/work/projects/hive-test/udf-test1/target/udf-test1-1.0-SNAPSHOT.jar;
查看add的所有jar:
list jars;
+----------------------------------------------------+
| resource |
+----------------------------------------------------+
| /Users/chengc/cc/work/projects/hive-test/udf-test1/target/udf-test1-1.0-SNAPSHOT.jar |
+----------------------------------------------------+
我们也可以通过hdfs命令上传UDF Jar到某目录下,适用于permanent function:
hdfs dfs -put udf-test1-1.0-SNAPSHOT.jar /tmp/hive/udf
1.3 Temporary Functions
1.3.1 Create Function
CREATE TEMPORARY FUNCTION my_lower_tmp AS 'com.chengc.demos.hive.test.udf.test1.Lower';;
以上为创建Session级别的临时函数。
现在试用下这个临时函数:
select my_lower_tmp('APPLE');
+--------+
| _c0 |
+--------+
| apple |
+--------+
可以看到该函数将我们传入的大写字符串转为了小写。
1.3.2 Drop Function
DROP TEMPORARY FUNCTION [IF EXISTS] function_name;
需要注意的是,Drop不会删除UDF对应的再HDFS上的Jar包,必须按需手动删除。
1.4 Permanent Functions
1.4.1 Create Function
CREATE FUNCTION [db_name.]function_name AS class_name
[USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
该function将被添加到指定的数据库,或添加到当前数据库。
可以通过全限定函数名称(db_name.function_name
)来引用该函数。或若该函数位于当前数据库中,则可以不带限定地对其进行引用。
例子如下:
create function my_lower_permanent as 'com.chengc.demos.hive.test.udf.test1.Lower' using jar 'hdfs:///tmp/hive/udf/udf-test1-1.0-SNAPSHOT.jar' ;
使用该永久函数:
select my_lower_permanent('APPLE');
+--------+
| _c0 |
+--------+
| apple |
+--------+
而且,就算退出后重启Beeline,该函数依旧有效!
1.4.2 Drop Function
DROP FUNCTION [IF EXISTS] function_name;
1.4.3 Reload Function
RELOAD (FUNCTIONS|FUNCTION);
如果一个Hive CLI会话中的Permanent Function是在HiveServer2或其他Hive CLI开启之后创建的,则它们可能不会反映在HiveServer2或其他Hive CLI会话中。 在HiveServer2或HiveCLI会话中发出RELOAD FUNCTIONS
将允许获取对Permanent Function的任何更改。
2 UDF
2.1 概述
UDF操作作用于单个数据行,且产生一个数据行作为输出。大多数函数
(例如数学函数和字符串函数)都属于这一类。
2.2 例子
3 UDAF
可参考GenericUDAFCaseStudy
3.1 概述
UDAF接受多个输入数据行,并产生一个输出数据行。
像COUNT和MAX这样的函数都是聚集函数。
3.2 例子
4 UDTF
4.1 概述
可参考Hive-UDTF-DeveloperGuide
UDTF操作作用于单个数据行,且产生多个数据行(即一个表)作为输出。
4.2 Hive部分自带TF
4.2.1 explode
4.3 例子
例子如下:
package org.apache.hadoop.hive.contrib.udtf.example;
import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
/**
* 必须继承GenericUDTF抽象类,为单个输入行生成可变数量的输出行。
* 本测试类的功能是两次输出已知行数,作用是测试从侧面反映出输出的行数。
*/
public class GenericUDTFCount2 extends GenericUDTF {
Integer count = Integer.valueOf(0);
Object forwardObj[] = new Object[1];
// 1.initialize由Hive调用,来通知UDTF期望的参数类型
// 该方法必须返回一个ObjectInspector,与UDTF将生成的行对象相对应
The UDTF must then return an object inspector corresponding to the row objects that the UDTF will generate.
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("col1");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
fieldOIs);
}
// 2.initialize滴啊用后,Hive会调用process方法,将rows传递给该UDTF
// 该方法内部可进行自定义逻辑,
// 也可以通过forward方法将rows传递给其他operator
@Override
public void process(Object[] args) throws HiveException {
count = Integer.valueOf(count.intValue() + 1);
}
// 3.最后,当所有row已经传递给UDTF后,Hive会调用你close方法
@Override
public void close() throws HiveException {
forwardObj[0] = count;
forward(forwardObj);
forward(forwardObj);
}
}
5 FAQ
5.1 多个hiveserver时,UDF不生效
登录到hiveserver2上,执行reload function;
即可。
可参考hive创建UDF未生效;Share UDFs in Multi hiveserver