I am new to spark and am attempting to speed up appending the contents of a dataframe, (that can have between 200k and 2M rows) to a postgres database using df.write:
df.write.format('jdbc').options(
url=psql_url_spark,
driver=spark_env['PSQL_DRIVER'],
dbtable="{schema}.{table}".format(schema=schema, table=table),
user=spark_env['PSQL_USER'],
password=spark_env['PSQL_PASS'],
batchsize=2000000,
queryTimeout=690
).mode(mode).save()
I tried increasing the batchsize but that didn't help, as completing this task still took ~4hours. I've also included some snapshots below from aws emr showing more details about how the job ran. The task to save the dataframe to the postgres table was only assigned to one executor (which I found strange), would speeding this up involve dividing this task between executors?
Also, I have read spark's performance tuning docs but increasing the batchsize
, and queryTimeout
have not seemed to improve performance. (I tried calling df.cache()
in my script before df.write
, but runtime for the script was still 4hrs)
Additionally, my aws emr hardware setup and spark-submit
are:
Master Node (1): m4.xlarge
Core Nodes (2): m5.xlarge
spark-submit --deploy-mode client --executor-cores 4 --num-executors 4 ...
Spark's partitions dictate the number of connections used to push data through the JDBC API. You can control the parallelism by calling coalesce(<N>) or repartition(<N>) depending on the existing number of partitions.
Spark DataFrames (as of Spark 1.4) have a write() method that can be used to write to a database. The write() method returns a DataFrameWriter object. DataFrameWriter objects have a jdbc() method, which is used to save DataFrame contents to an external database table via JDBC.
Sometimes, Spark runs slowly because there are too many concurrent tasks running. The capacity for high concurrency is a beneficial feature, as it provides Spark-native fine-grained sharing. This leads to maximum resource utilization while cutting down query latencies.
Access and process PostgreSQL Data in Apache Spark using the CData JDBC Driver. Apache Spark is a fast and general engine for large-scale data processing. When paired with the CData JDBC Driver for PostgreSQL, Spark can work with live PostgreSQL data.
Spark is a distributed data processing engine, so when you are processing your data or saving it on file system it uses all its executors to perform the task. Spark JDBC is slow because when you establish a JDBC connection, one of the executors establishes link to the target database hence resulting in slow speeds and failure.
To overcome this problem and speed up data writes to the database you need to use one of the following approaches:
Approach 1:
In this approach you need to use postgres COPY command utility in order to speed up the write operation. This requires you to have psycopg2 library on your EMR cluster.
The documentation for COPY utility is here
If you want to know the benchmark differences and why copy is faster visit here!
Postgres also suggests using COPY command for bulk inserts. Now how to bulk insert a spark dataframe. Now to implement faster writes, first save your spark dataframe to EMR file system in csv format and also repartition your output so that no file contains more than 100k rows.
#Repartition your dataframe dynamically based on number of rows in df
df.repartition(10).write.option("maxRecordsPerFile", 100000).mode("overwrite").csv("path/to/save/data)
Now read the files using python and execute copy command for each file.
import psycopg2
#iterate over your files here and generate file object you can also get files list using os module
file = open('path/to/save/data/part-00000_0.csv')
file1 = open('path/to/save/data/part-00000_1.csv')
#define a function
def execute_copy(fileName):
con = psycopg2.connect(database=dbname,user=user,password=password,host=host,port=port)
cursor = con.cursor()
cursor.copy_from(fileName, 'table_name', sep=",")
con.commit()
con.close()
To gain additional speed boost, since you are using EMR cluster you can leverage python multiprocessing to copy more than one file at once.
from multiprocessing import Pool, cpu_count
with Pool(cpu_count()) as p:
print(p.map(execute_copy, [file,file1]))
This is the approach recommended as spark JDBC can't be tuned to gain higher write speeds due to connection constraints.
Approach 2: Since you are already using an AWS EMR cluster you can always leverage the hadoop capabilities to perform your table writes faster. So here we will be using sqoop export to export our data from emrfs to the postgres db.
#If you are using s3 as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir s3://mybucket/myinputfiles/ --driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16
#If you are using EMRFS as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir /path/to/save/data/ --driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16
Why sqoop? Because sqoop opens multiple connections with the database based on the number of mapper specified. So if you specify -m as 8 then 8 concurrent connection streams will be there and those will write data to the postgres.
Also, for more information on using sqoop go through this AWS Blog, SQOOP Considerations and SQOOP Documentation.
If you can hack around your way with code then Approach 1 will definitely give you the performance boost you seek and if you are comfortable with hadoop components like SQOOP then go with second approach.
Hope it helps!
Spark side tuning => Perform repartition
on Datafarme so that there would multiple executor writing to DB in parallel
df
.repartition(10) // No. of concurrent connection Spark to PostgreSQL
.write.format('jdbc').options(
url=psql_url_spark,
driver=spark_env['PSQL_DRIVER'],
dbtable="{schema}.{table}".format(schema=schema, table=table),
user=spark_env['PSQL_USER'],
password=spark_env['PSQL_PASS'],
batchsize=2000000,
queryTimeout=690
).mode(mode).save()
Postgresql side tuning => There will need to bump up below parameters on PostgreSQL respectively.
max_connections
determines the maximum number of concurrent
connections to the database server. The default is typically 100
connections.shared_buffers
configuration parameter determines how much
memory is dedicated to PostgreSQL to use for caching data.To solve the performance issue, you generally need to resolve the below 2 bottlenecks:
df.repartition(n)
" to partiton the dataframe so that each partition is written in DB parallely.
Note - Large number of executors will also lead to slow inserts. So start with 5 partitions and increase the number of partitions by 5 till you get optimal performance.By repartitioning the dataframe you can achieve a better write performance is a known answer. But there is an optimal way of repartitioning your dataframe. Since you are running this process on an EMR cluster , First get to know about the instance type and the number of cores that are running on each of your slave instances. According to that specify your number of partitions on a dataframe. In your case you are using m5.xlarge(2 slaves) which will have 4 vCPUs each which means 4 threads per instance. So 8 partitions will give you an optimal result when you are dealing with huge data.
Note : Number of partitions should be increased or decreased based on your data size.
Note : Batch size is also something you should consider in your writes. Bigger the batch size better the performance
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With