Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Hive cluster by vs order by vs sort by

Tags:

hql

hadoop

hive

As far as I understand;

  • sort by only sorts with in the reducer

  • order by orders things globally but shoves everything into one reducers

  • cluster by intelligently distributes stuff into reducers by the key hash and make a sort by

So my question is does cluster by guarantee a global order? distribute by puts the same keys into same reducers but what about the adjacent keys?

The only document I can find on this is here and from the example it seems like it orders them globally. But from the definition I feel like it doesn't always do that.

like image 260
cashmere Avatar asked Dec 05 '12 01:12

cashmere


People also ask

What is the difference between ORDER BY sort by and distribute by?

The SORT BY and ORDER BY clauses are used to define the order of the output data. However, DISTRIBUTE BY and CLUSTER BY clauses are used to distribute the data to multiple reducers based on the key columns.

Why does the sort by clause function faster than the ORDER BY clause in Hive?

Hive: Why does the SORT BY clause function faster than the ORDER BY clause? In the SORT BY clause, data is sorted at the reducer level in small groups; therefore, it functions faster than the ORDER BY clause.

What is cluster by in Hive?

“clustered by” clause is used to divide the table into buckets. Each bucket will be saved as a file under table directory. Bucketing can be done along with partitioning or without partitioning on Hive tables. Bucketed tables will create almost equally distributed data file parts.


3 Answers

A shorter answer: yes, CLUSTER BY guarantees global ordering, provided you're willing to join the multiple output files yourself.

The longer version:

  • ORDER BY x: guarantees global ordering, but does this by pushing all data through just one reducer. This is basically unacceptable for large datasets. You end up one sorted file as output.
  • SORT BY x: orders data at each of N reducers, but each reducer can receive overlapping ranges of data. You end up with N or more sorted files with overlapping ranges.
  • DISTRIBUTE BY x: ensures each of N reducers gets non-overlapping ranges of x, but doesn't sort the output of each reducer. You end up with N or more unsorted files with non-overlapping ranges.
  • CLUSTER BY x: ensures each of N reducers gets non-overlapping ranges, then sorts by those ranges at the reducers. This gives you global ordering, and is the same as doing (DISTRIBUTE BY x and SORT BY x). You end up with N or more sorted files with non-overlapping ranges.

Make sense? So CLUSTER BY is basically the more scalable version of ORDER BY.

like image 142
Lars Yencken Avatar answered Oct 21 '22 14:10

Lars Yencken


Let me clarify first: clustered by only distributes your keys into different buckets, clustered by ... sorted by get buckets sorted.

With a simple experiment (see below) you can see that you will not get global order by default. The reason is that default partitioner splits keys using hash codes regardless of actual key ordering.

However you can get your data totally ordered.

Motivation is "Hadoop: The Definitive Guide" by Tom White (3rd edition, Chapter 8, p. 274, Total Sort), where he discusses TotalOrderPartitioner.

I will answer your TotalOrdering question first, and then describe several sort-related Hive experiments that I did.

Keep in mind: what I'm describing here is a 'proof of concept', I was able to handle a single example using Claudera's CDH3 distribution.

Originally I hoped that org.apache.hadoop.mapred.lib.TotalOrderPartitioner will do the trick. Unfortunately it did not because it looks like Hive partitions by value, not key. So I patch it (should have subclass, but I do not have time for that):

Replace

public int getPartition(K key, V value, int numPartitions) {     return partitions.findPartition(key); } 

with

public int getPartition(K key, V value, int numPartitions) {     return partitions.findPartition(value); } 

Now you can set (patched) TotalOrderPartitioner as your Hive partitioner:

hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;  hive> set total.order.partitioner.natural.order=false  hive> set total.order.partitioner.path=/user/yevgen/out_data2 

I also used

hive> set hive.enforce.bucketing = true;   hive> set mapred.reduce.tasks=4; 

in my tests.

File out_data2 tells TotalOrderPartitioner how to bucket values. You generate out_data2 by sampling your data. In my tests I used 4 buckets and keys from 0 to 10. I generated out_data2 using ad-hoc approach:

