'use strict'; var kafka = require('kafka-node'); var Offset = kafka.Offset; var Client = kafka.Client; var argv = require('optimist').argv; var cluster = require('cluster'); var os = require('os'); var fs = require('fs'); function createKafkaConsumer(config) { var Consumer = (config['useHLC'] == true || config['useHLC'] == 'true') ? kafka.HighLevelConsumer : kafka.Consumer; if (cluster.isWorker) { var worker = cluster.worker ? 'consumer' + cluster.worker.id + '-' + os.hostname() : 'defaultconsumer-' + os.hostname(); config.worker = worker; } else { if (config['useHLC']) console.log('Using a high level consumer'); console.log("Using options: "); console.log(" topic: " + config.topic + "\n timeout duration: " + config['duration'] + "\n zk url: " + config.zk); } var client = new Client(config.zk); var payloads = [ { topic: config.topic, offset: 0 } ], options = { autoCommit: false, groupId: config.groupId, fromOffset: true // needed to actually start from 0th offset }; var consumer = new Consumer(client, payloads, options); var offset = new Offset(client); var metricsCount = []; var message_count = 0; var start_time = Date.now() / 1000; function stop() { consumer.pause(); var end_time = Date.now() / 1000; // Calculate total metrics // var total = 0; // var modeMath = {}; // metricsCount.forEach(function(cnt) { // total += cnt; // if (modeMath[cnt]) { // modeMath[cnt] += 1 // } else { // modeMath[cnt] = 1 // } // }); // var max = 0, mode; // Object.keys(modeMath).forEach(function(key) { // if (modeMath[key] > max) { // max = modeMath[key]; // mode = key; // } // }); var results = { 'TPS': (message_count / (end_time - start_time)), 'Messages Processed': message_count, 'Start Time': start_time, 'Elapsed Time': (end_time - start_time), // 'Total metrics processed': total, // 'Mean metric count per obv': (total/message_count), // 'Mode': + mode } // Report to master if we're a worker, else console.log if (cluster.isWorker) { process.send(results); } else { for (var resKey in results) console.log(resKey + ' : ' + results[resKey]); } process.exit(0); } process.on('SIGINT', stop); setTimeout(stop, config['duration']); // Handle consumer events consumer.on('message', function (message) { if (message_count < 1) { console.log('Started dequeing messages after ' + ((Date.now() / 1000) - start_time)); } message_count++; // var valueObj = JSON.parse(message.value); // metricsCount.push(Object.keys(valueObj.metrics).length); }); consumer.on('error', function (err) { console.log('error', err); }); /* * If consumer get `offsetOutOfRange` event, fetch data from the smallest(oldest) offset */ consumer.on('offsetOutOfRange', function (topic) { console.log('\n \n offset out of range exception \n \n'); topic.maxNum = 2; offset.fetch([topic], function (err, offsets) { var min = Math.min.apply(null, offsets[topic.topic][topic.partition]); // console.log(min); consumer.setOffset(topic.topic, topic.partition, min); }); }); } require.main == module ? createKafkaConsumer({ 'duration': argv.duration || 20 * 1000, 'topic': argv.topic || 'observations.json', 'zk': argv.zk || 'localhost:2181', 'groupId': argv.groupId || 'bench', 'useHLC': argv.ctype == 'HLC' ? true : false }) : module.exports = createKafkaConsumer;