Skip to content

Instantly share code, notes, and snippets.

@kaustavha
Last active February 14, 2024 21:49
Show Gist options
  • Save kaustavha/a7e96da03eb48df4d61d to your computer and use it in GitHub Desktop.
Save kaustavha/a7e96da03eb48df4d61d to your computer and use it in GitHub Desktop.

Revisions

  1. kaustavha revised this gist Feb 20, 2016. 1 changed file with 6 additions and 6 deletions.
    12 changes: 6 additions & 6 deletions README.md
    Original file line number Diff line number Diff line change
    @@ -69,9 +69,9 @@ Mode: 2
    12 workers: `~70k TPS`
    24 workers: `~57k TPS`

    ### Using the High level consumer and variable worker counts:
    6 workers: `~16k TPS`
    12 workers: `~40k TPS`
    32 workers: `~45 TPS`
    64 workers: `~55k TPS`
    96 workers: `~50k TPS`
    ### Using the High level consumer and variable worker counts:
    6 workers: `~16k TPS`
    12 workers: `~40k TPS`
    32 workers: `~45 TPS`
    64 workers: `~55k TPS`
    96 workers: `~50k TPS`
  2. kaustavha revised this gist Feb 20, 2016. 1 changed file with 46 additions and 0 deletions.
    46 changes: 46 additions & 0 deletions README.md
    Original file line number Diff line number Diff line change
    @@ -29,3 +29,49 @@ node kafka_consumer_worker.js
    https://github.com/kaustavha/kafka-node--light
    And replace the require statements from `require('kafka-node')` to `require('kafka-node--light')`.
    Albeit if youre using snappy compression in kafka this wont be able to decompress your messages and you'll see a lower throughput since messages are chunked and compressed together

    ## Results:

    All these benchmarks were run a 12 AMD Opteron(tm) Processor 4184 (800mhz) core server with 64 gigs of RAM and 64 kafka partitions.

    ```
    Processed 407402 messages
    Start Time 1452886344.33
    Elapsed Time 60.134000062942505
    TPS: 6774.902710173456
    Total metrics processed: 2076641
    Mean metric count per obv: 5.097277382045252
    Mode: 2
    ```
    ```
    Processed 411449 messages
    Start Time 1452886457.638
    Elapsed Time 60.14800000190735
    TPS: 6840.6098288713265
    Total metrics processed: 2097194
    Mean metric count per obv: 5.097093442929744
    Mode: 2
    ```
    ```
    Processed 474156 messages
    Start Time 1452886560.338
    Elapsed Time 60.01300001144409
    TPS: 7900.888139396156
    Total metrics processed: 2414438
    Mean metric count per obv: 5.092075182007609
    Mode: 2
    ```

    ### Using a simple consumer and variable worker counts:
    2 workers : `~16 TPS`
    4 workers: `~25k TPS`
    6 workers: `~40k TPS`
    12 workers: `~70k TPS`
    24 workers: `~57k TPS`

    ### Using the High level consumer and variable worker counts:
    6 workers: `~16k TPS`
    12 workers: `~40k TPS`
    32 workers: `~45 TPS`
    64 workers: `~55k TPS`
    96 workers: `~50k TPS`
  3. kaustavha created this gist Feb 19, 2016.
    31 changes: 31 additions & 0 deletions README.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,31 @@
    # Kafka-node consumer benchmark

    This code can be used to benchmark throughput for a kafka cluster.
    Args:
    ```
    groupId -- (str) kafka consumer group id, default: bench
    concurrency -- (int) Number of worker threads to spawn, defaults to number of cpus on current host
    duration -- (int) How long to run the benchmark for, default: 20s
    topic -- (str) the kafka topic to consume from, defaults to observations.json
    zk -- (str) zookeeper url, defaults to localhost:2181
    useHLC -- (bool) wether to use a simple consumer or a high level consumer, see the kafka docs for an explanation. Defaults to false
    ```
    E.g:
    ```
    node master.js --groupId=test --concurrency=2 --duration=60 --topic=test --zk=localhost:2181/zkchroot --useHLC=true
    ```
    Or for just a single worker
    ```
    node kafka_consumer_worker.js
    ```

    ## Prereqs:
    You need a few libs
    ```
    npm install kafka-node optimist
    ```
    If you get a error installing kafka-node from a failure to compile its
    snappy dependency you can replace it with kafka-node--light which removes the dependency on snappy
    https://github.com/kaustavha/kafka-node--light
    And replace the require statements from `require('kafka-node')` to `require('kafka-node--light')`.
    Albeit if youre using snappy compression in kafka this wont be able to decompress your messages and you'll see a lower throughput since messages are chunked and compressed together
    122 changes: 122 additions & 0 deletions kafka_consumer_worker.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,122 @@
    'use strict';

    var kafka = require('kafka-node');
    var Offset = kafka.Offset;
    var Client = kafka.Client;
    var argv = require('optimist').argv;
    var cluster = require('cluster');
    var os = require('os');
    var fs = require('fs');

    function createKafkaConsumer(config) {
    var Consumer = (config['useHLC'] == true || config['useHLC'] == 'true') ? kafka.HighLevelConsumer : kafka.Consumer;

    if (cluster.isWorker) {
    var worker = cluster.worker ? 'consumer' + cluster.worker.id + '-' + os.hostname() : 'defaultconsumer-' + os.hostname();
    config.worker = worker;
    } else {
    if (config['useHLC']) console.log('Using a high level consumer');
    console.log("Using options: ");
    console.log(" topic: " + config.topic + "\n timeout duration: " + config['duration'] + "\n zk url: " + config.zk);
    }

    var client = new Client(config.zk);
    var payloads = [
    {
    topic: config.topic,
    offset: 0
    }
    ],
    options = {
    autoCommit: false,
    groupId: config.groupId,
    fromOffset: true // needed to actually start from 0th offset
    };

    var consumer = new Consumer(client, payloads, options);
    var offset = new Offset(client);
    var metricsCount = [];
    var message_count = 0;
    var start_time = Date.now() / 1000;

    function stop() {
    consumer.pause();
    var end_time = Date.now() / 1000;

    // Calculate total metrics
    // var total = 0;
    // var modeMath = {};
    // metricsCount.forEach(function(cnt) {
    // total += cnt;
    // if (modeMath[cnt]) {
    // modeMath[cnt] += 1
    // } else {
    // modeMath[cnt] = 1
    // }
    // });
    // var max = 0, mode;
    // Object.keys(modeMath).forEach(function(key) {
    // if (modeMath[key] > max) {
    // max = modeMath[key];
    // mode = key;
    // }
    // });

    var results = {
    'TPS': (message_count / (end_time - start_time)),
    'Messages Processed': message_count,
    'Start Time': start_time,
    'Elapsed Time': (end_time - start_time),
    // 'Total metrics processed': total,
    // 'Mean metric count per obv': (total/message_count),
    // 'Mode': + mode
    }

    // Report to master if we're a worker, else console.log
    if (cluster.isWorker) {
    process.send(results);
    } else {
    for (var resKey in results)
    console.log(resKey + ' : ' + results[resKey]);
    }

    process.exit(0);
    }


    process.on('SIGINT', stop);
    setTimeout(stop, config['duration']);
    // Handle consumer events
    consumer.on('message', function (message) {
    if (message_count < 1) {
    console.log('Started dequeing messages after ' + ((Date.now() / 1000) - start_time));
    }
    message_count++;
    // var valueObj = JSON.parse(message.value);
    // metricsCount.push(Object.keys(valueObj.metrics).length);
    });

    consumer.on('error', function (err) {
    console.log('error', err);
    });
    /*
    * If consumer get `offsetOutOfRange` event, fetch data from the smallest(oldest) offset
    */
    consumer.on('offsetOutOfRange', function (topic) {
    console.log('\n \n offset out of range exception \n \n');
    topic.maxNum = 2;
    offset.fetch([topic], function (err, offsets) {
    var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
    // console.log(min);
    consumer.setOffset(topic.topic, topic.partition, min);
    });
    });
    }

    require.main == module ? createKafkaConsumer({
    'duration': argv.duration || 20 * 1000,
    'topic': argv.topic || 'observations.json',
    'zk': argv.zk || 'localhost:2181',
    'groupId': argv.groupId || 'bench',
    'useHLC': argv.ctype == 'HLC' ? true : false
    }) : module.exports = createKafkaConsumer;
    64 changes: 64 additions & 0 deletions master.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,64 @@
    var cluster = require('cluster');
    var os = require('os');
    var wrk = 0;
    var ingestionWorkers = [];
    var total = 0;
    var start_time;
    var finished_workers = 0;
    var fs = require('fs');

    //handle some args
    var argv = require('optimist').argv;
    var config = {
    'groupId': argv.groupId || 'bench',
    'concurrency': argv.concurrency || os.cpus().length,
    'duration': argv.duration || 20 * 1000,
    'topic': argv.topic || 'observations.json',
    'zk': argv.zk || 'localhost:2181',
    'useHLC': argv.useHLC ? true : false
    }

    function printObj(obj) {
    for (var k in obj) {
    console.log(k + ' : ' + obj[k]);
    }
    }

    if (cluster.isMaster) {
    // Tekk us about our config
    console.log("Config: ")
    printObj(config);
    // create a file to write logs to

    // Fork workers.
    for (var i = 0; i < config.concurrency; i++) {
    // Time how long it takes for all the workers to finish
    // This should be 20s or w/e is set in the slave benchmark
    if (i == 0) {
    start_time = Date.now()
    }
    var worker = cluster.fork();
    ingestionWorkers.push(worker);

    // bind message handler to each worker
    worker.on('message', function(results) {
    total += results['Messages Processed'];
    console.log('Results from slave: ');
    printObj(results);
    });
    }

    cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
    finished_workers++;
    if (finished_workers == config.concurrency) {
    var duration = ((Date.now() - start_time) / 1000);
    console.log('All workers finished: ' + finished_workers);
    console.log('Runtime: ' + duration);
    console.log('Total messages: ' + total);
    console.log('TPS: '+ (total / duration) );
    }
    });
    } else {
    require('./kafka_consumer_worker.js')(config);
    }