Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Multiple Joins Out Of memory Error

I have a need to perform over 300 joins in apache spark. I have reduced my problem to the following examples which still produce out of memory errors in both Spark 1.6 and 2.0 platforms:

Is there a better way to perform excessive joins? I have tried novel combinations of union/ groupByKey which work but do not preserve the columns which is mandatory to understand the data . I have also tried broadcast joins, cogroup, persist and many more variations.

var data = Seq((1,2))
var data2 = Seq((1,3))
var rdd1 = sc.parallelize(data)
var rdd2 = sc.parallelize(data2)
var d2 = rdd1.toDF()
var d5 = rdd2.toDF()
var  d3   = d2.join(d5, Seq("_1"), "left_outer")
var  d4   = d3.join(d5, Seq("_1"), "left_outer")
var  d5a   = d4.join(d5, Seq("_1"), "left_outer")
var  d6   = d5a.join(d5, Seq("_1"), "left_outer")
var  d7   = d6.join(d5, Seq("_1"), "left_outer")
var  d8   = d7.join(d5, Seq("_1"), "left_outer")
var  d9   = d8.join(d5, Seq("_1"), "left_outer")
var  d10   = d9.join(d5, Seq("_1"), "left_outer")
var  d11   = d10.join(d5, Seq("_1"), "left_outer")
var  d12   = d11.join(d5, Seq("_1"), "left_outer")
var  d13   = d12.join(d5, Seq("_1"), "left_outer")
var  d14   = d13.join(d5, Seq("_1"), "left_outer")
var  d15   = d14.join(d5, Seq("_1"), "left_outer")
var  d16   = d15.join(d5, Seq("_1"), "left_outer")
var  d17   = d16.join(d5, Seq("_1"), "left_outer")
var  d18   = d17.join(d5, Seq("_1"), "left_outer")
var  d19   = d18.join(d5, Seq("_1"), "left_outer")
var  d20   = d19.join(d5, Seq("_1"), "left_outer")
var  d21   = d20.join(d5, Seq("_1"), "left_outer")
var  d22   = d21.join(d5, Seq("_1"), "left_outer")
var  d23   = d22.join(d5, Seq("_1"), "left_outer")
var  d24   = d23.join(d5, Seq("_1"), "left_outer")
var  d25   = d24.join(d5, Seq("_1"), "left_outer")
var  d26   = d25.join(d5, Seq("_1"), "left_outer")
var  d27   = d26.join(d5, Seq("_1"), "left_outer")
var  d28   = d27.join(d5, Seq("_1"), "left_outer")
var  d29   = d28.join(d5, Seq("_1"), "left_outer")
var  d30   = d29.join(d5, Seq("_1"), "left_outer")
var  d31   = d30.join(d5, Seq("_1"), "left_outer")
var  d32   = d31.join(d5, Seq("_1"), "left_outer")
var  d33   = d32.join(d5, Seq("_1"), "left_outer")
var  d34   = d33.join(d5, Seq("_1"), "left_outer")
var  d35   = d34.join(d5, Seq("_1"), "left_outer")
var  d36   = d35.join(d5, Seq("_1"), "left_outer")
var  d37   = d36.join(d5, Seq("_1"), "left_outer")
var  d38   = d37.join(d5, Seq("_1"), "left_outer")
var  d39   = d38.join(d5, Seq("_1"), "left_outer")
var  d40   = d39.join(d5, Seq("_1"), "left_outer")
var  d41   = d40.join(d5, Seq("_1"), "left_outer")
var  d42   = d41.join(d5, Seq("_1"), "left_outer")
var  d43   = d42.join(d5, Seq("_1"), "left_outer")
var  d44   = d43.join(d5, Seq("_1"), "left_outer")
var  d45   = d44.join(d5, Seq("_1"), "left_outer")
var  d46   = d45.join(d5, Seq("_1"), "left_outer")
var  d47   = d46.join(d5, Seq("_1"), "left_outer")
var  d48   = d47.join(d5, Seq("_1"), "left_outer")
var  d49   = d48.join(d5, Seq("_1"), "left_outer")
var  d50   = d49.join(d5, Seq("_1"), "left_outer")
var  d51   = d50.join(d5, Seq("_1"), "left_outer")
var  d52   = d51.join(d5, Seq("_1"), "left_outer")
var  d53   = d52.join(d5, Seq("_1"), "left_outer")
var  d54   = d53.join(d5, Seq("_1"), "left_outer")
var  d55   = d54.join(d5, Seq("_1"), "left_outer")
var  d56   = d55.join(d5, Seq("_1"), "left_outer")
var  d57   = d56.join(d5, Seq("_1"), "left_outer")
var  d58   = d57.join(d5, Seq("_1"), "left_outer")
var  d59   = d58.join(d5, Seq("_1"), "left_outer")
var  d60   = d59.join(d5, Seq("_1"), "left_outer")
var  d61   = d60.join(d5, Seq("_1"), "left_outer")
var  d62   = d61.join(d5, Seq("_1"), "left_outer")
var  d63   = d62.join(d5, Seq("_1"), "left_outer")
var  d64   = d63.join(d5, Seq("_1"), "left_outer")
var  d65   = d64.join(d5, Seq("_1"), "left_outer")
var  d66   = d65.join(d5, Seq("_1"), "left_outer")
var  d67   = d66.join(d5, Seq("_1"), "left_outer")
var  d68   = d67.join(d5, Seq("_1"), "left_outer")
var  d69   = d68.join(d5, Seq("_1"), "left_outer")
var  d70   = d69.join(d5, Seq("_1"), "left_outer")
var  d71   = d70.join(d5, Seq("_1"), "left_outer")
var  d72   = d71.join(d5, Seq("_1"), "left_outer")
var  d73   = d72.join(d5, Seq("_1"), "left_outer")
var  d74   = d73.join(d5, Seq("_1"), "left_outer")
var  d75   = d74.join(d5, Seq("_1"), "left_outer")
var  d76   = d75.join(d5, Seq("_1"), "left_outer")
var  d77   = d76.join(d5, Seq("_1"), "left_outer")
var  d78   = d77.join(d5, Seq("_1"), "left_outer")
var  d79   = d78.join(d5, Seq("_1"), "left_outer")
var  d80   = d79.join(d5, Seq("_1"), "left_outer")
var  d81   = d80.join(d5, Seq("_1"), "left_outer")
var  d82   = d81.join(d5, Seq("_1"), "left_outer")
var  d83   = d82.join(d5, Seq("_1"), "left_outer")
var  d84   = d83.join(d5, Seq("_1"), "left_outer")
var  d85   = d84.join(d5, Seq("_1"), "left_outer")
var  d86   = d85.join(d5, Seq("_1"), "left_outer")
var  d87   = d86.join(d5, Seq("_1"), "left_outer")
var  d88   = d87.join(d5, Seq("_1"), "left_outer")
var  d89   = d88.join(d5, Seq("_1"), "left_outer")
var  d90   = d89.join(d5, Seq("_1"), "left_outer")
var  d91   = d90.join(d5, Seq("_1"), "left_outer")
var  d92   = d91.join(d5, Seq("_1"), "left_outer")
var  d93   = d92.join(d5, Seq("_1"), "left_outer")
var  d94   = d93.join(d5, Seq("_1"), "left_outer")
var  d95   = d94.join(d5, Seq("_1"), "left_outer")
var  d96   = d95.join(d5, Seq("_1"), "left_outer")
var  d97   = d96.join(d5, Seq("_1"), "left_outer")
var  d98   = d97.join(d5, Seq("_1"), "left_outer")
var  d99   = d98.join(d5, Seq("_1"), "left_outer")
var  d100   = d99.join(d5, Seq("_1"), "left_outer")
var  d101   = d100.join(d5, Seq("_1"), "left_outer")
var  d102   = d101.join(d5, Seq("_1"), "left_outer")
var  d103   = d102.join(d5, Seq("_1"), "left_outer")
var  d104   = d103.join(d5, Seq("_1"), "left_outer")
var  d105   = d104.join(d5, Seq("_1"), "left_outer")
var  d106   = d105.join(d5, Seq("_1"), "left_outer")
var  d107   = d106.join(d5, Seq("_1"), "left_outer")
var  d108   = d107.join(d5, Seq("_1"), "left_outer")
var  d109   = d108.join(d5, Seq("_1"), "left_outer")
var  d110   = d109.join(d5, Seq("_1"), "left_outer")
var  d111   = d110.join(d5, Seq("_1"), "left_outer")
var  d112   = d111.join(d5, Seq("_1"), "left_outer")
var  d113   = d112.join(d5, Seq("_1"), "left_outer")
var  d114   = d113.join(d5, Seq("_1"), "left_outer")
var  d115   = d114.join(d5, Seq("_1"), "left_outer")
var  d116   = d115.join(d5, Seq("_1"), "left_outer")
var  d117   = d116.join(d5, Seq("_1"), "left_outer")
var  d118   = d117.join(d5, Seq("_1"), "left_outer")
var  d119   = d118.join(d5, Seq("_1"), "left_outer")
var  d120   = d119.join(d5, Seq("_1"), "left_outer")
var  d121   = d120.join(d5, Seq("_1"), "left_outer")
var  d122   = d121.join(d5, Seq("_1"), "left_outer")
var  d123   = d122.join(d5, Seq("_1"), "left_outer")
var  d124   = d123.join(d5, Seq("_1"), "left_outer")
var  d125   = d124.join(d5, Seq("_1"), "left_outer")
var  d126   = d125.join(d5, Seq("_1"), "left_outer")
var  d127   = d126.join(d5, Seq("_1"), "left_outer")
var  d128   = d127.join(d5, Seq("_1"), "left_outer")
var  d129   = d128.join(d5, Seq("_1"), "left_outer")
var  d130   = d129.join(d5, Seq("_1"), "left_outer")
var  d131   = d130.join(d5, Seq("_1"), "left_outer")
var  d132   = d131.join(d5, Seq("_1"), "left_outer")
var  d133   = d132.join(d5, Seq("_1"), "left_outer")
var  d134   = d133.join(d5, Seq("_1"), "left_outer")
var  d135   = d134.join(d5, Seq("_1"), "left_outer")
var  d136   = d135.join(d5, Seq("_1"), "left_outer")
var  d137   = d136.join(d5, Seq("_1"), "left_outer")
var  d138   = d137.join(d5, Seq("_1"), "left_outer")
var  d139   = d138.join(d5, Seq("_1"), "left_outer")
var  d140   = d139.join(d5, Seq("_1"), "left_outer")
var  d141   = d140.join(d5, Seq("_1"), "left_outer")
var  d142   = d141.join(d5, Seq("_1"), "left_outer")
var  d143   = d142.join(d5, Seq("_1"), "left_outer")
var  d144   = d143.join(d5, Seq("_1"), "left_outer")
var  d145   = d144.join(d5, Seq("_1"), "left_outer")
var  d146   = d145.join(d5, Seq("_1"), "left_outer")
var  d147   = d146.join(d5, Seq("_1"), "left_outer")
var  d148   = d147.join(d5, Seq("_1"), "left_outer")
var  d149   = d148.join(d5, Seq("_1"), "left_outer")
var  d150   = d149.join(d5, Seq("_1"), "left_outer")
var  d151   = d150.join(d5, Seq("_1"), "left_outer")
var  d152   = d151.join(d5, Seq("_1"), "left_outer")
var  d153   = d152.join(d5, Seq("_1"), "left_outer")
var  d154   = d153.join(d5, Seq("_1"), "left_outer")
var  d155   = d154.join(d5, Seq("_1"), "left_outer")
var  d156   = d155.join(d5, Seq("_1"), "left_outer")
var  d157   = d156.join(d5, Seq("_1"), "left_outer")
var  d158   = d157.join(d5, Seq("_1"), "left_outer")
var  d159   = d158.join(d5, Seq("_1"), "left_outer")
var  d160   = d159.join(d5, Seq("_1"), "left_outer")
var  d161   = d160.join(d5, Seq("_1"), "left_outer")
var  d162   = d161.join(d5, Seq("_1"), "left_outer")
var  d163   = d162.join(d5, Seq("_1"), "left_outer")
var  d164   = d163.join(d5, Seq("_1"), "left_outer")
var  d165   = d164.join(d5, Seq("_1"), "left_outer")
var  d166   = d165.join(d5, Seq("_1"), "left_outer")
var  d167   = d166.join(d5, Seq("_1"), "left_outer")
var  d168   = d167.join(d5, Seq("_1"), "left_outer")
var  d169   = d168.join(d5, Seq("_1"), "left_outer")
var  d170   = d169.join(d5, Seq("_1"), "left_outer")
var  d171   = d170.join(d5, Seq("_1"), "left_outer")
var  d172   = d171.join(d5, Seq("_1"), "left_outer") 
var  d173   = d172.join(d5, Seq("_1"), "left_outer")
var  d174   = d173.join(d5, Seq("_1"), "left_outer")
var  d175   = d174.join(d5, Seq("_1"), "left_outer")
var  d176   = d175.join(d5, Seq("_1"), "left_outer")
var  d177   = d176.join(d5, Seq("_1"), "left_outer")
var  d178   = d177.join(d5, Seq("_1"), "left_outer")
var  d179   = d178.join(d5, Seq("_1"), "left_outer")
var  d180   = d179.join(d5, Seq("_1"), "left_outer")
var  d181   = d180.join(d5, Seq("_1"), "left_outer")
var  d182   = d181.join(d5, Seq("_1"), "left_outer")
var  d183   = d182.join(d5, Seq("_1"), "left_outer")
var  d184   = d183.join(d5, Seq("_1"), "left_outer")
var  d185   = d184.join(d5, Seq("_1"), "left_outer")
var  d186   = d185.join(d5, Seq("_1"), "left_outer")
var  d187   = d186.join(d5, Seq("_1"), "left_outer")
var  d188   = d187.join(d5, Seq("_1"), "left_outer")
var  d189   = d188.join(d5, Seq("_1"), "left_outer")
var  d190   = d189.join(d5, Seq("_1"), "left_outer")
var  d191   = d190.join(d5, Seq("_1"), "left_outer")
var  d192   = d191.join(d5, Seq("_1"), "left_outer")
var  d193   = d192.join(d5, Seq("_1"), "left_outer")
var  d194   = d193.join(d5, Seq("_1"), "left_outer")
var  d195   = d194.join(d5, Seq("_1"), "left_outer")
var  d196   = d195.join(d5, Seq("_1"), "left_outer")
var  d197   = d196.join(d5, Seq("_1"), "left_outer")
var  d198   = d197.join(d5, Seq("_1"), "left_outer")
var  d199   = d198.join(d5, Seq("_1"), "left_outer")
var  d200   = d199.join(d5, Seq("_1"), "left_outer")
var  d201   = d200.join(d5, Seq("_1"), "left_outer")
var  d202   = d201.join(d5, Seq("_1"), "left_outer")
var  d203   = d202.join(d5, Seq("_1"), "left_outer")
var  d204   = d203.join(d5, Seq("_1"), "left_outer")
var  d205   = d204.join(d5, Seq("_1"), "left_outer")
var  d206   = d205.join(d5, Seq("_1"), "left_outer")
var  d207   = d206.join(d5, Seq("_1"), "left_outer")
var  d208   = d207.join(d5, Seq("_1"), "left_outer")
var  d209   = d208.join(d5, Seq("_1"), "left_outer")
var  d210   = d209.join(d5, Seq("_1"), "left_outer")
var  d211   = d210.join(d5, Seq("_1"), "left_outer")
var  d212   = d211.join(d5, Seq("_1"), "left_outer")
var  d213   = d212.join(d5, Seq("_1"), "left_outer")
var  d214   = d213.join(d5, Seq("_1"), "left_outer")
var  d215   = d214.join(d5, Seq("_1"), "left_outer")  
var  d216   = d215.join(d5, Seq("_1"), "left_outer")
var  d217   = d216.join(d5, Seq("_1"), "left_outer")  
var  d218   = d217.join(d5, Seq("_1"), "left_outer")
var  d219   = d218.join(d5, Seq("_1"), "left_outer")
var  d220   = d219.join(d5, Seq("_1"), "left_outer")
var  d221   = d220.join(d5, Seq("_1"), "left_outer")
var  d222   = d221.join(d5, Seq("_1"), "left_outer")
var  d223   = d222.join(d5, Seq("_1"), "left_outer")
var  d224   = d223.join(d5, Seq("_1"), "left_outer")
var  d225   = d224.join(d5, Seq("_1"), "left_outer")
var  d226   = d225.join(d5, Seq("_1"), "left_outer")
var  d227   = d226.join(d5, Seq("_1"), "left_outer")
var  d228   = d227.join(d5, Seq("_1"), "left_outer")
var  d229   = d228.join(d5, Seq("_1"), "left_outer")
var  d230   = d229.join(d5, Seq("_1"), "left_outer")
var  d231   = d230.join(d5, Seq("_1"), "left_outer")
var  d232   = d231.join(d5, Seq("_1"), "left_outer")
var  d233   = d232.join(d5, Seq("_1"), "left_outer")
var  d234   = d233.join(d5, Seq("_1"), "left_outer")
var  d235   = d234.join(d5, Seq("_1"), "left_outer")
var  d236   = d235.join(d5, Seq("_1"), "left_outer")
var  d237   = d236.join(d5, Seq("_1"), "left_outer")
var  d238   = d237.join(d5, Seq("_1"), "left_outer")
var  d239   = d238.join(d5, Seq("_1"), "left_outer")
var  d240   = d239.join(d5, Seq("_1"), "left_outer")
var  d241   = d240.join(d5, Seq("_1"), "left_outer")
var  d242   = d241.join(d5, Seq("_1"), "left_outer")
var  d243   = d242.join(d5, Seq("_1"), "left_outer")
var  d244   = d243.join(d5, Seq("_1"), "left_outer")
var  d245   = d244.join(d5, Seq("_1"), "left_outer")
var  d246   = d245.join(d5, Seq("_1"), "left_outer")
var  d247   = d246.join(d5, Seq("_1"), "left_outer")
var  d248   = d247.join(d5, Seq("_1"), "left_outer")
var  d249   = d248.join(d5, Seq("_1"), "left_outer")
var  d250   = d249.join(d5, Seq("_1"), "left_outer")
var  d251   = d250.join(d5, Seq("_1"), "left_outer")
var  d252   = d251.join(d5, Seq("_1"), "left_outer")
var  d253   = d252.join(d5, Seq("_1"), "left_outer")
var  d254   = d253.join(d5, Seq("_1"), "left_outer")
var  d255   = d254.join(d5, Seq("_1"), "left_outer")
var  d256   = d255.join(d5, Seq("_1"), "left_outer")
var  d257   = d256.join(d5, Seq("_1"), "left_outer")
var  d258   = d257.join(d5, Seq("_1"), "left_outer")
var  d259   = d258.join(d5, Seq("_1"), "left_outer")
var  d260   = d259.join(d5, Seq("_1"), "left_outer")
var  d261   = d260.join(d5, Seq("_1"), "left_outer")
var  d262   = d261.join(d5, Seq("_1"), "left_outer")
var  d263   = d262.join(d5, Seq("_1"), "left_outer")
var  d264   = d263.join(d5, Seq("_1"), "left_outer")
var  d265   = d264.join(d5, Seq("_1"), "left_outer")
var  d266   = d265.join(d5, Seq("_1"), "left_outer")
var  d267   = d266.join(d5, Seq("_1"), "left_outer")
var  d268   = d267.join(d5, Seq("_1"), "left_outer")
var  d269   = d268.join(d5, Seq("_1"), "left_outer")
var  d270   = d269.join(d5, Seq("_1"), "left_outer")
var  d271   = d270.join(d5, Seq("_1"), "left_outer")
var  d272   = d271.join(d5, Seq("_1"), "left_outer")
var  d273   = d272.join(d5, Seq("_1"), "left_outer")
var  d274   = d273.join(d5, Seq("_1"), "left_outer")
var  d275   = d274.join(d5, Seq("_1"), "left_outer")
var  d276   = d275.join(d5, Seq("_1"), "left_outer")
var  d277   = d276.join(d5, Seq("_1"), "left_outer")
var  d278   = d277.join(d5, Seq("_1"), "left_outer")
var  d279   = d278.join(d5, Seq("_1"), "left_outer")
var  d280   = d279.join(d5, Seq("_1"), "left_outer")
var  d281   = d280.join(d5, Seq("_1"), "left_outer")
var  d282   = d281.join(d5, Seq("_1"), "left_outer")
var  d283   = d282.join(d5, Seq("_1"), "left_outer")
var  d284   = d283.join(d5, Seq("_1"), "left_outer")
var  d285   = d284.join(d5, Seq("_1"), "left_outer")
var  d286   = d285.join(d5, Seq("_1"), "left_outer")
var  d287   = d286.join(d5, Seq("_1"), "left_outer")
var  d288   = d287.join(d5, Seq("_1"), "left_outer")
var  d289   = d288.join(d5, Seq("_1"), "left_outer")
var  d290   = d289.join(d5, Seq("_1"), "left_outer")
var  d291   = d290.join(d5, Seq("_1"), "left_outer")
var  d292   = d291.join(d5, Seq("_1"), "left_outer")
var  d293   = d292.join(d5, Seq("_1"), "left_outer")
var  d294   = d293.join(d5, Seq("_1"), "left_outer")
var  d295   = d294.join(d5, Seq("_1"), "left_outer")
var  d296   = d295.join(d5, Seq("_1"), "left_outer")
var  d297   = d296.join(d5, Seq("_1"), "left_outer")
var  d298   = d297.join(d5, Seq("_1"), "left_outer")
var  d299   = d298.join(d5, Seq("_1"), "left_outer")
var  d300   = d299.join(d5, Seq("_1"), "left_outer")

