Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is my build hanging / taking a long time to generate my query plan with many unions?

I notice when I run the same code as my example over here but with a union or unionByName or unionAll instead of the join, my query planning takes significantly longer and can result in a driver OOM.

Code included here for reference, with a slight difference to what occurs inside the for() loop.

from pyspark.sql import types as T, functions as F, SparkSession
spark = SparkSession.builder.getOrCreate()

schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False),
  T.StructField("col_2", T.IntegerType(), False),
  T.StructField("measure_1", T.FloatType(), False),
  T.StructField("measure_2", T.FloatType(), False),
])
data = [
  {"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
  {"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]

df = spark.createDataFrame(data, schema)

right_schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False)
])
right_data = [
  {"col_1": 1},
  {"col_1": 1},
  {"col_1": 2},
  {"col_1": 2}
]
right_df = spark.createDataFrame(right_data, right_schema)

df = df.unionByName(df)
df = df.join(right_df, on="col_1")
df.show()

"""
+-----+-----+---------+---------+
|col_1|col_2|measure_1|measure_2|
+-----+-----+---------+---------+
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
+-----+-----+---------+---------+
"""

df.explain()

"""
== Physical Plan ==
*(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803]
+- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
   :- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#5454]
   :     +- Union
   :        :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   :        +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   +- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#5460]
         +- *(4) Scan ExistingRDD[col_1#1808]
"""

filter_union_cols = ["col_1", "measure_1", "col_2", "measure_2"]
df = df.withColumn("found_filter", F.lit(None))
for filter_col in filter_union_cols:
  stats = df.filter(F.col(filter_col) < F.lit(1)).drop("found_filter")
  df = df.unionByName(
    stats.select(
      "*",
      F.lit(filter_col).alias("found_filter")
    )
  )

df.show()

"""
+-----+-----+---------+---------+------------+                                  
|col_1|col_2|measure_1|measure_2|found_filter|
+-----+-----+---------+---------+------------+
|    1|    2|      0.5|      1.5|        null|
|    1|    2|      0.5|      1.5|        null|
|    1|    2|      0.5|      1.5|        null|
|    1|    2|      0.5|      1.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    1|    2|      0.5|      1.5|   measure_1|
|    1|    2|      0.5|      1.5|   measure_1|
|    1|    2|      0.5|      1.5|   measure_1|
|    1|    2|      0.5|      1.5|   measure_1|
+-----+-----+---------+---------+------------+
"""

df.explain()

# REALLY long query plan.....

