I am a spark newbie and have a simple spark application using Spark SQL/hiveContext to:
I submit the job to run it on yarn cluster (100 executors), it's slow and when I looked at the DAG Visualization in Spark UI, it seems only the hive table scan tasks were running in parallel, rest of steps #2, and #3 above are only running in one instance which probably should be able to optimize to be parallelized?
The application looks like:
Step 1:
val input = hiveContext
.sql(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
, row_number() over
(partition by user_id, address, phone_number, first_name, last_name order by user_id, address, phone_number, first_name, last_name, server_ts desc, age) AS rn
FROM
(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
table
WHERE
phone_number <> '911' AND
server_date >= '2015-12-01' and server_date < '2016-01-01' AND
user_id IS NOT NULL AND
first_name IS NOT NULL AND
last_name IS NOT NULL AND
address IS NOT NULL AND
phone_number IS NOT NULL AND
) all_rows
) all_rows_with_row_number
WHERE rn = 1)
val input_tbl = input.registerTempTable(input_tbl)
Step 2:
val result = hiveContext.sql(
SELECT state,
phone_number,
address,
COUNT(*) as hash_count,
MAX(server_ts) as latest_ts
FROM
( SELECT
udf_getState(address) as state
, user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
input_tbl ) input
WHERE state IS NOT NULL AND state != ''
GROUP BY state, phone_number, address)
Step 3:
result.cache()
result.map(x => ...).saveAsNewAPIHadoopDataset(conf)
The DAG Visualization looks like:
As you can see, the "Filter", "Project" and "Exchange" in stage 0 are only running in one instance, so does the stage1 and stage2, so a few questions and apologies if the question is dumb:
Spark SQL will read different column family in parallel. By default, for example, in this example, there are six columns for the Parquet file. By default, all the six columns will be in a single Parquet file.
You're not reading the DAG graph correctly - the fact that each step is visualized using a single box does not mean that it isn't using multiple tasks (and therefore cores) to calculate that step.
You can see how many tasks are used for each step by drilling-down into the stage view, that displays all tasks for this stage.
For example, here's a sample DAG visualization similar to yours:
You can see each stage is depicted by a "single" column of steps.
But if we look at the table below, we can see the number of tasks per stage:
One of them is using only 2 tasks, but the other uses 220, which means data is split into 220 partitions and partitions are processed in parallel, given enough available resources.
If you drill-down into that stage, you can see again that it used 220 tasks and details for all the tasks.
Only tasks reading data from disk are shown in graph as having these "multiple dots" to help you understand how many files were read.
SO - as Rashid's answer suggestes, check the number of tasks for each stage.
It is not obvious so I would do following things to zero in the problem.
Hopefully, you will be able to zero in the problem.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With