Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to optimize a group by statement in PIG latin?

Tags:

apache-pig

I have a skewed data set and I need to do a group by operation and then do a nested foreach on it. Because of the skewed data, few reducers are taking long time and others are taking no time. I know there exists skewed join but what is there for group by and foreach? Here is my pig code (renamed the variables):

foo_grouped = GROUP foo_grouped by FOO;
FOO_stats = FOREACH foo_grouped 
{ 
a_FOO_total = foo_grouped.ATTR; 
a_FOO_total = DISTINCT a_FOO_total; 

bar_count = foo_grouped.BAR; 
bar_count = DISTINCT bar_count; 

a_FOO_type1 = FILTER foo_grouped by COND1=='Y';
a_FOO_type1 = a_FOO_type1.ATTR; 
a_FOO_type1 = DISTINCT a_FOO_type1;

a_FOO_type2 = FILTER foo_grouped by COND2=='Y' OR COND3=='HIGH'; 
a_FOO_type2 = a_FOO_type2.ATTR; 
a_FOO_type2 = DISTINCT a_FOO_type2; 

generate group as FOO, 
COUNT(a_FOO_total) as a_FOO_total, COUNT(a_FOO_type1) as a_FOO_type1, COUNT(a_FOO_type2)     as a_FOO_type2, COUNT(bar_count) as bar_count; }
like image 611
Karthik Ramasamy Avatar asked Jan 16 '23 14:01

Karthik Ramasamy


1 Answers

In your example there are a lot of nested DISTINCT operators within FOREACH which are executed in the reducer, it relies on RAM to calculate unique values and this query produces just one Job. In case of just too many unique elements in a group you could get memory related exceptions as well.

Luckily PIG Latin is a dataflow language and you write sort of execution plan. In order to utilize more CPUs you could change your code in such way that forces more MapReduce jobs which could be executed in parallel. For that we should rewrite query without using nested DISTINCT, the trick is to do distinct operations and than group by as if you had just one column and than merge the results. It is very SQL like, but it works. Here it is:

records = LOAD '....' USING PigStorage(',') AS (g, a, b, c, d, fd, s, w);
selected = FOREACH records GENERATE g, a, b, c, d;
grouped_a = FOREACH selected GENERATE g, a;
grouped_a = DISTINCT grouped_a;
grouped_a_count = GROUP grouped_a BY g;
grouped_a_count = FOREACH grouped_a_count GENERATE FLATTEN(group) as g, COUNT(grouped_a) as a_count;

grouped_b = FOREACH selected GENERATE g, b;
grouped_b = DISTINCT grouped_b;
grouped_b_count = GROUP grouped_b BY g;
grouped_b_count = FOREACH grouped_b_count GENERATE FLATTEN(group) as g, COUNT(grouped_b) as b_count;

grouped_c = FOREACH selected GENERATE g, c;
grouped_c = DISTINCT grouped_c;
grouped_c_count = GROUP grouped_c BY g;
grouped_c_count = FOREACH grouped_c_count GENERATE FLATTEN(group) as g, COUNT(grouped_c) as c_count;

grouped_d = FOREACH selected GENERATE g, d;
grouped_d = DISTINCT grouped_d;
grouped_d_count = GROUP grouped_d BY g;
grouped_d_count = FOREACH grouped_d_count GENERATE FLATTEN(group) as g, COUNT(grouped_d) as d_count;

mrg = JOIN grouped_a_count BY g, grouped_b_count BY g, grouped_c_count BY g, grouped_d_count BY g;
out = FOREACH mrg GENERATE grouped_a_count::g, grouped_a_count::a_count, grouped_b_count::b_count, grouped_c_count::c_count, grouped_d_count::d_count;
STORE out into '....' USING PigStorage(',');

After execution I got following summary which shows that distinct operations did not suffer from the skew in data were processed by the first Job:

Job Stats (time in seconds):
      JobId            Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime      MaxReduceTime   MinReduceTime   AvgReduceTime   Alias   Feature Outputs
job_201206061712_0244   669     45      75      8       13      376     18      202     grouped_a,grouped_b,grouped_c,grouped_d,records,selected        DISTINCT,MULTI_QUERY
job_201206061712_0245   1       1       3       3       3       12      12      12      grouped_c_count GROUP_BY,COMBINER
job_201206061712_0246   1       1       3       3       3       12      12      12      grouped_b_count GROUP_BY,COMBINER
job_201206061712_0247   5       1       48      27      33      30      30      30      grouped_a_count GROUP_BY,COMBINER
job_201206061712_0248   1       1       3       3       3       12      12      12      grouped_d_count GROUP_BY,COMBINER
job_201206061712_0249   4       1       3       3       3       12      12      12      mrg,out HASH_JOIN       ...,
Input(s):
Successfully read 52215768 records (44863559501 bytes) from: "...."

Output(s):
Successfully stored 9 records (181 bytes) in: "..."

From Job DAG we can see that groupby operations were executed in parallel:

Job DAG:
job_201206061712_0244   ->      job_201206061712_0248,job_201206061712_0246,job_201206061712_0247,job_201206061712_0245,
job_201206061712_0248   ->      job_201206061712_0249,
job_201206061712_0246   ->      job_201206061712_0249,
job_201206061712_0247   ->      job_201206061712_0249,
job_201206061712_0245   ->      job_201206061712_0249,
job_201206061712_0249

It works fine on my datasets where one of the group key values (in column g) makes 95% of the data. It also gets rid of memory related exceptions.

like image 107
alexeipab Avatar answered Feb 19 '23 01:02

alexeipab