Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark: shuffle RDD

I'm trying to randomise the order of elements in an RDD. My current approach is to zip the elements with an RDD of shuffled integers, then later join by those integers.

However, pyspark falls over with only 100000000 integers. I'm using the code below.

My question is: is there a better way to either zip with the random index or otherwise shuffle?

I've tried sorting by a random key, which works, but is slow.

def random_indices(n):
    """
    return an iterable of random indices in range(0,n)
    """
    indices = range(n)
    random.shuffle(indices)
    return indices

The following happens in pyspark:

Using Python version 2.7.3 (default, Jun 22 2015 19:33:41)
SparkContext available as sc.
>>> import clean
>>> clean.sc = sc
>>> clean.random_indices(100000000)
Killed
like image 430
Marcin Avatar asked Aug 19 '15 22:08

Marcin


People also ask

What is shuffle RDD in Spark?

The Spark SQL shuffle is a mechanism for redistributing or re-partitioning data so that the data is grouped differently across partitions, based on your data size you may need to reduce or increase the number of partitions of RDD/DataFrame using spark.

How do you shuffle data in PySpark?

shuffle() is used to shuffle the values in an array for all rows in the array type column of the pyspark DataFrame. It will return a new array with shuffled values. It takes the array type column name as a parameter. Please note that it shuffles randomly.

Does Spark shuffle write to disk?

Spark gathers the required data from each partition and combines it into a new partition. During a shuffle, data is written to disk and transferred across the network.


1 Answers

One possible approach is to add random keys using mapParitions

import os
import numpy as np

swap = lambda x: (x[1], x[0])

def add_random_key(it):
    # make sure we get a proper random seed
    seed = int(os.urandom(4).encode('hex'), 16) 
    # create separate generator
    rs = np.random.RandomState(seed)
    # Could be randint if you prefer integers
    return ((rs.rand(), swap(x)) for x in it)

rdd_with_keys = (rdd
  # It will be used as final key. If you don't accept gaps 
  # use zipWithIndex but this should be cheaper 
  .zipWithUniqueId()
  .mapPartitions(add_random_key, preservesPartitioning=True))

Next you can repartition, sort each partition and extract values:

n = rdd.getNumPartitions()
(rdd_with_keys
    # partition by random key to put data on random partition 
    .partitionBy(n)
    # Sort partition by random value to ensure random order on partition
    .mapPartitions(sorted, preservesPartitioning=True)
    # Extract (unique_id, value) pairs
    .values())

If sorting per partition is still to slow it could be replaced by Fisher–Yates shuffle.

If you simply need a random data then you can use mllib.RandomRDDs

from pyspark.mllib.random import RandomRDDs

RandomRDDs.uniformRDD(sc, n)

Theoretically it could be zipped with input rdd but it would require matching the number of elements per partition.

like image 124
zero323 Avatar answered Oct 22 '22 10:10

zero323