So my input data has two fields/columns: id1 & id2, and my code is the following:
TextLine(args("input"))
.read
.mapTo('line->('id1,'id2)) {line: String =>
val fields = line.split("\t")
(fields(0),fields(1))
}
.groupBy('id2){.size}
.write(Tsv(args("output")))
The output results in (what i assume) two fields: id2 * size. I'm a little stuck on finding out if it is possible to retain the id1 value that was also grouped with id2 and add it as another field?
You can't do this in a nice way I'm afraid. Think about how it works under the hood - it splits the data to be counted into chunks and sends it off to different processes, each process counts it's chunk, then a single reducer adds them all up at the end. While each process is counting it doesn't know the entire size so it can't add the field on. The only way is to go back and add it to the data once the entire size is known (i.e. a join).
If each group fits in memory (and you can configure the memory), you can:
Tsv(args("input"), ('id1, 'id2))
.groupBy('id2)(_.size.toList[(String, String)](('id1, 'id2) -> 'list))
.flatMapTo[(Iterable[(String, String)], Int), (String, String, Int)](('list, 'size) -> ('id1, 'id2, 'size)) {
case (list, size) => list.map(record => (record._1, record._2, size))
}
.write(Tsv(args("output")))
But if your system doesn't have enough memory, you will have to use an expensive join.
Remark: You can use Tsv instead of TextLine followed by mapTo and splitting.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With