Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Count number of elements in each pyspark RDD partition

I'm looking for the Pyspark equivalent to this question: How to get the number of elements in partition?.

Specifically, I want to programmatically count the number of elements in each partition of a pyspark RDD or dataframe (I know this information is available in the Spark Web UI).

This attempt:

df.foreachPartition(lambda iter: sum(1 for _ in iter))

results in:

AttributeError: 'NoneType' object has no attribute '_jvm'

I do not want to collect the contents of the iterator into memory.

like image 419
Matt Frei Avatar asked Aug 12 '16 19:08

Matt Frei


People also ask

How do I get partition count in Spark?

PySpark (Spark with Python) Similarly, in PySpark you can get the current length/size of partitions by running getNumPartitions() of RDD class, so to use with DataFrame first you need to convert to RDD.

How do I know how many partitions my RDD has?

Finding the number of partitions Simply turn the DataFrame to rdd and call partitions followed by size to get the number of partitions. We would see the number of partitions as 200.

How many tasks does Spark run on each partition?

Spark assigns one task per partition and each worker can process one task at a time.


1 Answers

If you are asking: can we get the number of elements in an iterator without iterating through it? The answer is No.

But we don't have to store it in memory, as in the post you mentioned:

def count_in_a_partition(idx, iterator):
  count = 0
  for _ in iterator:
    count += 1
  return idx, count

data = sc.parallelize([
    1, 2, 3, 4
], 4)

data.mapPartitionsWithIndex(count_in_a_partition).collect()

EDIT

Note that your code is very close to the solution, just that mapPartitions needs to return an iterator:

def count_in_a_partition(iterator):
  yield sum(1 for _ in iterator)

data.mapPartitions(count_in_a_partition).collect()
like image 185
shuaiyuancn Avatar answered Oct 13 '22 03:10

shuaiyuancn