Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can't zip RDDs with unequal numbers of partitions

Now I have 3 RDDs like this:

rdd1:

1 2
3 4
5 6
7 8
9 10

rdd2:

11 12
13 14

rdd3:

15 16
17 18
19 20

and I want to do this:

rdd1.zip(rdd2.union(rdd3))

and I want the result is like this:

1 2 11 12
3 4 13 14
5 6 15 16
7 8 17 18
9 10 19 20

but I have an exception like this:

Exception in thread "main" java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions

someone tell me I can do this without exception:

rdd1.zip(rdd2.union(rdd3).repartition(1))

But it seems like it is a little cost. So I want to know if there is other ways to solve this problem.

like image 912
赵祥宇 Avatar asked Mar 17 '23 07:03

赵祥宇


1 Answers

I'm not sure what you mean by "cost", but you're right to suspect that repartition(1) is not the right solution. It will repartition the RDD to a single partition.

  • If your data does not fit on a single machine, this will fail.
  • It only works if rdd1 has a single partition. When you have more data this will probably no longer hold.
  • repartition performs a shuffle, so your data can end up ordered differently.

I think the right solution is to give up on using zip, because you likely cannot ensure that the partitioning will match up. Create a key and use join instead:

val indexedRDD1 = rdd1.zipWithIndex.map { case (v, i) => i -> v }
val indexedRDD2 = rdd2.zipWithIndex.map { case (v, i) => i -> v }
val offset = rdd2.count
val indexedRDD3 = rdd3.zipWithIndex.map { case (v, i) => (i + offset) -> v }
val combined =
  indexedRDD1.leftOuterJoin(indexedRDD2).leftOuterJoin(indexedRDD3).map {
    case (i, ((v1, v2Opt), v3Opt)) => i -> (v1, v2Opt.getOrElse(v3Opt.get))
  }

This will work no matter the partitioning. If you like, you can sort the result and remove the index at the end:

val unindexed = combined.sortByKey().values
like image 136
Daniel Darabos Avatar answered Mar 19 '23 13:03

Daniel Darabos