Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

JDBC to Spark Dataframe - How to ensure even partitioning?

I am new to Spark, and am working on creating a DataFrame from a Postgres database table via JDBC, using spark.read.jdbc.

I am a bit confused about the partitioning options, in particular partitionColumn, lowerBound, upperBound, and numPartitions.


  • The documentation seems to indicate that these fields are optional. What happens if I don't provide them?
  • How does Spark know how to partition the queries? How efficient will that be?
  • If I DO specify these options, how do I ensure that the partition sizes are roughly even if the partitionColumn is not evenly distributed?

Let's say I'm going to have 20 executors, so I set my numPartitions to 20.
My partitionColumn is an auto-incremented ID field, and let's say the values range from 1 to 2,000,000
However, because the user selects to process some really old data, along with some really new data, with nothing in the middle, most of the data has ID values either under 100,000 or over 1,900,000.

  • Will my 1st and 20th executors get most of the work, while the other 18 executors sit there mostly idle?

  • If so, is there a way to prevent this?

like image 892
JoeMjr2 Avatar asked Jun 10 '19 22:06

JoeMjr2


People also ask

How to partition a spark dataframe?

Spark has several partitioning methods to achieve parallelism, based on your need, you should choose which one to use. Use only to reduce the number of partitions. Uses rangepartitioning. Ideal to use on numeric columns. Note: partitionBy () is a method from DataFrameWriter class, all others are from DataFrame. 1. Understanding Spark Partitioning

What is partitionby() method in spark?

Note: partitionBy () is a method from DataFrameWriter class, all others are from DataFrame. 1. Understanding Spark Partitioning By default, Spark/PySpark creates partitions that are equal to the number of CPU cores in the machine. Data of each partition resides in a single machine. Spark/PySpark creates a task for each partition.

What happens if you don’t provide a partition key in spark?

If you don’t provide a specific partition key (a column in case of a dataframe), data will be associated with a key. That will produce a (K,V) pair and the destination partition will be attributed by the following algorithm: HashPartitioner is the default partitioner used by Spark.

How to increase the number of partitions in a Dataframe?

If you want to increase the partitions of your DataFrame, all you need to run is the repartition () function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.


1 Answers

I found a way to manually specify the partition boundaries, by using the jdbc constructor with the predicates parameter.

It allows you to explicitly specify individual conditions to be inserted in the "where" clause for each partition, which allows you to specify exactly which range of rows each partition will receive. So, if you don't have a uniformly distributed column to auto-partition on, you can customize your own partition strategy.

An example of how to use it can be found in the accepted answer to this question.

like image 94
JoeMjr2 Avatar answered Nov 11 '22 00:11

JoeMjr2