自定义函数包含三种UDF, UDAF, UDTF
- UDF(User-Defined-Function):一进一出
- UDAF(User-Defined Aggregation Function):聚集函数,多进一出
- UDTF(User-Defined Table-Generating Functions): 一进多处
使用方式:在HIVE会话中add自定义函数jar文件,然后创建function继而使用函数
自定义函数的步骤(UDF)
- UDF函数可以直接应用于select语句,对查询结构做格式化处理后,再输出内容
- 编写UDF函数的时候需要注意以下几点
- 自定义UDF需要继承
org.apache.hadoop.hive.ql.UDF
- 需要实现evaluate函数,evaluate函数支持重载
- 自定义UDF需要继承
以下实现了一个简单的 脱敏工具类,值展示文本的第一个字符即可:
package org.hadoop.hive.learn.udf; import org.apache.hadoop.hive.ql.exec.UDF; /** * 只展示第一个字符的操作 */ public class SensitiveFunc extends UDF { public String evaluate(String str) { if (str == null || str.length() <= 1) { return str; } int length = str.length(); String res = str.substring(0,1); StringBuilder sb = new StringBuilder(res); for (int i = 1; i < length; i++) { sb.append("*"); } return sb.toString(); } public static void main(String[] args) { SensitiveFunc sensitiveFunc = new SensitiveFunc(); String res = sensitiveFunc.evaluate("你好劜"); System.out.println(res); } }
- 将以上代码打包为jar包,并上传到hive所在服务器中。
- 进入hive客户端,添加jar包
add jar /root/hive-learn-1.0-SNAPSHOT.jar
- 创建临时函数:
create temporary function tuomin as 'org.hadoop.hive.learn.udf.SensitiveFunc'
在创建完成以上的函数之后,就可以使用这个函数了:
我们可以看到自己定义的脱敏函数已经完成啦~~~~
这样的缺点是,因为创建的时temporary的函数,因此在hive客户端关闭之后,这个函数就无法再次被找到。
在以上的步骤中,我们也可以将jar包上传的hdfs上,然后通过一下命令使用:
create temporary function tumin as 'org.hadoop.hive.learn.udf.SensitiveFunc' using jar 'hdfs://node1:8020/path/tuomin.jar'
创建永久函数
永久函数因为需要使用到jar, 这个时候我们可以将jar包上传到hdfs中,然后创建永久函数,当在任何机器上使用函数时,都可以直接从hdfs上加载jar包并使用,具体步骤如下:
# 创建目录 hdfs dfs -mkdir -p /lib/hive/udf # 上传jar包 hdfs dfs -put /root/hive-learn-1.0-SNAPSHOT.jar /lib/hive/udf/hive-sql-func.jar
然后有了以上的jar包之后,则在Hive客户端,创建永久的函数:
hive> create function sens as 'org.hadoop.hive.learn.udf.SensitiveFunc' using jar 'hdfs://mycluster/lib/hive/udf/hive-sql-func.jar'
有了这个函数之后,我就可以在任何地方使用这个函数了:
一下为在hive的客户端工具查询的情况:
函数的维护
#删除函数名 drop function sens
UDAF自定义集函数
多行进一行出,如sum(), min(),用在group by时
实现步骤如下
- 必须继承
org.apache.hadoop.hive.ql.exec.UDAF
(函数类继承)org.apache.hadoop.hive.ql.exec.UDAFEvaluator
(内部类 Evaluator实现UDAFEvaluator接口)
- Evaluator需要实现
init
,iterate
,terminatePartial
,merge
,terminate
这几个函数- init():类似于构造函数,用于UDAF初始化
- iterate():接受传入的参数,并进行内部的轮转,返回boolean
- termiatePartial():无参数,其为iterate函数轮转结束后,返回轮转数据,类似于hadoop的Combiner
- merge():接受terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean
- terminate():返回最终的聚集函数结果
关于UDAF的四个阶段
- PARTIAL1:这个阶段用于接受数据库的原始数据,
iterate()
和terminatePartial()
方法将被调用。一般指代了mapreduce任务中的map阶段 - PARTIAL2: 这个阶段接受了部分的合并数据,主要是多map阶段产生的数据做合并操作。相当于mapreduce任务中的conbiner. 这个阶段
merge()
和terminatePartial()
任务将被调用 - FINAL:这个阶段对PARTIAL2产生的数据做整体的合并操作。这个阶段中,
merge()
和terminate()
方法将被调用。这个阶段就相当于mapreduce任务中的reduce阶段 - COMPLETE:这个阶段就是完结的阶段,
terminate()
和iterate()
方法将被调用。这个阶段如果mapreduce没有reduce任务,那么将会直接到COMPLETE阶段。这个阶段也是接受的数据库的原始数据
实现concat功能
这段代码主要实现字符串concat功能,实现在group的时候,将对应的字段拼接为一个字符串,具体代码如下:
package org.hadoop.hive.learn.udaf; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ConcatStrFunc extends AbstractGenericUDAFResolver { private static final Logger LOG = LoggerFactory.getLogger(ConcatStrFunc.class); /** * 创建Evaluator对象 * * @param info The types of the parameters. We need the type information to know * which evaluator class to use. * @return * @throws SemanticException */ @Override public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException { LOG.info("获取Evaluator对象.."); return new ConcatStrEvaluator(); } public static class ConcatStrAggregationBuffer implements GenericUDAFEvaluator.AggregationBuffer { private StringBuilder concatStrBuilder = new StringBuilder(); private Object monitor = new Object(); private static final String SPLITTER = ","; public ConcatStrAggregationBuffer() { LOG.info("创建buffer对象.."); } public ConcatStrAggregationBuffer(String str) { if (str != null && str.length() > 0) { concatStrBuilder.append(str); } } public String get() { return this.concatStrBuilder.toString(); } public void add(String str) { if (str == null) { str = ""; } synchronized (monitor) { this.concatStrBuilder.append(SPLITTER).append(str); } } public void reset() { synchronized (monitor) { this.concatStrBuilder.delete(0, concatStrBuilder.length()); } } } public static class ConcatStrEvaluator extends GenericUDAFEvaluator { private PrimitiveObjectInspector inputObjectInspector; private ObjectInspector outputOI; public ConcatStrEvaluator() { } /** * 这里主要是用来初始化聚合函数,这里需要对传入的参数进行解析。这个函数在每个阶段都会被调用 * * @param m The mode of aggregation. * @param parameters The ObjectInspector for the parameters: In PARTIAL1 and COMPLETE * mode, the parameters are original data; In PARTIAL2 and FINAL * mode, the parameters are just partial aggregations (in that case, * the array will always have a single element). * @return * @throws HiveException */ @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { LOG.info("初始化Evaluator#init()方法..."); super.init(m, parameters); this.inputObjectInspector = (PrimitiveObjectInspector) parameters[0]; if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) { // 这里是因为PARTIAL1和COMPLETE阶段都是输入的数据库的原始数据,所以他们的处理逻辑是一样的 } else { // 这里主要是指代的时对PARTIAL2和FINAL阶段的数据,这里接受到的是上一个阶段计算出的结果,因此这里数据主要来自PARTIAL1和PARTIAL2的结果 } // 这里是对输出数据的处理,保证我们输出的数据类型和我们需要的类型保持一致 this.outputOI = ObjectInspectorFactory.getReflectionObjectInspector( String.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA ); return this.outputOI; } /** * 创建buffer对象,用于存储计算的中间结果 * * @return * @throws HiveException */ @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { LOG.info("调用Evaluator#getNewAggregationBuffer()方法..."); return new ConcatStrAggregationBuffer(); } /** * 重置buffer中的缓存数据 * * @param agg * @throws HiveException */ @Override public void reset(AggregationBuffer agg) throws HiveException { LOG.info("调用Evaluator#reset()方法..."); ((ConcatStrAggregationBuffer) agg).reset(); } /** * 遍历数据, 这里因为只会在partial1和COMPLETE中被调用,因此这里遍历的是从数据库中获取的原始数据 * * @param agg * @param parameters The objects of parameters. * @throws HiveException */ @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { LOG.info("调用Evaluator#iterate()方法..."); if (parameters == null || parameters.length == 0) { return; } ConcatStrAggregationBuffer buffer = (ConcatStrAggregationBuffer) agg; for (Object parameter : parameters) { buffer.add((String) this.inputObjectInspector.getPrimitiveJavaObject(parameter)); } } /** * 这里调用的是一个阶段性的成果,用于在分布式计算中,将各个mapper的计算结果进行合并 * * @param agg * @return * @throws HiveException */ @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { LOG.info("调用Evaluator#terminatePartial()"); return this.terminate(agg); } @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException { LOG.info("调用Evaluator#merge()"); if (partial != null) { ConcatStrAggregationBuffer buffer = (ConcatStrAggregationBuffer) agg; // debug info if (this.inputObjectInspector == null) { buffer.add("inputObjectInspector is null!"); } else { buffer.add((String) this.inputObjectInspector.getPrimitiveJavaObject(partial)); } } } @Override public Object terminate(AggregationBuffer agg) throws HiveException { LOG.info("调用Evaluator#terminate()"); return ConcatStrAggregationBuffer.class.cast(agg).get(); } } public static void main(String[] args) { ConcatStrEvaluator concatStrEvaluator = ReflectionUtils.newInstance(ConcatStrEvaluator.class, new Configuration()); System.out.println(concatStrEvaluator); } }
自定义UDAF函数
在有了上面的实现之后,就需要自己定义Hive函数了,这个步骤需要在Hive客户端中进行操作,具体操作步骤如下:
- 将打包好的jar包上传到服务器或者HDFS
- 定义Hive函数
- 使用Hive函数验证
create function s_concat as 'org.hadoop.hive.learn.udaf.ConcatStrFunc' using jar 'hdfs://mycluster/lib/hive/udf/hive-sql-func.jar';
创建完成后,将输出如下的提示,表示函数创建成功:
这里面的输出表示了临时的jar存放的位置,在测试阶段建议使用
create temporary function
创建,因为创建永久函数调试程序,如果程序有BUG会导致很多奇怪的问题,在调试好了之后,再创建永久函数
在创建函数完成后,我们就可以使用SQL查询验证是否生效,我的等结果如下:
这就表示函数自定完成并成功投入使用。
Pingback: Hive SQL - 专注着的博客