Skip to content
This repository was archived by the owner on Oct 24, 2019. It is now read-only.
This repository was archived by the owner on Oct 24, 2019. It is now read-only.

GenericUDAFCollectSetArray #2

@CipherChen

Description

@CipherChen

Hi, all.
I'm writting a GenericUDAFCollectSetArray, which should worked like:

id ts somedata
1 2 data-1,2
1 3 data-1,3
1 4 data-1,4
2 5 data-2,5
2 3 data-2,3
2 4 data-2,4
3 6 data-3,6
3 1 data-3,1
3 4 data-3,4

SELECT id, collectSetArray(ts, somedata) FROM sometable GROUP BY id;
result:

id col1
1 [{"_col0": "2", "_col1": "data-1,2"}, {"_col0": "3", "_col1": "data-1,3"},
{"_col0": "4", "_col1": "data-1,4"}]
2 [{"_col0": "3", "_col1": "data-2,3"}, {"_col0": "4", "_col1": "data-2,4"},
{"_col0": "5", "_col1": "data-2,5"}]
3 [{"_col0": "1", "_col1": "data-3,1"}, {"_col0": "4", "_col1": "data-3,4"},
{"_col0": "6", "_col1": "data-3,6"}]

And I'm stuck in init() return and merge().
The below would be my own code which imitate
GenericUDAFMaxRow and GenericUDAFCollectSet.

Thanks for your patience for the long code.
And any hint would be helpful.

public class GenericUDAFCollectSet extends AbstractGenericUDAFResolver {
public GenericUDAFCollectSet() {
}

@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
        throws SemanticException {

    if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
        throw new UDFArgumentTypeException(0,
                "Only primitive type arguments are accepted but "
                        + parameters[0].getTypeName() + " was passed as parameter 1.");
    }

    return new GenericUDAFMkSetEvaluator();
}

public static class GenericUDAFMkSetEvaluator extends GenericUDAFEvaluator {


    private StandardListObjectInspector internalMergeOI;
    private PrimitiveObjectInspector internalMergeElementOI;

    private ObjectInspector[] inputOIs;
    private ObjectInspector[] outputOIs;
    private ObjectInspector structOI;
    private StandardListObjectInspector loi;

    public ObjectInspector init(Mode m, ObjectInspector[] parameters)
            throws HiveException {
        super.init(m, parameters);

        System.out.println("init() mode: " + m);

        int paramsLength = parameters.length;

        if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
            assert(parameters instanceof Object[]);

            inputOIs = parameters;
        } else {
            assert(paramsLength == 1);

            internalMergeOI = (StandardListObjectInspector) parameters[0];

            internalMergeElementOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector();

            System.out.println("internalMergeOI: " + internalMergeOI.getTypeName());
            System.out.println("internalMergeElementOI: " + internalMergeElementOI.getTypeName());
        }

        outputOIs = new ObjectInspector[paramsLength];

        List<String> fieldNames = new ArrayList<String>(paramsLength);
        List<ObjectInspector> fieldOIs = Arrays.asList(outputOIs);

        for (int i = 0; i < paramsLength; i++) {
            fieldNames.add("_col" + i);
            outputOIs[i] = ObjectInspectorUtils.getStandardObjectInspector(parameters[i]);
        }

        structOI = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);

        loi = ObjectInspectorFactory.getStandardListObjectInspector(structOI);

        System.out.println("return from init() ");
        return loi;
    }

    static class MkArrayAggregationBuffer implements AggregationBuffer {
        // What you see is what you get.
        List<Object[]> container;
    }

    @Override
    public void reset(AggregationBuffer agg) throws HiveException {
        ((MkArrayAggregationBuffer) agg).container = new ArrayList<Object[]>();
    }

    @Override
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
        MkArrayAggregationBuffer ret = new MkArrayAggregationBuffer();
        reset(ret);
        return ret;
    }

    //mapside
    @Override
    public void iterate(AggregationBuffer agg, Object[] parameters)
            throws HiveException {
        System.out.println("iterate() p.length: " + parameters.length);

        if (parameters.length > 0) {
            MkArrayAggregationBuffer listAgg = (MkArrayAggregationBuffer) agg;

            putIntoSet(parameters, listAgg);
        }

        printAgg(agg);
    }

    //mapside
    @Override
    public Object terminatePartial(AggregationBuffer agg) throws HiveException {
        System.out.println("terminatePartial() ");
        printAgg(agg);

        MkArrayAggregationBuffer listAgg = (MkArrayAggregationBuffer) agg;
        // However, the log said it seems to be ArrayList<Object>.
        ArrayList<Object[]> ret = new ArrayList<Object[]>(listAgg.container.size());
        ret.addAll(listAgg.container);
        return ret;
    }

    @Override
    public void merge(AggregationBuffer agg, Object partial)
            throws HiveException {
        System.out.println("merge() ");

        MkArrayAggregationBuffer listAgg = (MkArrayAggregationBuffer) agg;
        ArrayList<Object[]> partialResult = (ArrayList<Object[]>) internalMergeOI.getList(partial);
        for(Object[] i : partialResult) {
            putIntoSet(i, listAgg);
        }

        printAgg(agg);
    }

    @Override
    public Object terminate(AggregationBuffer agg) throws HiveException {
        System.out.println("terminate() ");
        printAgg(agg);

        MkArrayAggregationBuffer listAgg = (MkArrayAggregationBuffer) agg;
        ArrayList<Object[]> ret = new ArrayList<Object[]>(listAgg.container.size());
        ret.addAll(listAgg.container);
        return ret;
    }

    private void putIntoSet(Object[] p, MkArrayAggregationBuffer myagg) {
        System.out.println("putIntoSet() ");

        Object[] objects = new Object[p.length];

        for (int i = 0; i < p.length; i++) {
            objects[i] = ObjectInspectorUtils.copyToStandardObject(p[i], this.inputOIs[i]);
        }

        myagg.container.add(objects);
    }
}

}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions