In my pig code I do this:
all_combined = Union relation1, relation2, relation3, relation4, relation5, relation 6.
I want to do the same with spark. However, unfortunately, I see that I have to keep doing it pairwise:
first = rdd1.union(rdd2) second = first.union(rdd3) third = second.union(rdd4) # .... and so on
Is there a union operator that will let me operate on multiple rdds at a time:
e.g. union(rdd1, rdd2,rdd3, rdd4, rdd5, rdd6)
It is a matter on convenience.
Method 2: UnionByName() function in pyspark The PySpark unionByName() function is also used to combine two or more data frames but it might be used to combine dataframes having different schema. This is because it combines data frames by the name of the column and not the order of the columns.
UNION and UNION ALL return the rows that are found in either relation. UNION (alternatively, UNION DISTINCT ) takes only distinct rows while UNION ALL does not remove duplicates from the result rows.
If these are RDDs you can use SparkContext.union
method:
rdd1 = sc.parallelize([1, 2, 3]) rdd2 = sc.parallelize([4, 5, 6]) rdd3 = sc.parallelize([7, 8, 9]) rdd = sc.union([rdd1, rdd2, rdd3]) rdd.collect() ## [1, 2, 3, 4, 5, 6, 7, 8, 9]
There is no DataFrame
equivalent but it is just a matter of a simple one-liner:
from functools import reduce # For Python 3.x from pyspark.sql import DataFrame def unionAll(*dfs): return reduce(DataFrame.unionAll, dfs) df1 = sqlContext.createDataFrame([(1, "foo1"), (2, "bar1")], ("k", "v")) df2 = sqlContext.createDataFrame([(3, "foo2"), (4, "bar2")], ("k", "v")) df3 = sqlContext.createDataFrame([(5, "foo3"), (6, "bar3")], ("k", "v")) unionAll(df1, df2, df3).show() ## +---+----+ ## | k| v| ## +---+----+ ## | 1|foo1| ## | 2|bar1| ## | 3|foo2| ## | 4|bar2| ## | 5|foo3| ## | 6|bar3| ## +---+----+
If number of DataFrames
is large using SparkContext.union
on RDDs and recreating DataFrame
may be a better choice to avoid issues related to the cost of preparing an execution plan:
def unionAll(*dfs): first, *_ = dfs # Python 3.x, for 2.x you'll have to unpack manually return first.sql_ctx.createDataFrame( first.sql_ctx._sc.union([df.rdd for df in dfs]), first.schema )
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With