Apache beam seems to be refusing to recognise Kotlin's Iterable
. Here is a sample code:
@ProcessElement
fun processElement(
@Element input: KV<String, Iterable<String>>, receiver: OutputReceiver<String>
) {
val output = input.key + "|" + input.value.toString()
println("output: $output")
receiver.output(output)
}
I get the following weird error:
java.lang.IllegalArgumentException:
...PrintString, @ProcessElement processElement(KV, OutputReceiver), @ProcessElement processElement(KV, OutputReceiver):
@Element argument must have type org.apache.beam.sdk.values.KV<java.lang.String, java.lang.Iterable<? extends java.lang.String>>
Sure enough, if I replace Iterable
with java.lang.Iterable
, the same code works just fine. What am I doing wrong?
Version of depedencies:
1.3.21
2.11.0
Here is a gist with full codes and stack trace:
Update:
After a bit of trial and error, I found out that while List<String>
throws similar exception but MutableList<String>
actually works:
class PrintString: DoFn<KV<String, MutableList<String>>, String>() {
@ProcessElement
fun processElement(
@Element input: KV<String, MutableList<String>>, receiver: OutputReceiver<String>
) {
val output = input.key + "|" + input.value.toString()
println("output: $output")
receiver.output(output)
}
}
So, this reminded me that Kotlin's Immutable collection are actually only interface and that underlying collection is still mutable. However, attempt to replace Iterable
with MutableIterable
continue to raise the error.
Update 2:
I deployed my Kotlin Dataflow job using the MutableList
per above and job failed with:
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ClassCastException:
org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn$WindowReiterable cannot be cast to java.util.List
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
I had to switch back to use java.lang.Iterable
.
I ran into this problem as well, when using a ParDo
following a GroupByKey
. It turns out that a @JvmWildcard
annotation is needed in the Iterable
generic type when writing a transformation that accepts the result of a GroupByKey
.
See the contrived example below that reads a file and groups by the first character of each line.
class BeamPipe {
class ConcatLines : DoFn<KV<String, Iterable<@JvmWildcard String>>, KV<String, String>>() {
@ProcessElement
fun processElement(@Element input: KV<String, Iterable<@JvmWildcard String>>, receiver: OutputReceiver<KV<String, String>>) {
receiver.output(KV.of(input.key, input.value.joinToString("\n")))
}
}
fun pipe(options: PipelineOptions) {
val file =
"testFile.txt"
val p = Pipeline.create(options)
p.apply(TextIO.read().from(file))
.apply("Key lines by first character",
WithKeys.of { line: String -> line[0].toString() }
.withKeyType(TypeDescriptors.strings()))
.apply("Group lines by first character", GroupByKey.create<String, String>())
.apply("Concatenate lines", ParDo.of(ConcatLines()))
.apply("Write to files", FileIO.writeDynamic<String, KV<String, String>>()
.by { it.key }
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(ProcessFunction { it.value }), TextIO.sink())
.to("whatever")
.withNaming { key -> FileIO.Write.defaultNaming(key, ".txt") }
)
p.run()
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With