Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark SQL and MySQL- SaveMode.Overwrite not inserting modified data

I have a test table in MySQL with id and name like below:

+----+-------+
| id | name  |
+----+-------+
| 1  | Name1 |
+----+-------+
| 2  | Name2 |
+----+-------+
| 3  | Name3 |
+----+-------+

I am using Spark DataFrame to read this data (using JDBC) and modifying the data like this

Dataset<Row> modified = sparkSession.sql("select id, concat(name,' - new') as name from test");
modified.write().mode("overwrite").jdbc(AppProperties.MYSQL_CONNECTION_URL,
                "test", connectionProperties);

But my problem is, if I give overwrite mode, it drops the previous table and creates a new table but not inserting any data.

I tried the same program by reading from a csv file (same data as test table) and overwriting. That worked for me.

Am I missing something here ?

Thank You!

like image 422
Rijo Joseph Avatar asked Jan 26 '17 12:01

Rijo Joseph


People also ask

How does Spark overwrite mode work?

Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.

What is %SQL in Spark?

Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine. It enables unmodified Hadoop Hive queries to run up to 100x faster on existing deployments and data.


2 Answers

The problem is in your code. Because you overwrite a table from which you're trying to read you effectively obliterate all data before Spark can actually access it.

Remember that Spark is lazy. When you create a Dataset Spark fetches required metadata, but doesn't load the data. So there is no magic cache which will preserve original content. Data will be loaded when it is actually required. Here it is when you execute write action and when you start writing there is no more data to be fetched.

What you need is something like this:

  • Create a Dataset.
  • Apply required transformations and write data to an intermediate MySQL table.

  • TRUNCATE the original input and INSERT INTO ... SELECT from the intermediate table or DROP the original table and RENAME intermediate table.

Alternative, but less favorable approach, would be:

  • Create a Dataset.
  • Apply required transformations and write data to a persistent Spark table (df.write.saveAsTable(...) or equivalent)
  • TRUNCATE the original input.
  • Read data back and save (spark.table(...).write.jdbc(...))
  • Drop Spark table.

We cannot stress enough that using Spark cache / persist is not the way to go. Even in with the conservative StorageLevel (MEMORY_AND_DISK_2 / MEMORY_AND_DISK_SER_2) cached data can be lost (node failures), leading to silent correctness errors.

like image 185
zero323 Avatar answered Sep 18 '22 15:09

zero323


I believe all the steps above are unnecessary. Here's what you need to do:

  • Create a dataset A like val A = spark.read.parquet("....")

  • Read the table to be updated, as dataframe B. Make sure enable caching is enabled for dataframe B. val B = spark.read.jdbc("mytable").cache

  • Force a count on B - this will force execution and cache the table depending on the chosen StorageLevel - B.count

  • Now, you can do a transformation like val C = A.union(B)

  • And, then write C back to the database like C.write.mode(SaveMode.Overwrite).jdbc("mytable")

like image 20
Joe Nate Avatar answered Sep 21 '22 15:09

Joe Nate