Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark - Random Number Generation

I have written a method that must consider a random number to simulate a Bernoulli distribution. I am using random.nextDouble to generate a number between 0 and 1 then making my decision based on that value given my probability parameter.

My problem is that Spark is generating the same random numbers within each iteration of my for loop mapping function. I am using the DataFrame API. My code follows this format:

val myClass = new MyClass()
val M = 3
val myAppSeed = 91234
val rand = new scala.util.Random(myAppSeed)

for (m <- 1 to M) {
  val newDF = sqlContext.createDataFrame(myDF
    .map{row => RowFactory
      .create(row.getString(0),
        myClass.myMethod(row.getString(2), rand.nextDouble())
    }, myDF.schema)
}

Here is the class:

class myClass extends Serializable {
  val q = qProb

  def myMethod(s: String, rand: Double) = {
    if (rand <= q) // do something
    else // do something else
  }
}

I need a new random number every time myMethod is called. I also tried generating the number inside my method with java.util.Random (scala.util.Random v10 does not extend Serializable) like below, but I'm still getting the same numbers within each for loop

val r = new java.util.Random(s.hashCode.toLong)
val rand = r.nextDouble()

I've done some research, and it seems this has do to with Sparks deterministic nature.

like image 999
Brian Avatar asked Apr 06 '16 15:04

Brian


People also ask

How do you generate a random number in Pyspark?

To generate random number in Python, randint() function is used. This function is defined in random module.

How do I generate a random number in Scala?

To generate the Random numbers over Scala we use the Random function with the Scala. util. Random class. The Random number generated can be anything be it Integer, Float, Double, char.

How does spark SQL work?

Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine. It enables unmodified Hadoop Hive queries to run up to 100x faster on existing deployments and data.


2 Answers

Just use the SQL function rand:

import org.apache.spark.sql.functions._

//df: org.apache.spark.sql.DataFrame = [key: int]

df.select($"key", rand() as "rand").show
+---+-------------------+
|key|               rand|
+---+-------------------+
|  1| 0.8635073400704648|
|  2| 0.6870153659986652|
|  3|0.18998048357873532|
+---+-------------------+


df.select($"key", rand() as "rand").show
+---+------------------+
|key|              rand|
+---+------------------+
|  1|0.3422484248879837|
|  2|0.2301384925817671|
|  3|0.6959421970071372|
+---+------------------+
like image 104
David Griffin Avatar answered Sep 21 '22 17:09

David Griffin


According to this post, the best solution is not to put the new scala.util.Random inside the map, nor completely outside (ie. in the driver code), but in an intermediate mapPartitionsWithIndex:

import scala.util.Random
val myAppSeed = 91234
val newRDD = myRDD.mapPartitionsWithIndex { (indx, iter) =>
   val rand = new scala.util.Random(indx+myAppSeed)
   iter.map(x => (x, Array.fill(10)(rand.nextDouble)))
}
like image 43
leo9r Avatar answered Sep 17 '22 17:09

leo9r