I've had a go implementing a structured stream like so...
myDataSet
.map(r => StatementWrapper.Transform(r))
.writeStream
.foreach(MyWrapper.myWriter)
.start()
.awaitTermination()
This all seems to work, but looking at the throughput of MyWrapper.myWriter is horrible. It's effectively trying to be a JDBC sink, it looks like:
val myWriter: ForeachWriter[Seq[String]] = new ForeachWriter[Seq[String]] {
var connection: Connection = _
override def open(partitionId: Long, version: Long): Boolean = {
Try (connection = getRemoteConnection).isSuccess
}
override def process(row: Seq[String]) {
val statement = connection.createStatement()
try {
row.foreach( s => statement.execute(s) )
} catch {
case e: SQLSyntaxErrorException => println(e)
case e: SQLException => println(e)
} finally {
statement.closeOnCompletion()
}
}
override def close(errorOrNull: Throwable) {
connection.close()
}
}
So my question is - Is the new ForeachWriter instantiated for each row ? thus the open() and close() is called for every row in the dataset ?
Is there a better design to improve throughput ?
How to parse SQL statement once and execute many times, also keep the database connection open?
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
Spark Components It implements the higher-level Dataset and DataFrame APIs of Spark and adds SQL support on top of it. The libraries built on top of these are: MLLib for machine learning, GraphFrames for graph analysis, and 2 APIs for stream processing: Spark Streaming and Structured Streaming.
Every data item that is arriving on the stream is like a new row being appended to the Input Table. A query on the input will generate the “Result Table”. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table.
exactly once semantics are only possible if the source is re-playable and the sink is idempotent.
Opening and closing of the underlying sink depends on your implementation of ForeachWriter
.
The relevant class which invokes ForeachWriter
is the ForeachSink
, and this is the code which calls your writer:
data.queryExecution.toRdd.foreachPartition { iter =>
if (writer.open(TaskContext.getPartitionId(), batchId)) {
try {
while (iter.hasNext) {
writer.process(encoder.fromRow(iter.next()))
}
} catch {
case e: Throwable =>
writer.close(e)
throw e
}
writer.close(null)
} else {
writer.close(null)
}
}
Opening and closing of the writer is attempted foreach batch that is generated from your source. If you want open
and close
to be literally open and close the sink driver each time, you'll need to do so via your implementation.
If you want more control over how the data is handled, you can implement the Sink
trait which gives a batch id and the underlying DataFrame
:
trait Sink {
def addBatch(batchId: Long, data: DataFrame): Unit
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With