I notice when I run the same code as my example over here but with a union
or unionByName
or unionAll
instead of the join
, my query planning takes significantly longer and can result in a driver OOM.
Code included here for reference, with a slight difference to what occurs inside the for()
loop.
from pyspark.sql import types as T, functions as F, SparkSession
spark = SparkSession.builder.getOrCreate()
schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False),
T.StructField("col_2", T.IntegerType(), False),
T.StructField("measure_1", T.FloatType(), False),
T.StructField("measure_2", T.FloatType(), False),
])
data = [
{"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
{"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]
df = spark.createDataFrame(data, schema)
right_schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False)
])
right_data = [
{"col_1": 1},
{"col_1": 1},
{"col_1": 2},
{"col_1": 2}
]
right_df = spark.createDataFrame(right_data, right_schema)
df = df.unionByName(df)
df = df.join(right_df, on="col_1")
df.show()
"""
+-----+-----+---------+---------+
|col_1|col_2|measure_1|measure_2|
+-----+-----+---------+---------+
| 1| 2| 0.5| 1.5|
| 1| 2| 0.5| 1.5|
| 1| 2| 0.5| 1.5|
| 1| 2| 0.5| 1.5|
| 2| 3| 2.5| 3.5|
| 2| 3| 2.5| 3.5|
| 2| 3| 2.5| 3.5|
| 2| 3| 2.5| 3.5|
+-----+-----+---------+---------+
"""
df.explain()
"""
== Physical Plan ==
*(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803]
+- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#5454]
: +- Union
: :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
+- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#5460]
+- *(4) Scan ExistingRDD[col_1#1808]
"""
filter_union_cols = ["col_1", "measure_1", "col_2", "measure_2"]
df = df.withColumn("found_filter", F.lit(None))
for filter_col in filter_union_cols:
stats = df.filter(F.col(filter_col) < F.lit(1)).drop("found_filter")
df = df.unionByName(
stats.select(
"*",
F.lit(filter_col).alias("found_filter")
)
)
df.show()
"""
+-----+-----+---------+---------+------------+
|col_1|col_2|measure_1|measure_2|found_filter|
+-----+-----+---------+---------+------------+
| 1| 2| 0.5| 1.5| null|
| 1| 2| 0.5| 1.5| null|
| 1| 2| 0.5| 1.5| null|
| 1| 2| 0.5| 1.5| null|
| 2| 3| 2.5| 3.5| null|
| 2| 3| 2.5| 3.5| null|
| 2| 3| 2.5| 3.5| null|
| 2| 3| 2.5| 3.5| null|
| 1| 2| 0.5| 1.5| measure_1|
| 1| 2| 0.5| 1.5| measure_1|
| 1| 2| 0.5| 1.5| measure_1|
| 1| 2| 0.5| 1.5| measure_1|
+-----+-----+---------+---------+------------+
"""
df.explain()
# REALLY long query plan.....
"""
== Physical Plan ==
Union
:- *(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, null AS found_filter#1855]
: +- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7637]
: : +- Union
: : :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
: +- *(4) Scan ExistingRDD[col_1#1808]
:- *(12) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_1 AS found_filter#1860]
: +- *(12) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(9) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7654]
: : +- Union
: : :- *(7) Filter (col_1#1800 < 1)
: : : +- *(7) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(8) Filter (col_1#1800 < 1)
: : +- *(8) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(11) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
: +- *(10) Filter (col_1#1808 < 1)
: +- *(10) Scan ExistingRDD[col_1#1808]
:- *(18) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_1 AS found_filter#1880]
: +- *(18) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(15) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7671]
: : +- Union
: : :- *(13) Filter (measure_1#1802 < 1.0)
: : : +- *(13) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(14) Filter (measure_1#1802 < 1.0)
: : +- *(14) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(17) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(24) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_1 AS found_filter#2022]
: +- *(24) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(21) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7688]
: : +- Union
: : :- *(19) Filter ((col_1#1800 < 1) AND (measure_1#1802 < 1.0))
: : : +- *(19) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(20) Filter ((col_1#1800 < 1) AND (measure_1#1802 < 1.0))
: : +- *(20) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(23) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(30) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#1900]
: +- *(30) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(27) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7705]
: : +- Union
: : :- *(25) Filter (col_2#1801 < 1)
: : : +- *(25) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(26) Filter (col_2#1801 < 1)
: : +- *(26) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(29) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(36) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2023]
: +- *(36) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(33) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7722]
: : +- Union
: : :- *(31) Filter ((col_1#1800 < 1) AND (col_2#1801 < 1))
: : : +- *(31) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(32) Filter ((col_1#1800 < 1) AND (col_2#1801 < 1))
: : +- *(32) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(35) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(42) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2024]
: +- *(42) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(39) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7739]
: : +- Union
: : :- *(37) Filter ((measure_1#1802 < 1.0) AND (col_2#1801 < 1))
: : : +- *(37) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(38) Filter ((measure_1#1802 < 1.0) AND (col_2#1801 < 1))
: : +- *(38) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(41) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(48) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2028]
: +- *(48) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(45) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7756]
: : +- Union
: : :- *(43) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1))
: : : +- *(43) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(44) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1))
: : +- *(44) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(47) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(54) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#1920]
: +- *(54) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(51) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7773]
: : +- Union
: : :- *(49) Filter (measure_2#1803 < 1.0)
: : : +- *(49) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(50) Filter (measure_2#1803 < 1.0)
: : +- *(50) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(53) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(60) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2025]
: +- *(60) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(57) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7790]
: : +- Union
: : :- *(55) Filter ((col_1#1800 < 1) AND (measure_2#1803 < 1.0))
: : : +- *(55) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(56) Filter ((col_1#1800 < 1) AND (measure_2#1803 < 1.0))
: : +- *(56) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(59) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(66) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2026]
: +- *(66) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(63) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7807]
: : +- Union
: : :- *(61) Filter ((measure_1#1802 < 1.0) AND (measure_2#1803 < 1.0))
: : : +- *(61) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(62) Filter ((measure_1#1802 < 1.0) AND (measure_2#1803 < 1.0))
: : +- *(62) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(65) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(72) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2029]
: +- *(72) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(69) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7824]
: : +- Union
: : :- *(67) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (measure_2#1803 < 1.0))
: : : +- *(67) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(68) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (measure_2#1803 < 1.0))
: : +- *(68) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(71) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(78) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2027]
: +- *(78) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(75) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7841]
: : +- Union
: : :- *(73) Filter ((col_2#1801 < 1) AND (measure_2#1803 < 1.0))
: : : +- *(73) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(74) Filter ((col_2#1801 < 1) AND (measure_2#1803 < 1.0))
: : +- *(74) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(77) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(84) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2030]
: +- *(84) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(81) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7858]
: : +- Union
: : :- *(79) Filter (((col_1#1800 < 1) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
: : : +- *(79) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(80) Filter (((col_1#1800 < 1) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
: : +- *(80) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(83) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(90) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2031]
: +- *(90) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(87) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7875]
: : +- Union
: : :- *(85) Filter (((measure_1#1802 < 1.0) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
: : : +- *(85) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(86) Filter (((measure_1#1802 < 1.0) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
: : +- *(86) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(89) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
+- *(96) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2032]
+- *(96) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:- *(93) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7892]
: +- Union
: :- *(91) Filter ((((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
: : +- *(91) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(92) Filter ((((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
: +- *(92) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
+- *(95) Sort [col_1#1808 ASC NULLS FIRST], false, 0
+- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
"""
I'm seeing a significantly longer query plan here, and especially as the number of iterations of the for()
loop increases, the performance degrades terribly.
How can I improve my performance?
Microsoft Access does not store the definition for a pass-through query. Therefore, the pass-through query may be executed at various times to retrieve information related to its definition. When the pass-through query takes considerable time to execute, Access appears to hang as the pass-through query is being executed.
The query will hang! Why does this happen? Show activity on this post. By default, creating an index blocks all other operations on a database. When building an index on a collection, the database that holds the collection is unavailable for read or write operations until the index build completes.
Note that that's largely about avoiding wide results and indexes, but it can be effective for compilation if you're able to eliminate some of the joins in the first query entirely.
I have an SQL Server 2014 instance (12.0.2000.8) and a quite complex SELECT statement with about 20 joins. This query works fine with the same data set on PostgreSQL, Oracle and other databases, and entire execution takes about 1 minute. But on SQL Server it takes about 40 minutes.
This is a known limitation of iterative algorithms in Spark. At the moment, every iteration of the loop causes the inner nodes to be re-evaluated and stacked upon the outer df
variable.
This means your query planning process is taking O(exp(n))
where n is the number of iterations of your loop.
There's a tool in Palantir Foundry called Transforms Verbs that can help with this.
Simply import transforms.verbs.dataframes.union_many
and call it upon the total set of dataframes you wish to materialize (assuming your logic will allow for it, i.e. one iteration of the loop doesn't depend upon the result of a prior iteration of the loop.
The code above should instead be modified to:
from pyspark.sql import types as T, functions as F, SparkSession
from transforms.verbs.dataframes import union_many
spark = SparkSession.builder.getOrCreate()
schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False),
T.StructField("col_2", T.IntegerType(), False),
T.StructField("measure_1", T.FloatType(), False),
T.StructField("measure_2", T.FloatType(), False),
])
data = [
{"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
{"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]
df = spark.createDataFrame(data, schema)
right_schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False)
])
right_data = [
{"col_1": 1},
{"col_1": 1},
{"col_1": 2},
{"col_1": 2}
]
right_df = spark.createDataFrame(right_data, right_schema)
df = df.unionByName(df)
df = df.join(right_df, on="col_1")
df.show()
"""
+-----+-----+---------+---------+
|col_1|col_2|measure_1|measure_2|
+-----+-----+---------+---------+
| 1| 2| 0.5| 1.5|
| 1| 2| 0.5| 1.5|
| 1| 2| 0.5| 1.5|
| 1| 2| 0.5| 1.5|
| 2| 3| 2.5| 3.5|
| 2| 3| 2.5| 3.5|
| 2| 3| 2.5| 3.5|
| 2| 3| 2.5| 3.5|
+-----+-----+---------+---------+
"""
df.explain()
"""
== Physical Plan ==
*(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803]
+- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#5454]
: +- Union
: :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
+- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#5460]
+- *(4) Scan ExistingRDD[col_1#1808]
"""
filter_union_cols = ["col_1", "measure_1", "col_2", "measure_2"]
df = df.withColumn("found_filter", F.lit(None))
union_dfs = []
for filter_col in filter_union_cols:
stats = df.filter(F.col(filter_col) < F.lit(1)).drop("found_filter")
union_df = stats.select(
"*",
F.lit(filter_col).alias("found_filter")
)
union_dfs += [union_df]
df = df.unionByName(
union_many(union_dfs)
)
This will optimize your unions and take significantly less time.
The bottom line: beware of using any union
calls inside for/while loops. If you must use this behavior, use the transforms.verbs.dataframes.union_many
verb to optimize your final set of DataFrames
Check out your platform documentation for more information and more helpful Verbs.
Protip: Use the included optimization over here to further increase your performance
If you receive an error about lists not having an unionByName/union_many attribute, while trying to implement op's solution, try to unpack the union_dfs in the last row with an asterisk prefix, like so:
df = df.unionByName(
union_many(*union_dfs)
)
Apparently, Foundry does not unpack the list of dataframe references on its own.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With