Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to partition Spark RDD when importing Postgres using JDBC?

I am importing a Postgres database into Spark. I know that I can partition on import, but that requires that I have a numeric column (I don't want to use the value column because it's all over the place and doesn't maintain order):

df = spark.read.format('jdbc').options(url=url, dbtable='tableName', properties=properties).load()
df.printSchema()

root
 |-- id: string (nullable = false)
 |-- timestamp: timestamp (nullable = false)
 |-- key: string (nullable = false)
 |-- value: double (nullable = false)

Instead, I am converting the dataframe into an rdd (of enumerated tuples) and trying to partition that instead:

rdd = df.rdd.flatMap(lambda x: enumerate(x)).partitionBy(20)

Note that I used 20 because I have 5 workers with one core each in my cluster, and 5*4=20.

Unfortunately, the following command still takes forever to execute:

result = rdd.first()

Therefore I am wondering if my logic above makes sense? Am I doing anything wrong? From the web GUI, it looks like the workers are not being used:

enter image description here

like image 449
FullStack Avatar asked Sep 20 '16 15:09

FullStack


People also ask

Can RDD be partitioned?

Apache Spark's Resilient Distributed Datasets (RDD) are a collection of various data that are so big in size, that they cannot fit into a single node and should be partitioned across various nodes. Apache Spark automatically partitions RDDs and distributes the partitions across different nodes.

How many partitions should a Spark RDD have?

As already mentioned above, one partition is created for each block of the file in HDFS which is of size 64MB. However, when creating a RDD a second argument can be passed that defines the number of partitions to be created for an RDD. The above line of code will create an RDD named textFile with 5 partitions.

Which of the function is used to reduce the number of partitions for RDD?

All RDDs offer two functions to change the number of partitions - repartition and coalesce. Repartition is used to increase/decrease the number the number partition while coalesce is used to decrease the number of partition.

Does RDD reside in default memory?

Yes, All 10 RDDs data will spread in spark worker machines RAM. but not necessary to all machines must have a partition of each RDD. off course RDD will have data in memory only if any action performed on it as it's lazily evaluated.


1 Answers

Since you already know you can partition by a numeric column this is probably what you should do. Here is the trick. First lets find a minimum and maximum epoch:

url = ...
properties = ...

min_max_query = """(
    SELECT
        CAST(min(extract(epoch FROM timestamp)) AS bigint), 
        CAST(max(extract(epoch FROM timestamp)) AS bigint)
    FROM tablename
) tmp"""

min_epoch, max_epoch = spark.read.jdbc(
    url=url, table=min_max_query, properties=properties
).first()

and use it to query the table:

numPartitions = ...

query = """(
    SELECT *, CAST(extract(epoch FROM timestamp) AS bigint) AS epoch
    FROM tablename) AS tmp"""

spark.read.jdbc(
    url=url, table=query,
    lowerBound=min_epoch, upperBound=max_epoch + 1, 
    column="epoch", numPartitions=numPartitions, properties=properties
).drop("epoch")

Since this splits data into ranges of the same size it is relatively sensitive to data skew so you should use it with caution.

You could also provide a list of disjoint predicates as a predicates argument.

predicates= [
    "id BETWEEN 'a' AND 'c'",
    "id BETWEEN 'd' AND 'g'",
    ...   # Continue to get full coverage an desired number of predicates
]

spark.read.jdbc(
    url=url, table="tablename", properties=properties, 
    predicates=predicates
)

The latter approach is much more flexible and can address certain issues with non-uniform data distribution but requires more knowledge about the data.

Using partitionBy fetches data first and then performs full shuffle to get desired number of partitions so it is relativistically expensive.

like image 52
zero323 Avatar answered Nov 15 '22 08:11

zero323