Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use spark to generate huge amount of random integers?

I need lots of random numbers, one per line. The result should be something like this:

24324 24324
4234234 4234234
1310313 1310313
...

So I wrote this spark code (Sorry I'm new to Spark and scala):

import util.Random

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object RandomIntegerWriter {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: RandomIntegerWriter <num Integers> <outDir>")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("Spark RandomIntegerWriter")
    val spark = new SparkContext(conf)
    val distData = spark.parallelize(Seq.fill(args(0).toInt)(Random.nextInt))
    distData.saveAsTextFile(args(1))
    spark.stop()
  }
}

Notes: Now I just want to generate one number per line.

But it seems that when number of numbers gets larger, the program will report an error. Any idea with this piece of code?

Thank you.

like image 630
Haoliang Avatar asked Mar 16 '15 03:03

Haoliang


People also ask

How do you generate 10 random numbers in PySpark?

The randint() method to generates a whole number (integer). You can use randint(0,50) to generate a random number between 0 and 50. To generate random integers between 0 and 9, you can use the function randrange(min,max) . Change the parameters of randint() to generate a number between 1 and 10.

How do you generate a random array of integers?

In order to generate random array of integers in Java, we use the nextInt() method of the java. util. Random class. This returns the next random integer value from this random number generator sequence.

How can I show more than 20 rows in Spark?

By default Spark with Scala, Java, or with Python (PySpark), fetches only 20 rows from DataFrame show() but not all rows and the column value is truncated to 20 characters, In order to fetch/display more than 20 rows and column full value from Spark/PySpark DataFrame, you need to pass arguments to the show() method.


3 Answers

In Spark 1.4 you can use the DataFrame API to do this:

In [1]: from pyspark.sql.functions import rand, randn
In [2]: # Create a DataFrame with one int column and 10 rows.
In [3]: df = sqlContext.range(0, 10)
In [4]: df.show()
+--+
|id|
+--+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+--+

In [4]: # Generate two other columns using uniform distribution and normal distribution.
In [5]: df.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal")).show()
+--+-------------------+--------------------+
|id|            uniform|              normal|
+--+-------------------+--------------------+
| 0| 0.7224977951905031| -0.1875348803463305|
| 1| 0.2953174992603351|-0.26525647952450265|
| 2| 0.4536856090041318| -0.7195024130068081|
| 3| 0.9970412477032209|  0.5181478766595276|
| 4|0.19657711634539565|  0.7316273979766378|
| 5|0.48533720635534006| 0.07724879367590629|
| 6| 0.7369825278894753| -0.5462256961278941|
| 7| 0.5241113627472694| -0.2542275002421211|
| 8| 0.2977697066654349| -0.5752237580095868|
| 9| 0.5060159582230856|  1.0900096472044518|
+--+-------------------+--------------------+
like image 76
vmhacker Avatar answered Nov 17 '22 02:11

vmhacker


try

val distData = spark.parallelize(Seq[Int](), numPartitions)
  .mapPartitions { _ => {
    (1 to recordsPerPartition).map{_ => Random.nextInt}.iterator
  }}

It will create an empty collection in driver side, but generate many random integers in worker side. Total number of records is: numPartitions * recordsPerPartition

like image 42
cloud Avatar answered Nov 17 '22 04:11

cloud


Running on a Spark Cluster

The current version is materializing the collection of random numbers in the memory of the driver. If that collection is very large, the driver will run out of memory. Note that that version does not make use of Spark's processing capabilities as it's only using it to save the data after it's created.

Assuming we are working on a cluster, what we need to do is to distribute the work required to generate the data among the executors. One way of doing that would be transforming the original algorithm in a version that can work across the cluster by dividing the work among executors:

val numRecords:Int = ???
val partitions:Int = ???
val recordsPerPartition = numRecords / partitions // we are assuming here that numRecords is divisible by partitions, otherwise we need to compensate for the residual 

val seedRdd = sparkContext.parallelize(Seq.fill(partitions)(recordsPerPartition),partitions)
val randomNrs = seedRdd.flatMap(records => Seq.fill(records)(Random.nextInt))
randomNrs.saveAsTextFile(...)

Running on a single machine

If we don't have a cluster, and this is meant to run on a single machine, the question would be "why use Spark?". This random generator process is basically I/O bound and could be done in O(1) of memory by sequentially writing random numbers to a file.

import java.io._
def randomFileWriter(file:String, records:Long):Unit = {
    val pw = new PrintWriter(new BufferedWriter(new FileWriter(file)))
    def loop(count:Int):Unit = {
        if (count <= 0) () else {    
          pw.println(Random.nextInt)
          writeRandom(writer, count-1)
        }
    }
    loop(records)
    pw.close
}
like image 23
maasg Avatar answered Nov 17 '22 02:11

maasg