Suppose that I have this Spark code written in Scala 2.12
val dataset = spark.emptyDataset[String]
dataset.foreachPartition( partition => partition.foreach {
entry: String => println(entry)
})
When I run the code, the compiler gave this error
[info] Compiling 1 Scala source to <path>/scala-2.12/classes ...
[error] Code.scala:11:52: value foreach is not a member of Object
[error] empty.foreachPartition( partition => partition.foreach{
[error] ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 1 s, completed Jul 11, 2020 1:43:41 AM
Why did the compiler partition as an Object instead of Iterator[String]?
I have to manually add the partition type in order for the code to works.
val dataset = spark.emptyDataset[String]
dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach {
entry: String => println(entry)
})
This is because of two overloaded versions of foreachPartition and Java-Scala interop.
If the code were only in Scala (this is minimal code and independent of Spark)
val dataset: Dataset[String] = ???
dataset.foreachPartition(partition => ???)
class Dataset[T] {
def foreachPartition(f: Iterator[T] => Unit): Unit = ???
def foreachPartition(func: ForeachPartitionFunction[T]): Unit = ???
}
trait ForeachPartitionFunction[T] extends Serializable {
def call(t: Iterator[T]): Unit
}
then the type of partition would be inferred (as scala.collection.Iterator[String]).
But in actual Spark code ForeachPartitionFunction is Java interface whose method call accepts java.util.Iterator[String].
So both options
dataset.foreachPartition((
(partition: scala.collection.Iterator[String]) => ???
): Iterator[String] => Unit)
dataset.foreachPartition((
(partition: java.util.Iterator[String]) => ???
): ForeachPartitionFunction[String])
are eligible and compiler can't infer the type of partition.
And inference in Scala is local so after compiler can see partition => partition.foreach... (and java.util.Iterator[String] doesn't have method foreach) it's too late to come back to typing partition.
just like @Dmytro said, scala compiler can't infer which overload function it should apply. however there is a simple workaround you can use, by using this helper function:
def helper[I](f: I => Unit): I => Unit = f
now all you need to do is:
dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach {
helper[String](entry => println(entry))
})
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With