Last active
February 14, 2024 21:49
-
-
Save kaustavha/a7e96da03eb48df4d61d to your computer and use it in GitHub Desktop.
Revisions
-
kaustavha revised this gist
Feb 20, 2016 . 1 changed file with 6 additions and 6 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -69,9 +69,9 @@ Mode: 2 12 workers: `~70k TPS` 24 workers: `~57k TPS` ### Using the High level consumer and variable worker counts: 6 workers: `~16k TPS` 12 workers: `~40k TPS` 32 workers: `~45 TPS` 64 workers: `~55k TPS` 96 workers: `~50k TPS` -
kaustavha revised this gist
Feb 20, 2016 . 1 changed file with 46 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -29,3 +29,49 @@ node kafka_consumer_worker.js https://github.com/kaustavha/kafka-node--light And replace the require statements from `require('kafka-node')` to `require('kafka-node--light')`. Albeit if youre using snappy compression in kafka this wont be able to decompress your messages and you'll see a lower throughput since messages are chunked and compressed together ## Results: All these benchmarks were run a 12 AMD Opteron(tm) Processor 4184 (800mhz) core server with 64 gigs of RAM and 64 kafka partitions. ``` Processed 407402 messages Start Time 1452886344.33 Elapsed Time 60.134000062942505 TPS: 6774.902710173456 Total metrics processed: 2076641 Mean metric count per obv: 5.097277382045252 Mode: 2 ``` ``` Processed 411449 messages Start Time 1452886457.638 Elapsed Time 60.14800000190735 TPS: 6840.6098288713265 Total metrics processed: 2097194 Mean metric count per obv: 5.097093442929744 Mode: 2 ``` ``` Processed 474156 messages Start Time 1452886560.338 Elapsed Time 60.01300001144409 TPS: 7900.888139396156 Total metrics processed: 2414438 Mean metric count per obv: 5.092075182007609 Mode: 2 ``` ### Using a simple consumer and variable worker counts: 2 workers : `~16 TPS` 4 workers: `~25k TPS` 6 workers: `~40k TPS` 12 workers: `~70k TPS` 24 workers: `~57k TPS` ### Using the High level consumer and variable worker counts: 6 workers: `~16k TPS` 12 workers: `~40k TPS` 32 workers: `~45 TPS` 64 workers: `~55k TPS` 96 workers: `~50k TPS` -
kaustavha created this gist
Feb 19, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,31 @@ # Kafka-node consumer benchmark This code can be used to benchmark throughput for a kafka cluster. Args: ``` groupId -- (str) kafka consumer group id, default: bench concurrency -- (int) Number of worker threads to spawn, defaults to number of cpus on current host duration -- (int) How long to run the benchmark for, default: 20s topic -- (str) the kafka topic to consume from, defaults to observations.json zk -- (str) zookeeper url, defaults to localhost:2181 useHLC -- (bool) wether to use a simple consumer or a high level consumer, see the kafka docs for an explanation. Defaults to false ``` E.g: ``` node master.js --groupId=test --concurrency=2 --duration=60 --topic=test --zk=localhost:2181/zkchroot --useHLC=true ``` Or for just a single worker ``` node kafka_consumer_worker.js ``` ## Prereqs: You need a few libs ``` npm install kafka-node optimist ``` If you get a error installing kafka-node from a failure to compile its snappy dependency you can replace it with kafka-node--light which removes the dependency on snappy https://github.com/kaustavha/kafka-node--light And replace the require statements from `require('kafka-node')` to `require('kafka-node--light')`. Albeit if youre using snappy compression in kafka this wont be able to decompress your messages and you'll see a lower throughput since messages are chunked and compressed together This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,122 @@ 'use strict'; var kafka = require('kafka-node'); var Offset = kafka.Offset; var Client = kafka.Client; var argv = require('optimist').argv; var cluster = require('cluster'); var os = require('os'); var fs = require('fs'); function createKafkaConsumer(config) { var Consumer = (config['useHLC'] == true || config['useHLC'] == 'true') ? kafka.HighLevelConsumer : kafka.Consumer; if (cluster.isWorker) { var worker = cluster.worker ? 'consumer' + cluster.worker.id + '-' + os.hostname() : 'defaultconsumer-' + os.hostname(); config.worker = worker; } else { if (config['useHLC']) console.log('Using a high level consumer'); console.log("Using options: "); console.log(" topic: " + config.topic + "\n timeout duration: " + config['duration'] + "\n zk url: " + config.zk); } var client = new Client(config.zk); var payloads = [ { topic: config.topic, offset: 0 } ], options = { autoCommit: false, groupId: config.groupId, fromOffset: true // needed to actually start from 0th offset }; var consumer = new Consumer(client, payloads, options); var offset = new Offset(client); var metricsCount = []; var message_count = 0; var start_time = Date.now() / 1000; function stop() { consumer.pause(); var end_time = Date.now() / 1000; // Calculate total metrics // var total = 0; // var modeMath = {}; // metricsCount.forEach(function(cnt) { // total += cnt; // if (modeMath[cnt]) { // modeMath[cnt] += 1 // } else { // modeMath[cnt] = 1 // } // }); // var max = 0, mode; // Object.keys(modeMath).forEach(function(key) { // if (modeMath[key] > max) { // max = modeMath[key]; // mode = key; // } // }); var results = { 'TPS': (message_count / (end_time - start_time)), 'Messages Processed': message_count, 'Start Time': start_time, 'Elapsed Time': (end_time - start_time), // 'Total metrics processed': total, // 'Mean metric count per obv': (total/message_count), // 'Mode': + mode } // Report to master if we're a worker, else console.log if (cluster.isWorker) { process.send(results); } else { for (var resKey in results) console.log(resKey + ' : ' + results[resKey]); } process.exit(0); } process.on('SIGINT', stop); setTimeout(stop, config['duration']); // Handle consumer events consumer.on('message', function (message) { if (message_count < 1) { console.log('Started dequeing messages after ' + ((Date.now() / 1000) - start_time)); } message_count++; // var valueObj = JSON.parse(message.value); // metricsCount.push(Object.keys(valueObj.metrics).length); }); consumer.on('error', function (err) { console.log('error', err); }); /* * If consumer get `offsetOutOfRange` event, fetch data from the smallest(oldest) offset */ consumer.on('offsetOutOfRange', function (topic) { console.log('\n \n offset out of range exception \n \n'); topic.maxNum = 2; offset.fetch([topic], function (err, offsets) { var min = Math.min.apply(null, offsets[topic.topic][topic.partition]); // console.log(min); consumer.setOffset(topic.topic, topic.partition, min); }); }); } require.main == module ? createKafkaConsumer({ 'duration': argv.duration || 20 * 1000, 'topic': argv.topic || 'observations.json', 'zk': argv.zk || 'localhost:2181', 'groupId': argv.groupId || 'bench', 'useHLC': argv.ctype == 'HLC' ? true : false }) : module.exports = createKafkaConsumer; This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,64 @@ var cluster = require('cluster'); var os = require('os'); var wrk = 0; var ingestionWorkers = []; var total = 0; var start_time; var finished_workers = 0; var fs = require('fs'); //handle some args var argv = require('optimist').argv; var config = { 'groupId': argv.groupId || 'bench', 'concurrency': argv.concurrency || os.cpus().length, 'duration': argv.duration || 20 * 1000, 'topic': argv.topic || 'observations.json', 'zk': argv.zk || 'localhost:2181', 'useHLC': argv.useHLC ? true : false } function printObj(obj) { for (var k in obj) { console.log(k + ' : ' + obj[k]); } } if (cluster.isMaster) { // Tekk us about our config console.log("Config: ") printObj(config); // create a file to write logs to // Fork workers. for (var i = 0; i < config.concurrency; i++) { // Time how long it takes for all the workers to finish // This should be 20s or w/e is set in the slave benchmark if (i == 0) { start_time = Date.now() } var worker = cluster.fork(); ingestionWorkers.push(worker); // bind message handler to each worker worker.on('message', function(results) { total += results['Messages Processed']; console.log('Results from slave: '); printObj(results); }); } cluster.on('exit', function(worker, code, signal) { console.log('worker ' + worker.process.pid + ' died'); finished_workers++; if (finished_workers == config.concurrency) { var duration = ((Date.now() - start_time) / 1000); console.log('All workers finished: ' + finished_workers); console.log('Runtime: ' + duration); console.log('Total messages: ' + total); console.log('TPS: '+ (total / duration) ); } }); } else { require('./kafka_consumer_worker.js')(config); }