Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reduce a key-value pair into a key-list pair with Apache Spark

I am writing a Spark application and want to combine a set of Key-Value pairs (K, V1), (K, V2), ..., (K, Vn) into one Key-Multivalue pair (K, [V1, V2, ..., Vn]). I feel like I should be able to do this using the reduceByKey function with something of the flavor:

My_KMV = My_KV.reduce(lambda a, b: a.append([b])) 

The error that I get when this occurs is:

'NoneType' object has no attribue 'append'.

My keys are integers and values V1,...,Vn are tuples. My goal is to create a single pair with the key and a list of the values (tuples).

like image 604
TravisJ Avatar asked Nov 18 '14 19:11

TravisJ


People also ask

How do I use reduce by key in Spark?

Spark reduceByKey Function In Spark, the reduceByKey function is a frequently used transformation operation that performs aggregation of data. It receives key-value pairs (K, V) as an input, aggregates the values based on the key and generates a dataset of (K, V) pairs as an output.

What is reduce by key in PySpark?

PySpark reduceByKey() transformation is used to merge the values of each key using an associative reduce function on PySpark RDD. It is a wider transformation as it shuffles data across multiple partitions and It operates on pair RDD (key/value pair).

What is group by key and reduce by key in Spark?

Both reduceByKey and groupByKey result in wide transformations which means both triggers a shuffle operation. The key difference between reduceByKey and groupByKey is that reduceByKey does a map side combine and groupByKey does not do a map side combine.

Can we use reduceByKey in Spark DataFrame?

reduceByKey is not available on a single value rdd or regular rdd but pairRDD.


1 Answers

Map and ReduceByKey

Input type and output type of reduce must be the same, therefore if you want to aggregate a list, you have to map the input to lists. Afterwards you combine the lists into one list.

Combining lists

You'll need a method to combine lists into one list. Python provides some methods to combine lists.

append modifies the first list and will always return None.

x = [1, 2, 3] x.append([4, 5]) # x is [1, 2, 3, [4, 5]] 

extend does the same, but unwraps lists:

x = [1, 2, 3] x.extend([4, 5]) # x is [1, 2, 3, 4, 5] 

Both methods return None, but you'll need a method that returns the combined list, therefore just use the plus sign.

x = [1, 2, 3] + [4, 5] # x is [1, 2, 3, 4, 5] 

Spark

file = spark.textFile("hdfs://...") counts = file.flatMap(lambda line: line.split(" ")) \          .map(lambda actor: (actor.split(",")[0], actor)) \            # transform each value into a list          .map(lambda nameTuple: (nameTuple[0], [ nameTuple[1] ])) \           # combine lists: ([1,2,3] + [4,5]) becomes [1,2,3,4,5]          .reduceByKey(lambda a, b: a + b) 

CombineByKey

It's also possible to solve this with combineByKey, which is used internally to implement reduceByKey, but it's more complex and "using one of the specialized per-key combiners in Spark can be much faster". Your use case is simple enough for the upper solution.

GroupByKey

It's also possible to solve this with groupByKey, but it reduces parallelization and therefore could be much slower for big data sets.

like image 114
Christian Strempfer Avatar answered Sep 30 '22 15:09

Christian Strempfer