I have a need to perform over 300 joins in apache spark. I have reduced my problem to the following examples which still produce out of memory errors in both Spark 1.6 and 2.0 platforms:
Is there a better way to perform excessive joins? I have tried novel combinations of union/ groupByKey which work but do not preserve the columns which is mandatory to understand the data . I have also tried broadcast joins, cogroup, persist and many more variations.
var data = Seq((1,2))
var data2 = Seq((1,3))
var rdd1 = sc.parallelize(data)
var rdd2 = sc.parallelize(data2)
var d2 = rdd1.toDF()
var d5 = rdd2.toDF()
var d3 = d2.join(d5, Seq("_1"), "left_outer")
var d4 = d3.join(d5, Seq("_1"), "left_outer")
var d5a = d4.join(d5, Seq("_1"), "left_outer")
var d6 = d5a.join(d5, Seq("_1"), "left_outer")
var d7 = d6.join(d5, Seq("_1"), "left_outer")
var d8 = d7.join(d5, Seq("_1"), "left_outer")
var d9 = d8.join(d5, Seq("_1"), "left_outer")
var d10 = d9.join(d5, Seq("_1"), "left_outer")
var d11 = d10.join(d5, Seq("_1"), "left_outer")
var d12 = d11.join(d5, Seq("_1"), "left_outer")
var d13 = d12.join(d5, Seq("_1"), "left_outer")
var d14 = d13.join(d5, Seq("_1"), "left_outer")
var d15 = d14.join(d5, Seq("_1"), "left_outer")
var d16 = d15.join(d5, Seq("_1"), "left_outer")
var d17 = d16.join(d5, Seq("_1"), "left_outer")
var d18 = d17.join(d5, Seq("_1"), "left_outer")
var d19 = d18.join(d5, Seq("_1"), "left_outer")
var d20 = d19.join(d5, Seq("_1"), "left_outer")
var d21 = d20.join(d5, Seq("_1"), "left_outer")
var d22 = d21.join(d5, Seq("_1"), "left_outer")
var d23 = d22.join(d5, Seq("_1"), "left_outer")
var d24 = d23.join(d5, Seq("_1"), "left_outer")
var d25 = d24.join(d5, Seq("_1"), "left_outer")
var d26 = d25.join(d5, Seq("_1"), "left_outer")
var d27 = d26.join(d5, Seq("_1"), "left_outer")
var d28 = d27.join(d5, Seq("_1"), "left_outer")
var d29 = d28.join(d5, Seq("_1"), "left_outer")
var d30 = d29.join(d5, Seq("_1"), "left_outer")
var d31 = d30.join(d5, Seq("_1"), "left_outer")
var d32 = d31.join(d5, Seq("_1"), "left_outer")
var d33 = d32.join(d5, Seq("_1"), "left_outer")
var d34 = d33.join(d5, Seq("_1"), "left_outer")
var d35 = d34.join(d5, Seq("_1"), "left_outer")
var d36 = d35.join(d5, Seq("_1"), "left_outer")
var d37 = d36.join(d5, Seq("_1"), "left_outer")
var d38 = d37.join(d5, Seq("_1"), "left_outer")
var d39 = d38.join(d5, Seq("_1"), "left_outer")
var d40 = d39.join(d5, Seq("_1"), "left_outer")
var d41 = d40.join(d5, Seq("_1"), "left_outer")
var d42 = d41.join(d5, Seq("_1"), "left_outer")
var d43 = d42.join(d5, Seq("_1"), "left_outer")
var d44 = d43.join(d5, Seq("_1"), "left_outer")
var d45 = d44.join(d5, Seq("_1"), "left_outer")
var d46 = d45.join(d5, Seq("_1"), "left_outer")
var d47 = d46.join(d5, Seq("_1"), "left_outer")
var d48 = d47.join(d5, Seq("_1"), "left_outer")
var d49 = d48.join(d5, Seq("_1"), "left_outer")
var d50 = d49.join(d5, Seq("_1"), "left_outer")
var d51 = d50.join(d5, Seq("_1"), "left_outer")
var d52 = d51.join(d5, Seq("_1"), "left_outer")
var d53 = d52.join(d5, Seq("_1"), "left_outer")
var d54 = d53.join(d5, Seq("_1"), "left_outer")
var d55 = d54.join(d5, Seq("_1"), "left_outer")
var d56 = d55.join(d5, Seq("_1"), "left_outer")
var d57 = d56.join(d5, Seq("_1"), "left_outer")
var d58 = d57.join(d5, Seq("_1"), "left_outer")
var d59 = d58.join(d5, Seq("_1"), "left_outer")
var d60 = d59.join(d5, Seq("_1"), "left_outer")
var d61 = d60.join(d5, Seq("_1"), "left_outer")
var d62 = d61.join(d5, Seq("_1"), "left_outer")
var d63 = d62.join(d5, Seq("_1"), "left_outer")
var d64 = d63.join(d5, Seq("_1"), "left_outer")
var d65 = d64.join(d5, Seq("_1"), "left_outer")
var d66 = d65.join(d5, Seq("_1"), "left_outer")
var d67 = d66.join(d5, Seq("_1"), "left_outer")
var d68 = d67.join(d5, Seq("_1"), "left_outer")
var d69 = d68.join(d5, Seq("_1"), "left_outer")
var d70 = d69.join(d5, Seq("_1"), "left_outer")
var d71 = d70.join(d5, Seq("_1"), "left_outer")
var d72 = d71.join(d5, Seq("_1"), "left_outer")
var d73 = d72.join(d5, Seq("_1"), "left_outer")
var d74 = d73.join(d5, Seq("_1"), "left_outer")
var d75 = d74.join(d5, Seq("_1"), "left_outer")
var d76 = d75.join(d5, Seq("_1"), "left_outer")
var d77 = d76.join(d5, Seq("_1"), "left_outer")
var d78 = d77.join(d5, Seq("_1"), "left_outer")
var d79 = d78.join(d5, Seq("_1"), "left_outer")
var d80 = d79.join(d5, Seq("_1"), "left_outer")
var d81 = d80.join(d5, Seq("_1"), "left_outer")
var d82 = d81.join(d5, Seq("_1"), "left_outer")
var d83 = d82.join(d5, Seq("_1"), "left_outer")
var d84 = d83.join(d5, Seq("_1"), "left_outer")
var d85 = d84.join(d5, Seq("_1"), "left_outer")
var d86 = d85.join(d5, Seq("_1"), "left_outer")
var d87 = d86.join(d5, Seq("_1"), "left_outer")
var d88 = d87.join(d5, Seq("_1"), "left_outer")
var d89 = d88.join(d5, Seq("_1"), "left_outer")
var d90 = d89.join(d5, Seq("_1"), "left_outer")
var d91 = d90.join(d5, Seq("_1"), "left_outer")
var d92 = d91.join(d5, Seq("_1"), "left_outer")
var d93 = d92.join(d5, Seq("_1"), "left_outer")
var d94 = d93.join(d5, Seq("_1"), "left_outer")
var d95 = d94.join(d5, Seq("_1"), "left_outer")
var d96 = d95.join(d5, Seq("_1"), "left_outer")
var d97 = d96.join(d5, Seq("_1"), "left_outer")
var d98 = d97.join(d5, Seq("_1"), "left_outer")
var d99 = d98.join(d5, Seq("_1"), "left_outer")
var d100 = d99.join(d5, Seq("_1"), "left_outer")
var d101 = d100.join(d5, Seq("_1"), "left_outer")
var d102 = d101.join(d5, Seq("_1"), "left_outer")
var d103 = d102.join(d5, Seq("_1"), "left_outer")
var d104 = d103.join(d5, Seq("_1"), "left_outer")
var d105 = d104.join(d5, Seq("_1"), "left_outer")
var d106 = d105.join(d5, Seq("_1"), "left_outer")
var d107 = d106.join(d5, Seq("_1"), "left_outer")
var d108 = d107.join(d5, Seq("_1"), "left_outer")
var d109 = d108.join(d5, Seq("_1"), "left_outer")
var d110 = d109.join(d5, Seq("_1"), "left_outer")
var d111 = d110.join(d5, Seq("_1"), "left_outer")
var d112 = d111.join(d5, Seq("_1"), "left_outer")
var d113 = d112.join(d5, Seq("_1"), "left_outer")
var d114 = d113.join(d5, Seq("_1"), "left_outer")
var d115 = d114.join(d5, Seq("_1"), "left_outer")
var d116 = d115.join(d5, Seq("_1"), "left_outer")
var d117 = d116.join(d5, Seq("_1"), "left_outer")
var d118 = d117.join(d5, Seq("_1"), "left_outer")
var d119 = d118.join(d5, Seq("_1"), "left_outer")
var d120 = d119.join(d5, Seq("_1"), "left_outer")
var d121 = d120.join(d5, Seq("_1"), "left_outer")
var d122 = d121.join(d5, Seq("_1"), "left_outer")
var d123 = d122.join(d5, Seq("_1"), "left_outer")
var d124 = d123.join(d5, Seq("_1"), "left_outer")
var d125 = d124.join(d5, Seq("_1"), "left_outer")
var d126 = d125.join(d5, Seq("_1"), "left_outer")
var d127 = d126.join(d5, Seq("_1"), "left_outer")
var d128 = d127.join(d5, Seq("_1"), "left_outer")
var d129 = d128.join(d5, Seq("_1"), "left_outer")
var d130 = d129.join(d5, Seq("_1"), "left_outer")
var d131 = d130.join(d5, Seq("_1"), "left_outer")
var d132 = d131.join(d5, Seq("_1"), "left_outer")
var d133 = d132.join(d5, Seq("_1"), "left_outer")
var d134 = d133.join(d5, Seq("_1"), "left_outer")
var d135 = d134.join(d5, Seq("_1"), "left_outer")
var d136 = d135.join(d5, Seq("_1"), "left_outer")
var d137 = d136.join(d5, Seq("_1"), "left_outer")
var d138 = d137.join(d5, Seq("_1"), "left_outer")
var d139 = d138.join(d5, Seq("_1"), "left_outer")
var d140 = d139.join(d5, Seq("_1"), "left_outer")
var d141 = d140.join(d5, Seq("_1"), "left_outer")
var d142 = d141.join(d5, Seq("_1"), "left_outer")
var d143 = d142.join(d5, Seq("_1"), "left_outer")
var d144 = d143.join(d5, Seq("_1"), "left_outer")
var d145 = d144.join(d5, Seq("_1"), "left_outer")
var d146 = d145.join(d5, Seq("_1"), "left_outer")
var d147 = d146.join(d5, Seq("_1"), "left_outer")
var d148 = d147.join(d5, Seq("_1"), "left_outer")
var d149 = d148.join(d5, Seq("_1"), "left_outer")
var d150 = d149.join(d5, Seq("_1"), "left_outer")
var d151 = d150.join(d5, Seq("_1"), "left_outer")
var d152 = d151.join(d5, Seq("_1"), "left_outer")
var d153 = d152.join(d5, Seq("_1"), "left_outer")
var d154 = d153.join(d5, Seq("_1"), "left_outer")
var d155 = d154.join(d5, Seq("_1"), "left_outer")
var d156 = d155.join(d5, Seq("_1"), "left_outer")
var d157 = d156.join(d5, Seq("_1"), "left_outer")
var d158 = d157.join(d5, Seq("_1"), "left_outer")
var d159 = d158.join(d5, Seq("_1"), "left_outer")
var d160 = d159.join(d5, Seq("_1"), "left_outer")
var d161 = d160.join(d5, Seq("_1"), "left_outer")
var d162 = d161.join(d5, Seq("_1"), "left_outer")
var d163 = d162.join(d5, Seq("_1"), "left_outer")
var d164 = d163.join(d5, Seq("_1"), "left_outer")
var d165 = d164.join(d5, Seq("_1"), "left_outer")
var d166 = d165.join(d5, Seq("_1"), "left_outer")
var d167 = d166.join(d5, Seq("_1"), "left_outer")
var d168 = d167.join(d5, Seq("_1"), "left_outer")
var d169 = d168.join(d5, Seq("_1"), "left_outer")
var d170 = d169.join(d5, Seq("_1"), "left_outer")
var d171 = d170.join(d5, Seq("_1"), "left_outer")
var d172 = d171.join(d5, Seq("_1"), "left_outer")
var d173 = d172.join(d5, Seq("_1"), "left_outer")
var d174 = d173.join(d5, Seq("_1"), "left_outer")
var d175 = d174.join(d5, Seq("_1"), "left_outer")
var d176 = d175.join(d5, Seq("_1"), "left_outer")
var d177 = d176.join(d5, Seq("_1"), "left_outer")
var d178 = d177.join(d5, Seq("_1"), "left_outer")
var d179 = d178.join(d5, Seq("_1"), "left_outer")
var d180 = d179.join(d5, Seq("_1"), "left_outer")
var d181 = d180.join(d5, Seq("_1"), "left_outer")
var d182 = d181.join(d5, Seq("_1"), "left_outer")
var d183 = d182.join(d5, Seq("_1"), "left_outer")
var d184 = d183.join(d5, Seq("_1"), "left_outer")
var d185 = d184.join(d5, Seq("_1"), "left_outer")
var d186 = d185.join(d5, Seq("_1"), "left_outer")
var d187 = d186.join(d5, Seq("_1"), "left_outer")
var d188 = d187.join(d5, Seq("_1"), "left_outer")
var d189 = d188.join(d5, Seq("_1"), "left_outer")
var d190 = d189.join(d5, Seq("_1"), "left_outer")
var d191 = d190.join(d5, Seq("_1"), "left_outer")
var d192 = d191.join(d5, Seq("_1"), "left_outer")
var d193 = d192.join(d5, Seq("_1"), "left_outer")
var d194 = d193.join(d5, Seq("_1"), "left_outer")
var d195 = d194.join(d5, Seq("_1"), "left_outer")
var d196 = d195.join(d5, Seq("_1"), "left_outer")
var d197 = d196.join(d5, Seq("_1"), "left_outer")
var d198 = d197.join(d5, Seq("_1"), "left_outer")
var d199 = d198.join(d5, Seq("_1"), "left_outer")
var d200 = d199.join(d5, Seq("_1"), "left_outer")
var d201 = d200.join(d5, Seq("_1"), "left_outer")
var d202 = d201.join(d5, Seq("_1"), "left_outer")
var d203 = d202.join(d5, Seq("_1"), "left_outer")
var d204 = d203.join(d5, Seq("_1"), "left_outer")
var d205 = d204.join(d5, Seq("_1"), "left_outer")
var d206 = d205.join(d5, Seq("_1"), "left_outer")
var d207 = d206.join(d5, Seq("_1"), "left_outer")
var d208 = d207.join(d5, Seq("_1"), "left_outer")
var d209 = d208.join(d5, Seq("_1"), "left_outer")
var d210 = d209.join(d5, Seq("_1"), "left_outer")
var d211 = d210.join(d5, Seq("_1"), "left_outer")
var d212 = d211.join(d5, Seq("_1"), "left_outer")
var d213 = d212.join(d5, Seq("_1"), "left_outer")
var d214 = d213.join(d5, Seq("_1"), "left_outer")
var d215 = d214.join(d5, Seq("_1"), "left_outer")
var d216 = d215.join(d5, Seq("_1"), "left_outer")
var d217 = d216.join(d5, Seq("_1"), "left_outer")
var d218 = d217.join(d5, Seq("_1"), "left_outer")
var d219 = d218.join(d5, Seq("_1"), "left_outer")
var d220 = d219.join(d5, Seq("_1"), "left_outer")
var d221 = d220.join(d5, Seq("_1"), "left_outer")
var d222 = d221.join(d5, Seq("_1"), "left_outer")
var d223 = d222.join(d5, Seq("_1"), "left_outer")
var d224 = d223.join(d5, Seq("_1"), "left_outer")
var d225 = d224.join(d5, Seq("_1"), "left_outer")
var d226 = d225.join(d5, Seq("_1"), "left_outer")
var d227 = d226.join(d5, Seq("_1"), "left_outer")
var d228 = d227.join(d5, Seq("_1"), "left_outer")
var d229 = d228.join(d5, Seq("_1"), "left_outer")
var d230 = d229.join(d5, Seq("_1"), "left_outer")
var d231 = d230.join(d5, Seq("_1"), "left_outer")
var d232 = d231.join(d5, Seq("_1"), "left_outer")
var d233 = d232.join(d5, Seq("_1"), "left_outer")
var d234 = d233.join(d5, Seq("_1"), "left_outer")
var d235 = d234.join(d5, Seq("_1"), "left_outer")
var d236 = d235.join(d5, Seq("_1"), "left_outer")
var d237 = d236.join(d5, Seq("_1"), "left_outer")
var d238 = d237.join(d5, Seq("_1"), "left_outer")
var d239 = d238.join(d5, Seq("_1"), "left_outer")
var d240 = d239.join(d5, Seq("_1"), "left_outer")
var d241 = d240.join(d5, Seq("_1"), "left_outer")
var d242 = d241.join(d5, Seq("_1"), "left_outer")
var d243 = d242.join(d5, Seq("_1"), "left_outer")
var d244 = d243.join(d5, Seq("_1"), "left_outer")
var d245 = d244.join(d5, Seq("_1"), "left_outer")
var d246 = d245.join(d5, Seq("_1"), "left_outer")
var d247 = d246.join(d5, Seq("_1"), "left_outer")
var d248 = d247.join(d5, Seq("_1"), "left_outer")
var d249 = d248.join(d5, Seq("_1"), "left_outer")
var d250 = d249.join(d5, Seq("_1"), "left_outer")
var d251 = d250.join(d5, Seq("_1"), "left_outer")
var d252 = d251.join(d5, Seq("_1"), "left_outer")
var d253 = d252.join(d5, Seq("_1"), "left_outer")
var d254 = d253.join(d5, Seq("_1"), "left_outer")
var d255 = d254.join(d5, Seq("_1"), "left_outer")
var d256 = d255.join(d5, Seq("_1"), "left_outer")
var d257 = d256.join(d5, Seq("_1"), "left_outer")
var d258 = d257.join(d5, Seq("_1"), "left_outer")
var d259 = d258.join(d5, Seq("_1"), "left_outer")
var d260 = d259.join(d5, Seq("_1"), "left_outer")
var d261 = d260.join(d5, Seq("_1"), "left_outer")
var d262 = d261.join(d5, Seq("_1"), "left_outer")
var d263 = d262.join(d5, Seq("_1"), "left_outer")
var d264 = d263.join(d5, Seq("_1"), "left_outer")
var d265 = d264.join(d5, Seq("_1"), "left_outer")
var d266 = d265.join(d5, Seq("_1"), "left_outer")
var d267 = d266.join(d5, Seq("_1"), "left_outer")
var d268 = d267.join(d5, Seq("_1"), "left_outer")
var d269 = d268.join(d5, Seq("_1"), "left_outer")
var d270 = d269.join(d5, Seq("_1"), "left_outer")
var d271 = d270.join(d5, Seq("_1"), "left_outer")
var d272 = d271.join(d5, Seq("_1"), "left_outer")
var d273 = d272.join(d5, Seq("_1"), "left_outer")
var d274 = d273.join(d5, Seq("_1"), "left_outer")
var d275 = d274.join(d5, Seq("_1"), "left_outer")
var d276 = d275.join(d5, Seq("_1"), "left_outer")
var d277 = d276.join(d5, Seq("_1"), "left_outer")
var d278 = d277.join(d5, Seq("_1"), "left_outer")
var d279 = d278.join(d5, Seq("_1"), "left_outer")
var d280 = d279.join(d5, Seq("_1"), "left_outer")
var d281 = d280.join(d5, Seq("_1"), "left_outer")
var d282 = d281.join(d5, Seq("_1"), "left_outer")
var d283 = d282.join(d5, Seq("_1"), "left_outer")
var d284 = d283.join(d5, Seq("_1"), "left_outer")
var d285 = d284.join(d5, Seq("_1"), "left_outer")
var d286 = d285.join(d5, Seq("_1"), "left_outer")
var d287 = d286.join(d5, Seq("_1"), "left_outer")
var d288 = d287.join(d5, Seq("_1"), "left_outer")
var d289 = d288.join(d5, Seq("_1"), "left_outer")
var d290 = d289.join(d5, Seq("_1"), "left_outer")
var d291 = d290.join(d5, Seq("_1"), "left_outer")
var d292 = d291.join(d5, Seq("_1"), "left_outer")
var d293 = d292.join(d5, Seq("_1"), "left_outer")
var d294 = d293.join(d5, Seq("_1"), "left_outer")
var d295 = d294.join(d5, Seq("_1"), "left_outer")
var d296 = d295.join(d5, Seq("_1"), "left_outer")
var d297 = d296.join(d5, Seq("_1"), "left_outer")
var d298 = d297.join(d5, Seq("_1"), "left_outer")
var d299 = d298.join(d5, Seq("_1"), "left_outer")
var d300 = d299.join(d5, Seq("_1"), "left_outer")
not sure if this is a code design issue or configuration issue, I have experimented with numerous configurations including the following :
spark-shell
--master yarn-client
--executor-memory 8g
--executor-cores 2
--driver-java-options
"-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Xms1g -XX:MaxNewSize=2g"
--conf spark.network.timeout=10000000
--conf spark.executor.heartbeatInterval=10000000
--conf spark.dynamicAllocation.enabled=true -i <program name>
you can do the join with a foldleft instead of doing it by manually. You don't need to use var here. Just use val. It's better to persist after each join in cae of failure if the join is heavy.
val joinFunction: (DataFrame, DataFrame) => DataFrame = (accumulator, tableToJoin) => {
val result = accumulator.join(tableToJoin, Seq("_1"), "left_outer").persist()
accumulator.unpersist()
Logger.getLogger(getClass).info(s"Joined table count: ${result.count()}")
result
}
val sqlCtx = sqlContext
import sqlCtx.implicits._
val data = Seq((1,2))
val data2 = Seq((1,3))
val rdd1 = sc.parallelize(data)
val rdd2 = sc.parallelize(data2)
val d2 = rdd1.toDF()
val d5 = rdd2.toDF()
val d3: DataFrame = d2.join(d5, Seq("_1"), "left_outer")
val listDF = Range(1, 10).map(x => d3).toList
val result: DataFrame = listDF.reduce{joinFunction}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With