I am trying to read a file on an remote machine in Apache Spark (the Scala version) using ftp. Currently, I have followed an example in the Learning Spark repo of Databricks on GitHub. Using curl, I am able to download the file, so the path I uses exists.
Below is a snippet of the code I try to execute:
var file = sc.textFile("ftp://user:pwd/192.168.1.5/brecht-d-m/map/input.nt")
var fileDF = file.toDF()
fileDF.write.parquet("out")
After trying to execute a count on the dataframe, I get following stacktrace (http://pastebin.com/YEq8c2Hf):
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#1L])
+- TungstenExchange SinglePartition, None
+- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#4L])
+- Project
+- Scan ExistingRDD[_1#0]
...
Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: ftp://user:[email protected]/brecht-d-m/map/input.nt
I would assume that the file would be unreachable, but this is in contradiction with that I am able to retrieve the file via curl:
curl ftp://user:[email protected]/brecht-d-m/map/input.nt
This will print out the specific file on my terminal. I do not see what I am doing wrong in the Scala code. Is there an error in the code snippet I gave above, or is that code totally wrong?
Thanks in advance, Brecht
Note:
Specifying the whole path (/home/brecht-d-m/map/input.nt) also does not work (as expected, since this also does not work in curl; "server denied you to change to the given directory"). Trying this in Spark, gives the IOException that seek is not supported (http://pastebin.com/b9EB9ru2).
Working locally (e.g. sc.textFile("/home/brecht-d-m/map/input.nt")) works perfectly.
File permissions for specific file is set to R+W for all users.
The file size (15MB) should not be a problem, and it should be able to handle much bigger files.
Software versions: Scala 2.11.7, Apache Spark 1.6.0, Java 1.8.0_74, Ubuntu 14.04.4
I was able to find a workaround. Via the codesnippet below:
import org.apache.spark.SparkFiles
val dataSource = "ftp://user:pwd/192.168.1.5/brecht-d-m/map/input.nt"
sc.addFile(dataSource)
var fileName = SparkFiles.get(dataSource.split("/").last)
var file = sc.textFile(fileName)
I am able to download a file over FTP (with the same URL as from the first code snippet). This workaround will first download the file (via addFile). Next, I retrieve the path to where the file was downloaded. Finally, I use that path to load that file into an RDD.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With