Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is Spark zipWithIndex safe with parallel implementation?

If I have a file, and I did an RDD zipWithIndex per row,

([row1, id1001, name, address], 0)
([row2, id1001, name, address], 1)
...
([row100000, id1001, name, address], 100000)

Will I be able to get the same index order if I reload the file? Since it runs in parallel, other rows may be partitioned differently?

like image 282
sophie Avatar asked Aug 06 '15 03:08

sophie


1 Answers

RDDs can be sorted, and so do have an order. This order is used to create the index with .zipWithIndex().

To get the same order each time depends upon what previous calls are doing in your program. The docs mention that .groupBy() can destroy order or generate different orderings. There may be other calls that do this as well.

I suppose you could always call .sortBy() before calling .zipWithIndex() if you needed to guarantee a specific ordering.

This is explained in the .zipWithIndex() scala API docs

public RDD<scala.Tuple2<T,Object>> zipWithIndex() Zips this RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index. This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type. This method needs to trigger a spark job when this RDD contains more than one partitions.

Note that some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition. The index assigned to each element is therefore not guaranteed, and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee the same index assignments, you should sort the RDD with sortByKey() or save it to a file.

like image 183
Paul Avatar answered Sep 21 '22 17:09

Paul