We need to efficiently convert large lists of key/value pairs, like this:
val providedData = List(
(new Key("1"), new Val("one")),
(new Key("1"), new Val("un")),
(new Key("1"), new Val("ein")),
(new Key("2"), new Val("two")),
(new Key("2"), new Val("deux")),
(new Key("2"), new Val("zwei"))
)
into lists of values per key, like this:
val expectedData = List(
(new Key("1"), List(
new Val("one"),
new Val("un"),
new Val("ein"))),
(new Key("2"), List(
new Val("two"),
new Val("deux"),
new Val("zwei")))
)
The key value pairs are from a large key/value store (Accumulo), so the keys will be sorted, but will usually cross spark partition boundaries. There can be millions of keys and hundreds of values per key.
I think the right tool for this job is spark's combineByKey operation, but have been only able to find terse examples with generic types (like Int), that I've been unable to generalize to user-defined types such as above.
Since I suspect many others will have the same question, I'm hoping someone can provide both fully-specified (verbose) and terse examples of the scala syntax for using combineByKey with user-defined types as above, or possibly point out a better tool that I've missed.
I'm not really a Spark expert, but based on this question, I think you can do the following:
val rdd = sc.parallelize(providedData)
rdd.combineByKey(
// createCombiner: add first value to a list
(x: Val) => List(x),
// mergeValue: add new value to existing list
(acc: List[Val], x) => x :: acc,
// mergeCominber: combine the 2 lists
(acc1: List[Val], acc2: List[Val]) => acc1 ::: acc2
)
Using aggregateByKey
:
rdd.aggregateByKey(List[Val]())(
(acc, x) => x :: acc,
(acc1, acc2) => acc1 ::: acc2
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With