Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark DataFrame InsertIntoJDBC - TableAlreadyExists Exception

Using Spark 1.4.0, I am trying to insert data from a Spark DataFrame into a MemSQL database (which should be exactly like interacting with a MySQL database) using insertIntoJdbc(). However I keep getting a Runtime TableAlreadyExists exception.

First I create the MemSQL table like this:

CREATE TABLE IF NOT EXISTS table1 (id INT AUTO_INCREMENT PRIMARY KEY, val INT);

Then I create a simple dataframe in Spark and try to insert into MemSQL like this:

val df = sc.parallelize(Array(123,234)).toDF.toDF("val")
//df: org.apache.spark.sql.DataFrame = [val: int]

df.insertIntoJDBC("jdbc:mysql://172.17.01:3306/test?user=root", "table1", false)

java.lang.RuntimeException: Table table1 already exists.
like image 953
DJElbow Avatar asked Oct 02 '15 20:10

DJElbow


People also ask

How do I select multiple columns in spark data frame?

You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.

How do I print a schema of a DataFrame in spark?

To get the schema of the Spark DataFrame, use printSchema() on Spark DataFrame object. From the above example, printSchema() prints the schema to console( stdout ) and show() displays the content of the Spark DataFrame.

What is spark toDF?

toDF() toDF() method provides a very concise way to create a Dataframe. This method can be applied to a sequence of objects. To access the toDF() method, we have to import spark.

How do you repartition spark?

The repartition() method is used to increase or decrease the number of partitions of an RDD or dataframe in spark. This method performs a full shuffle of data across all the nodes. It creates partitions of more or less equal in size. This is a costly operation given that it involves data movement all over the network.


3 Answers

This solution applies to general JDBC connections, although the answer by @wayne is probably a better solution for memSQL specifically.

insertIntoJdbc seems to have been deprecated as of 1.4.0, and using it actually calls write.jdbc().

write() returns a DataFrameWriter object. If you want to append data to your table you will have to change the save mode of the object to "append".

Another issue with the example in the question above is the DataFrame schema didn't match the schema of the target table.

The code below gives a working example from the Spark shell. I am using spark-shell --driver-class-path mysql-connector-java-5.1.36-bin.jar to start my spark-shell session.

import java.util.Properties

val prop = new Properties() 
prop.put("user", "root")
prop.put("password", "")  

val df = sc.parallelize(Array((1,234), (2,1233))).toDF.toDF("id", "val")   
val dfWriter = df.write.mode("append") 

dfWriter.jdbc("jdbc:mysql://172.17.01:3306/test", "table1", prop) 
like image 74
DJElbow Avatar answered Oct 17 '22 08:10

DJElbow


The insertIntoJDBC docs are actually incorrect; they say that the table must already exist, but in fact if it does, it'll throw an error, as you can see above:

https://github.com/apache/spark/blob/03cca5dce2cd7618b5c0e33163efb8502415b06e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L264

We recommend using our MemSQL Spark connector, which you can find here:

https://github.com/memsql/memsql-spark-connector

If you include that library and import com.memsql.spark.connector._ in your code, you can use df.saveToMemSQL(...) to save your DataFrame to MemSQL. You can find documentation for our connector here:

http://memsql.github.io/memsql-spark-connector/latest/api/#com.memsql.spark.connector.DataFrameFunctions

like image 22
Wayne Song Avatar answered Oct 17 '22 08:10

Wayne Song


I had same issue . Updating spark version to 1.6.2 worked fine

like image 31
Dinesh Parmar Avatar answered Oct 17 '22 08:10

Dinesh Parmar