import org.apache.hadoop.fs.*; import org.apache.hadoop.mapred.InputPathProcessor; import org.apache.hadoop.mapred.JobConf; import java.util.List; //Supporting class for listLocatedStatus class MultiPathFilter implements PathFilter { protected static final PathFilter hiddenFileFilter = new PathFilter(){ public boolean accept(Path p){ String name = p.getName(); return !name.startsWith("_") && !name.startsWith("."); } }; private List filters; public MultiPathFilter(List filters) { this.filters = filters; } public boolean accept(Path path) { for (PathFilter filter : filters) { if (!filter.accept(path)) { return false; } } return true; } public static PathFilter getHiddenFileFilter() { return hiddenFileFilter; } } protected FileStatus[] listStatus(Path path) throws IOException { List inputPaths = new ArrayList(); inputPaths.add(path); LOG.info("Input paths to process:" + inputPaths.size()); // creates a MultiPathFilter with the hiddenFileFilter and the // user provided one (if any). List filters = new ArrayList(); filters.add(MultiPathFilter.getHiddenFileFilter()); final PathFilter inputFilter = new MultiPathFilter(filters); JobConf job = new JobConf(); //Turning on optimization job.setInt("fs.s3.inputpathprocessor.minsize", 0); LOG.info("S3 optimization turned ON"); InputPathProcessor ipp = new InputPathProcessor(job, inputFilter, inputPaths); LOG.info("InputPathProcessor initialized"); long t1 = System.nanoTime(); ipp.compute(); LOG.info("computeLocatedFileStatus took " + (System.nanoTime() - t1) / Math.pow(10, 9)); List result = ipp.getLocatedFileStatus(); LOG.info("Total result paths to process : " + result.size()); return result.toArray(new LocatedFileStatus[result.size()]); }