I am generating a hierarchy for a table determining the parent child.
Below is the configuration used, even after getting the error with regards to the too large frame:
--conf spark.yarn.executor.memoryOverhead=1024mb \
--conf yarn.nodemanager.resource.memory-mb=12288mb \
--driver-memory 32g \
--driver-cores 8 \
--executor-cores 32 \
--num-executors 8 \
--executor-memory 256g \
--conf spark.maxRemoteBlockSizeFetchToMem=15g
import org.apache.log4j.{Level, Logger};
import org.apache.spark.SparkContext;
import org.apache.spark.sql.{DataFrame, SparkSession};
import org.apache.spark.sql.functions._;
import org.apache.spark.sql.expressions._;
lazy val sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate();
import spark.implicits._;
val hiveEmp: DataFrame = sparkSession.sql("select * from db.employee");
hiveEmp.repartition(300);
import org.apache.spark.sql.functions._;
val nestedLevel = 3;
val empHierarchy = (1 to nestedLevel).foldLeft(hiveEmp.as("wd0")) { (wDf, i) =>
val j = i - 1
wDf.join(hiveEmp.as(s"wd$i"), col(s"wd$j.parent_id".trim) === col(s"wd$i.id".trim), "left_outer")
}.select(
col("wd0.id") :: col("wd0.parent_id") ::
col("wd0.amount").as("amount") :: col("wd0.payment_id").as("payment_id") :: (
(1 to nestedLevel).toList.map(i => col(s"wd$i.amount").as(s"amount_$i")) :::
(1 to nestedLevel).toList.map(i => col(s"wd$i.payment_id").as(s"payment_id_$i"))
): _*);
empHierarchy.write.saveAsTable("employee4");
Caused by: org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
... 3 more
Caused by: org.apache.spark.shuffle.FetchFailedException: Too large frame: 5454002341
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:361)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:336)
use this spark config, spark.maxRemoteBlockSizeFetchToMem < 2g
Since there is lot of issues with> 2G partition (cannot shuffle, cannot cache on disk), Hence it is throwing failedfetchedexception too large data frame.
Suresh is right. Here's a better documented & formatted version of his answer with some useful background info:
If you're on a version 2.2.x or 2.3.x, you can achieve the same effect by setting the value of the config to Int.MaxValue - 512
, i.e. by setting spark.maxRemoteBlockSizeFetchToMem=2147483135
. See here for the default value used as of September 2019.
This means that size of your dataset partitions is enormous. You need to repartition your dataset to more partitions.
you can do this using,
df.repartition(n)
Here, n
is dependent on the size of your dataset.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With