Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to do CopyMerge in Hadoop 3.0?

Tags:

java

hadoop

I know hadoop version 2.7's FileUtil has the copyMerge function that merges multiple files into a new one.

But the copyMerge function is no longer supported per the API in the 3.0 version.

Any ideas on how to merge all files within a directory into a new single file in the 3.0 version of hadoop?

like image 531
Jeremy Avatar asked Feb 04 '17 02:02

Jeremy


3 Answers

FileUtil#copyMerge method has been removed. See details for the major change:

https://issues.apache.org/jira/browse/HADOOP-12967

https://issues.apache.org/jira/browse/HADOOP-11392

You can use getmerge

Usage: hadoop fs -getmerge [-nl]

Takes a source directory and a destination file as input and concatenates files in src into the destination local file. Optionally -nl can be set to enable adding a newline character (LF) at the end of each file. -skip-empty-file can be used to avoid unwanted newline characters in case of empty files.

Examples:

hadoop fs -getmerge -nl /src /opt/output.txt
hadoop fs -getmerge -nl /src/file1.txt /src/file2.txt /output.txt

Exit Code: Returns 0 on success and non-zero on error.

https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html#getmerge

like image 70
ravi Avatar answered Oct 31 '22 22:10

ravi


Since FileUtil.copyMerge() has been deprecated and removed from the API starting in version 3, we can always re-implement it ourselves.

Here is the original Java implementation from previous versions.

Here is a Scala translation:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.IOUtils
import java.io.IOException

def copyMerge(
    srcFS: FileSystem, srcDir: Path,
    dstFS: FileSystem, dstFile: Path,
    deleteSource: Boolean, conf: Configuration
): Boolean = {

  if (dstFS.exists(dstFile)) {
    throw new IOException(s"Target $dstFile already exists")
  }

  // Source path is expected to be a directory:
  if (srcFS.getFileStatus(srcDir).isDirectory) {

    val outputFile = dstFS.create(dstFile)
    try {
      srcFS
        .listStatus(srcDir)
        .sortBy(_.getPath.getName)
        .collect {
          case status if status.isFile =>
            val inputFile = srcFS.open(status.getPath)
            try { IOUtils.copyBytes(inputFile, outputFile, conf, false) }
            finally { inputFile.close() }
        }
    } finally { outputFile.close() }

    if (deleteSource) srcFS.delete(srcDir, true) else true
  }
  else false
}
like image 30
Xavier Guihot Avatar answered Oct 31 '22 22:10

Xavier Guihot


I had the same question and had to re-implement copyMerge (in PySpark though, but using the same API calls as original copyMerge).

Have no idea why there is no equivalent functionality in Hadoop 3. We have to merge files from an HDFS directory over to an HDFS file very often.

Here's implementation in pySpark I referenced above https://github.com/Tagar/stuff/blob/master/copyMerge.py

like image 7
Tagar Avatar answered Oct 31 '22 22:10

Tagar