Lets start with a simple function which always returns a random integer:
import numpy as np
def f(x):
return np.random.randint(1000)
and a RDD filled with zeros and mapped using f
:
rdd = sc.parallelize([0] * 10).map(f)
Since above RDD is not persisted I expect I'll get a different output every time I collect:
> rdd.collect()
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255]
If we ignore the fact that distribution of values doesn't really look random it is more or less what happens. Problem starts we we when take only a first element:
assert len(set(rdd.first() for _ in xrange(100))) == 1
or
assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1
It seems to return the same number each time. I've been able to reproduce this behavior on two different machines with Spark 1.2, 1.3 and 1.4. Here I am using np.random.randint
but it behaves the same way with random.randint
.
This issue, same as non-exactly-random results with collect
, seems to be Python specific and I couldn't reproduce it using Scala:
def f(x: Int) = scala.util.Random.nextInt(1000)
val rdd = sc.parallelize(List.fill(10)(0)).map(f)
(1 to 100).map(x => rdd.first).toSet.size
rdd.collect()
Did I miss something obvious here?
Edit:
Turns out the source of the problem is Python RNG implementation. To quote official documentation:
The functions supplied by this module are actually bound methods of a hidden instance of the random.Random class. You can instantiate your own instances of Random to get generators that don’t share state.
I assume NumPy works the same way and rewriting f
using RandomState
instance as follows
import os
import binascii
def f(x, seed=None):
seed = (
seed if seed is not None
else int(binascii.hexlify(os.urandom(4)), 16))
rs = np.random.RandomState(seed)
return rs.randint(1000)
makes it slower but solves the problem.
While above explains not random results from collect I still don't understand how it affects first
/ take(1)
between multiple actions.
Generates a random column with independent and identically distributed (i.i.d.) samples uniformly distributed in [0.0, 1.0). New in version 1.4. 0.
Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.
Use a random.randint() function to get a random integer number from the inclusive range. For example, random.randint(0, 10) will return a random number from [0, 1, 2, 3, 4, 5, 6, 7, 8 ,9, 10].
PySpark is the Python API for Apache Spark, an open source, distributed computing framework and set of libraries for real-time, large-scale data processing. If you're already familiar with Python and libraries such as Pandas, then PySpark is a good language to learn to create more scalable analyses and pipelines.
For my use case, most of the solution was buried in an edit at the bottom of the question. However, there is another complication: I wanted to use the same function to generate multiple (different) random columns. It turns out that Spark has an assumption that the output of a UDF is deterministic, which means that it can skip later calls to the same function with the same inputs. For functions that return random output this is obviously not what you want.
To work around this, I generated a separate seed column for every random column that I wanted using the built-in PySpark rand
function:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
import numpy as np
@F.udf(IntegerType())
def my_rand(seed):
rs = np.random.RandomState(seed)
return rs.randint(1000)
seed_expr = (F.rand()*F.lit(4294967295).astype('double')).astype('bigint')
my_df = (
my_df
.withColumn('seed_0', seed_expr)
.withColumn('seed_1', seed_expr)
.withColumn('myrand_0', my_rand(F.col('seed_0')))
.withColumn('myrand_1', my_rand(F.col('seed_1')))
.drop('seed_0', 'seed_1')
)
I'm using the DataFrame API rather than the RDD of the original problem because that's what I'm more familiar with, but the same concepts presumably apply.
NB: apparently it is possible to disable the assumption of determinism for Scala Spark UDFs since v2.3: https://jira.apache.org/jira/browse/SPARK-20586.
This seems to be a bug (or feature) of randint
. I see the same behavior, but as soon as I change the f
, the values do indeed change. So, I'm not sure of the actual randomness of this method....I can't find any documentation, but it seems to be using some deterministic math algorithm instead of using more variable features of the running machine. Even if I go back and forth, the numbers seem to be the same upon returning to the original value...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With