import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.Tool; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.fs.FileSystem;   public class TotalPartitioner extends Configured implements Tool{     public static void main(String[] args) throws Exception{             ToolRunner.run(new TotalPartitioner(), args);     }      @Override     public int run(String[] args) throws Exception {         Path partFile = new Path("/home/yevgen/out_data2");         FileSystem fs = FileSystem.getLocal(getConf());          HiveKey key = new HiveKey();         NullWritable value = NullWritable.get();          SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), partFile, HiveKey.class, NullWritable.class);         key.set( new byte[]{1,3}, 0, 2);//partition at 3; 1 came from Hive -- do not know why         writer.append(key, value);         key.set( new byte[]{1, 6}, 0, 2);//partition at 6         writer.append(key, value);         key.set( new byte[]{1, 9}, 0, 2);//partition at 9         writer.append(key, value);         writer.close();         return 0;     }  } 

Then I copied resulting out_data2 to HDFS (into /user/yevgen/out_data2)

With these settings I got my data bucketed/sorted (see last item in my experiment list).

Here is my experiments.

  • Create sample data

    bash> echo -e "1\n3\n2\n4\n5\n7\n6\n8\n9\n0" > data.txt

  • Create basic test table:

    hive> create table test(x int); hive> load data local inpath 'data.txt' into table test;

Basically this table contains values from 0 to 9 without order.

  • Demonstrate how table copying works (really mapred.reduce.tasks parameter which sets MAXIMAL number of reduce tasks to use)

    hive> create table test2(x int);

    hive> set mapred.reduce.tasks=4;

    hive> insert overwrite table test2 select a.x from test a join test b on a.x=b.x; -- stupied join to force non-trivial map-reduce

    bash> hadoop fs -cat /user/hive/warehouse/test2/000001_0

    1

    5

    9

  • Demonstrate bucketing. You can see that keys are assinged at random without any sort order:

    hive> create table test3(x int) clustered by (x) into 4 buckets;

    hive> set hive.enforce.bucketing = true;

    hive> insert overwrite table test3 select * from test;

    bash> hadoop fs -cat /user/hive/warehouse/test3/000000_0

    4

    8

    0

  • Bucketing with sorting. Results are partially sorted, not totally sorted

    hive> create table test4(x int) clustered by (x) sorted by (x desc) into 4 buckets;

    hive> insert overwrite table test4 select * from test;

    bash> hadoop fs -cat /user/hive/warehouse/test4/000001_0

    1

    5

    9

You can see that values are sorted in ascending order. Looks like Hive bug in CDH3?

  • Getting partially sorted without cluster by statement:

    hive> create table test5 as select x from test distribute by x sort by x desc;

    bash> hadoop fs -cat /user/hive/warehouse/test5/000001_0

    9

    5

    1

  • Use my patched TotalOrderParitioner:

    hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

    hive> set total.order.partitioner.natural.order=false

    hive> set total.order.partitioner.path=/user/training/out_data2

    hive> create table test6(x int) clustered by (x) sorted by (x) into 4 buckets;

    hive> insert overwrite table test6 select * from test;

    bash> hadoop fs -cat /user/hive/warehouse/test6/000000_0

    1

    2

    0

    bash> hadoop fs -cat /user/hive/warehouse/test6/000001_0

    3

    4

    5

    bash> hadoop fs -cat /user/hive/warehouse/test6/000002_0

    7

    6

    8

    bash> hadoop fs -cat /user/hive/warehouse/test6/000003_0

    9

like image 45
Yevgen Yampolskiy Avatar answered Oct 21 '22 14:10

Yevgen Yampolskiy


CLUSTER BY does not produce global ordering.

The accepted answer (by Lars Yencken) misleads by stating that the reducers will receive non-overlapping ranges. As Anton Zaviriukhin correctly points to the BucketedTables documentation, CLUSTER BY is basically DISTRIBUTE BY (same as bucketing) plus SORT BY within each bucket/reducer. And DISTRIBUTE BY simply hashes and mods into buckets and while the hashing function may preserve order (hash of i > hash of j if i > j), mod of hash value does not.

Here's a better example showing overlapping ranges

http://myitlearnings.com/bucketing-in-hive/

like image 38
Edi Bice Avatar answered Oct 21 '22 14:10

Edi Bice