Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Random numbers generation in PySpark

Lets start with a simple function which always returns a random integer:

import numpy as np

def f(x):
    return np.random.randint(1000)

and a RDD filled with zeros and mapped using f:

rdd = sc.parallelize([0] * 10).map(f)

Since above RDD is not persisted I expect I'll get a different output every time I collect:

> rdd.collect()
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255]

If we ignore the fact that distribution of values doesn't really look random it is more or less what happens. Problem starts we we when take only a first element:

assert len(set(rdd.first() for _ in xrange(100))) == 1

or

assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1

It seems to return the same number each time. I've been able to reproduce this behavior on two different machines with Spark 1.2, 1.3 and 1.4. Here I am using np.random.randint but it behaves the same way with random.randint.

This issue, same as non-exactly-random results with collect, seems to be Python specific and I couldn't reproduce it using Scala:

def f(x: Int) = scala.util.Random.nextInt(1000)

val rdd = sc.parallelize(List.fill(10)(0)).map(f)
(1 to 100).map(x => rdd.first).toSet.size

rdd.collect()

Did I miss something obvious here?

Edit:

Turns out the source of the problem is Python RNG implementation. To quote official documentation:

The functions supplied by this module are actually bound methods of a hidden instance of the random.Random class. You can instantiate your own instances of Random to get generators that don’t share state.

I assume NumPy works the same way and rewriting f using RandomState instance as follows

import os
import binascii

def f(x, seed=None):
    seed = (
        seed if seed is not None 
        else int(binascii.hexlify(os.urandom(4)), 16))
    rs = np.random.RandomState(seed)
    return rs.randint(1000)

makes it slower but solves the problem.

While above explains not random results from collect I still don't understand how it affects first / take(1) between multiple actions.

like image 313
zero323 Avatar asked Aug 09 '15 01:08

zero323


People also ask

What is Rand in PySpark?

Generates a random column with independent and identically distributed (i.i.d.) samples uniformly distributed in [0.0, 1.0). New in version 1.4. 0.

What does .collect do in PySpark?

Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.

How do you use random Randint?

Use a random.randint() function to get a random integer number from the inclusive range. For example, random.randint(0, 10) will return a random number from [0, 1, 2, 3, 4, 5, 6, 7, 8 ,9, 10].

What is PySpark?

PySpark is the Python API for Apache Spark, an open source, distributed computing framework and set of libraries for real-time, large-scale data processing. If you're already familiar with Python and libraries such as Pandas, then PySpark is a good language to learn to create more scalable analyses and pipelines.


2 Answers

For my use case, most of the solution was buried in an edit at the bottom of the question. However, there is another complication: I wanted to use the same function to generate multiple (different) random columns. It turns out that Spark has an assumption that the output of a UDF is deterministic, which means that it can skip later calls to the same function with the same inputs. For functions that return random output this is obviously not what you want.

To work around this, I generated a separate seed column for every random column that I wanted using the built-in PySpark rand function:

import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
import numpy as np

@F.udf(IntegerType())
def my_rand(seed):
    rs = np.random.RandomState(seed)
    return rs.randint(1000)

seed_expr = (F.rand()*F.lit(4294967295).astype('double')).astype('bigint')
my_df = (
    my_df
    .withColumn('seed_0', seed_expr)
    .withColumn('seed_1', seed_expr)
    .withColumn('myrand_0', my_rand(F.col('seed_0')))
    .withColumn('myrand_1', my_rand(F.col('seed_1')))
    .drop('seed_0', 'seed_1')
)

I'm using the DataFrame API rather than the RDD of the original problem because that's what I'm more familiar with, but the same concepts presumably apply.

NB: apparently it is possible to disable the assumption of determinism for Scala Spark UDFs since v2.3: https://jira.apache.org/jira/browse/SPARK-20586.

like image 164
abeboparebop Avatar answered Oct 03 '22 12:10

abeboparebop


This seems to be a bug (or feature) of randint. I see the same behavior, but as soon as I change the f, the values do indeed change. So, I'm not sure of the actual randomness of this method....I can't find any documentation, but it seems to be using some deterministic math algorithm instead of using more variable features of the running machine. Even if I go back and forth, the numbers seem to be the same upon returning to the original value...

like image 39
Justin Pihony Avatar answered Oct 03 '22 12:10

Justin Pihony