Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Write RDD as textfile using Apache Spark

I am exploring Spark for batch processing. I am running the spark on my local machine using standalone mode.

I am trying to convert the Spark RDD as single file [final output] using saveTextFile() method, but its not working.

For example if i have more than one partition how we can get one single file as final output.

Update:

I tried the below approaches, but i am getting null pointer exception.

person.coalesce(1).toJavaRDD().saveAsTextFile("C://Java_All//output");
person.repartition(1).toJavaRDD().saveAsTextFile("C://Java_All//output");

The exception is :

    15/06/23 18:25:27 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
15/06/23 18:25:27 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
15/06/23 18:25:27 INFO deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
15/06/23 18:25:27 INFO deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
15/06/23 18:25:27 INFO deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
15/06/23 18:25:27 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.NullPointerException
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
    at org.apache.hadoop.util.Shell.run(Shell.java:379)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
    at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
15/06/23 18:25:27 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
    at org.apache.hadoop.util.Shell.run(Shell.java:379)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
    at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

15/06/23 18:25:27 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
15/06/23 18:25:27 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
15/06/23 18:25:27 INFO TaskSchedulerImpl: Cancelling stage 1
15/06/23 18:25:27 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at TestSpark.java:40) failed in 0.249 s
15/06/23 18:25:28 INFO DAGScheduler: Job 0 failed: saveAsTextFile at TestSpark.java:40, took 0.952286 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
    at org.apache.hadoop.util.Shell.run(Shell.java:379)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
    at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/06/23 18:25:28 INFO SparkContext: Invoking stop() from shutdown hook
15/06/23 18:25:28 INFO SparkUI: Stopped Spark web UI at http://10.37.145.179:4040
15/06/23 18:25:28 INFO DAGScheduler: Stopping DAGScheduler
15/06/23 18:25:28 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/06/23 18:25:28 INFO Utils: path = C:\Users\crh537\AppData\Local\Temp\spark-a52371d8-ae6a-4567-b759-0a6c66c1908c\blockmgr-4d17a5b4-c8f8-4408-af07-0e88239794e8, already present as root for deletion.
15/06/23 18:25:28 INFO MemoryStore: MemoryStore cleared
15/06/23 18:25:28 INFO BlockManager: BlockManager stopped
15/06/23 18:25:28 INFO BlockManagerMaster: BlockManagerMaster stopped
15/06/23 18:25:28 INFO SparkContext: Successfully stopped SparkContext
15/06/23 18:25:28 INFO Utils: Shutdown hook called

Regards, Shankar

like image 468
Shankar Avatar asked Jun 23 '15 04:06

Shankar


People also ask

How to read text file into RDD in spark?

1. Spark read text file into RDD We can read a single text file, multiple files and all files from a directory into Spark RDD by using below two functions that are provided in SparkContext class.

How do I create a RDD from a text file?

Text file RDDs can be created using SparkContext ’s textFile method. This method takes a URI for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI) and reads it as a collection of lines. Here is an example invocation:

How do I create a text file in spark?

Apache spark does support sequence files, textfiles, and any other Hadoop input format. We can create textfile RDDs by sparkcontext’s textfile method. This method uses the URL for the file (either a local path on the machine or database or a hdfs://, s3n://, etc URL). It also reads whole as a collection of lines.

What is RDD in Apache Spark?

Spark RDD are core abstraction of apache spark. RDD refers to Resilient Distributed Datasets. Generally, we consider it as a technological arm of apache spark, they are immutable in nature. It supports self-recovery, i.e. fault tolerance or resilient property of RDDs.


1 Answers

You can use coalesce method to save into a single file. This way your code will look like this:

val myFile = sc.textFile("file.txt")
val finalRdd = doStuff(myFile)
finalRdd.coalesce(1).saveAsTextFile("newfile")

There is also another method repartition to do the same thing, however it will cause a shuffle which is may be very expensive, while coalesce will try to avoid a shuffle.

like image 103
Maksud Avatar answered Oct 17 '22 09:10

Maksud