Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to name file when saveAsTextFile in spark?

When saving as a textfile in spark version 1.5.1 I use: rdd.saveAsTextFile('<drectory>').

But if I want to find the file in that direcotry, how do I name it what I want?

Currently, I think it is named part-00000, which must be some default. How do I give it a name?

like image 545
makansij Avatar asked Nov 11 '15 21:11

makansij


People also ask

How do I save an RDD file?

You can save the RDD using saveAsObjectFile and saveAsTextFile method. Whereas you can read the RDD using textFile and sequenceFile function from SparkContext.

How do I write to HDFS in spark?

Write & Read CSV & TSV file from HDFS read. csv("path") , replace the path to HDFS. And Write a CSV file to HDFS using below syntax. Use the write() method of the Spark DataFrameWriter object to write Spark DataFrame to a CSV file.


2 Answers

The correct answer to this question is that saveAsTextFile does not allow you to name the actual file.

The reason for this is that the data is partitioned and within the path given as a parameter to the call to saveAsTextFile(...), it will treat that as a directory and then write one file per partition.

You can call rdd.coalesce(1).saveAsTextFile('/some/path/somewhere') and it will create /some/path/somewhere/part-0000.txt.

If you need more control than this, you will need to do an actual file operation on your end after you do a rdd.collect().

Notice, this will pull all data into one executor so you may run into memory issues. That's the risk you take.

like image 102
nod Avatar answered Oct 22 '22 12:10

nod


It's not possible to name the file as @nod said. However, it's possible to rename the file right afterward. An example using PySpark:

sc._jsc.hadoopConfiguration().set(
    "mapred.output.committer.class",
    "org.apache.hadoop.mapred.FileOutputCommitter")
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
fs = FileSystem.get(URI("s3://{bucket_name}"), sc._jsc.hadoopConfiguration())
file_path = "s3://{bucket_name}/processed/source={source_name}/year={partition_year}/week={partition_week}/"
# remove data already stored if necessary
fs.delete(Path(file_path))

df.saveAsTextFile(file_path, compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")

# rename created file
created_file_path = fs.globStatus(Path(file_path + "part*.gz"))[0].getPath()
fs.rename(
    created_file_path,
    Path(file_path + "{desired_name}.jl.gz"))
like image 44
Juan Riaza Avatar answered Oct 22 '22 12:10

Juan Riaza