I have get an error when using mllib RandomForest to train data. As my dataset is huge and the default partition is relative small. so an exception thrown indicating that "Size exceeds Integer.MAX_VALUE" ,the orignal stack trace as following,
15/04/16 14:13:03 WARN scheduler.TaskSetManager: Lost task 19.0 in stage 6.0 (TID 120, 10.215.149.47): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:618) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:146) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
The Integer.MAX_SIZE is 2GB, it seems that some partition out of memory. So i repartiton my rdd partition to 1000, so that each partition could hold far less data as before. Finally, the problem is solved!!!
So, my question is : Why partition size has the 2G limit? It seems that there is no configure set for the limit in the spark
Spark can run 1 concurrent task for every partition of an RDD (up to the number of cores in the cluster). If you're cluster has 20 cores, you should have at least 20 partitions (in practice 2–3x times more ).
The ideal size of each partition is around 100-200 MB. The smaller size of partitions will increase the parallel running jobs, which can improve performance, but too small of a partition will cause overhead and increasing the GC time.
Yes, While repartioning it gets shuffled into 1000 partitions.
Spark's operators spill data to disk if it does not fit in memory, allowing it to run well on any sized data. Likewise, cached datasets that do not fit in memory are either spilled to disk or recomputed on the fly when needed, as determined by the RDD's storage level.
RDD does not provide schema view of data. It has no provision for handling structured data. Dataset and DataFrame provide the Schema view of data. It is a distributed collection of data organized into named columns. So, this was all in limitations of RDD in Apache Spark.
In the example below we limit our partitions to 100. The Spark DataFrame that originally has 1000 partitions, will be repartitioned to 100 partitions without shuffling. By no shuffling we mean that each the 100 new partitions will be assigned to 10 existing partitions.
Understanding Spark Partitioning 1 By default, Spark/PySpark creates partitions that are equal to the number of CPU cores in the machine. 2 Data of each partition resides in a single machine. 3 Spark/PySpark creates a task for each partition. 4 Spark Shuffle operations move the data from one partition to other partitions. More items...
When you running Spark jobs on the Hadoop cluster the default number of partitions is based on the following. On the HDFS cluster, by default, Spark creates one Partition for each block of the file. In Version 1 Hadoop the HDFS block size is 64 MB and in Version 2 Hadoop the HDFS block size is 128 MB
The basic abstraction for blocks in spark is a ByteBuffer
, which unfortunately has a limit of Integer.MAX_VALUE (~2GB).
It is a critical issue which prevents use of spark with very large datasets. Increasing the number of partitions can resolve it (like in OP's case), but is not always feasible, for instance when there is large chain of transformations part of which can increase data (flatMap etc) or in cases where data is skewed.
The solution proposed is to come up with an abstraction like LargeByteBuffer which can support list of bytebuffers for a block. This impacts overall spark architecture, so it has remained unresolved for quite a while.
the problem is when using datastores like Casandra, HBase, or Accumulo the block size is based on the datastore splits (which can be over 10 gig). when loading data from these datastores you have to immediately repartitions with 1000s of partitions so you can operated the data without blowing the 2gig limit.
most people that use spark are not really using large data; to them if it is bigger that excel can hold or tableau is is big data to them; mostly data scientist who use quality data or use a sample size small enough to work with the limit.
when processing large volumes of data, i end of having to go back to mapreduce and only used spark once the data has been cleaned up. this is unfortunate however, the majority of the spark community has no interest in addressing the issue.
a simple solution would be to create an abstraction and use bytearray as default; however, allow to overload a spark job with an 64bit data pointer to handle the large jobs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With