Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sampling a large distributed data set using pyspark / spark

I have a file in hdfs which is distributed across the nodes in the cluster.

I'm trying to get a random sample of 10 lines from this file.

in the pyspark shell, I read the file into an RDD using:

>>> textFile = sc.textFile("/user/data/myfiles/*")

and then I want to simply take a sample... the cool thing about Spark is that there are commands like takeSample, unfortunately I think I'm doing something wrong because the following takes a really long time:

>>> textFile.takeSample(False, 10, 12345)

so I tried creating a partition on each node, and then instructing each node to sample that partition using the following command:

>>> textFile.partitionBy(4).mapPartitions(lambda blockOfLines: blockOfLines.takeSample(False, 10, 1234)).first()

but this gives an error ValueError: too many values to unpack :

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/serializers.py", line 117, in dump_stream
    for obj in iterator:
  File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/rdd.py", line 821, in add_shuffle_key
    for (k, v) in iterator:
ValueError: too many values to unpack

How can I sample 10 lines from a large distributed data set using spark or pyspark?

like image 896
mgoldwasser Avatar asked Jul 17 '14 14:07

mgoldwasser


People also ask

Can PySpark handle big data?

Using the Spark Python API, PySpark, you will leverage parallel computation with large datasets, and get ready for high-performance machine learning. From cleaning data to creating features and implementing machine learning models, you'll execute end-to-end workflows with Spark.

How do you process terabytes of data in Spark?

This is defined by the Hadoop FileSystem that is used to read the files and applies to files from other filesystems for which Spark uses Hadoop to reed them. Any repartitioning you do. You can call repartition(N) or coalesce(N) on an RDD or a DataFrame to change the number of partitions and thus their size.

How do you take a sample of data in PySpark?

You can get Stratified sampling in PySpark without replacement by using sampleBy() method. It returns a sampling fraction for each stratum. If a stratum is not specified, it takes zero as the default. fractions – It's Dictionary type takes key and value.

Is PySpark good for data science?

Pyspark is a big data solution that is applicable for real-time streaming using Python programming language and provides a better and efficient way to do all kinds of calculations and computations.


1 Answers

Try using textFile.sample(false,fraction,seed) instead. takeSample will generally be very slow because it calls count() on the RDD. It needs to do this because otherwise it wouldn't take evenly from each partition, basically it uses the count along with the sample size you asked for to compute the fraction and calls sample internally. sample is fast because it just uses a random boolean generator that returns true fraction percent of the time and thus doesn't need to call count.

In addition, I don't think this is happening to you but if the sample size returned is not big enough it calls sample again which can obviously slow it down. Since you should have some idea of the size of your data I would recommend calling sample and then cutting the sample down to size yourself, since you know more about your data than spark does.

like image 154
aaronman Avatar answered Sep 16 '22 19:09

aaronman