I have a file in hdfs which is distributed across the nodes in the cluster.
I'm trying to get a random sample of 10 lines from this file.
in the pyspark shell, I read the file into an RDD using:
>>> textFile = sc.textFile("/user/data/myfiles/*")
and then I want to simply take a sample... the cool thing about Spark is that there are commands like takeSample
, unfortunately I think I'm doing something wrong because the following takes a really long time:
>>> textFile.takeSample(False, 10, 12345)
so I tried creating a partition on each node, and then instructing each node to sample that partition using the following command:
>>> textFile.partitionBy(4).mapPartitions(lambda blockOfLines: blockOfLines.takeSample(False, 10, 1234)).first()
but this gives an error ValueError: too many values to unpack
:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/worker.py", line 77, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/serializers.py", line 117, in dump_stream
for obj in iterator:
File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/rdd.py", line 821, in add_shuffle_key
for (k, v) in iterator:
ValueError: too many values to unpack
How can I sample 10 lines from a large distributed data set using spark or pyspark?
Using the Spark Python API, PySpark, you will leverage parallel computation with large datasets, and get ready for high-performance machine learning. From cleaning data to creating features and implementing machine learning models, you'll execute end-to-end workflows with Spark.
This is defined by the Hadoop FileSystem that is used to read the files and applies to files from other filesystems for which Spark uses Hadoop to reed them. Any repartitioning you do. You can call repartition(N) or coalesce(N) on an RDD or a DataFrame to change the number of partitions and thus their size.
You can get Stratified sampling in PySpark without replacement by using sampleBy() method. It returns a sampling fraction for each stratum. If a stratum is not specified, it takes zero as the default. fractions – It's Dictionary type takes key and value.
Pyspark is a big data solution that is applicable for real-time streaming using Python programming language and provides a better and efficient way to do all kinds of calculations and computations.
Try using textFile.sample(false,fraction,seed)
instead. takeSample
will generally be very slow because it calls count()
on the RDD. It needs to do this because otherwise it wouldn't take evenly from each partition, basically it uses the count along with the sample size you asked for to compute the fraction and calls sample
internally. sample
is fast because it just uses a random boolean generator that returns true fraction
percent of the time and thus doesn't need to call count
.
In addition, I don't think this is happening to you but if the sample size returned is not big enough it calls sample
again which can obviously slow it down. Since you should have some idea of the size of your data I would recommend calling sample and then cutting the sample down to size yourself, since you know more about your data than spark does.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With