Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reading CSV file in Spark in a distributed manner

I am developing a Spark processing framework which reads large CSV files, loads them into RDD's, performs some transformations and at the end saves some statistics.

The CSV files in question are around 50GB on average. I'm using Spark 2.0.

My question is:

When I load the files using sparkContext.textFile() function, does the file needs to be stored in the memory of the driver first, and then it is distributed to the workers (thus requiring a rather large amount of memory on the driver)? Or the file is read "in parallel" by every worker, in a way none of them needs to store the whole file, and the driver acts only as a "manager"?

Thanks in advance

like image 763
Ander Murillo Zohn Avatar asked Feb 10 '17 22:02

Ander Murillo Zohn


1 Answers

When you define the reading, the file would be divided to partitions based on your parallelism scheme and the instructions would be sent to the workers. Then the file is read directly by the workers from the filesystem (hence the need for a distributed filesystem available to all the nodes such as HDFS).

As a side note, it would be much better to read it to a dataframe using spark.read.csv and not in RDD. This would take less memory and would allow spark to optimize your queries.

UPDATE

In the comment, it was asked what would happen if the file system was not distributed and the file would be located on only one machine. The answer is that If you have more than 1 machine it will most likely fail.

When you do the sparkContext.textFile, nothing is actually read, it just tells spark WHAT you want to read. Then you do some transformation on it and still nothing is read because you are defining a plan. Once you perform an action (e.g. collect) then the actual processing begins. Spark would divide the job into tasks and send them to the executors. The executors (which might be on the master node or on worker nodes) would then attempt to read portions of the file. The problem is that any executor NOT on the master node would look for the file and fail to find it causing the tasks to fail. Spark would retry several times (I believe the default is 4) and then fail completely.

Of course if you have just one node then all executors will see the file and everything would be fine. Also in theory, it could be that the tasks would fail on worker and then rerun on the master and succeed there but in any case the workers would not do any work unless they see a copy of the file.

You can solve this by copying the file to the exact same path in all nodes or by using any kind of distributed file system (even NFS shares are fine).

Of course you can always work on a single node but then you would not be taking advantage of spark's scalability.

like image 92
Assaf Mendelson Avatar answered Nov 24 '22 07:11

Assaf Mendelson