dataFrame.coalesce(1).write().save("path")
sometimes writes only _SUCCESS and ._SUCCESS.crc files without an expected *.csv.gz even on non-empty input DataFrame
file save code:
private static void writeCsvToDirectory(Dataset<Row> dataFrame, Path directory) {
dataFrame.coalesce(1)
.write()
.format("csv")
.option("header", "true")
.option("delimiter", "\t")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.mode(SaveMode.Overwrite)
.save("file:///" + directory);
}
file get code:
static Path getTemporaryCsvFile(Path directory) throws IOException {
String glob = "*.csv.gz";
try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory, glob)) {
return stream.iterator().next();
} catch (NoSuchElementException e) {
throw new RuntimeException(getNoSuchElementExceptionMessage(directory, glob), e);
}
}
file get error example:
java.lang.RuntimeException: directory /tmp/temp5889805853850415940 does not contain a file with glob *.csv.gz. Directory listing:
/tmp/temp5889805853850415940/_SUCCESS,
/tmp/temp5889805853850415940/._SUCCESS.crc
I rely on this expectation, can someone explain me why it work this way?
In Spark, you can save (write/extract) a DataFrame to a CSV file on disk by using dataframeObj. write. csv("path") , using this you can also write DataFrame to AWS S3, Azure Blob, HDFS, or any Spark supported file systems.
Output file should (must by logic) contain at least the header line and some data lines. But he does not exist at all
This comment was a bit misleading. According to the code on Github, this will happen only if the Dataframe is empty, and won't produce SUCCESS
files. Considering that those files are present - Dataframe is not empty and the writeCsvToDirectory
from your code is triggered.
I have a couple of questions:
SUCCESS
file gets updated? My two main suspects are:
coalesce(1)
- if you have a lot of data, this might failSaveMode.Overwrite
- I have a feeling that those SUCCESS files are in that folder from previous runsIt is depend on your storage that you choose to write your csv file. if you write on hdfs everything's ok. but whenever you decide to write in your local files system you must care that nothing will be written in driver local files system and your data will be in worker's files system and you should find them in worker's storage.
two solution's:
set mater local[NUMBER_OF_CORES] that you can submit your job by --master local[10]
config
write your data in distributed file system like s3,hdfs,...
My own solution solved this problem.
I replace .save("file://"
with hadoopFileSystem.copyToLocalFile
The thing is .save("file://
works expectedly only with SparkSession.builder().master("local")
, where hdfs://
is emulated by master's file://
.
I may be wrong in theory, but it works.
static Path writeCsvToTemporaryDirectory(Dataset<Row> dataFrame) throws IOException {
String temporaryDirectoryName = getTemporaryDirectoryName();
writeCsvToDirectory(dataFrame, temporaryDirectoryName, sparkContext);
return Paths.get(temporaryDirectoryName);
}
static void writeCsvToDirectory(Dataset<Row> dataFrame, String directory) throws IOException {
dataFrame.coalesce(1)
.write()
.option("header", "true")
.option("delimiter", "\t")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.mode(SaveMode.Overwrite)
.csv(directory);
FileSystem hadoopFileSystem = FileSystem.get(sparkContext.hadoopConfiguration());
hadoopFileSystem.copyToLocalFile(true,
new org.apache.hadoop.fs.Path(directory),
new org.apache.hadoop.fs.Path(directory));
}
static Path getTemporaryCsvFile(Path directory) throws IOException {
String glob = "*.csv.gz";
try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory, glob)) {
return stream.iterator().next();
} catch (NoSuchElementException e) {
throw new RuntimeException(getNoSuchElementExceptionMessage(directory, glob), e);
}
}
Path temporaryDirectory = writeCsvToTemporaryDirectory(dataFrame);
Path temporaryFile = DataFrameIOUtils.getTemporaryCsvFile(temporaryDirectory);
try {
return otherStorage.upload(temporaryFile, name, fields).join();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
removeTemporaryDirectory(temporaryDirectory);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With