Environment:
We are using EMR, with Spark 2.1 and EMR FS.
Process we are doing:
We are running a PySpark job to join 2 Hive tables and creating a another hive table based on this result using saveAsTable and storing it as a ORC with partitions
Issue:
18/01/23 10:21:28 INFO OutputCommitCoordinator: Task was denied committing,
stage: 84, partition: 901, attempt: 10364
18/01/23 10:21:28 INFO TaskSetManager: Starting task 901.10365 in stage 84.0
(TID 212686, ip-172-31-46-97.ec2.internal, executor 10, partition 901,
PROCESS_LOCAL, 6235 bytes)
18/01/23 10:21:28 WARN TaskSetManager: Lost task 884.10406 in stage 84.0
(TID 212677, ip-172-31-46-97.ec2.internal, executor 85): TaskCommitDenied
(Driver denied task commit) for job: 84, partition: 884, attemptNumber: 10406
This specific log information is recursive from the Spark logs and by the time we killed the job we have seen this for about ~170000 (160595) times as given in: Spark-Task Commit Denied
From the Source Code it shows:
/** * :: DeveloperApi :: * Task requested the driver to commit, but was denied. */
@DeveloperApicase class TaskCommitDenied
( jobID: Int,
partitionID: Int,
attemptNumber: Int) extends TaskFailedReason
{
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber"
/** * If a task failed because its attempt to commit was denied, do not count this failure * towards failing the stage. This is intended to prevent spurious stage failures in cases * where many speculative tasks are launched and denied to commit. */
override def countTowardsTaskFailures: Boolean = false
}
Please note we have not enabled spark.speculation i.e. (it is false) and from the spark job Environment we have not seen this property at all.
But while the job is running we can see that the corresponding files are created under EMRFS under the table temp directories like:
hdfs://ip-172-31-18-155.ec2.internal:8020/hive/location/hive.db/hivetable/_temporary/0/task_1513431588574_1185_3_01_000000/00000_0.orc
we can see these kind of folders about 2001 ( as we have given spark.sql.shuffle.partitions = 2001)
1) What could cause the job to get launch ~170000 tasks even though we have not enabled spark.speculation
2) When it has completed writing the data to HDFS (EMRFS) why each executor is trying to launch new tasks
3) is there a way we can avoid this?
Thanks a lot for looking into this. any inputs related to this will help us a lot.
Venkat
When you are trying to perform any "GroupBy" or "Join" operations, if the corresponding result exceeds 2GB, Spark will throw an error. Check for that.
Note: Please make partitions effectively so that you can enhance the performance. You can find the most common and silly mistakes occurred when programming in Spark here this.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With