Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does Apache Spark SQL support MERGE clause?

Does Apache Spark SQL support MERGE clause that's similar to Oracle's MERGE SQL clause?

MERGE into <table> using (
  select * from <table1>
    when matched then update...
       DELETE WHERE...
    when not matched then insert...
)
like image 482
DilTeam Avatar asked Oct 06 '17 21:10

DilTeam


People also ask

What type of SQL does Spark SQL use?

Hive integration Run SQL or HiveQL queries on existing warehouses. Spark SQL supports the HiveQL syntax as well as Hive SerDes and UDFs, allowing you to access existing Hive warehouses. Spark SQL can use existing Hive metastores, SerDes, and UDFs.

How is Spark SQL different from SQL?

Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine. It enables unmodified Hadoop Hive queries to run up to 100x faster on existing deployments and data.

Which of the following are uses of Apache Spark SQL?

1. It executes SQL queries. 2. We can read data from existing Hive installation using SparkSQL.

What is difference between Spark and Spark SQL?

Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations.


2 Answers

Spark does support MERGE operation using Delta Lake as storage format. The first thing to do is to save the table using the delta format to provide support for transactional capabilities and support for DELETE/UPDATE/MERGE operations with spark

Python/scala: df.write.format("delta").save("/data/events")

SQL: CREATE TABLE events (eventId long, ...) USING delta

Once the table exists, you can run your usual SQL Merge command:

MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
  UPDATE SET events.data = updates.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

The command is also available in Python/Scala:

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched
  .updateExpr(
    Map("data" -> "updates.data"))
  .whenNotMatched
  .insertExpr(
    Map(
      "date" -> "updates.date",
      "eventId" -> "updates.eventId",
      "data" -> "updates.data"))
  .execute()

To support Delta Lake format, you also need the delta package as dependency in your spark job:

<dependency>
  <groupId>io.delta</groupId>
  <artifactId>delta-core_x.xx</artifactId>
  <version>xxxx</version>
</dependency>

See https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge for more details

like image 182
Quentin Avatar answered Oct 02 '22 20:10

Quentin


you can write your custom code: Below code you can edit to go with merge instead of Insert. Make sure this is computation heavy operations. but get y

  df.rdd.coalesce(2).foreachPartition(partition => {
  val connectionProperties = brConnect.value
  val jdbcUrl = connectionProperties.getProperty("jdbcurl")
  val user = connectionProperties.getProperty("user")
  val password = connectionProperties.getProperty("password")
  val driver = connectionProperties.getProperty("Driver")
  Class.forName(driver)

  val dbc: Connection = DriverManager.getConnection(jdbcUrl, user, password)
  val db_batchsize = 1000
  var pstmt: PreparedStatement = null

  partition.grouped(db_batchsize).foreach(batch => {
    batch.foreach{ row =>
      {
        val id = row.id
        val fname = row.fname
        val lname = row.lname
        val userid = row.userid
        println(id, fname)
        val sqlString = "INSERT employee USING   " +
        " values (?, ?, ?, ?) "

        var pstmt: PreparedStatement = dbc.prepareStatement(sqlString)
        pstmt.setLong(1, row.id)
        pstmt.setString(2, row.fname)
        pstmt.setString(3, row.lname)
        pstmt.setString(4, row.userid)
        pstmt.addBatch()
        pstmt.executeBatch()
      }

    }
    //pstmt.executeBatch()
    dbc.commit()
    pstmt.close()
  })
  dbc.close()
} )
like image 20
Dip Avatar answered Oct 02 '22 21:10

Dip