关于使用Hive的udaf导致的报错

发布于 2022-09-05 01:31:55 字数 8264 浏览 18 评论 0

最近在一家公司做大数据方面的实习,今天有一个需求,是需要统计在某个时间段内,用户的服务请求次数,即最后得到一个kv结果,类似于<userId, int>的结果。
我的思路就是使用udaf,通过这个时间段内,每一条接口日志的迭代,将userId和调用次数存在map中,最后输出根据这个map得到的String。
然而尝试无数次后确总是在reduce阶段出现如下错误:

2017-06-13 18:59:25,164 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{},"value":{"_col0":{}}}
    at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:265)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{},"value":{"_col0":{}}}
    at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:253)
    ... 7 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: length should be positive!
    at org.apache.hadoop.hive.ql.exec.GroupByOperator.processOp(GroupByOperator.java:763)
    at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:244)
    ... 7 more
Caused by: java.lang.RuntimeException: length should be positive!
    at org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryNonPrimitive.init(LazyBinaryNonPrimitive.java:54)
    at org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct.init(LazyBinaryStruct.java:95)
    at org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct.uncheckedGetField(LazyBinaryStruct.java:264)
    at org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct.getField(LazyBinaryStruct.java:201)
    at org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryStructObjectInspector.getStructFieldData(LazyBinaryStructObjectInspector.java:64)
    at org.apache.hadoop.hive.ql.exec.ExprNodeColumnEvaluator._evaluate(ExprNodeColumnEvaluator.java:98)
    at org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator.evaluate(ExprNodeEvaluator.java:77)
    at org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator.evaluate(ExprNodeEvaluator.java:65)
    at org.apache.hadoop.hive.ql.exec.GroupByOperator.updateAggregations(GroupByOperator.java:584)
    at org.apache.hadoop.hive.ql.exec.GroupByOperator.processAggr(GroupByOperator.java:848)
    at org.apache.hadoop.hive.ql.exec.GroupByOperator.processKey(GroupByOperator.java:692)
    at org.apache.hadoop.hive.ql.exec.GroupByOperator.processOp(GroupByOperator.java:758)
    ... 8 more

看报错内容似乎是因为在reduce没有找到任何的内容,但是却始终不明白为什么没有内容,打日志看了看,map中确实是有数据的,具体udaf代码如下:

package com.jason.hive;

public class GenericUDAFCount extends AbstractGenericUDAFResolver {

    public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
        if (parameters.length != 1) {
            throw new UDFArgumentException("only one argument is expected.");
        }
        switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
            case STRING:
                return new GenericUDAFCountEvaluator();
            default:
                throw new UDFArgumentTypeException(0,
                        "Only string type arguments are accepted but "
                                + parameters[0].getTypeName() + " is passed.");
        }
    }

    public static class GenericUDAFCountEvaluator extends GenericUDAFEvaluator {
        ObjectInspector inputOI;
        MapObjectInspector partialCountAggOI;
        ObjectInspector outputOI;

        public ObjectInspector init(GenericUDAFEvaluator.Mode m, ObjectInspector[] parameters) throws HiveException {
            super.init(m, parameters);
            inputOI = parameters[0];
            ObjectInspector keyInspector = ObjectInspectorFactory.getReflectionObjectInspector(String.class,
                    ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
            ObjectInspector valueInspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class,
                    ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
            partialCountAggOI = ObjectInspectorFactory.getStandardMapObjectInspector(keyInspector, valueInspector);
            if (m == Mode.FINAL) {
                outputOI = ObjectInspectorFactory.getReflectionObjectInspector(String.class,
                        ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
            } else {
                outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Map.class,
                        ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
            }
            return outputOI;
        }

        public GenericUDAFEvaluator.AggregationBuffer getNewAggregationBuffer() throws HiveException {
            CountAgg buffer = new CountAgg();
            reset(buffer);
            return buffer;
        }

        public void reset(GenericUDAFEvaluator.AggregationBuffer agg) throws HiveException {
            ((CountAgg) agg).map = new HashMap<>();
        }

        public void iterate(GenericUDAFEvaluator.AggregationBuffer agg, Object[] parameters) throws HiveException {
            if (parameters == null || parameters.length < 2) {
                return;
            }
            if (parameters[0] != null && parameters[1] != null) {
                CountAgg myagg = (CountAgg) agg;
                Object object = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]);
                if(object == null){
                    return;
                }
                String key = String.valueOf(object);
                myagg.add(key, 1);
            }
        }

        public void merge(GenericUDAFEvaluator.AggregationBuffer agg, Object partial) throws HiveException {
            if(partial == null) {
                return;
            }
            Map map = partialCountAggOI.getMap(partial);
            if(map == null) {
                return;
            }
            for(Object object : map.entrySet()) {
                Map.Entry<String, Long> entry = ( Map.Entry<String, Long>)object;
                ((CountAgg) agg).add(entry.getKey(), entry.getValue());
            }
        }

        public Object terminate(GenericUDAFEvaluator.AggregationBuffer agg) throws HiveException {
            return agg.toString();
        }

        public Object terminatePartial(GenericUDAFEvaluator.AggregationBuffer agg) throws HiveException {
            for(Map.Entry<String, Long> entry : ((CountAgg) agg).map.entrySet()) {
                break;
            }
            return ((CountAgg) agg).map;
        }

        static class CountAgg extends GenericUDAFEvaluator.AbstractAggregationBuffer {
            Map<String, Long> map = new HashMap<>();

            private void add(String key, long value){
                if(key == null) {
                    return;
                }
                if(map.containsKey(key)) {
                    map.put(key, map.get(key)+value);
                }else{
                    map.put(key, value);
                }
            }

            @Override
            public String toString() {
                StringBuffer sb = new StringBuffer();
                for(Map.Entry<String, Long> entry : map.entrySet()) {
                    sb.append(entry.getKey()).append(":").append(entry.getValue()).append(";");
                }
                if(sb.length() > 0) {
                    return sb.substring(0, sb.length()-1);
                }
                return super.toString();
            }
        }
    }
}

使用的写法大致如下(gcount就是上面的内容):

select gcount(`userId`) from userTemp where d = '2016-06-01' and h = '10' and action = 'loginAction';

可能是因为自己刚学习就需要上手做一些东西,所以对于hive、mapreduce这一块的实现原理并不是很清楚,希望大虾不要见怪~

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文