Now I have 3 RDDs like this:
rdd1:
1 2
3 4
5 6
7 8
9 10
rdd2:
11 12
13 14
rdd3:
15 16
17 18
19 20
and I want to do this:
rdd1.zip(rdd2.union(rdd3))
and I want the result is like this:
1 2 11 12
3 4 13 14
5 6 15 16
7 8 17 18
9 10 19 20
but I have an exception like this:
Exception in thread "main" java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
someone tell me I can do this without exception:
rdd1.zip(rdd2.union(rdd3).repartition(1))
But it seems like it is a little cost. So I want to know if there is other ways to solve this problem.
I'm not sure what you mean by "cost", but you're right to suspect that repartition(1)
is not the right solution. It will repartition the RDD to a single partition.
rdd1
has a single partition. When you have more data this will probably no longer hold.repartition
performs a shuffle, so your data can end up ordered differently.I think the right solution is to give up on using zip
, because you likely cannot ensure that the partitioning will match up. Create a key and use join
instead:
val indexedRDD1 = rdd1.zipWithIndex.map { case (v, i) => i -> v }
val indexedRDD2 = rdd2.zipWithIndex.map { case (v, i) => i -> v }
val offset = rdd2.count
val indexedRDD3 = rdd3.zipWithIndex.map { case (v, i) => (i + offset) -> v }
val combined =
indexedRDD1.leftOuterJoin(indexedRDD2).leftOuterJoin(indexedRDD3).map {
case (i, ((v1, v2Opt), v3Opt)) => i -> (v1, v2Opt.getOrElse(v3Opt.get))
}
This will work no matter the partitioning. If you like, you can sort the result and remove the index at the end:
val unindexed = combined.sortByKey().values
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With