Problem: I want to import data into Spark EMR from S3 using:
data = sqlContext.read.json("s3n://.....")
Is there a way I can set the number of nodes that Spark uses to load and process the data? This is an example of how I process the data:
data.registerTempTable("table")
SqlData = sqlContext.sql("SELECT * FROM table")
Context: The data is not too big, takes a long time to load into Spark and also to query from. I think Spark partitions the data into too many nodes. I want to be able to set that manually. I know when dealing with RDDs and sc.parallelize
I can pass the number of partitions as an input. Also, I have seen repartition()
, but I am not sure if it can solve my problem. The variable data
is a DataFrame
in my example.
Let me define partition more precisely. Definition one: commonly referred to as "partition key" , where a column is selected and indexed to speed up query (that is not what i want). Definition two: (this is where my concern is) suppose you have a data set, Spark decides it is going to distribute it across many nodes so it can run operations on the data in parallel. If the data size is too small, this may further slow down the process. How can i set that value
If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.
Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or improved version of repartition() where the movement of the data across the partitions is lower using coalesce.
By default it partitions into 200 sets. You can change it by using set command in sql context sqlContext.sql("set spark.sql.shuffle.partitions=10");
. However you need to set it with caution based up on your data characteristics.
You can call repartition()
on dataframe for setting partitions. You can even set spark.sql.shuffle.partitions
this property after creating hive context or by passing to spark-submit jar:
spark-submit .... --conf spark.sql.shuffle.partitions=100
or
dataframe.repartition(100)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With