package org.icgc.dcc.etl.staging.function; import static com.google.common.base.Stopwatch.createStarted; import static com.google.common.collect.Iterables.toArray; import static org.icgc.dcc.common.core.util.FormatUtils.formatCount; import static org.icgc.dcc.common.core.util.FormatUtils.formatPercent; import static org.icgc.dcc.common.core.util.Splitters.TAB; import java.io.Serializable; import java.util.Iterator; import lombok.val; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.spark.api.java.function.Function2; import scala.Tuple2; @Slf4j public class PreProcessLine implements Function2>, Iterator>, Serializable { /** * Constants. */ private static final int LINE_STATUS_COUNT = 10 * 1000 * 1000; @Override public Iterator call(final InputSplit split, final Iterator> it) throws Exception { val watch = createStarted(); val fileSplit = (FileSplit) split; val length = fileSplit.getLength(); val projectName = getProjectName(fileSplit); // Lazy iterator return new Iterator() { private long lineCount = 0; private Tuple2 record; @Override public boolean hasNext() { if (!it.hasNext()) { return false; } // Peek record = it.next(); lineCount++; if (lineCount % LINE_STATUS_COUNT == 0) { val offset = getOffset(record); val percent = offset * 1.0 / length; log.info("{}: Processed {} lines ({} %) in {}", fileSplit, formatCount(lineCount), formatPercent(percent), watch); } // Skip headers if (isHeader(record)) { return hasNext(); } return true; } @Override public String[] next() { val line = getLine(record); val prependedLine = projectName + "\t" + line; String[] values = toArray(TAB.split(prependedLine), String.class); return values; } @Override public void remove() { throw new UnsupportedOperationException("Cannot remove a transformed iterator"); } }; } private static boolean isHeader(Tuple2 record) { return getOffset(record) == 0; } private static long getOffset(Tuple2 record) { return record._1.get(); } private static String getLine(Tuple2 record) { return record._2.toString(); } private static String getProjectName(FileSplit fileSplit) { return fileSplit.getPath().getParent().getName(); } }