Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Removing rows which occur less than N times

Lets say that I am having the following rdd:

a = [('a',1),('a',2),('a',3),('b',1),('b',4),('c',3)]
anRDD = sc.parallelize(a)

And I want to get the rows that they key occurs more than N times(for this example lets say more or equal than 2). And in another rdd the ones that where excluded.

What I do is the following:

threshold = 2
anRDD.persist()
grouped_counts = anRDD
                    .toDF(['letter','number'])
                    .groupBy('letter')
                    .count()

downFromLimit = grouped_counts.filter(grouped_counts['count']<threshold).select("letter").map(lambda x:x.letter).collect()  
upTheLimit = grouped_counts.filter(grouped_counts['count']>=threshold).select("letter").map(lambda x:x.letter).collect()

upData = anRDD.filter(lambda x:x[0] in upTheLimit)
downData = anRDD.filter(lambda x:x[0] in downFromLimit)
anRDD.unpersist()

It does what I want but it should something more clear,easier and more efficient than this.

If I use reduceByKey and count the length of the values would it be more beneficial?

Any other idea?

like image 434
Mpizos Dimitris Avatar asked Feb 07 '23 08:02

Mpizos Dimitris


2 Answers

You could use countByKey if the number of keys isn't too big. Also, the result could be converted to a Broadcast and finally filter the RDD.

rdd = sc.parallelize([('a',1),('a',2),('a',3),('b',1),('b',4),('c',3)])

broadcast = sc.broadcast(rdd.countByKey())
broadcast.value
# defaultdict(<type 'int'>, {'a': 3, 'c': 1, 'b': 2})

rdd.filter(lambda x: broadcast.value[x[0]] >= 2).take(10) 
# [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 4)]
like image 26
Alberto Bonsanto Avatar answered Feb 09 '23 01:02

Alberto Bonsanto


Both your approach and Alberto's are pulling all the keys back to the driver which will be a problem if you have a lot of them.

Instead of doing that I would create the aggregated DF and then join that against the original data. Then you can use partitioning on write to save out both groups in one pass.

You should keep all your data in DataFrames whenever possible instead of RDD. There are a large number of optimizations when using DataFrames which is especially applicable when using pyspark.

from pyspark.sql import functions as F    

df = anRDD.toDF(['letter','number'])

counts = df.groupBy('letter') \
           .count()

# Join to the original data
df = df.join(counts, df.letter == counts.letter)

# Add the column to partition on
updated = df.withColumn('group', F.when(df.count < 2, 'down').otherwise('up')) \
           .select('letter', 'group')

# Partitioning lets us write out both groups at once, instead of recomputing stages
updated.write.partitionBy('group').text('/some/output/path')

This will create a folder structure for you like this:

/some/output/path
     group=down
          part-0000
          ...
     group=up
          part-0000
          ...
like image 68
Ryan Widmaier Avatar answered Feb 08 '23 23:02

Ryan Widmaier