Lets say that I am having the following rdd:
a = [('a',1),('a',2),('a',3),('b',1),('b',4),('c',3)]
anRDD = sc.parallelize(a)
And I want to get the rows that they key occurs more than N times(for this example lets say more or equal than 2). And in another rdd the ones that where excluded.
What I do is the following:
threshold = 2
anRDD.persist()
grouped_counts = anRDD
.toDF(['letter','number'])
.groupBy('letter')
.count()
downFromLimit = grouped_counts.filter(grouped_counts['count']<threshold).select("letter").map(lambda x:x.letter).collect()
upTheLimit = grouped_counts.filter(grouped_counts['count']>=threshold).select("letter").map(lambda x:x.letter).collect()
upData = anRDD.filter(lambda x:x[0] in upTheLimit)
downData = anRDD.filter(lambda x:x[0] in downFromLimit)
anRDD.unpersist()
It does what I want but it should something more clear,easier and more efficient than this.
If I use reduceByKey
and count the length of the values would it be more beneficial?
Any other idea?
You could use countByKey if the number of keys isn't too big. Also, the result could be converted to a Broadcast and finally filter the RDD.
rdd = sc.parallelize([('a',1),('a',2),('a',3),('b',1),('b',4),('c',3)])
broadcast = sc.broadcast(rdd.countByKey())
broadcast.value
# defaultdict(<type 'int'>, {'a': 3, 'c': 1, 'b': 2})
rdd.filter(lambda x: broadcast.value[x[0]] >= 2).take(10)
# [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 4)]
Both your approach and Alberto's are pulling all the keys back to the driver which will be a problem if you have a lot of them.
Instead of doing that I would create the aggregated DF and then join that against the original data. Then you can use partitioning on write to save out both groups in one pass.
You should keep all your data in DataFrames whenever possible instead of RDD. There are a large number of optimizations when using DataFrames which is especially applicable when using pyspark.
from pyspark.sql import functions as F
df = anRDD.toDF(['letter','number'])
counts = df.groupBy('letter') \
.count()
# Join to the original data
df = df.join(counts, df.letter == counts.letter)
# Add the column to partition on
updated = df.withColumn('group', F.when(df.count < 2, 'down').otherwise('up')) \
.select('letter', 'group')
# Partitioning lets us write out both groups at once, instead of recomputing stages
updated.write.partitionBy('group').text('/some/output/path')
This will create a folder structure for you like this:
/some/output/path
group=down
part-0000
...
group=up
part-0000
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With