关于使用Hive的udaf导致的报错
最近在一家公司做大数据方面的实习,今天有一个需求,是需要统计在某个时间段内,用户的服务请求次数,即最后得到一个kv结果,类似于<userId, int>的结果。
我的思路就是使用udaf,通过这个时间段内,每一条接口日志的迭代,将userId和调用次数存在map中,最后输出根据这个map得到的String。
然而尝试无数次后确总是在reduce阶段出现如下错误:
2017-06-13 18:59:25,164 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{},"value":{"_col0":{}}}
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:265)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{},"value":{"_col0":{}}}
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:253)
... 7 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: length should be positive!
at org.apache.hadoop.hive.ql.exec.GroupByOperator.processOp(GroupByOperator.java:763)
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:244)
... 7 more
Caused by: java.lang.RuntimeException: length should be positive!
at org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryNonPrimitive.init(LazyBinaryNonPrimitive.java:54)
at org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct.init(LazyBinaryStruct.java:95)
at org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct.uncheckedGetField(LazyBinaryStruct.java:264)
at org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct.getField(LazyBinaryStruct.java:201)
at org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryStructObjectInspector.getStructFieldData(LazyBinaryStructObjectInspector.java:64)
at org.apache.hadoop.hive.ql.exec.ExprNodeColumnEvaluator._evaluate(ExprNodeColumnEvaluator.java:98)
at org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator.evaluate(ExprNodeEvaluator.java:77)
at org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator.evaluate(ExprNodeEvaluator.java:65)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.updateAggregations(GroupByOperator.java:584)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.processAggr(GroupByOperator.java:848)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.processKey(GroupByOperator.java:692)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.processOp(GroupByOperator.java:758)
... 8 more
看报错内容似乎是因为在reduce没有找到任何的内容,但是却始终不明白为什么没有内容,打日志看了看,map中确实是有数据的,具体udaf代码如下:
package com.jason.hive;
public class GenericUDAFCount extends AbstractGenericUDAFResolver {
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
if (parameters.length != 1) {
throw new UDFArgumentException("only one argument is expected.");
}
switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
case STRING:
return new GenericUDAFCountEvaluator();
default:
throw new UDFArgumentTypeException(0,
"Only string type arguments are accepted but "
+ parameters[0].getTypeName() + " is passed.");
}
}
public static class GenericUDAFCountEvaluator extends GenericUDAFEvaluator {
ObjectInspector inputOI;
MapObjectInspector partialCountAggOI;
ObjectInspector outputOI;
public ObjectInspector init(GenericUDAFEvaluator.Mode m, ObjectInspector[] parameters) throws HiveException {
super.init(m, parameters);
inputOI = parameters[0];
ObjectInspector keyInspector = ObjectInspectorFactory.getReflectionObjectInspector(String.class,
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
ObjectInspector valueInspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class,
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
partialCountAggOI = ObjectInspectorFactory.getStandardMapObjectInspector(keyInspector, valueInspector);
if (m == Mode.FINAL) {
outputOI = ObjectInspectorFactory.getReflectionObjectInspector(String.class,
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
} else {
outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Map.class,
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
return outputOI;
}
public GenericUDAFEvaluator.AggregationBuffer getNewAggregationBuffer() throws HiveException {
CountAgg buffer = new CountAgg();
reset(buffer);
return buffer;
}
public void reset(GenericUDAFEvaluator.AggregationBuffer agg) throws HiveException {
((CountAgg) agg).map = new HashMap<>();
}
public void iterate(GenericUDAFEvaluator.AggregationBuffer agg, Object[] parameters) throws HiveException {
if (parameters == null || parameters.length < 2) {
return;
}
if (parameters[0] != null && parameters[1] != null) {
CountAgg myagg = (CountAgg) agg;
Object object = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]);
if(object == null){
return;
}
String key = String.valueOf(object);
myagg.add(key, 1);
}
}
public void merge(GenericUDAFEvaluator.AggregationBuffer agg, Object partial) throws HiveException {
if(partial == null) {
return;
}
Map map = partialCountAggOI.getMap(partial);
if(map == null) {
return;
}
for(Object object : map.entrySet()) {
Map.Entry<String, Long> entry = ( Map.Entry<String, Long>)object;
((CountAgg) agg).add(entry.getKey(), entry.getValue());
}
}
public Object terminate(GenericUDAFEvaluator.AggregationBuffer agg) throws HiveException {
return agg.toString();
}
public Object terminatePartial(GenericUDAFEvaluator.AggregationBuffer agg) throws HiveException {
for(Map.Entry<String, Long> entry : ((CountAgg) agg).map.entrySet()) {
break;
}
return ((CountAgg) agg).map;
}
static class CountAgg extends GenericUDAFEvaluator.AbstractAggregationBuffer {
Map<String, Long> map = new HashMap<>();
private void add(String key, long value){
if(key == null) {
return;
}
if(map.containsKey(key)) {
map.put(key, map.get(key)+value);
}else{
map.put(key, value);
}
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
for(Map.Entry<String, Long> entry : map.entrySet()) {
sb.append(entry.getKey()).append(":").append(entry.getValue()).append(";");
}
if(sb.length() > 0) {
return sb.substring(0, sb.length()-1);
}
return super.toString();
}
}
}
}
使用的写法大致如下(gcount就是上面的内容):
select gcount(`userId`) from userTemp where d = '2016-06-01' and h = '10' and action = 'loginAction';
可能是因为自己刚学习就需要上手做一些东西,所以对于hive、mapreduce这一块的实现原理并不是很清楚,希望大虾不要见怪~
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论