"""
== Physical Plan ==
Union
:- *(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, null AS found_filter#1855]
:  +- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7637]
:     :     +- Union
:     :        :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:           +- *(4) Scan ExistingRDD[col_1#1808]
:- *(12) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_1 AS found_filter#1860]
:  +- *(12) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(9) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7654]
:     :     +- Union
:     :        :- *(7) Filter (col_1#1800 < 1)
:     :        :  +- *(7) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(8) Filter (col_1#1800 < 1)
:     :           +- *(8) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(11) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:           +- *(10) Filter (col_1#1808 < 1)
:              +- *(10) Scan ExistingRDD[col_1#1808]
:- *(18) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_1 AS found_filter#1880]
:  +- *(18) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(15) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7671]
:     :     +- Union
:     :        :- *(13) Filter (measure_1#1802 < 1.0)
:     :        :  +- *(13) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(14) Filter (measure_1#1802 < 1.0)
:     :           +- *(14) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(17) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(24) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_1 AS found_filter#2022]
:  +- *(24) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(21) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7688]
:     :     +- Union
:     :        :- *(19) Filter ((col_1#1800 < 1) AND (measure_1#1802 < 1.0))
:     :        :  +- *(19) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(20) Filter ((col_1#1800 < 1) AND (measure_1#1802 < 1.0))
:     :           +- *(20) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(23) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(30) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#1900]
:  +- *(30) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(27) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7705]
:     :     +- Union
:     :        :- *(25) Filter (col_2#1801 < 1)
:     :        :  +- *(25) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(26) Filter (col_2#1801 < 1)
:     :           +- *(26) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(29) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(36) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2023]
:  +- *(36) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(33) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7722]
:     :     +- Union
:     :        :- *(31) Filter ((col_1#1800 < 1) AND (col_2#1801 < 1))
:     :        :  +- *(31) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(32) Filter ((col_1#1800 < 1) AND (col_2#1801 < 1))
:     :           +- *(32) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(35) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(42) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2024]
:  +- *(42) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(39) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7739]
:     :     +- Union
:     :        :- *(37) Filter ((measure_1#1802 < 1.0) AND (col_2#1801 < 1))
:     :        :  +- *(37) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(38) Filter ((measure_1#1802 < 1.0) AND (col_2#1801 < 1))
:     :           +- *(38) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(41) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(48) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2028]
:  +- *(48) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(45) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7756]
:     :     +- Union
:     :        :- *(43) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1))
:     :        :  +- *(43) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(44) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1))
:     :           +- *(44) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(47) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(54) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#1920]
:  +- *(54) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(51) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7773]
:     :     +- Union
:     :        :- *(49) Filter (measure_2#1803 < 1.0)
:     :        :  +- *(49) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(50) Filter (measure_2#1803 < 1.0)
:     :           +- *(50) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(53) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(60) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2025]
:  +- *(60) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(57) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7790]
:     :     +- Union
:     :        :- *(55) Filter ((col_1#1800 < 1) AND (measure_2#1803 < 1.0))
:     :        :  +- *(55) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(56) Filter ((col_1#1800 < 1) AND (measure_2#1803 < 1.0))
:     :           +- *(56) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(59) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(66) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2026]
:  +- *(66) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(63) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7807]
:     :     +- Union
:     :        :- *(61) Filter ((measure_1#1802 < 1.0) AND (measure_2#1803 < 1.0))
:     :        :  +- *(61) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(62) Filter ((measure_1#1802 < 1.0) AND (measure_2#1803 < 1.0))
:     :           +- *(62) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(65) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(72) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2029]
:  +- *(72) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(69) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7824]
:     :     +- Union
:     :        :- *(67) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (measure_2#1803 < 1.0))
:     :        :  +- *(67) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(68) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (measure_2#1803 < 1.0))
:     :           +- *(68) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(71) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(78) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2027]
:  +- *(78) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(75) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7841]
:     :     +- Union
:     :        :- *(73) Filter ((col_2#1801 < 1) AND (measure_2#1803 < 1.0))
:     :        :  +- *(73) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(74) Filter ((col_2#1801 < 1) AND (measure_2#1803 < 1.0))
:     :           +- *(74) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(77) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(84) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2030]
:  +- *(84) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(81) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7858]
:     :     +- Union
:     :        :- *(79) Filter (((col_1#1800 < 1) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :        :  +- *(79) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(80) Filter (((col_1#1800 < 1) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :           +- *(80) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(83) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(90) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2031]
:  +- *(90) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(87) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7875]
:     :     +- Union
:     :        :- *(85) Filter (((measure_1#1802 < 1.0) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :        :  +- *(85) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(86) Filter (((measure_1#1802 < 1.0) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :           +- *(86) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(89) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
+- *(96) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2032]
   +- *(96) SortMergeJoin [col_1#1800], [col_1#1808], Inner
      :- *(93) Sort [col_1#1800 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7892]
      :     +- Union
      :        :- *(91) Filter ((((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
      :        :  +- *(91) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
      :        +- *(92) Filter ((((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
      :           +- *(92) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
      +- *(95) Sort [col_1#1808 ASC NULLS FIRST], false, 0
         +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
"""

I'm seeing a significantly longer query plan here, and especially as the number of iterations of the for() loop increases, the performance degrades terribly.

How can I improve my performance?

like image 628
vanhooser Avatar asked Aug 16 '21 17:08

vanhooser


People also ask

