I’m writing a data source that shares similarities with Spark’s JDBC data source implementation, and I’d like to ask how Spark handles certain failure scenarios. To my understanding, if an executor dies while it’s running a task, Spark will revive the executor and try to re-run that task. However, how does this play out in the context of data integrity and Spark’s JDBC data source API (e.g. df.write.format("jdbc").option(...).save()
)?
In the savePartition function of JdbcUtils.scala, we see Spark calling the commit and rollback functions of the Java connection object generated from the database url/credentials provided by the user (see below). But if an executor dies right after commit() finishes or before rollback() is called, does Spark try to re-run the task and write the same data partition again, essentially creating duplicate committed rows in the database? And what happens if the executor dies in the middle of calling commit() or rollback()?
try {
...
if (supportsTransactions) {
conn.commit()
}
committed = true
Iterator.empty
} catch {
case e: SQLException =>
...
throw e
} finally {
if (!committed) {
// The stage must fail. We got here through an exception path, so
// let the exception through unless rollback() or close() want to
// tell the user about another problem.
if (supportsTransactions) {
conn.rollback()
}
conn.close()
} else {
...
}
}
But if an executor dies right after commit() finishes or before rollback() is called, does Spark try to re-run the task and write the same data partition again, essentially creating duplicate committed rows in the database?
What would you expect since Spark SQL (which is a high-level API over RDD API) does not really know much about all the peculiarities of JDBC or any other protocol? Not to mention the underlying execution runtime, i.e. the Spark Core.
When you write a structured query like df.write.format(“jdbc”).option(...).save()
Spark SQL translates it to a distributed computation using the low-level assembly-like RDD API. Since it tries to embrace as many "protocols" as possible (incl. JDBC), Spark SQL's DataSource API leaves much of the error handling to the data source itself.
The core of Spark that schedules tasks (that does not know or even care what the tasks do) simply watches execution and if a task fails, it will attempt to execute it again (until 3 failed attempts by default).
So when you write a custom data source, you know the drill and have to deal with such retries in your code.
One way to handle errors is to register a task listener using TaskContext (e.g. addTaskCompletionListener
or addTaskFailureListener
).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With