Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Structured Streaming MemoryStream + Row + Encoders issue

I am trying to run some tests on my local machine with spark structured streaming.

In batch mode here is the Row that i am dealing with:

val recordSchema = StructType(List(StructField("Record", MapType(StringType, StringType), false)))
val rows         = List(
    Row(
      Map("ID" -> "1",
        "STRUCTUREID" -> "MFCD00869853",
        "MOLFILE" -> "The MOL Data",
        "MOLWEIGHT" -> "803.482",
        "FORMULA" -> "C44H69NO12",
        "NAME" -> "Tacrolimus",
        "HASH" -> "52b966c551cfe0fa7d526bac16abcb7be8b8867d",
        "SMILES" -> """[H][C@]12O[C@](O)([C@H](C)C[C@@H]1OC)""",
        "METABOLISM" -> "The metabolism 500"
       )),
    Row(
      Map("ID" -> "2",
        "STRUCTUREID" -> "MFCD00869854",
        "MOLFILE" -> "The MOL Data",
        "MOLWEIGHT" -> "603.482",
        "FORMULA" -> "",
        "NAME" -> "Tacrolimus2",
        "HASH" -> "52b966c551cfe0fa7d526bac16abcb7be8b8867d",
        "SMILES" -> """[H][C@]12O[C@](O)([C@H](C)C[C@@H]1OC)""",
        "METABOLISM" -> "The metabolism 500"
      ))
  )
val df  = spark.createDataFrame(spark.sparkContext.parallelize(rows), recordSchema)

Working with that in Batch more works as a charm, no issue.

Now I'm try to move in streaming mode using MemoryStream for testing. I added the following:

implicit val ctx = spark.sqlContext
val intsInput = MemoryStream[Row]

But the compiler complain with the as follows:

No implicits found for parameter evidence$1: Encoder[Row]

Hence, my question: What should I do here to get that working

Also i saw that if I add the following import the error goes away:

import spark.implicits._

Actually, I now get the following warning instead of an error

Ambiguous implicits for parameter evidence$1: Encoder[Row]

I do not understand the encoder mechanism well and would appreciate if someone could explain to me how not to use those implicits. The reason being that I red the following in a book when it comes to the creation of DataFrame from Rows.

Recommended appraoch:

val myManualSchema = new StructType(Array(
  new StructField("some", StringType, true),
  new StructField("col", StringType, true),
  new StructField("names", LongType, false)))
val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRDD, myManualSchema)
myDf.show()

And then the author goes on with this:

In Scala, we can also take advantage of Spark’s implicits in the console (and if you import them in your JAR code) by running toDF on a Seq type. This does not play well with null types, so it’s not necessarily recommended for production use cases.

val myDF = Seq(("Hello", 2, 1L)).toDF("col1", "col2", "col3")

If someone could take the time to explain what is happening in my scenario when i use the implicit, and if it is rather safe to do so, or else is there a way to do it more explicitly without importing the implicit.

Finally, if someone could point me to a good doc around Encoder and Spark Type mapping that would be great.

EDIT1

I finally got it to work with

  implicit val ctx = spark.sqlContext
  import spark.implicits._
  val rows = MemoryStream[Map[String,String]]
  val df = rows.toDF()

Although my problem here is that i am not confident about what I am doing. It seems to me that it is like in some situation I need to create a DataSet to be able to convert it in an DF[ROW] with toDF conversion. I understood that working with DS is typeSafe but slower than with DF. So why this intermediary with DataSet? This is not the first time that i see that in Spark Structured Streaming. Again if someone could help me with those, that would be great.

like image 475
MaatDeamon Avatar asked Sep 08 '18 18:09

MaatDeamon


1 Answers

I encourage you to use Scala's case classes for data modeling.

final case class Product(name: String, catalogNumber: String, cas: String, formula: String, weight: Double, mld: String)

