Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does Spark parallelize the processing of a 1TB file?

Imaginary problem

  • A gigantic CSV log file, let's say 1 TB in size, the file is located on a USB drive
  • The log contains activities logs of users around the world, let's assume that the line contains 50 columns, among those there is Country.
  • We want a line count per country, descending order.
  • Let's assume the Spark cluster has enough nodes with RAM to process the entire 1TB in memory (20 nodes, 4 cores CPU, each node has 64GB RAM)

My Poorman's conceptual solution Using SparkSQL & Databricks spark-csv

$ ./spark-shell --packages com.databricks:spark-csv_2.10:1.4.0
val dfBigLog = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .load("/media/username/myUSBdrive/bogusBigLog1TB.log")

dfBigLog.select("Country")
  .groupBy("Country")
  .agg(count($"Country") as "CountryCount")
  .orderBy($"CountryCount".desc).show

Question 1: How does Spark parallelize the processing?

I suppose the majority of the execution time (99% ?) of the above solution is to read the 1TB file from the USB drive into the Spark cluster. Reading the file from the USB drive is not parallelizable. But after reading the entire file, what does Spark do under the hood to parallelize the processing?

  • How many nodes used for creating the DataFrame? (maybe only one?)

  • How many nodes used for groupBy & count? Let's assume there are 100+ countries (but Spark doesn't know that yet). How would Spark partition to distribute the 100+ country values on 20 nodes?

Question 2: How to make the Spark application the fastest possible? I suppose the area of improvement would be to parallelize the reading of the 1TB file.

  • Convert the CSV File into a Parquet file format + using Snappy compression. Let's assume this can be done in advance.

  • Copy the Parquet file on HDFS. Let's assume the Spark cluster is within the same Hadoop cluster and the datanodes are independant from the 20 nodes Spark cluster.

  • Change the Spark application to read from HDFS. I suppose Spark would now use several nodes to read the file as Parquet is splittable.

  • Let's assume the Parquet file compressed by Snappy is 10x smaller, size = 100GB, HDFS block = 128 MB in size. Total 782 HDFS blocks.

But then how does Spark manage to use all the 20 nodes for both creating the DataFrame and the processing (groupBy and count)? Does Spark use all the nodes each time?

like image 403
Polymerase Avatar asked Apr 09 '16 20:04

Polymerase


People also ask

How does Spark process large files?

Spark's operators spill data to disk if it does not fit in memory, allowing it to run well on any sized data. Likewise, cached datasets that do not fit in memory are either spilled to disk or recomputed on the fly when needed, as determined by the RDD's storage level.

How does parallelize work in Spark?

When a task is parallelized in Spark, it means that concurrent tasks may be running on the driver node or worker nodes. How the task is split across these different nodes in the cluster depends on the types of data structures and libraries that you're using.

How does Apache Spark parallelize tasks?

Parallel Processing of Apache Spark RDD RDD is built from an external data source like a local file or HDFS. RDD goes through a series of parallel transformations such as filter, map, and join where each transformation/change gives a different RDD. Moreover, the same gets fed to the next transformation stage.

How much data can Spark handle?

How large a cluster can Spark scale to? Many organizations run Spark on clusters of thousands of nodes. The largest cluster we know has 8000 of them. In terms of data size, Spark has been shown to work well up to petabytes.


2 Answers

Question 1: How does Spark parallelize the processing (of reading a file from a USB drive)?

This scenario is not possible.

Spark relies on a hadoop compliant filesystem to read a file. When you mount the USB drive, you can only access it from the local host. Attempting to execute

.load("/media/username/myUSBdrive/bogusBigLog1TB.log")

will fail in cluster configuration, as executors in the cluster will not have access to that local path.

It would be possible to read the file with Spark in local mode (master=local[*]) in which case you only will have 1 host and hence the rest of the questions would not apply.

Question 2: How to make the Spark application the fastest possible?

Divide and conquer.
The strategy outlined in the question is good. Using Parquet will allow Spark to do a projection on the data and only .select("Country") column, further reducing the amount of data required to be ingested and hence speeding things up.

The cornerstone to parallelism in Spark are partitions. Again, as we are reading from a file, Spark relies on the Hadoop filesystem. When reading from HDFS, the partitioning will be dictated by the splits of the file on HDFS. Those splits will be evenly distributed among the executors. That's how Spark will initially distribute the work across all available executors for the job.

I'm not deeply familiar with the Catalist optimizations, but I think I could assume that .groupBy("Country").agg(count($"Country") will become something similar to: rdd.map(country => (country,1)).reduceByKey(_+_) The map operation will not affect partitioning, so can be applied on site. The reduceByKey will be applied first locally on each partition and partial results will be combined on the driver. So most counting happens distributed in the cluster, and adding it up will be centralized.

like image 136
maasg Avatar answered Oct 19 '22 19:10

maasg


Reading the file from the USB drive is not parallelizable.

USB drive or any other data source the same rules apply. Either source is accessible from the driver and all worker machines and data is accessed in parallel (up to the source limits) or data is not accessed at all you get an exception.

How many nodes used for creating the DataFrame? (maybe only one?)

Assuming that files is accessible from all machines it depends on a configuration. For starters you should take a look at the split size.

How many nodes used for the GroupBy & Count?

Once again it depends on a configuration.

like image 39
zero323 Avatar answered Oct 19 '22 21:10

zero323