Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does join fail with "java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]"?

I am using Spark 1.5.

I have two dataframes of the form:

scala> libriFirstTable50Plus3DF res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int]  scala> linkPersonItemLessThan500DF res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int] 

libriFirstTable50Plus3DF has 766,151 records while linkPersonItemLessThan500DF has 26,694,353 records. Note that I am using repartition(number) on linkPersonItemLessThan500DF since I intend to join these two later on. I am following up the above code with:

val userTripletRankDF = linkPersonItemLessThan500DF      .join(libriFirstTable50Plus3DF, Seq("family_id"))      .take(20)      .foreach(println(_)) 

for which I am getting this output:

16/12/13 15:07:10 INFO scheduler.TaskSetManager: Finished task 172.0 in stage 3.0 (TID 473) in 520 ms on mlhdd01.mondadori.it (199/200) java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:        at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:63) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190)  at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)  at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)  at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)  at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)  at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)  at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315)  at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378)  at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)  at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402)  at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)  at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371)  at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)  at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:77)  at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:79)  at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:81)  at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:83)  at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:85)  at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:87)  at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89)  at $iwC$$iwC$$iwC$$iwC.<init>(<console>:91)  at $iwC$$iwC$$iwC.<init>(<console>:93)  at $iwC$$iwC.<init>(<console>:95)  at $iwC.<init>(<console>:97)  at <init>(<console>:99)  at .<init>(<console>:103)  at .<clinit>(<console>)  at .<init>(<console>:7)  at .<clinit>(<console>)  at $print(<console>)  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)  at java.lang.reflect.Method.invoke(Method.java:606)  at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)  at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)  at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)  at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)  at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)  at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)  at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)  at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)  at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)  at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)  at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)  at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)  at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)  at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)  at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)  at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)  at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)  at org.apache.spark.repl.Main$.main(Main.scala:31)  at org.apache.spark.repl.Main.main(Main.scala)  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)  at java.lang.reflect.Method.invoke(Method.java:606)  at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)  at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

and I don't understand what is the issue. Is it as simple as increasing the waiting time? Is the join too intensive? Do I need more memory? Is the shufffling intensive? Can anyone help?

like image 789
Christos Hadjinikolis Avatar asked Dec 13 '16 14:12

Christos Hadjinikolis


People also ask

What is Java Util Concurrent TimeoutException?

java.util.concurrent.TimeoutException. Exception thrown when a blocking operation times out. Blocking operations for which a timeout is specified need a means to indicate that the timeout has occurred.

What is spark SQL autoBroadcastJoinThreshold?

spark.sql.autoBroadcastJoinThreshold. 10485760 (10 MB) Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled.


1 Answers

This happens because Spark tries to do Broadcast Hash Join and one of the DataFrames is very large, so sending it consumes much time.

You can:

  1. Set higher spark.sql.broadcastTimeout to increase timeout - spark.conf.set("spark.sql.broadcastTimeout", newValueForExample36000)
  2. persist() both DataFrames, then Spark will use Shuffle Join - reference from here

PySpark

In PySpark, you can set the config when you build the spark context in the following manner:

spark = SparkSession   .builder   .appName("Your App")   .config("spark.sql.broadcastTimeout", "36000")   .getOrCreate() 
like image 158
T. Gawęda Avatar answered Oct 05 '22 19:10

T. Gawęda