I am writing a Spark application and want to combine a set of Key-Value pairs (K, V1), (K, V2), ..., (K, Vn)
into one Key-Multivalue pair (K, [V1, V2, ..., Vn])
. I feel like I should be able to do this using the reduceByKey
function with something of the flavor:
My_KMV = My_KV.reduce(lambda a, b: a.append([b]))
The error that I get when this occurs is:
'NoneType' object has no attribue 'append'.
My keys are integers and values V1,...,Vn are tuples. My goal is to create a single pair with the key and a list of the values (tuples).
Spark reduceByKey Function In Spark, the reduceByKey function is a frequently used transformation operation that performs aggregation of data. It receives key-value pairs (K, V) as an input, aggregates the values based on the key and generates a dataset of (K, V) pairs as an output.
PySpark reduceByKey() transformation is used to merge the values of each key using an associative reduce function on PySpark RDD. It is a wider transformation as it shuffles data across multiple partitions and It operates on pair RDD (key/value pair).
Both reduceByKey and groupByKey result in wide transformations which means both triggers a shuffle operation. The key difference between reduceByKey and groupByKey is that reduceByKey does a map side combine and groupByKey does not do a map side combine.
reduceByKey is not available on a single value rdd or regular rdd but pairRDD.
Map and ReduceByKey
Input type and output type of reduce
must be the same, therefore if you want to aggregate a list, you have to map
the input to lists. Afterwards you combine the lists into one list.
Combining lists
You'll need a method to combine lists into one list. Python provides some methods to combine lists.
append
modifies the first list and will always return None
.
x = [1, 2, 3] x.append([4, 5]) # x is [1, 2, 3, [4, 5]]
extend
does the same, but unwraps lists:
x = [1, 2, 3] x.extend([4, 5]) # x is [1, 2, 3, 4, 5]
Both methods return None
, but you'll need a method that returns the combined list, therefore just use the plus sign.
x = [1, 2, 3] + [4, 5] # x is [1, 2, 3, 4, 5]
Spark
file = spark.textFile("hdfs://...") counts = file.flatMap(lambda line: line.split(" ")) \ .map(lambda actor: (actor.split(",")[0], actor)) \ # transform each value into a list .map(lambda nameTuple: (nameTuple[0], [ nameTuple[1] ])) \ # combine lists: ([1,2,3] + [4,5]) becomes [1,2,3,4,5] .reduceByKey(lambda a, b: a + b)
CombineByKey
It's also possible to solve this with combineByKey
, which is used internally to implement reduceByKey
, but it's more complex and "using one of the specialized per-key combiners in Spark can be much faster". Your use case is simple enough for the upper solution.
GroupByKey
It's also possible to solve this with groupByKey
, but it reduces parallelization and therefore could be much slower for big data sets.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With