Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark structured streaming - update data frame's schema on the fly

I have a simple structured streaming job which monitors a directory for CSV files and writes parquet files - no transformation in between.

The job starts by building a data frame from reading CSV files using readStream(), with a schema which I get from calling a function called buildSchema(). Here is the code:

  var df = spark
    .readStream
    .option("sep", "|")
    .option("header","true")
    .schema(buildSchema(spark, table_name).get) // buildSchema() gets schema for me
    .csv(input_base_dir + table_name + "*")

  logger.info(" new batch indicator")

  if (df.schema != buildSchema(spark, table_name).get) {
    df = spark.sqlContext.createDataFrame(df.collectAsList(), buildSchema(spark, table_name).get)
  }

  val query =
    df.writeStream
      .format("parquet")
      .queryName("convertCSVtoPqrquet for table " + table_name)
      .option("path", output_base_dir + table_name + "/")
      .trigger(ProcessingTime(60.seconds))
      .start()

The job runs fine, but my question is, I'd like to always use the latest schema to build my data frame, or in other words, to read from the CSV files. While buildSchema() can get me the latest schema, I'm not sure how to call it periodically (or once per CSV file), and then use the latest schema to somehow re-generate or modify the data frame.

When testing, my observation is that only the query object is running continuously batch after batch; the log statement that I put, and the if()statement for schema comparison, only happened once at the beginning of the application.

Can data frame schema in structured streaming job be modified after query.start() is called? What would you suggest as a good workaround if we cannot change the schema of a data frame?

Thanks in advance.

like image 262
Howard Xie Avatar asked Feb 12 '18 22:02

Howard Xie


1 Answers

You can utilize foreachBatch method to load the latest schema periodically, then compare it to the concrete micro batch dataframe schema. Example:

    var streamingDF = spark
      .readStream
      .option("sep", "|")
      .option("header", "true")
      .schema(buildSchema(spark, table_name).get) // buildSchema() gets schema for me
      .csv(input_base_dir + table_name + "*")


    val query =
      streamingDF
        .writeStream
        .foreachBatch((ds, i) => {
          logger.info(s"New batch indicator(${i})")
          val batchDf =
          if (ds.schema != buildSchema(spark, table_name).get) {
            spark.sqlContext.createDataFrame(ds.collectAsList(), buildSchema(spark, table_name).get)
          } else {
            ds
          }
          batchDf.write.parquet(output_base_dir + table_name + "/")
        })
        .trigger(ProcessingTime(60.seconds))
        .start()
like image 89
Gal Shaboodi Avatar answered Nov 24 '22 00:11

Gal Shaboodi