package org.apache.phoenix.example.job; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.phoenix.mapreduce.PhoenixOutputFormat; import org.apache.phoenix.mapreduce.bean.StockBean; import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; import org.apache.phoenix.mapreduce.example.StockWritable; public class StockComputationJob { protected final static String zkUrl = "sandbox.hortonworks.com:2181"; public static class StockMapper extends Mapper { @Override protected void map(NullWritable key, StockWritable stockWritable, Context context) throws IOException, InterruptedException { final StockBean bean = stockWritable.getStockBean(); double[] recordings = bean.getRecordings(); if(recordings.length == 0) { return; } double sum = 0.0; for(double recording: recordings) { sum += recording; } double avg = sum / recordings.length; bean.setAverage(avg); stockWritable.setStockBean(bean); context.write(NullWritable.get(), stockWritable); } } public static void main(String[] args) throws Exception { final Configuration conf = new Configuration(); HBaseConfiguration.addHbaseResources(conf); conf.set(HConstants.ZOOKEEPER_QUORUM, zkUrl); final Job job = Job.getInstance(conf, "stock-stats-job"); final String selectQuery = "SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCKS "; PhoenixMapReduceUtil.setInput(job, StockWritable.class, "STOCKS", selectQuery); PhoenixMapReduceUtil.setOutput(job, "STOCKS", "STOCK_NAME,RECORDING_YEAR,RECORDINGS_AVG"); job.setMapperClass(StockMapper.class); job.setOutputFormatClass(PhoenixOutputFormat.class); job.setNumReduceTasks(0); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(StockWritable.class); TableMapReduceUtil.addDependencyJars(job); job.waitForCompletion(true); } }