I know hadoop
version 2.7
's FileUtil
has the copyMerge
function that merges multiple files into a new one.
But the copyMerge
function is no longer supported per the API in the 3.0
version.
Any ideas on how to merge all files within a directory into a new single file in the 3.0
version of hadoop?
FileUtil#copyMerge method has been removed. See details for the major change:
https://issues.apache.org/jira/browse/HADOOP-12967
https://issues.apache.org/jira/browse/HADOOP-11392
You can use getmerge
Usage: hadoop fs -getmerge [-nl]
Takes a source directory and a destination file as input and concatenates files in src into the destination local file. Optionally -nl can be set to enable adding a newline character (LF) at the end of each file. -skip-empty-file can be used to avoid unwanted newline characters in case of empty files.
Examples:
hadoop fs -getmerge -nl /src /opt/output.txt
hadoop fs -getmerge -nl /src/file1.txt /src/file2.txt /output.txt
Exit Code: Returns 0 on success and non-zero on error.
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html#getmerge
Since FileUtil.copyMerge()
has been deprecated and removed from the API starting in version 3, we can always re-implement it ourselves.
Here is the original Java implementation from previous versions.
Here is a Scala translation:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.IOUtils
import java.io.IOException
def copyMerge(
srcFS: FileSystem, srcDir: Path,
dstFS: FileSystem, dstFile: Path,
deleteSource: Boolean, conf: Configuration
): Boolean = {
if (dstFS.exists(dstFile)) {
throw new IOException(s"Target $dstFile already exists")
}
// Source path is expected to be a directory:
if (srcFS.getFileStatus(srcDir).isDirectory) {
val outputFile = dstFS.create(dstFile)
try {
srcFS
.listStatus(srcDir)
.sortBy(_.getPath.getName)
.collect {
case status if status.isFile =>
val inputFile = srcFS.open(status.getPath)
try { IOUtils.copyBytes(inputFile, outputFile, conf, false) }
finally { inputFile.close() }
}
} finally { outputFile.close() }
if (deleteSource) srcFS.delete(srcDir, true) else true
}
else false
}
I had the same question and had to re-implement copyMerge (in PySpark though, but using the same API calls as original copyMerge).
Have no idea why there is no equivalent functionality in Hadoop 3. We have to merge files from an HDFS directory over to an HDFS file very often.
Here's implementation in pySpark I referenced above https://github.com/Tagar/stuff/blob/master/copyMerge.py
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With