Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does Spark distributes dataframe across nodes internally?

I am trying to use Spark for processing csv file on cluster. I want to understand if I need to explicitly read the file on each of the worker nodes to do the processing in parallel or will the driver node read the file and distribute the data across cluster for processing internally? (I am working with Spark 2.3.2 and Python)

I know RDD's can be parallelized using SparkContext.parallelize() but what in case of Spark DataFrames?

if __name__=="__main__":
     spark=SparkSession.builder.appName('myApp').getOrCreate()
     df=spark.read.csv('dataFile.csv',header=True)
     df=df.filter("date>'2010-12-01' AND date<='2010-12-02' AND town=='Madrid'")

So if I am running the above code on cluster, will the entire operation be done by driver node or will it distribute df across cluster and each worker perform processing on its data partition?

like image 230
Siddhant Avatar asked Apr 03 '19 00:04

Siddhant


People also ask

How DataFrame is distributed in Spark?

In Spark, DataFrames are the distributed collections of data, organized into rows and columns. Each column in a DataFrame has a name and an associated type. DataFrames are similar to traditional database tables, which are structured and concise.

Is DataFrame distributed?

Pandas DataFrame is not distributed and hence processing in the Pandas DataFrame will be slower for a large amount of data.

What is distribute by in Spark?

Description. The DISTRIBUTE BY clause is used to repartition the data based on the input expressions. Unlike the CLUSTER BY clause, this does not sort the data within each partition.

What is the difference between pandas and Spark DataFrame?

pandas-on-Spark DataFrame and pandas DataFrame are similar. However, the former is distributed and the latter is in a single machine. When converting to each other, the data is transferred between multiple machines and the single client machine.


1 Answers

To be strict, if you run the above code it will not read or process any data. DataFrames are basically an abstraction implemented on top of RDDs. As with RDDs, you have to distinguish transformations and actions. As your code only consists of one filter(...) transformation, noting will happen in terms of readind or processing of data. Spark will only create the DataFrame which is an execution plan. You have to perform an action like count() or write.csv(...) to actually trigger processing of the CSV file.

If you do so, the data will then be read and processed by 1..n worker nodes. It is never read or processed by the driver node. How many or your worker nodes are actually involved depends -- in your code -- on the number of partitions of your source file. Each partition of the source file can be processed in parallel by one worker node. In your example it is probably a single CSV file, so when you call df.rdd.getNumPartitions() after you read the file, it should return 1. Hence, only one worker node will read the data. The same is true if you check the number of partitions after your filter(...) operation.

Here are two ways of how the processing of your single CSV file can be parallelized:

  1. You can manually repartition your source DataFrame by calling df.repartition(n) with n the number of partitions you want to have. But -- and this is a significant but -- this means that all data is potentially send over the network (aka shuffle)!

  2. You perform aggregations or joins on the DataFrame. These operations have to trigger a shuffle. Spark then uses the number of partitions specified in spark.sql.shuffle.partitions(default: 200) to partition the resulting DataFrame.

like image 54
effemm Avatar answered Sep 23 '22 20:09

effemm