Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does Spark partition(ing) work on files in HDFS?

I'm working with Apache Spark on a Cluster using HDFS. As far as I understand, HDFS is distributing files on data-nodes. So if a put a "file.txt" on the filesystem, it will be split into partitions. Now I'm calling

rdd = SparkContext().textFile("hdfs://.../file.txt")  

from Apache Spark. Has rdd now automatically the same partitions as "file.txt" on the filesystem? What happens when I call

rdd.repartition(x) 

where x > then the partitions used by hdfs? Will Spark physically rearrange the data on hdfs to work locally?

Example: I put a 30GB Textfile on the HDFS-System, which is distributing it on 10 nodes. Will Spark a) use the same 10 partitons? and b) shuffle 30GB across the cluster when I call repartition(1000)?

like image 704
Degget Avatar asked Mar 12 '15 13:03

Degget


People also ask

How a file is partitioned in Spark?

Spark automatically partitions RDDs and distributes the partitions across different nodes. A partition in spark is an atomic chunk of data (logical division of data) stored on a node in the cluster. Partitions are basic units of parallelism in Apache Spark. RDDs in Apache Spark are collection of partitions.

How does hash partitioning work in Spark?

Spark Default PartitionerThe Hash Partitioner works on the concept of using the hashcode() function. The concept of hashcode() is that equal objects have the same hashcode. On the basis of this concept, the Hash Partitioner will divide the keys that have the same hashcode and distribute them across the partitions.

What is partition and how Spark partitions the data?

In spark, the partition is an atomic chunk of data. Simply putting, it is a logical division of data stored on a node over the cluster. In apache spark, partitions are basic units of parallelism and RDDs, in spark are the collection of partitions.

How does Spark write data into HDFS?

You can try saveAsTextFile method. Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.


2 Answers

When Spark reads a file from HDFS, it creates a single partition for a single input split. Input split is set by the Hadoop InputFormat used to read this file. For instance, if you use textFile() it would be TextInputFormat in Hadoop, which would return you a single partition for a single block of HDFS (but the split between partitions would be done on line split, not the exact block split), unless you have a compressed text file. In case of compressed file you would get a single partition for a single file (as compressed text files are not splittable).

When you call rdd.repartition(x) it would perform a shuffle of the data from N partititons you have in rdd to x partitions you want to have, partitioning would be done on round robin basis.

If you have a 30GB uncompressed text file stored on HDFS, then with the default HDFS block size setting (128MB) it would be stored in 235 blocks, which means that the RDD you read from this file would have 235 partitions. When you call repartition(1000) your RDD would be marked as to be repartitioned, but in fact it would be shuffled to 1000 partitions only when you will execute an action on top of this RDD (lazy execution concept)

like image 102
0x0FFF Avatar answered Sep 22 '22 10:09

0x0FFF


When reading non-bucketed HDFS files (e.g. parquet) with spark-sql, the number of DataFrame partitions df.rdd.getNumPartitions depends on these factors:

  • spark.default.parallelism (roughly translates to #cores available for the application)
  • spark.sql.files.maxPartitionBytes (default 128MB)
  • spark.sql.files.openCostInBytes (default 4MB)

A rough estimation of the number of partitions is:

  • If you have enough cores to read all your data in parallel, (i.e. at least one core for every 128MB of your data)

    AveragePartitionSize ≈ min(4MB, TotalDataSize/#cores) NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize

  • If you don't have enough cores,

    AveragePartitionSize ≈ 128MB NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize

The exact calculations are slightly complicated and can be found on the code base for FileSourceScanExec, refer here.

like image 30
Apoorve Dave Avatar answered Sep 23 '22 10:09

Apoorve Dave