Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to speed up spark df.write jdbc to postgres database?

I am new to spark and am attempting to speed up appending the contents of a dataframe, (that can have between 200k and 2M rows) to a postgres database using df.write:

df.write.format('jdbc').options(
      url=psql_url_spark,
      driver=spark_env['PSQL_DRIVER'],
      dbtable="{schema}.{table}".format(schema=schema, table=table),
      user=spark_env['PSQL_USER'],
      password=spark_env['PSQL_PASS'],
      batchsize=2000000,
      queryTimeout=690
      ).mode(mode).save()

I tried increasing the batchsize but that didn't help, as completing this task still took ~4hours. I've also included some snapshots below from aws emr showing more details about how the job ran. The task to save the dataframe to the postgres table was only assigned to one executor (which I found strange), would speeding this up involve dividing this task between executors?

Also, I have read spark's performance tuning docs but increasing the batchsize, and queryTimeout have not seemed to improve performance. (I tried calling df.cache() in my script before df.write, but runtime for the script was still 4hrs)

Additionally, my aws emr hardware setup and spark-submit are:

Master Node (1): m4.xlarge

Core Nodes (2): m5.xlarge

spark-submit --deploy-mode client --executor-cores 4 --num-executors 4 ...

enter image description here

enter image description here

like image 815
Megan Avatar asked Nov 03 '19 02:11

Megan


People also ask

How do parallel connections work between Spark and a database using JDBC?

Spark's partitions dictate the number of connections used to push data through the JDBC API. You can control the parallelism by calling coalesce(<N>) or repartition(<N>) depending on the existing number of partitions.

How do I convert a Spark DataFrame to a database?

Spark DataFrames (as of Spark 1.4) have a write() method that can be used to write to a database. The write() method returns a DataFrameWriter object. DataFrameWriter objects have a jdbc() method, which is used to save DataFrame contents to an external database table via JDBC.

Why is Spark SQL slow?

Sometimes, Spark runs slowly because there are too many concurrent tasks running. The capacity for high concurrency is a beneficial feature, as it provides Spark-native fine-grained sharing. This leads to maximum resource utilization while cutting down query latencies.

Can Spark connect to PostgreSQL?

Access and process PostgreSQL Data in Apache Spark using the CData JDBC Driver. Apache Spark is a fast and general engine for large-scale data processing. When paired with the CData JDBC Driver for PostgreSQL, Spark can work with live PostgreSQL data.


4 Answers

Spark is a distributed data processing engine, so when you are processing your data or saving it on file system it uses all its executors to perform the task. Spark JDBC is slow because when you establish a JDBC connection, one of the executors establishes link to the target database hence resulting in slow speeds and failure.

To overcome this problem and speed up data writes to the database you need to use one of the following approaches:

Approach 1:

In this approach you need to use postgres COPY command utility in order to speed up the write operation. This requires you to have psycopg2 library on your EMR cluster.

The documentation for COPY utility is here

If you want to know the benchmark differences and why copy is faster visit here!

Postgres also suggests using COPY command for bulk inserts. Now how to bulk insert a spark dataframe. Now to implement faster writes, first save your spark dataframe to EMR file system in csv format and also repartition your output so that no file contains more than 100k rows.

#Repartition your dataframe dynamically based on number of rows in df
df.repartition(10).write.option("maxRecordsPerFile", 100000).mode("overwrite").csv("path/to/save/data)

Now read the files using python and execute copy command for each file.

import psycopg2    
#iterate over your files here and generate file object you can also get files list using os module
file = open('path/to/save/data/part-00000_0.csv')
file1 = open('path/to/save/data/part-00000_1.csv')

#define a function
def execute_copy(fileName):
    con = psycopg2.connect(database=dbname,user=user,password=password,host=host,port=port)
    cursor = con.cursor()
    cursor.copy_from(fileName, 'table_name', sep=",")
    con.commit()
    con.close()

To gain additional speed boost, since you are using EMR cluster you can leverage python multiprocessing to copy more than one file at once.

from multiprocessing import Pool, cpu_count
with Pool(cpu_count()) as p:
        print(p.map(execute_copy, [file,file1]))

This is the approach recommended as spark JDBC can't be tuned to gain higher write speeds due to connection constraints.

Approach 2: Since you are already using an AWS EMR cluster you can always leverage the hadoop capabilities to perform your table writes faster. So here we will be using sqoop export to export our data from emrfs to the postgres db.

#If you are using s3 as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir s3://mybucket/myinputfiles/ --driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16

#If you are using EMRFS as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir /path/to/save/data/ --driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16

Why sqoop? Because sqoop opens multiple connections with the database based on the number of mapper specified. So if you specify -m as 8 then 8 concurrent connection streams will be there and those will write data to the postgres.

Also, for more information on using sqoop go through this AWS Blog, SQOOP Considerations and SQOOP Documentation.

If you can hack around your way with code then Approach 1 will definitely give you the performance boost you seek and if you are comfortable with hadoop components like SQOOP then go with second approach.

Hope it helps!

like image 99
Shubham Jain Avatar answered Oct 19 '22 16:10

Shubham Jain


Spark side tuning => Perform repartition on Datafarme so that there would multiple executor writing to DB in parallel

df
.repartition(10)        // No. of concurrent connection Spark to PostgreSQL
.write.format('jdbc').options(
      url=psql_url_spark,
      driver=spark_env['PSQL_DRIVER'],
      dbtable="{schema}.{table}".format(schema=schema, table=table),
      user=spark_env['PSQL_USER'],
      password=spark_env['PSQL_PASS'],
      batchsize=2000000,
      queryTimeout=690
      ).mode(mode).save()

Postgresql side tuning => There will need to bump up below parameters on PostgreSQL respectively.

  1. max_connections determines the maximum number of concurrent connections to the database server. The default is typically 100 connections.
  2. shared_buffers configuration parameter determines how much memory is dedicated to PostgreSQL to use for caching data.
like image 45
QuickSilver Avatar answered Oct 19 '22 16:10

QuickSilver


To solve the performance issue, you generally need to resolve the below 2 bottlenecks:

  1. Make sure the spark job is writing the data in parallel to DB - To resolve this make sure you have a partitioned dataframe. Use "df.repartition(n)" to partiton the dataframe so that each partition is written in DB parallely. Note - Large number of executors will also lead to slow inserts. So start with 5 partitions and increase the number of partitions by 5 till you get optimal performance.
  2. Make sure the DB has enough compute, memory and storage required for ingesting bulk data.
like image 2
Abraham Avatar answered Oct 19 '22 17:10

Abraham


By repartitioning the dataframe you can achieve a better write performance is a known answer. But there is an optimal way of repartitioning your dataframe. Since you are running this process on an EMR cluster , First get to know about the instance type and the number of cores that are running on each of your slave instances. According to that specify your number of partitions on a dataframe. In your case you are using m5.xlarge(2 slaves) which will have 4 vCPUs each which means 4 threads per instance. So 8 partitions will give you an optimal result when you are dealing with huge data.

Note : Number of partitions should be increased or decreased based on your data size.

Note : Batch size is also something you should consider in your writes. Bigger the batch size better the performance

like image 2
Krishna Chaitanya V Avatar answered Oct 19 '22 16:10

Krishna Chaitanya V