Using Spark 1.4.0, I am trying to insert data from a Spark DataFrame into a MemSQL database (which should be exactly like interacting with a MySQL database) using insertIntoJdbc(). However I keep getting a Runtime TableAlreadyExists exception.
First I create the MemSQL table like this:
CREATE TABLE IF NOT EXISTS table1 (id INT AUTO_INCREMENT PRIMARY KEY, val INT);
Then I create a simple dataframe in Spark and try to insert into MemSQL like this:
val df = sc.parallelize(Array(123,234)).toDF.toDF("val")
//df: org.apache.spark.sql.DataFrame = [val: int]
df.insertIntoJDBC("jdbc:mysql://172.17.01:3306/test?user=root", "table1", false)
java.lang.RuntimeException: Table table1 already exists.
You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.
To get the schema of the Spark DataFrame, use printSchema() on Spark DataFrame object. From the above example, printSchema() prints the schema to console( stdout ) and show() displays the content of the Spark DataFrame.
toDF() toDF() method provides a very concise way to create a Dataframe. This method can be applied to a sequence of objects. To access the toDF() method, we have to import spark.
The repartition() method is used to increase or decrease the number of partitions of an RDD or dataframe in spark. This method performs a full shuffle of data across all the nodes. It creates partitions of more or less equal in size. This is a costly operation given that it involves data movement all over the network.
This solution applies to general JDBC connections, although the answer by @wayne is probably a better solution for memSQL specifically.
insertIntoJdbc seems to have been deprecated as of 1.4.0, and using it actually calls write.jdbc().
write() returns a DataFrameWriter object. If you want to append data to your table you will have to change the save mode of the object to "append"
.
Another issue with the example in the question above is the DataFrame schema didn't match the schema of the target table.
The code below gives a working example from the Spark shell. I am using spark-shell --driver-class-path mysql-connector-java-5.1.36-bin.jar
to start my spark-shell session.
import java.util.Properties
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "")
val df = sc.parallelize(Array((1,234), (2,1233))).toDF.toDF("id", "val")
val dfWriter = df.write.mode("append")
dfWriter.jdbc("jdbc:mysql://172.17.01:3306/test", "table1", prop)
The insertIntoJDBC docs are actually incorrect; they say that the table must already exist, but in fact if it does, it'll throw an error, as you can see above:
https://github.com/apache/spark/blob/03cca5dce2cd7618b5c0e33163efb8502415b06e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L264
We recommend using our MemSQL Spark connector, which you can find here:
https://github.com/memsql/memsql-spark-connector
If you include that library and import com.memsql.spark.connector._ in your code, you can use df.saveToMemSQL(...) to save your DataFrame to MemSQL. You can find documentation for our connector here:
http://memsql.github.io/memsql-spark-connector/latest/api/#com.memsql.spark.connector.DataFrameFunctions
I had same issue . Updating spark version to 1.6.2 worked fine
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With