Skip to content

Instantly share code, notes, and snippets.

@tispratik
Created August 28, 2014 19:43
Show Gist options
  • Save tispratik/f7a66f6a40b7ae3b98ad to your computer and use it in GitHub Desktop.
Save tispratik/f7a66f6a40b7ae3b98ad to your computer and use it in GitHub Desktop.

Revisions

  1. tispratik created this gist Aug 28, 2014.
    161 changes: 161 additions & 0 deletions parquet_columns_reader2.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,161 @@
    package com.company.grid.lookup_new;

    import parquet.column.ColumnDescriptor;
    import parquet.column.ColumnReader;
    import parquet.column.impl.ColumnReadStoreImpl;
    import parquet.column.page.PageReadStore;
    import parquet.hadoop.ParquetFileReader;
    import parquet.hadoop.metadata.BlockMetaData;
    import parquet.hadoop.metadata.ParquetMetadata;
    import parquet.io.api.Binary;
    import parquet.io.api.Converter;
    import parquet.io.api.GroupConverter;
    import parquet.io.api.PrimitiveConverter;
    import parquet.schema.MessageType;
    import org.apache.hadoop.fs.Path;

    import java.io.IOException;
    import java.lang.reflect.Array;
    import java.math.BigInteger;
    import java.nio.charset.Charset;
    import java.nio.charset.CharsetDecoder;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;

    import org.apache.hadoop.fs.LocalFileSystem;
    import org.apache.hadoop.hdfs.DistributedFileSystem;
    import org.apache.hadoop.conf.Configuration;

    final class DimColumn {
    public volatile Object arrayList;
    public String name;
    public Class<?> typeOfArray;
    public int size;

    DimColumn(String name, Class<?> typeOfArray, int size) {
    this.name = name;
    this.typeOfArray = typeOfArray;
    this.size = size;
    this.arrayList = Array.newInstance(typeOfArray, size);
    }
    }

    public class RfiParquetFileReader {
    ParquetMetadata metaData;
    MessageType schema;
    private static final Charset UTF8 = Charset.forName("UTF-8");
    private static final CharsetDecoder UTF8_DECODER = UTF8.newDecoder();

    public RfiParquetFileReader(String fileName) throws IOException {
    Configuration conf = new Configuration();
    conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
    conf.set("fs.file.impl", LocalFileSystem.class.getName());
    Path filePath = new Path(fileName);
    metaData = ParquetFileReader.readFooter(conf, filePath);
    schema = metaData.getFileMetaData().getSchema();

    List<BlockMetaData> blocks;
    blocks = metaData.getBlocks();
    long totalSize = blocks.get(0).getRowCount();
    List<ColumnDescriptor> columns = schema.getColumns();
    List<DimColumn> dimColumns = new ArrayList<DimColumn>();

    for (ColumnDescriptor columnDescriptor : columns) {
    System.out.println(columnDescriptor.toString());
    DimColumn dimColumn = new DimColumn(columnDescriptor.getPath()[0], columnDescriptor.getType().javaType, (int) totalSize);

    int index = 0;
    ParquetFileReader fileReader = new ParquetFileReader(conf, filePath, blocks, schema.getColumns());
    PageReadStore pageReadStore = fileReader.readNextRowGroup();
    while (pageReadStore != null) {
    ColumnReadStoreImpl columnReadStoreImpl = new ColumnReadStoreImpl(pageReadStore, new DumpGroupConverter(), schema);
    index = load(columnReadStoreImpl, columnDescriptor, dimColumn, index);
    pageReadStore = fileReader.readNextRowGroup();
    }
    dimColumns.add(dimColumn);
    }

    Random rand = new Random();
    int index;
    for (DimColumn dimColumn : dimColumns) {
    System.out.println(dimColumn.name);
    for(int i = 0; i < 5; i++) {
    index = rand.nextInt((int) totalSize);
    System.out.println("Index: " + index + " Value: " + Array.get(dimColumn.arrayList, index));
    }
    System.out.println("--------");
    }
    }

    public String getSchema() {
    return schema.toString();
    }

    public static void main(String[] args) {
    String fileName = "/Users/pkhadloya/Downloads/AVRO/parquet_files/13.assigned_conversion.parquet";
    try {
    long startTime = System.currentTimeMillis();
    new RfiParquetFileReader(fileName);
    long endTime = System.currentTimeMillis();
    System.out.println("Time taken: " + (endTime - startTime) + "ms");
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    public static int load(ColumnReadStoreImpl columnReadStore, ColumnDescriptor column, DimColumn dimColumn, int index) throws IOException {
    int maxDefinitionLevel = column.getMaxDefinitionLevel();
    ColumnReader columnReader = columnReadStore.getColumnReader(column);

    for (long i = 0, totalValueCount = columnReader.getTotalValueCount(); i < totalValueCount; ++i) {
    int definitionLevel = columnReader.getCurrentDefinitionLevel();

    if (definitionLevel == maxDefinitionLevel) {
    switch (column.getType()) {
    case BINARY: Array.set(dimColumn.arrayList, index, columnReader.getBinary()); break;
    case BOOLEAN: Array.set(dimColumn.arrayList, index, columnReader.getBoolean()); break;
    case DOUBLE: Array.set(dimColumn.arrayList, index, columnReader.getDouble()); break;
    case FLOAT: Array.set(dimColumn.arrayList, index, columnReader.getFloat()); break;
    case INT32: Array.set(dimColumn.arrayList, index, columnReader.getInteger()); break;
    case INT64: Array.set(dimColumn.arrayList, index, columnReader.getLong()); break;
    case INT96: Array.set(dimColumn.arrayList, index, binaryToBigInteger(columnReader.getBinary())); break;
    // case FIXED_LEN_BYTE_ARRAY: out.format("%s", binaryToString(columnReader.getBinary())); break;
    }
    }
    columnReader.consume();
    index += 1;
    }
    return index;
    }

    // public static String binaryToString(Binary value) {
    // byte[] data = value.getBytes();
    // if (data == null) return null;
    //
    // try {
    // CharBuffer buffer = UTF8_DECODER.decode(value.toByteBuffer());
    // return buffer.toString();
    // } catch (Throwable th) {
    // }
    //
    // return "";
    // }

    public static BigInteger binaryToBigInteger(Binary value) {
    byte[] data = value.getBytes();
    if (data == null) return null;

    return new BigInteger(data);
    }

    private static final class DumpGroupConverter extends GroupConverter {
    @Override public void start() { }
    @Override public void end() { }
    @Override public Converter getConverter(int fieldIndex) { return new DumpConverter(); }
    }

    private static final class DumpConverter extends PrimitiveConverter {
    @Override public GroupConverter asGroupConverter() { return new DumpGroupConverter(); }
    }
    }