Why does my pass through query take so long to execute?

Microsoft Access does not store the definition for a pass-through query. Therefore, the pass-through query may be executed at various times to retrieve information related to its definition. When the pass-through query takes considerable time to execute, Access appears to hang as the pass-through query is being executed.

Why does my indexing query hang?

The query will hang! Why does this happen? Show activity on this post. By default, creating an index blocks all other operations on a database. When building an index on a collection, the database that holds the collection is unavailable for read or write operations until the index build completes.

Is it possible to avoid joins in the first query?

Note that that's largely about avoiding wide results and indexes, but it can be effective for compilation if you're able to eliminate some of the joins in the first query entirely.

How long does it take to run a SQL Server query?

I have an SQL Server 2014 instance (12.0.2000.8) and a quite complex SELECT statement with about 20 joins. This query works fine with the same data set on PostgreSQL, Oracle and other databases, and entire execution takes about 1 minute. But on SQL Server it takes about 40 minutes.


2 Answers

This is a known limitation of iterative algorithms in Spark. At the moment, every iteration of the loop causes the inner nodes to be re-evaluated and stacked upon the outer df variable.

This means your query planning process is taking O(exp(n)) where n is the number of iterations of your loop.

There's a tool in Palantir Foundry called Transforms Verbs that can help with this.

Simply import transforms.verbs.dataframes.union_many and call it upon the total set of dataframes you wish to materialize (assuming your logic will allow for it, i.e. one iteration of the loop doesn't depend upon the result of a prior iteration of the loop.

The code above should instead be modified to:

from pyspark.sql import types as T, functions as F, SparkSession
from transforms.verbs.dataframes import union_many

spark = SparkSession.builder.getOrCreate()

schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False),
  T.StructField("col_2", T.IntegerType(), False),
  T.StructField("measure_1", T.FloatType(), False),
  T.StructField("measure_2", T.FloatType(), False),
])
data = [
  {"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
  {"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]

df = spark.createDataFrame(data, schema)

right_schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False)
])
right_data = [
  {"col_1": 1},
  {"col_1": 1},
  {"col_1": 2},
  {"col_1": 2}
]
right_df = spark.createDataFrame(right_data, right_schema)

df = df.unionByName(df)
df = df.join(right_df, on="col_1")
df.show()

"""
+-----+-----+---------+---------+
|col_1|col_2|measure_1|measure_2|
+-----+-----+---------+---------+
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
+-----+-----+---------+---------+
"""

df.explain()

"""
== Physical Plan ==
*(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803]
+- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
   :- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#5454]
   :     +- Union
   :        :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   :        +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   +- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#5460]
         +- *(4) Scan ExistingRDD[col_1#1808]
"""

filter_union_cols = ["col_1", "measure_1", "col_2", "measure_2"]
df = df.withColumn("found_filter", F.lit(None))
union_dfs = []
for filter_col in filter_union_cols:
  stats = df.filter(F.col(filter_col) < F.lit(1)).drop("found_filter")
  union_df = stats.select(
    "*",
    F.lit(filter_col).alias("found_filter")
  )
  union_dfs += [union_df]

df = df.unionByName(
  union_many(union_dfs)
)

This will optimize your unions and take significantly less time.

The bottom line: beware of using any union calls inside for/while loops. If you must use this behavior, use the transforms.verbs.dataframes.union_many verb to optimize your final set of DataFrames

Check out your platform documentation for more information and more helpful Verbs.

Protip: Use the included optimization over here to further increase your performance

like image 103
vanhooser Avatar answered Oct 08 '22 21:10

vanhooser


If you receive an error about lists not having an unionByName/union_many attribute, while trying to implement op's solution, try to unpack the union_dfs in the last row with an asterisk prefix, like so:

df = df.unionByName(
    union_many(*union_dfs)
)

Apparently, Foundry does not unpack the list of dataframe references on its own.

like image 2
Misa Avatar answered Oct 08 '22 21:10

Misa