I am encountering error while trying to read a file from hdfs into Spark. The file README.md is present in hdfs
spark@osboxes hadoop]$ hdfs dfs -ls README.md
16/02/26 00:29:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-rw-r--r-- 1 spark supergroup 4811 2016-02-25 23:38 README.md
In Spark shell, I gave
scala> val readme = sc.textFile("hdfs://localhost:9000/README.md")
readme: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:27
scala> readme.count
16/02/26 00:25:26 DEBUG BlockManager: Getting local block broadcast_4
16/02/26 00:25:26 DEBUG BlockManager: Level for block broadcast_4 is StorageLevel(true, true, false, true, 1)
16/02/26 00:25:26 DEBUG BlockManager: Getting block broadcast_4 from memory
16/02/26 00:25:26 DEBUG HadoopRDD: Creating new JobConf and caching it for later re-use
16/02/26 00:25:26 DEBUG Client: The ping interval is 60000 ms.
16/02/26 00:25:26 DEBUG Client: Connecting to localhost/127.0.0.1:9000
16/02/26 00:25:26 DEBUG Client: IPC Client (648679508) connection to localhost/127.0.0.1:9000 from spark: starting, having connections 1
16/02/26 00:25:26 DEBUG Client: IPC Client (648679508) connection to localhost/127.0.0.1:9000 from spark sending #4
16/02/26 00:25:26 DEBUG Client: IPC Client (648679508) connection to localhost/127.0.0.1:9000 from spark got value #4
16/02/26 00:25:26 DEBUG ProtobufRpcEngine: Call: getFileInfo took 6ms
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/README.md
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at org.apache.spark.rdd.RDD.count(RDD.scala:1143)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
at $iwC$$iwC$$iwC.<init>(<console>:43)
at $iwC$$iwC.<init>(<console>:45)
at $iwC.<init>(<console>:47)
at <init>(<console>:49)
at .<init>(<console>:53)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
scala> 16/02/26 00:25:36 DEBUG Client: IPC Client (648679508) connection to localhost/127.0.0.1:9000 from spark: closed
16/02/26 00:25:36 DEBUG Client: IPC Client (648679508) connection to localhost/127.0.0.1:9000 from spark: stopped, remaining connections 0
In core-site.xml, I have below entry:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
and hdfs-site.xml has below detail:
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
Am I missing something here ? My OS is CentOS Linux release 7.2.1511 (Core), Hadoop is 2.7.2, and Spark is 1.6.0-bin-hadoop2.6
This happens due to internal mapping between directories.
First go to the directory where your file (README.md) is kept.
run command : df -k .
.
You will get the actual mount point of the directory.
For example: /xyz
Now, try finding your file(README.md) within this mount point.
For example: /xyz/home/omi/myDir/README.md
Use this path in your code.
val readme = sc.textfile("/xyz/home/omi/myDir/README.md");
By default, hdfs dfs -ls
will ls your user home folder on hdfs, not the root of hdfs. You can easily verify this by comparing the output of hdfs dfs -ls
and hdfs dfs -ls /
. When you use the full hdfs URL, you're using an absolute path, and it's not finding your file (because it's located in your user home folder). When you use a relative path, the problem is gone :)
You may want to know that hdfs dfs -put
will also use your hdfs home folder as the default destination for the file, not the root of hdfs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With