package com.company.grid.lookup_new; import parquet.column.ColumnDescriptor; import parquet.column.ColumnReader; import parquet.column.impl.ColumnReadStoreImpl; import parquet.column.page.PageReadStore; import parquet.hadoop.ParquetFileReader; import parquet.hadoop.metadata.BlockMetaData; import parquet.hadoop.metadata.ParquetMetadata; import parquet.io.api.Binary; import parquet.io.api.Converter; import parquet.io.api.GroupConverter; import parquet.io.api.PrimitiveConverter; import parquet.schema.MessageType; import org.apache.hadoop.fs.Path; import java.io.IOException; import java.lang.reflect.Array; import java.math.BigInteger; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.ArrayList; import java.util.List; import java.util.Random; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.conf.Configuration; final class DimColumn { public volatile Object arrayList; public String name; public Class typeOfArray; public int size; DimColumn(String name, Class typeOfArray, int size) { this.name = name; this.typeOfArray = typeOfArray; this.size = size; this.arrayList = Array.newInstance(typeOfArray, size); } } public class RfiParquetFileReader { ParquetMetadata metaData; MessageType schema; private static final Charset UTF8 = Charset.forName("UTF-8"); private static final CharsetDecoder UTF8_DECODER = UTF8.newDecoder(); public RfiParquetFileReader(String fileName) throws IOException { Configuration conf = new Configuration(); conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName()); conf.set("fs.file.impl", LocalFileSystem.class.getName()); Path filePath = new Path(fileName); metaData = ParquetFileReader.readFooter(conf, filePath); schema = metaData.getFileMetaData().getSchema(); List blocks; blocks = metaData.getBlocks(); long totalSize = blocks.get(0).getRowCount(); List columns = schema.getColumns(); List dimColumns = new ArrayList(); for (ColumnDescriptor columnDescriptor : columns) { System.out.println(columnDescriptor.toString()); DimColumn dimColumn = new DimColumn(columnDescriptor.getPath()[0], columnDescriptor.getType().javaType, (int) totalSize); int index = 0; ParquetFileReader fileReader = new ParquetFileReader(conf, filePath, blocks, schema.getColumns()); PageReadStore pageReadStore = fileReader.readNextRowGroup(); while (pageReadStore != null) { ColumnReadStoreImpl columnReadStoreImpl = new ColumnReadStoreImpl(pageReadStore, new DumpGroupConverter(), schema); index = load(columnReadStoreImpl, columnDescriptor, dimColumn, index); pageReadStore = fileReader.readNextRowGroup(); } dimColumns.add(dimColumn); } Random rand = new Random(); int index; for (DimColumn dimColumn : dimColumns) { System.out.println(dimColumn.name); for(int i = 0; i < 5; i++) { index = rand.nextInt((int) totalSize); System.out.println("Index: " + index + " Value: " + Array.get(dimColumn.arrayList, index)); } System.out.println("--------"); } } public String getSchema() { return schema.toString(); } public static void main(String[] args) { String fileName = "/Users/pkhadloya/Downloads/AVRO/parquet_files/13.assigned_conversion.parquet"; try { long startTime = System.currentTimeMillis(); new RfiParquetFileReader(fileName); long endTime = System.currentTimeMillis(); System.out.println("Time taken: " + (endTime - startTime) + "ms"); } catch (IOException e) { e.printStackTrace(); } } public static int load(ColumnReadStoreImpl columnReadStore, ColumnDescriptor column, DimColumn dimColumn, int index) throws IOException { int maxDefinitionLevel = column.getMaxDefinitionLevel(); ColumnReader columnReader = columnReadStore.getColumnReader(column); for (long i = 0, totalValueCount = columnReader.getTotalValueCount(); i < totalValueCount; ++i) { int definitionLevel = columnReader.getCurrentDefinitionLevel(); if (definitionLevel == maxDefinitionLevel) { switch (column.getType()) { case BINARY: Array.set(dimColumn.arrayList, index, columnReader.getBinary()); break; case BOOLEAN: Array.set(dimColumn.arrayList, index, columnReader.getBoolean()); break; case DOUBLE: Array.set(dimColumn.arrayList, index, columnReader.getDouble()); break; case FLOAT: Array.set(dimColumn.arrayList, index, columnReader.getFloat()); break; case INT32: Array.set(dimColumn.arrayList, index, columnReader.getInteger()); break; case INT64: Array.set(dimColumn.arrayList, index, columnReader.getLong()); break; case INT96: Array.set(dimColumn.arrayList, index, binaryToBigInteger(columnReader.getBinary())); break; // case FIXED_LEN_BYTE_ARRAY: out.format("%s", binaryToString(columnReader.getBinary())); break; } } columnReader.consume(); index += 1; } return index; } // public static String binaryToString(Binary value) { // byte[] data = value.getBytes(); // if (data == null) return null; // // try { // CharBuffer buffer = UTF8_DECODER.decode(value.toByteBuffer()); // return buffer.toString(); // } catch (Throwable th) { // } // // return ""; // } public static BigInteger binaryToBigInteger(Binary value) { byte[] data = value.getBytes(); if (data == null) return null; return new BigInteger(data); } private static final class DumpGroupConverter extends GroupConverter { @Override public void start() { } @Override public void end() { } @Override public Converter getConverter(int fieldIndex) { return new DumpConverter(); } } private static final class DumpConverter extends PrimitiveConverter { @Override public GroupConverter asGroupConverter() { return new DumpGroupConverter(); } } }