Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to Read Data from DB in Spark in parallel

I need to Read Data from DB2 Database using Spark SQL (As Sqoop is not present)

I know about this function which will read data in parellel by opening multiple connections

jdbc(url: String, table: String, columnName: String, lowerBound: Long,upperBound: Long, numPartitions: Int, connectionProperties: Properties)

My issue is that I don't have a column which is incremental like this. Also I need to read data through Query only as my table is quite large. Does anybody know about way to read data through API or I have to create something on my own

like image 224
Saurabh Sharma Avatar asked Aug 11 '16 20:08

Saurabh Sharma


2 Answers

Saurabh, in order to read in parallel using the standard Spark JDBC data source support you need indeed to use the numPartitions option as you supposed.

But you need to give Spark some clue how to split the reading SQL statements into multiple parallel ones. So you need some sort of integer partitioning column where you have a definitive max and min value.

If your DB2 system is MPP partitioned there is an implicit partitioning already existing and you can in fact leverage that fact and read each DB2 database partition in parallel:

var df = spark.read.
format("jdbc").
option("url", "jdbc:db2://<DB2 server>:<DB2 port>/<dbname>").
option("user", "<username>").
option("password", "<password>").
option("dbtable", "<your table>").
option("partitionColumn", "DBPARTITIONNUM(<a column name>)").
option("lowerBound", "<lowest partition number>").
option("upperBound", "<largest partition number>").
option("numPartitions", "<number of partitions>").
load()

So as you can see the DBPARTITIONNUM() function is the partitioning key here.

Just in case you don't know the partitioning of your DB2 MPP system, here is how you can find it out with SQL:

SELECT min(member_number), max(member_number), count(member_number) 
FROM TABLE(SYSPROC.DB_MEMBERS())

In case you use multiple partition groups and different tables could be distributed on different set of partitions you can use this SQL to figure out the list of partitions per table:

SELECT t2.DBPARTITIONNUM, t3.HOST_NAME
 FROM SYSCAT.TABLESPACES as t1,  SYSCAT.DBPARTITIONGROUPDEF as t2,
      SYSCAT.TABLES t4, TABLE(SYSPROC.DB_MEMBERS()) as t3 
 WHERE t1.TBSPACEID = t4.TBSPACEID AND
       t4.TABSCHEMA='<myschema>' AND
       t4.TABNAME='<mytab>' AND
       t1.DBPGNAME = t2.DBPGNAME AND
       t2.DBPARTITIONNUM = t3.PARTITION_NUMBER;
like image 52
Torsten Steinbach Avatar answered Oct 08 '22 22:10

Torsten Steinbach


You don't need the identity column to read in parallel and the table variable only specifies the source. After registering the table, you can limit the data read from it using your Spark SQL query using aWHERE clause. If this is not an option, you could use a view instead, or as described in this post, you can also use any arbitrary subquery as your table input.

val dataframe = sqlContext.read.format("jdbc").option("url", "jdbc:db2://localhost/sparksql").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "table").option("user", "root").option("password", "root").load()
dataframe.registerTempTable("table")
dataframe.sqlContext.sql("select * from table where dummy_flag=1").collect.foreach(println)
like image 33
Alex Avatar answered Oct 08 '22 22:10

Alex