Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Structured Streaming ForeachWriter and database performance

I've had a go implementing a structured stream like so...

myDataSet
  .map(r =>  StatementWrapper.Transform(r))
  .writeStream
  .foreach(MyWrapper.myWriter)
  .start()
  .awaitTermination()

This all seems to work, but looking at the throughput of MyWrapper.myWriter is horrible. It's effectively trying to be a JDBC sink, it looks like:

val myWriter: ForeachWriter[Seq[String]] = new ForeachWriter[Seq[String]] {

  var connection: Connection = _

  override def open(partitionId: Long, version: Long): Boolean = {
    Try (connection = getRemoteConnection).isSuccess
  }

  override def process(row: Seq[String]) {
    val statement = connection.createStatement()
    try {
      row.foreach( s => statement.execute(s) )
    } catch {
      case e: SQLSyntaxErrorException => println(e)
      case e: SQLException => println(e)
    } finally {
      statement.closeOnCompletion()
    }
  }

  override def close(errorOrNull: Throwable) {
    connection.close()
  }
}

So my question is - Is the new ForeachWriter instantiated for each row ? thus the open() and close() is called for every row in the dataset ?

Is there a better design to improve throughput ?

How to parse SQL statement once and execute many times, also keep the database connection open?

like image 233
Exie Avatar asked Oct 18 '17 22:10

Exie


People also ask

What is the difference between Spark streaming and structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

Can Spark structured streaming API can be used to process graph data?

Spark Components It implements the higher-level Dataset and DataFrame APIs of Spark and adds SQL support on top of it. The libraries built on top of these are: MLLib for machine learning, GraphFrames for graph analysis, and 2 APIs for stream processing: Spark Streaming and Structured Streaming.

How does New data arriving in a stream get represented in structured streaming?

Every data item that is arriving on the stream is like a new row being appended to the Input Table. A query on the input will generate the “Result Table”. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table.

Which property must a Spark structured streaming sink possess to ensure end-to-end exactly once semantics?

exactly once semantics are only possible if the source is re-playable and the sink is idempotent.


1 Answers

Opening and closing of the underlying sink depends on your implementation of ForeachWriter.

The relevant class which invokes ForeachWriter is the ForeachSink, and this is the code which calls your writer:

data.queryExecution.toRdd.foreachPartition { iter =>
  if (writer.open(TaskContext.getPartitionId(), batchId)) {
    try {
      while (iter.hasNext) {
        writer.process(encoder.fromRow(iter.next()))
      }
    } catch {
      case e: Throwable =>
        writer.close(e)
        throw e
    }
    writer.close(null)
  } else {
    writer.close(null)
  }
}

Opening and closing of the writer is attempted foreach batch that is generated from your source. If you want open and close to be literally open and close the sink driver each time, you'll need to do so via your implementation.

If you want more control over how the data is handled, you can implement the Sink trait which gives a batch id and the underlying DataFrame:

trait Sink {
  def addBatch(batchId: Long, data: DataFrame): Unit
}
like image 165
Yuval Itzchakov Avatar answered Sep 19 '22 13:09

Yuval Itzchakov