Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does DataFrame.saveAsTable("df") save table to different HDFS host?

I have configured Hive (1.13.1) with Spark (1.4.0) and I am able to access all the Databases and Table from hive and my warehouse directory is hdfs://192.168.1.17:8020/user/hive/warehouse

But when, I am trying to save a Dataframe through Spark-Shell (using master) into Hive using df.saveAsTable("df") function, I got this error.

15/07/03 14:48:59 INFO audit: ugi=user  ip=unknown-ip-addr  cmd=get_database: default   
15/07/03 14:48:59 INFO HiveMetaStore: 0: get_table : db=default tbl=df
15/07/03 14:48:59 INFO audit: ugi=user  ip=unknown-ip-addr  cmd=get_table : db=default tbl=df   
java.net.ConnectException: Call From bdiuser-Vostro-3800/127.0.1.1 to 192.168.1.19:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:783)
    at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:730)
    at org.apache.hadoop.ipc.Client.call(Client.java:1414)
    at org.apache.hadoop.ipc.Client.call(Client.java:1363)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
    at com.sun.proxy.$Proxy14.getFileInfo(Unknown Source)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
    at com.sun.proxy.$Proxy14.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:699)
    at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1762)
    at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1124)
    at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1120)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1120)
    at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
    at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:78)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939)
    at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:332)
    at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:239)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:211)
    at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1517)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:22)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
    at $iwC$$iwC$$iwC.<init>(<console>:35)
    at $iwC$$iwC.<init>(<console>:37)
    at $iwC.<init>(<console>:39)
    at <init>(<console>:41)
    at .<init>(<console>:45)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
    at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
    at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:529)
    at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:493)
    at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:604)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:699)
    at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:367)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1462)
    at org.apache.hadoop.ipc.Client.call(Client.java:1381)
    ... 86 more

When, I go through this error, I found that program tried different host for HDFS connection to save table.

And i also tried with different worker's spark-shell, I got same error.

like image 870
Kaushal Avatar asked Jul 03 '15 14:07

Kaushal


1 Answers

Please find the example below:

val options = Map("path" -> hiveTablePath)
result.write.format("orc").partitionBy("partitiondate").options(options).mode(SaveMode.Append).saveAsTable(hiveTable)

I have explained this a little bit more in my blog.

like image 165
Deepika Khera Avatar answered Sep 30 '22 03:09

Deepika Khera