Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Pig: FLATTEN and parallel execution of reducers

I have implemented an Apache Pig script. When I execute the script it results in many mappers for a specific step, but has only one reducer for that step. Because of this condition (many mappers, one reducer) the Hadoop cluster is almost idle while the single reducer executes. In order to better use the resources of the cluster I would like to also have many reducers running in parallel.

Even if I set the parallelism in the Pig script using the SET DEFAULT_PARALLEL command I still result in having only 1 reducer.

The code part issuing the problem is the following:

SET DEFAULT_PARALLEL 5; inputData = LOAD 'input_data.txt' AS (group_name:chararray, item:int); inputDataGrouped = GROUP inputData BY (group_name); -- The GeneratePairsUDF generates a bag containing pairs of integers, e.g. {(1, 5), (1, 8), ..., (8, 5)} pairs = FOREACH inputDataGrouped GENERATE GeneratePairsUDF(inputData.item) AS pairs_bag; pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (item1:int, item2:int); 

The 'inputData' and 'inputDataGrouped' aliases are computed in the mapper.

The 'pairs' and 'pairsFlat' in the reducer.

If I change the script by removing the line with the FLATTEN command (pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (item1:int, item2:int);) then the execution results in 5 reducers (and thus in a parallel execution).

It seems that the FLATTEN command is the problem and avoids that many reducers are created.

How could I reach the same result of FLATTEN but having the script being executed in parallel (with many reducers)?

Edit:

EXPLAIN plan when having two FOREACH (as above):

Map Plan inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-32 |   | |   Project[chararray][0] - scope-33 | |---inputData: New For Each(false,false)[bag] - scope-29     |   |     |   Cast[chararray] - scope-24     |   |     |   |---Project[bytearray][0] - scope-23     |   |     |   Cast[int] - scope-27     |   |     |   |---Project[bytearray][1] - scope-26     |     |---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-22--------   Reduce Plan pairsFlat: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-42 | |---pairsFlat: New For Each(true)[bag] - scope-41     |   |     |   Project[bag][0] - scope-39     |     |---pairs: New For Each(false)[bag] - scope-38         |   |         |   POUserFunc(GeneratePairsUDF)[bag] - scope-36         |   |         |   |---Project[bag][1] - scope-35         |       |         |       |---Project[bag][1] - scope-34         |         |---inputDataGrouped: Package[tuple]{chararray} - scope-31-------- Global sort: false 

EXPLAIN plan when having only one FOREACH with FLATTEN wrapping the UDF:

Map Plan inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-29 |   | |   Project[chararray][0] - scope-30 | |---inputData: New For Each(false,false)[bag] - scope-26     |   |     |   Cast[chararray] - scope-21     |   |     |   |---Project[bytearray][0] - scope-20     |   |     |   Cast[int] - scope-24     |   |     |   |---Project[bytearray][1] - scope-23     |     |---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-19--------   Reduce Plan pairs: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36 | |---pairs: New For Each(true)[bag] - scope-35     |   |     |   POUserFunc(GeneratePairsUDF)[bag] - scope-33     |   |     |   |---Project[bag][1] - scope-32     |       |     |       |---Project[bag][1] - scope-31     |     |---inputDataGrouped: Package[tuple]{chararray} - scope-28-------- Global sort: false 
like image 480
user2964640 Avatar asked Nov 07 '13 12:11

user2964640


People also ask

What does flatten do in Pig?

The FLATTEN operator looks like a UDF syntactically, but it is actually an operator that changes the structure of tuples and bags in a way that a UDF cannot. Flatten un-nests tuples as well as bags. The idea is the same, but the operation and result is different for each type of structure.

What is foreach used for in Pig Latin scripts?

The FOREACH operator is used to generate specified data transformations based on the column data.


2 Answers

There is no surety if pig uses the configuration DEFAULT_PARALLEL value for every steps in the pig script. Try PARALLEL along with your specific join/group step which you feel taking time (In your case GROUP step).

 inputDataGrouped = GROUP inputData BY (group_name) PARALLEL 67; 

If still it is not working then you might have to see your data for skewness issue.

like image 193
Ashish Avatar answered Nov 05 '22 10:11

Ashish


I think there is a skewness in the data. Only a small number of mappers are producing exponentially large output. Look at the distribution of keys in your data. Like data contains few Groups with large number of records.

like image 29
Tanveer Avatar answered Nov 05 '22 11:11

Tanveer