not sure if this is a code design issue or configuration issue, I have experimented with numerous configurations including the following :

  spark-shell 
  --master yarn-client  
  --executor-memory 8g  
  --executor-cores 2 
  --driver-java-options 
  "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Xms1g  -XX:MaxNewSize=2g"  
  --conf spark.network.timeout=10000000  
  --conf spark.executor.heartbeatInterval=10000000 
  --conf spark.dynamicAllocation.enabled=true -i <program name> 
like image 201
John Daniels Avatar asked Mar 08 '23 06:03

John Daniels


1 Answers

you can do the join with a foldleft instead of doing it by manually. You don't need to use var here. Just use val. It's better to persist after each join in cae of failure if the join is heavy.

  val joinFunction: (DataFrame, DataFrame) => DataFrame = (accumulator, tableToJoin) => {
  val result = accumulator.join(tableToJoin, Seq("_1"), "left_outer").persist()
  accumulator.unpersist()
  Logger.getLogger(getClass).info(s"Joined table count: ${result.count()}")
  result
}

val sqlCtx = sqlContext
import sqlCtx.implicits._
val data = Seq((1,2))
val data2 = Seq((1,3))
val rdd1 = sc.parallelize(data)
val rdd2 = sc.parallelize(data2)
val d2 = rdd1.toDF()
val d5 = rdd2.toDF()
val d3: DataFrame = d2.join(d5, Seq("_1"), "left_outer")
val listDF  = Range(1, 10).map(x => d3).toList
val result: DataFrame =  listDF.reduce{joinFunction}
like image 94
firsni Avatar answered May 13 '23 20:05

firsni