I need lots of random numbers, one per line. The result should be something like this:
24324 24324
4234234 4234234
1310313 1310313
...
So I wrote this spark code (Sorry I'm new to Spark and scala):
import util.Random
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object RandomIntegerWriter {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: RandomIntegerWriter <num Integers> <outDir>")
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark RandomIntegerWriter")
val spark = new SparkContext(conf)
val distData = spark.parallelize(Seq.fill(args(0).toInt)(Random.nextInt))
distData.saveAsTextFile(args(1))
spark.stop()
}
}
Notes: Now I just want to generate one number per line.
But it seems that when number of numbers gets larger, the program will report an error. Any idea with this piece of code?
Thank you.
The randint() method to generates a whole number (integer). You can use randint(0,50) to generate a random number between 0 and 50. To generate random integers between 0 and 9, you can use the function randrange(min,max) . Change the parameters of randint() to generate a number between 1 and 10.
In order to generate random array of integers in Java, we use the nextInt() method of the java. util. Random class. This returns the next random integer value from this random number generator sequence.
By default Spark with Scala, Java, or with Python (PySpark), fetches only 20 rows from DataFrame show() but not all rows and the column value is truncated to 20 characters, In order to fetch/display more than 20 rows and column full value from Spark/PySpark DataFrame, you need to pass arguments to the show() method.
In Spark 1.4 you can use the DataFrame API to do this:
In [1]: from pyspark.sql.functions import rand, randn
In [2]: # Create a DataFrame with one int column and 10 rows.
In [3]: df = sqlContext.range(0, 10)
In [4]: df.show()
+--+
|id|
+--+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+--+
In [4]: # Generate two other columns using uniform distribution and normal distribution.
In [5]: df.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal")).show()
+--+-------------------+--------------------+
|id| uniform| normal|
+--+-------------------+--------------------+
| 0| 0.7224977951905031| -0.1875348803463305|
| 1| 0.2953174992603351|-0.26525647952450265|
| 2| 0.4536856090041318| -0.7195024130068081|
| 3| 0.9970412477032209| 0.5181478766595276|
| 4|0.19657711634539565| 0.7316273979766378|
| 5|0.48533720635534006| 0.07724879367590629|
| 6| 0.7369825278894753| -0.5462256961278941|
| 7| 0.5241113627472694| -0.2542275002421211|
| 8| 0.2977697066654349| -0.5752237580095868|
| 9| 0.5060159582230856| 1.0900096472044518|
+--+-------------------+--------------------+
try
val distData = spark.parallelize(Seq[Int](), numPartitions)
.mapPartitions { _ => {
(1 to recordsPerPartition).map{_ => Random.nextInt}.iterator
}}
It will create an empty collection in driver side, but generate many random integers in worker side. Total number of records is: numPartitions * recordsPerPartition
The current version is materializing the collection of random numbers in the memory of the driver. If that collection is very large, the driver will run out of memory. Note that that version does not make use of Spark's processing capabilities as it's only using it to save the data after it's created.
Assuming we are working on a cluster, what we need to do is to distribute the work required to generate the data among the executors. One way of doing that would be transforming the original algorithm in a version that can work across the cluster by dividing the work among executors:
val numRecords:Int = ???
val partitions:Int = ???
val recordsPerPartition = numRecords / partitions // we are assuming here that numRecords is divisible by partitions, otherwise we need to compensate for the residual
val seedRdd = sparkContext.parallelize(Seq.fill(partitions)(recordsPerPartition),partitions)
val randomNrs = seedRdd.flatMap(records => Seq.fill(records)(Random.nextInt))
randomNrs.saveAsTextFile(...)
If we don't have a cluster, and this is meant to run on a single machine, the question would be "why use Spark?". This random generator process is basically I/O bound and could be done in O(1) of memory by sequentially writing random numbers to a file.
import java.io._
def randomFileWriter(file:String, records:Long):Unit = {
val pw = new PrintWriter(new BufferedWriter(new FileWriter(file)))
def loop(count:Int):Unit = {
if (count <= 0) () else {
pw.println(Random.nextInt)
writeRandom(writer, count-1)
}
}
loop(records)
pw.close
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With