Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use PathFilter in Apache Spark?

I have a simple file filter which basically selects files from a particular date. In Hadoop I would set the PathFilter class to the InputFormat parameter using setInputPathFilter. How can I perform this in Spark?

public class FilesFilter extends Configured implements PathFilter {

    @Override
    public boolean accept(Path path) {

        try {
            if (fs.isDirectory(path))
                return true;
        } catch (IOException e1) {
            e1.printStackTrace();
            return false;
        }

        String file_date = "01.30.2015";
        SimpleDateFormat sdf = new SimpleDateFormat("MM.dd.yyyy");
        Date date = null;

        try {
            date = sdf.parse(file_date);
        } catch (ParseException e1) {
            e1.printStackTrace();
        }

        long dt = date.getTime()/(1000 * 3600 * 24);

        try {
            FileStatus file = fs.getFileStatus(path);
            long time = file.getModificationTime() / (1000 * 3600 * 24);
            return time == dt;
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }

    }
}
like image 527
None Avatar asked Feb 04 '15 19:02

None


1 Answers

Use this:

sc.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[TmpFileFilter], classOf[PathFilter])

Here is my code of TmpFileFilter.scala, which will omit .tmp files:

import org.apache.hadoop.fs.{Path, PathFilter}

class TmpFileFilter  extends PathFilter {
  override def accept(path : Path): Boolean = !path.getName.endsWith(".tmp")
}

You can define your own PathFilter.

like image 132
soulmachine Avatar answered Sep 30 '22 10:09

soulmachine