Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kotlin Iterable not supported in Apache Beam?

Apache beam seems to be refusing to recognise Kotlin's Iterable. Here is a sample code:

@ProcessElement
fun processElement(
    @Element input: KV<String, Iterable<String>>, receiver: OutputReceiver<String>
) {
    val output = input.key + "|" + input.value.toString()
    println("output: $output")
    receiver.output(output)
}

I get the following weird error:

java.lang.IllegalArgumentException:
   ...PrintString, @ProcessElement processElement(KV, OutputReceiver), @ProcessElement processElement(KV, OutputReceiver):
   @Element argument must have type org.apache.beam.sdk.values.KV<java.lang.String, java.lang.Iterable<? extends java.lang.String>>

Sure enough, if I replace Iterable with java.lang.Iterable, the same code works just fine. What am I doing wrong?

Version of depedencies:

  • kotlin-jvm: 1.3.21
  • org.apache.beam: 2.11.0

Here is a gist with full codes and stack trace:

  • https://gist.github.com/marcoslin/e1e19afdbacac9757f6974592cfd8d7f#file-apache-beam-iterable-notworking-kt

Update:

After a bit of trial and error, I found out that while List<String> throws similar exception but MutableList<String> actually works:

class PrintString: DoFn<KV<String, MutableList<String>>, String>() {
    @ProcessElement
    fun processElement(
        @Element input: KV<String, MutableList<String>>, receiver: OutputReceiver<String>
    ) {
        val output = input.key + "|" + input.value.toString()
        println("output: $output")
        receiver.output(output)
    }
}

So, this reminded me that Kotlin's Immutable collection are actually only interface and that underlying collection is still mutable. However, attempt to replace Iterable with MutableIterable continue to raise the error.

Update 2:

I deployed my Kotlin Dataflow job using the MutableList per above and job failed with:

java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ClassCastException:
org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn$WindowReiterable cannot be cast to java.util.List
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)

I had to switch back to use java.lang.Iterable.

like image 829
marcoseu Avatar asked Apr 29 '19 18:04

marcoseu


1 Answers

I ran into this problem as well, when using a ParDo following a GroupByKey. It turns out that a @JvmWildcard annotation is needed in the Iterable generic type when writing a transformation that accepts the result of a GroupByKey.

See the contrived example below that reads a file and groups by the first character of each line.

class BeamPipe {
  class ConcatLines : DoFn<KV<String, Iterable<@JvmWildcard String>>, KV<String, String>>() {
    @ProcessElement
    fun processElement(@Element input: KV<String, Iterable<@JvmWildcard String>>, receiver: OutputReceiver<KV<String, String>>) {
      receiver.output(KV.of(input.key, input.value.joinToString("\n")))
    }
  }

  fun pipe(options: PipelineOptions) {
    val file =
        "testFile.txt"
    val p = Pipeline.create(options)
    p.apply(TextIO.read().from(file))
        .apply("Key lines by first character",
            WithKeys.of { line: String -> line[0].toString() }
                .withKeyType(TypeDescriptors.strings()))
        .apply("Group lines by first character", GroupByKey.create<String, String>())
        .apply("Concatenate lines", ParDo.of(ConcatLines()))
        .apply("Write to files", FileIO.writeDynamic<String, KV<String, String>>()
            .by { it.key }
            .withDestinationCoder(StringUtf8Coder.of())
            .via(Contextful.fn(ProcessFunction { it.value }), TextIO.sink())
            .to("whatever")
            .withNaming { key -> FileIO.Write.defaultNaming(key, ".txt") }
        )
    p.run()
  }
}
like image 156
AlecBrooks Avatar answered Sep 21 '22 16:09

AlecBrooks