Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark writing/reading to/from S3 - Partition Size and Compression

I am doing an experiment to understand which file size behaves best with s3 and [EMR + Spark]

Input data :

Incompressible data: Random Bytes in files 
Total Data Size: 20GB  
Each folder has varying input file size: From 2MB To 4GB file size.

Cluster Specifications :

1 master + 4 nodes : C3.8xls
--driver-memory 5G \
--executor-memory 3G \
--executor-cores 2 \
--num-executors 60 \

Code :

scala> def time[R](block: => R): R = {
          val t0 = System.nanoTime()
          val result = block    // call-by-name
         val t1 = System.nanoTime()
          println("Elapsed time: " + (t1 - t0) + "ns")
          result
      }
time: [R](block: => R)R

scala> val inputFiles = time{sc.textFile("s3://bucket/folder/2mb-10240files-20gb/*/*")};
scala> val outputFiles = time {inputFiles.saveAsTextFile("s3://bucket/folder-out/2mb-10240files-20gb/")};

Observations

  • 2MB - 32MB: Most of the time is spent in opening file handles [Not efficient]
  • 64MB till 1GB: Spark itself is launching 320 tasks for all these file sizes, it's no longer the no of files in that bucket with 20GB data e.g. 512 MB files had 40 files to make 20gb data and could just have 40 tasks to be completed but instead, there were 320
    tasks each dealing with 64MB data.
  • 4GB file size : 0 Bytes outputted [Not able to handle in-memory /Data not even splittable ???]

Questions

  • Any default setting that forces input size to be dealt with to be 64MB ??
  • Since the data I am using is random bytes and is already compressed how is it splitting this data further? If it can split this data why is it not able to split file size of 4gb object file size?
  • Why is compressed file size increased after uploading via spark? The 2MB compressed input file becomes 3.6 MB in the output bucket.
like image 492
Palak Sukant Avatar asked Nov 21 '17 23:11

Palak Sukant


People also ask

Can Spark read from S3?

read. text() and spark. read. textFile() We can read a single text file, multiple files and all files from a directory on S3 bucket into Spark DataFrame and Dataset.

How many partitions does Spark create when a file is loaded from S3 bucket?

By default spark will create partitions of size 64MB when reading from s3. So a 100 MB file will be split into 2 partitions, 64MB and 36MB.

How the number of partitions is decided by Spark when a file is read?

The number of partitions in spark should be decided thoughtfully based on the cluster configuration and requirements of the application. Increasing the number of partitions will make each partition have less data or no data at all.


1 Answers

Since it is not specified, I'm assuming usage of gzip and Spark 2.2 in my answer.

  • Any default setting that forces input size to be dealt with to be 64MB ??

Yes, there is. Spark is a Hadoop project, and therefore treats S3 to be a block based file system even though it is an object based file system. So the real question here is: which implementation of S3 file system are you using(s3a, s3n) etc. A similar question can be found here.

  • Since the data I am using is random bytes and is already compressed how is it splitting this data further? If it can split this data why is it not able to split file size of 4gb object file size?

Spark docs indicate that it is capable of reading compressed files:

All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").

This means that your files were read quite easily and converted to a plaintext string for each line.

However, you are using compressed files. Assuming it is a non-splittable format such as gzip, the entire file is needed for de-compression. You are running with 3gb executors which can satisfy the needs of 4mb-1gb files quite well, but can't handle a file larger than 3gb at once (probably lesser after accounting for overhead).

Some further info can be found in this question. Details of splittable compression types can be found in this answer.

  • Why is compressed file size increased after uploading via spark?The 2MB compressed input file becomes 3.6 MB in output bucket.

As a corollary to the previous point, this means that spark has de-compressed the RDD while reading as plaintext. While re-uploading, it is no longer compressed. To compress, you can pass a compression codec as a parameter:

sc.saveAsTextFile("s3://path", classOf[org.apache.hadoop.io.compress.GzipCodec])

There are other compression formats available.

like image 130
Ra41P Avatar answered Sep 19 '22 01:09

Ra41P