Does Apache Spark SQL support MERGE clause that's similar to Oracle's MERGE SQL clause?
MERGE into <table> using (
select * from <table1>
when matched then update...
DELETE WHERE...
when not matched then insert...
)
Hive integration Run SQL or HiveQL queries on existing warehouses. Spark SQL supports the HiveQL syntax as well as Hive SerDes and UDFs, allowing you to access existing Hive warehouses. Spark SQL can use existing Hive metastores, SerDes, and UDFs.
Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine. It enables unmodified Hadoop Hive queries to run up to 100x faster on existing deployments and data.
1. It executes SQL queries. 2. We can read data from existing Hive installation using SparkSQL.
Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations.
Spark does support MERGE operation using Delta Lake as storage format. The first thing to do is to save the table using the delta
format to provide support for transactional capabilities and support for DELETE/UPDATE/MERGE operations with spark
Python/scala:
df.write.format("delta").save("/data/events")
SQL: CREATE TABLE events (eventId long, ...) USING delta
Once the table exists, you can run your usual SQL Merge command:
MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
UPDATE SET events.data = updates.data
WHEN NOT MATCHED
THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
The command is also available in Python/Scala:
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
To support Delta Lake format, you also need the delta package as dependency in your spark job:
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_x.xx</artifactId>
<version>xxxx</version>
</dependency>
See https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge for more details
you can write your custom code: Below code you can edit to go with merge instead of Insert. Make sure this is computation heavy operations. but get y
df.rdd.coalesce(2).foreachPartition(partition => {
val connectionProperties = brConnect.value
val jdbcUrl = connectionProperties.getProperty("jdbcurl")
val user = connectionProperties.getProperty("user")
val password = connectionProperties.getProperty("password")
val driver = connectionProperties.getProperty("Driver")
Class.forName(driver)
val dbc: Connection = DriverManager.getConnection(jdbcUrl, user, password)
val db_batchsize = 1000
var pstmt: PreparedStatement = null
partition.grouped(db_batchsize).foreach(batch => {
batch.foreach{ row =>
{
val id = row.id
val fname = row.fname
val lname = row.lname
val userid = row.userid
println(id, fname)
val sqlString = "INSERT employee USING " +
" values (?, ?, ?, ?) "
var pstmt: PreparedStatement = dbc.prepareStatement(sqlString)
pstmt.setLong(1, row.id)
pstmt.setString(2, row.fname)
pstmt.setString(3, row.lname)
pstmt.setString(4, row.userid)
pstmt.addBatch()
pstmt.executeBatch()
}
}
//pstmt.executeBatch()
dbc.commit()
pstmt.close()
})
dbc.close()
} )
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With