Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark union of multiple RDDs

In my pig code I do this:

all_combined = Union relation1, relation2,      relation3, relation4, relation5, relation 6. 

I want to do the same with spark. However, unfortunately, I see that I have to keep doing it pairwise:

first = rdd1.union(rdd2) second = first.union(rdd3) third = second.union(rdd4) # .... and so on 

Is there a union operator that will let me operate on multiple rdds at a time:

e.g. union(rdd1, rdd2,rdd3, rdd4, rdd5, rdd6)

It is a matter on convenience.

like image 818
user3803714 Avatar asked Nov 16 '15 20:11

user3803714


People also ask

How do you perform a union of multiple DataFrames in PySpark?

Method 2: UnionByName() function in pyspark The PySpark unionByName() function is also used to combine two or more data frames but it might be used to combine dataframes having different schema. This is because it combines data frames by the name of the column and not the order of the columns.

What is difference between union and union all in Spark?

UNION and UNION ALL return the rows that are found in either relation. UNION (alternatively, UNION DISTINCT ) takes only distinct rows while UNION ALL does not remove duplicates from the result rows.


1 Answers

If these are RDDs you can use SparkContext.union method:

rdd1 = sc.parallelize([1, 2, 3]) rdd2 = sc.parallelize([4, 5, 6]) rdd3 = sc.parallelize([7, 8, 9])  rdd = sc.union([rdd1, rdd2, rdd3]) rdd.collect()  ## [1, 2, 3, 4, 5, 6, 7, 8, 9] 

There is no DataFrame equivalent but it is just a matter of a simple one-liner:

from functools import reduce  # For Python 3.x from pyspark.sql import DataFrame  def unionAll(*dfs):     return reduce(DataFrame.unionAll, dfs)  df1 = sqlContext.createDataFrame([(1, "foo1"), (2, "bar1")], ("k", "v")) df2 = sqlContext.createDataFrame([(3, "foo2"), (4, "bar2")], ("k", "v")) df3 = sqlContext.createDataFrame([(5, "foo3"), (6, "bar3")], ("k", "v"))  unionAll(df1, df2, df3).show()  ## +---+----+ ## |  k|   v| ## +---+----+ ## |  1|foo1| ## |  2|bar1| ## |  3|foo2| ## |  4|bar2| ## |  5|foo3| ## |  6|bar3| ## +---+----+ 

If number of DataFrames is large using SparkContext.union on RDDs and recreating DataFrame may be a better choice to avoid issues related to the cost of preparing an execution plan:

def unionAll(*dfs):     first, *_ = dfs  # Python 3.x, for 2.x you'll have to unpack manually     return first.sql_ctx.createDataFrame(         first.sql_ctx._sc.union([df.rdd for df in dfs]),         first.schema     ) 
like image 90
zero323 Avatar answered Sep 29 '22 00:09

zero323