Now you can have a List of Product in memory:

  val inMemoryRecords: List[Product] = List(
    Product("Cyclohexanecarboxylic acid", " D19706", "1148027-03-5", "C(11)H(13)Cl(2)NO(5)", 310.131, "MFCD11226417"),
    Product("Tacrolimus", "G51159", "104987-11-3", "C(44)H(69)NO(12)", 804.018, "MFCD00869853"),
    Product("Methanol", "T57494", "173310-45-7", "C(8)H(8)Cl(2)O", 191.055, "MFCD27756662")
  )

The structured streaming API makes it easy to reason about stream processing by using the widely known Dataset[T] abstraction. Roughly speaking, you just have to worry about three things:

  • Source: a source can generate an input data stream which we can represent as a Dataset[Input]. Every new data item Input that arrives is going to be appended into this unbounded dataset. You can manipulate the data as you wish (e.g. Dataset[Input] => Dataset[Output]).
  • StreamingQueries and Sink: a query generates a result table that's updated from the Source every trigger interval. Changes are written into external storage called a Sink.
  • Output modes: there are different modes on which you can write data into the Sink: complete mode, append mode, and update mode.

Let's assume that you want to know the products that contain a molecular weight bigger than 200 units.

As you said, using the batch API is fairly simple and straight-forward:

// Create an static dataset using the in-memory data
val staticData: Dataset[Product] = spark.createDataset(inMemoryRecords)

// Processing...
val result: Dataset[Product] = staticData.filter(_.weight > 200)

// Print results!
result.show()

When using the Streaming API you just need to define a source and a sink as an extra step. In this example, we can use the MemoryStream and the console sink to print out the results.

// Create an streaming dataset using the in-memory data (memory source)
val productSource = MemoryStream[Product]
productSource.addData(inMemoryRecords)

val streamingData: Dataset[Product] = productSource.toDS()

// Processing...
val result: Dataset[Product] = streamingData.filter(_.weight > 200)

// Print results by using the console sink. 
val query: StreamingQuery = result.writeStream.format("console").start()

// Stop streaming
query.awaitTermination(timeoutMs=5000)
query.stop()

Note that the staticData and the streamingData have the exact type signature (i.e., Dataset[Product]). This allows us to apply the same processing steps regardless of using the Batch or Streaming API. You can also think of implementing a generic method def processing[In, Out](inputData: Dataset[In]): Dataset[Out] = ??? to avoid repeating yourself in both approaches.

Complete code example:

object ExMemoryStream extends App {

  // Boilerplate code...
  val spark: SparkSession = SparkSession.builder
    .appName("ExMemoryStreaming")
    .master("local[*]")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  import spark.implicits._
  implicit val sqlContext: SQLContext = spark.sqlContext

  // Define your data models 
  final case class Product(name: String, catalogNumber: String, cas: String, formula: String, weight: Double, mld: String)

  // Create some in-memory instances
  val inMemoryRecords: List[Product] = List(
    Product("Cyclohexanecarboxylic acid", " D19706", "1148027-03-5", "C(11)H(13)Cl(2)NO(5)", 310.131, "MFCD11226417"),
    Product("Tacrolimus", "G51159", "104987-11-3", "C(44)H(69)NO(12)", 804.018, "MFCD00869853"),
    Product("Methanol", "T57494", "173310-45-7", "C(8)H(8)Cl(2)O", 191.055, "MFCD27756662")
  )

  // Defining processing step
  def processing(inputData: Dataset[Product]): Dataset[Product] =
    inputData.filter(_.weight > 200)

  // STATIC DATASET
  val datasetStatic: Dataset[Product] = spark.createDataset(inMemoryRecords)

  println("This is the static dataset:")
  processing(datasetStatic).show()

  // STREAMING DATASET
  val productSource = MemoryStream[Product]
  productSource.addData(inMemoryRecords)

  val datasetStreaming: Dataset[Product] = productSource.toDS()

  println("This is the streaming dataset:")
  val query: StreamingQuery = processing(datasetStreaming).writeStream.format("console").start()
  query.awaitTermination(timeoutMs=5000)
  
  // Stop query and close Spark
  query.stop()
  spark.close()

}
like image 141
Rodrigo Hernández Mota Avatar answered Sep 24 '22 15:09

Rodrigo Hernández Mota