Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

COLLECT_SET() in Hive, keep duplicates?

Is there a way to keep the duplicates in a collected set in Hive, or simulate the sort of aggregate collection that Hive provides using some other method? I want to aggregate all of the items in a column that have the same key into an array, with duplicates.

I.E.:

hash_id | num_of_cats ===================== ad3jkfk            4 ad3jkfk            4 ad3jkfk            2 fkjh43f            1 fkjh43f            8 fkjh43f            8 rjkhd93            7 rjkhd93            4 rjkhd93            7 

should return:

hash_agg | cats_aggregate =========================== ad3jkfk   Array<int>(4,4,2) fkjh43f   Array<int>(1,8,8) rjkhd93   Array<int>(7,4,7) 
like image 644
batman Avatar asked Jun 22 '11 19:06

batman


People also ask

What does Collect_set do in hive?

Spark SQL collect_list() and collect_set() functions are used to create an array (ArrayType) column on DataFrame by merging rows, typically after group by or window partitions.

What is lateral view explode in hive?

Lateral view explodes the array data into multiple rows. In other words, lateral view expands the array into rows. When you use a lateral view along with the explode function, you will get the result something like below.


2 Answers

Try to use COLLECT_LIST(col) after Hive 0.13.0

SELECT     hash_id, COLLECT_LIST(num_of_cats) AS aggr_set FROM     tablename WHERE     blablabla GROUP BY     hash_id ; 
like image 53
Marvin W Avatar answered Oct 03 '22 00:10

Marvin W


There is nothing built in, but creating user defined functions, including aggregates, isn't that bad. The only rough part is trying to make them type generic, but here is a collect example.

package com.example;  import java.util.ArrayList; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;  public class CollectAll extends AbstractGenericUDAFResolver {     @Override     public GenericUDAFEvaluator getEvaluator(TypeInfo[] tis)             throws SemanticException     {         if (tis.length != 1)         {             throw new UDFArgumentTypeException(tis.length - 1, "Exactly one argument is expected.");         }         if (tis[0].getCategory() != ObjectInspector.Category.PRIMITIVE)         {             throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + tis[0].getTypeName() + " was passed as parameter 1.");         }         return new CollectAllEvaluator();     }      public static class CollectAllEvaluator extends GenericUDAFEvaluator     {         private PrimitiveObjectInspector inputOI;         private StandardListObjectInspector loi;         private StandardListObjectInspector internalMergeOI;          @Override         public ObjectInspector init(Mode m, ObjectInspector[] parameters)                 throws HiveException         {             super.init(m, parameters);             if (m == Mode.PARTIAL1)             {                 inputOI = (PrimitiveObjectInspector) parameters[0];                 return ObjectInspectorFactory                         .getStandardListObjectInspector((PrimitiveObjectInspector) ObjectInspectorUtils                         .getStandardObjectInspector(inputOI));             }             else             {                 if (!(parameters[0] instanceof StandardListObjectInspector))                 {                     inputOI = (PrimitiveObjectInspector)  ObjectInspectorUtils                             .getStandardObjectInspector(parameters[0]);                     return (StandardListObjectInspector) ObjectInspectorFactory                             .getStandardListObjectInspector(inputOI);                 }                 else                 {                     internalMergeOI = (StandardListObjectInspector) parameters[0];                     inputOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector();                     loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI);                     return loi;                 }             }         }          static class ArrayAggregationBuffer implements AggregationBuffer         {             ArrayList<Object> container;         }          @Override         public void reset(AggregationBuffer ab)                 throws HiveException         {             ((ArrayAggregationBuffer) ab).container = new ArrayList<Object>();         }          @Override         public AggregationBuffer getNewAggregationBuffer()                 throws HiveException         {             ArrayAggregationBuffer ret = new ArrayAggregationBuffer();             reset(ret);             return ret;         }          @Override         public void iterate(AggregationBuffer ab, Object[] parameters)                 throws HiveException         {             assert (parameters.length == 1);             Object p = parameters[0];             if (p != null)             {                 ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;                 agg.container.add(ObjectInspectorUtils.copyToStandardObject(p, this.inputOI));             }         }          @Override         public Object terminatePartial(AggregationBuffer ab)                 throws HiveException         {             ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;             ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());             ret.addAll(agg.container);             return ret;         }          @Override         public void merge(AggregationBuffer ab, Object o)                 throws HiveException         {             ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;             ArrayList<Object> partial = (ArrayList<Object>)internalMergeOI.getList(o);             for(Object i : partial)             {                 agg.container.add(ObjectInspectorUtils.copyToStandardObject(i, this.inputOI));             }         }          @Override         public Object terminate(AggregationBuffer ab)                 throws HiveException         {             ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;             ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());             ret.addAll(agg.container);             return ret;         }     } } 

Then in hive, just issue add jar Whatever.jar; and CREATE TEMPORARY FUNCTION collect_all AS 'com.example.CollectAll'; You should them be able to use it as expected.

hive> SELECT hash_id, collect_all(num_of_cats) FROM test GROUP BY hash_id; OK ad3jkfk [4,4,2] fkjh43f [1,8,8] rjkhd93 [7,4,7] 

It's worth noting that the order of the elements should be considered undefined, so if you intend to use this to feed information into n_grams you might need to expand it a bit to sort the data as needed.

like image 24
Jeff Mc Avatar answered Oct 03 '22 01:10

Jeff Mc