Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to optimize spark sql to run it in parallel

I am a spark newbie and have a simple spark application using Spark SQL/hiveContext to:

  1. select data from hive table (1 billion rows)
  2. do some filtering, aggregation including row_number over window function to select first row, group by, count() and max(), etc.
  3. write the result into HBase (hundreds million rows)

I submit the job to run it on yarn cluster (100 executors), it's slow and when I looked at the DAG Visualization in Spark UI, it seems only the hive table scan tasks were running in parallel, rest of steps #2, and #3 above are only running in one instance which probably should be able to optimize to be parallelized?

The application looks like:

Step 1:

val input = hiveContext
  .sql(
     SELECT   
           user_id  
           , address  
           , age  
           , phone_number  
           , first_name  
           , last_name  
           , server_ts   
       FROM  
       (     
           SELECT  
               user_id  
               , address  
               , age  
               , phone_number  
               , first_name  
               , last_name  
               , server_ts   
               , row_number() over 
                (partition by user_id, address,  phone_number, first_name, last_name  order by user_id, address, phone_number, first_name, last_name,  server_ts desc, age) AS rn  
           FROM  
           (  
               SELECT  
                   user_id  
                   , address  
                   , age  
                   , phone_number  
                   , first_name  
                   , last_name  
                   , server_ts  
               FROM  
                   table   
               WHERE  
                   phone_number <> '911' AND   
                   server_date >= '2015-12-01' and server_date < '2016-01-01' AND  
                   user_id IS NOT NULL AND  
                   first_name IS NOT NULL AND  
                   last_name IS NOT NULL AND  
                   address IS NOT NULL AND  
                   phone_number IS NOT NULL AND  
           ) all_rows  
       ) all_rows_with_row_number  
       WHERE rn = 1)

val input_tbl = input.registerTempTable(input_tbl)

Step 2:

val result = hiveContext.sql(
  SELECT state, 
         phone_number, 
         address, 
         COUNT(*) as hash_count, 
         MAX(server_ts) as latest_ts 
     FROM  
    ( SELECT  
         udf_getState(address) as state  
         , user_id  
         , address  
         , age  
         , phone_number  
         , first_name  
         , last_name  
         , server_ts  
     FROM  
         input_tbl ) input  
     WHERE state IS NOT NULL AND state != ''  
     GROUP BY state, phone_number, address)

Step 3:

result.cache()
result.map(x => ...).saveAsNewAPIHadoopDataset(conf)

The DAG Visualization looks like: enter image description here

As you can see, the "Filter", "Project" and "Exchange" in stage 0 are only running in one instance, so does the stage1 and stage2, so a few questions and apologies if the question is dumb:

  1. Does "Filter", "Project" and "Exchange" happen in Driver after data shuffling from each executor?
  2. What code maps to "Filter", "Project" and "Exchange"?
  3. how I could run "Filter", "Project" and "Exchange" in parallel to optimize the performance?
  4. is it possible to run stage1 and stage2 in parallel?
like image 682
user_not_found Avatar asked Apr 27 '16 00:04

user_not_found


People also ask

Does Spark SQL run in parallel?

Spark SQL will read different column family in parallel. By default, for example, in this example, there are six columns for the Parquet file. By default, all the six columns will be in a single Parquet file.


2 Answers

You're not reading the DAG graph correctly - the fact that each step is visualized using a single box does not mean that it isn't using multiple tasks (and therefore cores) to calculate that step.

You can see how many tasks are used for each step by drilling-down into the stage view, that displays all tasks for this stage.

For example, here's a sample DAG visualization similar to yours:

enter image description here

You can see each stage is depicted by a "single" column of steps.

But if we look at the table below, we can see the number of tasks per stage:

enter image description here

One of them is using only 2 tasks, but the other uses 220, which means data is split into 220 partitions and partitions are processed in parallel, given enough available resources.

If you drill-down into that stage, you can see again that it used 220 tasks and details for all the tasks.

enter image description here

Only tasks reading data from disk are shown in graph as having these "multiple dots" to help you understand how many files were read.

SO - as Rashid's answer suggestes, check the number of tasks for each stage.

like image 71
Tzach Zohar Avatar answered Sep 23 '22 21:09

Tzach Zohar


It is not obvious so I would do following things to zero in the problem.

  1. Calculate execution time of each steps.
  2. First step may be slow if your table is of text format, spark usually works better if data is stored in Hive in parquet format.
  3. See if your table is partitioned by the column used in where clause.
  4. If saving data to Hbase is slow then you may need to pre-split hbase table as by default data is stored in a single region.
  5. Look at stages tab in spark ui to see how many tasks are started for each stage and also look for data local level as describe here

Hopefully, you will be able to zero in the problem.

like image 37
Rashid Ali Avatar answered Sep 23 '22 21:09

Rashid Ali