COLLECT_SET() in Hive, keep duplicates?

Is there a way to keep the duplicates in a collected set in Hive, or simulate the sort of aggregate collection that Hive provides using some other method? I want to aggregate all of the items in a column that have the same key into an array, with duplicates.


hash_id | num_of_cats ===================== ad3jkfk            4 ad3jkfk            4 ad3jkfk            2 fkjh43f            1 fkjh43f            8 fkjh43f            8 rjkhd93            7 rjkhd93            4 rjkhd93            7 

should return:

hash_agg | cats_aggregate =========================== ad3jkfk   Array<int>(4,4,2) fkjh43f   Array<int>(1,8,8) rjkhd93   Array<int>(7,4,7) 
2 Answers

Try to use COLLECT_LIST(col) after Hive 0.13.0

SELECT     hash_id, COLLECT_LIST(num_of_cats) AS aggr_set FROM     tablename WHERE     blablabla GROUP BY     hash_id ; 
There is nothing built in, but creating user defined functions, including aggregates, isn't that bad. The only rough part is trying to make them type generic, but here is a collect example.

package com.example;  import java.util.ArrayList; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;  public class CollectAll extends AbstractGenericUDAFResolver {     @Override     public GenericUDAFEvaluator getEvaluator(TypeInfo[] tis)             throws SemanticException     {         if (tis.length != 1)         {             throw new UDFArgumentTypeException(tis.length - 1, "Exactly one argument is expected.");         }         if (tis[0].getCategory() != ObjectInspector.Category.PRIMITIVE)         {             throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + tis[0].getTypeName() + " was passed as parameter 1.");         }         return new CollectAllEvaluator();     }      public static class CollectAllEvaluator extends GenericUDAFEvaluator     {         private PrimitiveObjectInspector inputOI;         private StandardListObjectInspector loi;         private StandardListObjectInspector internalMergeOI;          @Override         public ObjectInspector init(Mode m, ObjectInspector[] parameters)                 throws HiveException         {             super.init(m, parameters);             if (m == Mode.PARTIAL1)             {                 inputOI = (PrimitiveObjectInspector) parameters[0];                 return ObjectInspectorFactory                         .getStandardListObjectInspector((PrimitiveObjectInspector) ObjectInspectorUtils                         .getStandardObjectInspector(inputOI));             }             else             {                 if (!(parameters[0] instanceof StandardListObjectInspector))                 {                     inputOI = (PrimitiveObjectInspector)  ObjectInspectorUtils                             .getStandardObjectInspector(parameters[0]);                     return (StandardListObjectInspector) ObjectInspectorFactory                             .getStandardListObjectInspector(inputOI);                 }                 else                 {                     internalMergeOI = (StandardListObjectInspector) parameters[0];                     inputOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector();                     loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI);                     return loi;                 }             }         }          static class ArrayAggregationBuffer implements AggregationBuffer         {             ArrayList<Object> container;         }          @Override         public void reset(AggregationBuffer ab)                 throws HiveException         {             ((ArrayAggregationBuffer) ab).container = new ArrayList<Object>();         }          @Override         public AggregationBuffer getNewAggregationBuffer()                 throws HiveException         {             ArrayAggregationBuffer ret = new ArrayAggregationBuffer();             reset(ret);             return ret;         }          @Override         public void iterate(AggregationBuffer ab, Object[] parameters)                 throws HiveException         {             assert (parameters.length == 1);             Object p = parameters[0];             if (p != null)             {                 ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;                 agg.container.add(ObjectInspectorUtils.copyToStandardObject(p, this.inputOI));             }         }          @Override         public Object terminatePartial(AggregationBuffer ab)                 throws HiveException         {             ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;             ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());             ret.addAll(agg.container);             return ret;         }          @Override         public void merge(AggregationBuffer ab, Object o)                 throws HiveException         {             ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;             ArrayList<Object> partial = (ArrayList<Object>)internalMergeOI.getList(o);             for(Object i : partial)             {                 agg.container.add(ObjectInspectorUtils.copyToStandardObject(i, this.inputOI));             }         }          @Override         public Object terminate(AggregationBuffer ab)                 throws HiveException         {             ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;             ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());             ret.addAll(agg.container);             return ret;         }     } } 

Then in hive, just issue add jar Whatever.jar; and CREATE TEMPORARY FUNCTION collect_all AS 'com.example.CollectAll'; You should them be able to use it as expected.

hive> SELECT hash_id, collect_all(num_of_cats) FROM test GROUP BY hash_id; OK ad3jkfk [4,4,2] fkjh43f [1,8,8] rjkhd93 [7,4,7] 

It's worth noting that the order of the elements should be considered undefined, so if you intend to use this to feed information into n_grams you might need to expand it a bit to sort the data as needed.

