Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use JDBC source to write and read data in (Py)Spark?

The goal of this question is to document:

  • steps required to read and write data using JDBC connections in PySpark

  • possible issues with JDBC sources and know solutions

With small changes these methods should work with other supported languages including Scala and R.

like image 413
zero323 Avatar asked Jun 22 '15 15:06

zero323


People also ask

How do parallel connections work between Spark and a database using JDBC?

Spark's partitions dictate the number of connections used to push data through the JDBC API. You can control the parallelism by calling coalesce(<N>) or repartition(<N>) depending on the existing number of partitions.

Does Spark use JDBC?

Spark SQL also includes a data source that can read data from other databases using JDBC.

Can we use JDBC in Spark SQL?

JDBC To Other Databases Spark SQL also includes a data source that can read data from other databases using JDBC. functionality should be preferred over using JdbcRDD. This is because the results are returned

What are the modes supported by JDBC writer in Apache Spark?

Spark JDBC writer supports following modes: append: Append contents of this :class:DataFrameto existing data. overwrite: Overwrite existing data. ignore: Silently ignore this operation if data already exists. error(default case): Throw an exception if data already exists. Upserts or other fine-grained modifications are not supported mode = ...

Is it possible to read from a SQL context in spark?

Additionally, you should use the spark session not the SQLContext directly to read from the sql server (even thought that depends on your spark version): Hope this helps, good luck!

How do you write to a Dataframe in spark?

The foundation for writing data in Spark is the DataFrameWriter, which is accessed per-DataFrame using the attribute dataFrame.write. Save modes — specifies what will happen if Spark finds data already at the destination. There are 4 typical save modes and the default mode is errorIfExists.


1 Answers

Writing data

  1. Include applicable JDBC driver when you submit the application or start shell. You can use for example --packages:

     bin/pyspark --packages group:name:version   

or combining driver-class-path and jars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR 

These properties can be also set using PYSPARK_SUBMIT_ARGS environment variable before JVM instance has been started or using conf/spark-defaults.conf to set spark.jars.packages or spark.jars / spark.driver.extraClassPath.

  1. Choose desired mode. Spark JDBC writer supports following modes:

    • append: Append contents of this :class:DataFrame to existing data.
    • overwrite: Overwrite existing data.
    • ignore: Silently ignore this operation if data already exists.
    • error (default case): Throw an exception if data already exists.

    Upserts or other fine-grained modifications are not supported

     mode = ... 
  2. Prepare JDBC URI, for example:

     # You can encode credentials in URI or pass  # separately using properties argument  # of jdbc method or options   url = "jdbc:postgresql://localhost/foobar" 
  3. (Optional) Create a dictionary of JDBC arguments.

     properties = {      "user": "foo",      "password": "bar"  } 

    properties / options can be also used to set supported JDBC connection properties.

  4. Use DataFrame.write.jdbc

     df.write.jdbc(url=url, table="baz", mode=mode, properties=properties) 

to save the data (see pyspark.sql.DataFrameWriter for details).

Known issues:

  • Suitable driver cannot be found when driver has been included using --packages (java.sql.SQLException: No suitable driver found for jdbc: ...)

    Assuming there is no driver version mismatch to solve this you can add driver class to the properties. For example:

      properties = {       ...       "driver": "org.postgresql.Driver"   } 
  • using df.write.format("jdbc").options(...).save() may result in:

    java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select.

    Solution unknown.

  • in Pyspark 1.3 you can try calling Java method directly:

      df._jdf.insertIntoJDBC(url, "baz", True) 

Reading data

  1. Follow steps 1-4 from Writing data

  2. Use sqlContext.read.jdbc:

     sqlContext.read.jdbc(url=url, table="baz", properties=properties) 

or sqlContext.read.format("jdbc"):

    (sqlContext.read.format("jdbc")         .options(url=url, dbtable="baz", **properties)         .load()) 

Known issues and gotchas:

  • Suitable driver cannot be found - see: Writing data

  • Spark SQL supports predicate pushdown with JDBC sources although not all predicates can pushed down. It also doesn't delegate limits nor aggregations. Possible workaround is to replace dbtable / table argument with a valid subquery. See for example:

    • Does spark predicate pushdown work with JDBC?
    • More than one hour to execute pyspark.sql.DataFrame.take(4)
    • How to use SQL query to define table in dbtable?
  • By default JDBC data sources loads data sequentially using a single executor thread. To ensure distributed data loading you can:

    • Provide partitioning column (must be IntegerType), lowerBound, upperBound, numPartitions.
    • Provide a list of mutually exclusive predicates predicates, one for each desired partition.

    See:

    • Partitioning in spark while reading from RDBMS via JDBC,
    • How to optimize partitioning when migrating data from JDBC source?,
    • How to improve performance for slow Spark jobs using DataFrame and JDBC connection?
    • How to partition Spark RDD when importing Postgres using JDBC?
  • In a distributed mode (with partitioning column or predicates) each executor operates in its own transaction. If the source database is modified at the same time there is no guarantee that the final view will be consistent.

Where to find suitable drivers:

  • Maven Repository (to obtain required coordinates for --packages select desired version and copy data from a Gradle tab in a form compile-group:name:version substituting respective fields) or Maven Central Repository:

    • PostgreSQL
    • MySQL

Other options

Depending on the database specialized source might exist, and be preferred in some cases:

  • Greenplum - Pivotal Greenplum-Spark Connector
  • Apache Phoenix - Apache Spark Plugin
  • Microsoft SQL Server - Spark connector for Azure SQL Databases and SQL Server
  • Amazon Redshift - Databricks Redshift connector (current versions available only in a proprietary Databricks Runtime. Discontinued open source version, available on GitHub).
like image 90
zero323 Avatar answered Sep 18 '22 04:09

zero323