Skip to content

Instantly share code, notes, and snippets.

@mquinz
Forked from tispratik/parquet_columns_reader1.java
Last active August 29, 2015 14:12
Show Gist options
  • Select an option

  • Save mquinz/833713d0c81fff12f9a6 to your computer and use it in GitHub Desktop.

Select an option

Save mquinz/833713d0c81fff12f9a6 to your computer and use it in GitHub Desktop.

Revisions

  1. @tispratik tispratik revised this gist Aug 29, 2014. 1 changed file with 2 additions and 1 deletion.
    3 changes: 2 additions & 1 deletion parquet_columns_reader1.java
    Original file line number Diff line number Diff line change
    @@ -135,10 +135,11 @@ public int load(List<DimColumn> dimColumns, ColumnReadStoreImpl columnReadStore,
    index = startIndex;
    int maxDefinitionLevel = dc.getColumnDescriptor().getMaxDefinitionLevel();
    ColumnReader columnReader = columnReadStore.getColumnReader(dc.getColumnDescriptor());
    int definitionLevel = columnReader.getCurrentDefinitionLevel();

    // System.out.println(dc.getTotalSize() + " : " + columnReader.getTotalValueCount() + " - " + dc.getName());
    for (long i = 0, totalValueCount = columnReader.getTotalValueCount(); i < totalValueCount; ++i) {
    int definitionLevel = columnReader.getCurrentDefinitionLevel();

    if (definitionLevel == maxDefinitionLevel) {
    switch (dc.getColumnDescriptor().getType()) {
    case BINARY:
  2. @tispratik tispratik renamed this gist Aug 28, 2014. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  3. @tispratik tispratik created this gist Aug 28, 2014.
    194 changes: 194 additions & 0 deletions gistfile1.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,194 @@
    package com.company.grid.lookup;

    import parquet.column.ColumnDescriptor;
    import parquet.column.ColumnReader;
    import parquet.column.impl.ColumnReadStoreImpl;
    import parquet.column.page.PageReadStore;
    import parquet.hadoop.ParquetFileReader;
    import parquet.hadoop.metadata.BlockMetaData;
    import parquet.hadoop.metadata.ParquetMetadata;
    import parquet.io.api.Binary;
    import parquet.io.api.Converter;
    import parquet.io.api.GroupConverter;
    import parquet.io.api.PrimitiveConverter;
    import parquet.schema.MessageType;
    import org.apache.hadoop.fs.Path;

    import java.io.IOException;
    import java.lang.reflect.Array;
    import java.math.BigInteger;
    import java.nio.charset.Charset;
    import java.nio.charset.CharsetDecoder;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;

    import org.apache.hadoop.fs.LocalFileSystem;
    import org.apache.hadoop.hdfs.DistributedFileSystem;
    import org.apache.hadoop.conf.Configuration;

    final class DimColumn {
    private volatile Object arrayList;
    private String name;
    private Class<?> typeOfArray;
    private int totalSize;
    private ColumnDescriptor columnDescriptor;

    DimColumn(ColumnDescriptor columnDescriptor, int totalSize) {
    this.columnDescriptor = columnDescriptor;
    this.name = columnDescriptor.getPath()[0];
    this.typeOfArray = columnDescriptor.getType().javaType;
    this.totalSize = totalSize;
    this.arrayList = Array.newInstance(typeOfArray, totalSize);
    }

    public ColumnDescriptor getColumnDescriptor() {
    return columnDescriptor;
    }

    public Object getArrayList() {
    return arrayList;
    }

    public Object getName() {
    return name;
    }

    public Object getTotalSize() {
    return totalSize;
    }
    }

    public class RfiParquetFileReader {
    ParquetMetadata metaData;
    MessageType schema;
    private static final Charset UTF8 = Charset.forName("UTF-8");
    private static final CharsetDecoder UTF8_DECODER = UTF8.newDecoder();

    public RfiParquetFileReader(String fileName) throws IOException {
    Configuration conf = new Configuration();
    conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
    conf.set("fs.file.impl", LocalFileSystem.class.getName());
    Path filePath = new Path(fileName);
    metaData = ParquetFileReader.readFooter(conf, filePath);
    schema = metaData.getFileMetaData().getSchema();

    List<BlockMetaData> blocks;
    blocks = metaData.getBlocks();
    int totalSize = (int) blocks.get(0).getRowCount();
    List<ColumnDescriptor> columnDescriptors = schema.getColumns();
    List<DimColumn> dimColumns = makeDimColumns(columnDescriptors, totalSize);

    ParquetFileReader fileReader = new ParquetFileReader(conf, filePath, blocks, columnDescriptors);
    PageReadStore pageReadStore = fileReader.readNextRowGroup();
    int index = 0;
    while (pageReadStore != null) {
    ColumnReadStoreImpl columnReadStoreImpl = new ColumnReadStoreImpl(pageReadStore, new DumpGroupConverter(), schema);
    index = load(dimColumns, columnReadStoreImpl, index);
    pageReadStore = fileReader.readNextRowGroup();
    }

    Random rand = new Random();
    for (DimColumn dimColumn : dimColumns) {
    System.out.println(dimColumn.getName());
    for(int i = 0; i < 5; i++) {
    index = rand.nextInt((int) totalSize);
    System.out.println("Index: " + index + " Value: " + Array.get(dimColumn.getArrayList(), index));
    }
    System.out.println("--------");
    }
    }

    public String getSchema() {
    return schema.toString();
    }

    public static void main(String[] args) {
    String dirName = "/Users/pkhadloya/Downloads/AVRO/parquet_files/";
    // String[] files = {"1.campaigns.parquet", "13.assigned_conversion.parquet"};
    String[] files = {"13.assigned_conversion.parquet"};

    try {
    long startTime = System.currentTimeMillis();
    for (String file : files) {
    new RfiParquetFileReader(dirName + file);
    System.out.println("========================================================================");
    }
    long endTime = System.currentTimeMillis();
    System.out.println("Time taken: " + (endTime - startTime) + "ms");
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    public ArrayList<DimColumn> makeDimColumns(List<ColumnDescriptor> columnDescriptors, int totalSize) {
    ArrayList dimColumns = new ArrayList<DimColumn>();
    for (ColumnDescriptor columnDescriptor : columnDescriptors) {
    dimColumns.add(new DimColumn(columnDescriptor, totalSize));
    }
    return dimColumns;
    }

    public int load(List<DimColumn> dimColumns, ColumnReadStoreImpl columnReadStore, int startIndex) throws IOException {
    int index = 1;
    for (DimColumn dc : dimColumns) {
    index = startIndex;
    int maxDefinitionLevel = dc.getColumnDescriptor().getMaxDefinitionLevel();
    ColumnReader columnReader = columnReadStore.getColumnReader(dc.getColumnDescriptor());
    int definitionLevel = columnReader.getCurrentDefinitionLevel();

    // System.out.println(dc.getTotalSize() + " : " + columnReader.getTotalValueCount() + " - " + dc.getName());
    for (long i = 0, totalValueCount = columnReader.getTotalValueCount(); i < totalValueCount; ++i) {
    if (definitionLevel == maxDefinitionLevel) {
    switch (dc.getColumnDescriptor().getType()) {
    case BINARY:
    String str = new String(columnReader.getBinary().getBytes(), "UTF-8");
    System.out.println(index + " : " + dc.getName() + " : " + str);
    Array.set(dc.getArrayList(), index, columnReader.getBinary()); break;
    case BOOLEAN:
    Array.set(dc.getArrayList(), index, columnReader.getBoolean()); break;
    case DOUBLE: Array.set(dc.getArrayList(), index, columnReader.getDouble()); break;
    case FLOAT: Array.set(dc.getArrayList(), index, columnReader.getFloat()); break;
    case INT32: Array.set(dc.getArrayList(), index, columnReader.getInteger()); break;
    case INT64: Array.set(dc.getArrayList(), index, columnReader.getLong()); break;
    case INT96: Array.set(dc.getArrayList(), index, binaryToBigInteger(columnReader.getBinary())); break;
    // case FIXED_LEN_BYTE_ARRAY: out.format("%s", binaryToString(columnReader.getBinary())); break;
    }
    }
    columnReader.consume();
    index += 1;
    }
    }
    return startIndex + index;
    }

    // public static String binaryToString(Binary value) {
    // byte[] data = value.getBytes();
    // if (data == null) return null;
    //
    // try {
    // CharBuffer buffer = UTF8_DECODER.decode(value.toByteBuffer());
    // return buffer.toString();
    // } catch (Throwable th) {
    // }
    //
    // return "";
    // }

    public static BigInteger binaryToBigInteger(Binary value) {
    byte[] data = value.getBytes();
    if (data == null) return null;

    return new BigInteger(data);
    }

    private static final class DumpGroupConverter extends GroupConverter {
    @Override public void start() { }
    @Override public void end() { }
    @Override public Converter getConverter(int fieldIndex) { return new DumpConverter(); }
    }

    private static final class DumpConverter extends PrimitiveConverter {
    @Override public GroupConverter asGroupConverter() { return new DumpGroupConverter(); }
    }
    }