Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala compiler failed to infer type inside Spark lambda function

Suppose that I have this Spark code written in Scala 2.12

    val dataset = spark.emptyDataset[String]

    dataset.foreachPartition( partition => partition.foreach {
      entry: String => println(entry)
    })

When I run the code, the compiler gave this error


[info] Compiling 1 Scala source to <path>/scala-2.12/classes ...
[error] Code.scala:11:52: value foreach is not a member of Object
[error]     empty.foreachPartition( partition => partition.foreach{
[error]                                                    ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 1 s, completed Jul 11, 2020 1:43:41 AM


Why did the compiler partition as an Object instead of Iterator[String]?

I have to manually add the partition type in order for the code to works.

    val dataset = spark.emptyDataset[String]

    dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach {
      entry: String => println(entry)
    })

like image 527
IllSc Avatar asked Apr 25 '26 22:04

IllSc


2 Answers

This is because of two overloaded versions of foreachPartition and Java-Scala interop.

If the code were only in Scala (this is minimal code and independent of Spark)

val dataset: Dataset[String] = ???

dataset.foreachPartition(partition => ???)

class Dataset[T] {
  def foreachPartition(f: Iterator[T] => Unit): Unit = ???
  def foreachPartition(func: ForeachPartitionFunction[T]): Unit = ???
}

trait ForeachPartitionFunction[T] extends Serializable {
  def call(t: Iterator[T]): Unit
}

then the type of partition would be inferred (as scala.collection.Iterator[String]).

But in actual Spark code ForeachPartitionFunction is Java interface whose method call accepts java.util.Iterator[String].

So both options

dataset.foreachPartition((
  (partition: scala.collection.Iterator[String]) => ??? 
): Iterator[String] => Unit)

dataset.foreachPartition((
  (partition: java.util.Iterator[String]) => ??? 
): ForeachPartitionFunction[String])

are eligible and compiler can't infer the type of partition.

And inference in Scala is local so after compiler can see partition => partition.foreach... (and java.util.Iterator[String] doesn't have method foreach) it's too late to come back to typing partition.

like image 159
Dmytro Mitin Avatar answered Apr 27 '26 13:04

Dmytro Mitin


just like @Dmytro said, scala compiler can't infer which overload function it should apply. however there is a simple workaround you can use, by using this helper function:

  def helper[I](f: I => Unit): I => Unit = f

now all you need to do is:

dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach {
      helper[String](entry => println(entry))
    })
like image 35
linehrr Avatar answered Apr 27 '26 13:04

linehrr



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!