Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Overloaded method foreachBatch with alternatives

I am trying to serialize a json file to parquet format. I have this error :

Error:(34, 25) overloaded method foreachBatch with alternatives: (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] cannot be applied to ((org.apache.spark.sql.DataFrame, scala.Long) => org.apache.spark.sql.DataFrame) askDF.writeStream.foreachBatch { (askDF: DataFrame, batchId: Long) =>

And here is my code :

package fr.fdj
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}


object serialize {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("serialize")
    .getOrCreate()


  def main(args : Array[String]) {

  spark.sparkContext.setLogLevel("ERROR")

  //schema definition
  val mySchema = StructType(Array(
    StructField("Species", StringType, true),
    StructField("Race", StringType, true),
    StructField("Color", StringType, true),
    StructField("Age", IntegerType, true)
  ))

  val askDF = spark
  .readStream
  .format("json")
  .option("header", "true")
  .schema(mySchema)
  .load("/src/main/scala/file.json")

  askDF.writeStream.foreachBatch { (askDF: DataFrame, batchId: Long) =>
    askDF.persist()
    askDF.write.parquet("/src/main/scala/file.json")
    askDF.unpersist()
  }.start().awaitTermination()


  }
}
like image 680
roxannefdj Avatar asked Jul 28 '20 15:07

roxannefdj


Video Answer


2 Answers

I suppose you are using Scala 2.12.

Due to some changes in Scala 2.12, the method DataStreamWriter.foreachBatch requires some updates on the code, otherwise this ambiguity happens.

You can check Both foreachBatch methods here: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/DataStreamWriter.html

I guess you could user scala 2.11 instead, or check the link, where the issue has been addressed: https://docs.databricks.com/release-notes/runtime/7.0.html

In your code, you could try this:

def myFunc( askDF:DataFrame, batchID:Long ) : Unit = {
    askDF.persist()
    askDF.write.parquet("/src/main/scala/file.json")
    askDF.unpersist()
}
askDF.writeStream.foreachBatch(myFunc _).start().awaitTermination()
like image 145
Osvaldo Correia Avatar answered Oct 18 '22 23:10

Osvaldo Correia


Return unit from function in foreach block by adding () at the end. Adding () at the last of inline function changes the method signature and hence the ambiguity in function is resolved

 askDF.writeStream.foreachBatch { (askDF: DataFrame, batchId: Long) =>
    askDF.persist()
    askDF.write.parquet("/src/main/scala/file.json")
    askDF.unpersist()
    ()
  }.start().awaitTermination()
like image 27
chandan Kumar Avatar answered Oct 19 '22 00:10

chandan Kumar