Skip to content

Instantly share code, notes, and snippets.

@kakopappa
Forked from samsonjs/rate-limiting-queue.js
Created August 23, 2023 02:32
Show Gist options
  • Select an option

  • Save kakopappa/bf757cedb7f681206e078a122969f43c to your computer and use it in GitHub Desktop.

Select an option

Save kakopappa/bf757cedb7f681206e078a122969f43c to your computer and use it in GitHub Desktop.

Revisions

  1. @samsonjs samsonjs revised this gist May 25, 2011. 1 changed file with 3 additions and 3 deletions.
    6 changes: 3 additions & 3 deletions rate-limiting-queue.js
    Original file line number Diff line number Diff line change
    @@ -1,12 +1,12 @@
    //// Usage

    var messageBucket = createBucket({ messagesPerMinute: 5 })
    var queue = createMessageQueue({ messagesPerMinute: 60 })

    messageBucket.on('message', function(msg) {
    queue.on('message', function(msg) {
    console.log('message: ' + JSON.stringify(msg, null, 2))
    })

    messageBucket.enqueue({ to: 'avian', from: 'sjs', body: 'cool story bro' })
    queue.enqueue({ to: 'avian', from: 'sjs', body: 'cool story bro' })



  2. @samsonjs samsonjs renamed this gist May 25, 2011. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  3. @samsonjs samsonjs revised this gist May 25, 2011. 1 changed file with 4 additions and 4 deletions.
    8 changes: 4 additions & 4 deletions rate-limited-queue.js
    Original file line number Diff line number Diff line change
    @@ -17,11 +17,11 @@ messageBucket.enqueue({ to: 'avian', from: 'sjs', body: 'cool story bro' })
    // we dequeue the oldest count and push a count of 0 which is then updated
    // as messages come in. The total in the queue at any given time must be
    // less than the specified maximum number of messages to send per minute.
    // The default is a maximum of 2 messages per minute.
    // The default is a maximum of 20 messages per minute.
    //
    // When that limit is reached messages will be queued until the queue
    // reaches the maximum specified size, after which messages are ignored. By
    // default up to 4 messages will be queued.
    // default up to 10 messages will be queued.
    //
    // There is a volume knob that goes from 0 to 10 where 0 is mute and 10 is
    // maximum chattiness. You can retrieve and set the volume so an external
    @@ -44,8 +44,8 @@ function clamp(n, min, max) {
    // the good stuff

    var MaxVolume = 10
    , DefaultOptions = { messagesPerMinute: 2 // send at most 1 message every N minutes
    , queueSize: 4 // queue up to N messages before ignoring new ones
    , DefaultOptions = { messagesPerMinute: 20 // send at most 1 message every N minutes
    , queueSize: 10 // queue up to N messages before ignoring new ones
    }

    function createMessageQueue(options) {
  4. @samsonjs samsonjs revised this gist May 25, 2011. 1 changed file with 7 additions and 4 deletions.
    11 changes: 7 additions & 4 deletions rate-limited-queue.js
    Original file line number Diff line number Diff line change
    @@ -41,7 +41,10 @@ function clamp(n, min, max) {
    return n
    }

    var DefaultOptions = { messagesPerMinute: 2 // send at most 1 message every N minutes
    // the good stuff

    var MaxVolume = 10
    , DefaultOptions = { messagesPerMinute: 2 // send at most 1 message every N minutes
    , queueSize: 4 // queue up to N messages before ignoring new ones
    }

    @@ -55,7 +58,7 @@ function createMessageQueue(options) {
    , messagesSentEachSecond = [0] // flows left -> right. unshift is enqueue, pop is dequeue.
    , queue = []
    , limited = false
    , volume = 10
    , volume = MaxVolume
    , preMuteVolume = volume // used to restore the volume on unmute

    function consume() {
    @@ -67,7 +70,7 @@ function createMessageQueue(options) {
    }

    function limit() {
    currentMaxMessagesPerMinute = volume * absoluteMaxMessagesPerMinute
    currentMaxMessagesPerMinute = Math.round((volume / MaxVolume) * absoluteMaxMessagesPerMinute)
    limited = messagesSentThisMinute >= currentMaxMessagesPerMinute
    if (!limited) consume()
    }
    @@ -122,7 +125,7 @@ function createMessageQueue(options) {
    if (typeof newVolume !== 'undefined') {
    var n = +newVolume
    if (typeof n !== 'number' || isNaN(n)) throw 'volume does not go to 11'
    volume = clamp(n, 0, 10)
    volume = clamp(n, 0, MaxVolume)
    }
    else {
    return volume
  5. @samsonjs samsonjs revised this gist May 25, 2011. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion rate-limited-queue.js
    Original file line number Diff line number Diff line change
    @@ -121,7 +121,7 @@ function createMessageQueue(options) {
    , volume: function(newVolume) {
    if (typeof newVolume !== 'undefined') {
    var n = +newVolume
    if (typeof n !== 'number' || !isNaN(n)) throw 'volume does not go to 11'
    if (typeof n !== 'number' || isNaN(n)) throw 'volume does not go to 11'
    volume = clamp(n, 0, 10)
    }
    else {
  6. @samsonjs samsonjs revised this gist May 25, 2011. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion rate-limited-queue.js
    Original file line number Diff line number Diff line change
    @@ -6,7 +6,7 @@ messageBucket.on('message', function(msg) {
    console.log('message: ' + JSON.stringify(msg, null, 2))
    })

    messageBucket.queue({ to: 'avian', from: 'sjs', body: 'cool story bro' })
    messageBucket.enqueue({ to: 'avian', from: 'sjs', body: 'cool story bro' })



  7. @samsonjs samsonjs revised this gist May 25, 2011. 1 changed file with 43 additions and 3 deletions.
    46 changes: 43 additions & 3 deletions rate-limited-queue.js
    Original file line number Diff line number Diff line change
    @@ -22,26 +22,41 @@ messageBucket.queue({ to: 'avian', from: 'sjs', body: 'cool story bro' })
    // When that limit is reached messages will be queued until the queue
    // reaches the maximum specified size, after which messages are ignored. By
    // default up to 4 messages will be queued.
    //
    // There is a volume knob that goes from 0 to 10 where 0 is mute and 10 is
    // maximum chattiness. You can retrieve and set the volume so an external
    // algorithm could adjust the volume as necessary.

    var EventEmitter = require('events').EventEmitter

    // a couple of utils

    function mixin(a, b) {
    for (var k in b) a[k] = b[k]
    }

    function clamp(n, min, max) {
    if (n > max) return max
    if (n < min) return min
    return n
    }

    var DefaultOptions = { messagesPerMinute: 2 // send at most 1 message every N minutes
    , queueSize: 4 // queue up to N messages before ignoring new ones
    }

    function createMessageQueue(options) {
    options = mixin(options || {}, DefaultOptions)

    var maxMessagesPerMinute = options.messagesPerMinute
    var absoluteMaxMessagesPerMinute = options.messagesPerMinute
    , maxQueueSize = options.queueSize
    , queue = []
    , currentMaxMessagesPerMinute = absoluteMaxMessagesPerMinute
    , messagesSentThisMinute = 0
    , messagesSentEachSecond = [0] // flows left -> right. unshift is enqueue, pop is dequeue.
    , queue = []
    , limited = false
    , volume = 10
    , preMuteVolume = volume // used to restore the volume on unmute

    function consume() {
    if (self.isEmpty()) return
    @@ -52,7 +67,8 @@ function createMessageQueue(options) {
    }

    function limit() {
    limited = messagesSentThisMinute >= maxMessagesPerMinute
    currentMaxMessagesPerMinute = volume * absoluteMaxMessagesPerMinute
    limited = messagesSentThisMinute >= currentMaxMessagesPerMinute
    if (!limited) consume()
    }

    @@ -72,9 +88,17 @@ function createMessageQueue(options) {
    return queue.length >= maxQueueSize
    }

    , mute: function() {
    if (volume > 0) {
    preMuteVolume = volume
    volume = 0
    }
    }

    , start: function() {
    if (self.timeout) throw 'queue already started'
    limited = false
    volume = 10
    messagesSentThisMinute = 0
    messagesSentEachSecond = [0]
    self.timeout = setTimeout(function() {
    @@ -93,6 +117,22 @@ function createMessageQueue(options) {
    queue = []
    messagesSentEachSecond = null
    }

    , volume: function(newVolume) {
    if (typeof newVolume !== 'undefined') {
    var n = +newVolume
    if (typeof n !== 'number' || !isNaN(n)) throw 'volume does not go to 11'
    volume = clamp(n, 0, 10)
    }
    else {
    return volume
    }
    }

    , unmute: function() {
    if (volume === 0) volume = preMuteVolume
    }

    }

    EventEmitter.call(self)
  8. @samsonjs samsonjs revised this gist May 25, 2011. 1 changed file with 2 additions and 3 deletions.
    5 changes: 2 additions & 3 deletions rate-limited-queue.js
    Original file line number Diff line number Diff line change
    @@ -6,7 +6,7 @@ messageBucket.on('message', function(msg) {
    console.log('message: ' + JSON.stringify(msg, null, 2))
    })

    messageBucket.enqueue({ to: 'avian', from: 'sjs', body: 'cool story bro' })
    messageBucket.queue({ to: 'avian', from: 'sjs', body: 'cool story bro' })



    @@ -40,15 +40,14 @@ function createMessageQueue(options) {
    , maxQueueSize = options.queueSize
    , queue = []
    , messagesSentThisMinute = 0
    , messagesSentEachSecond = [0] // left -> right, unshift is enqueue, pop is dequeue
    , messagesSentEachSecond = [0] // flows left -> right. unshift is enqueue, pop is dequeue.
    , limited = false

    function consume() {
    if (self.isEmpty()) return
    self.emit('message', queue.pop())
    messagesSentEachSecond[0] += 1
    messagesSentThisMinute += 1
    if (self.isEmpty()) self.emit('empty')
    limit()
    }

  9. @samsonjs samsonjs created this gist May 25, 2011.
    102 changes: 102 additions & 0 deletions rate-limited-queue.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,102 @@
    //// Usage

    var messageBucket = createBucket({ messagesPerMinute: 5 })

    messageBucket.on('message', function(msg) {
    console.log('message: ' + JSON.stringify(msg, null, 2))
    })

    messageBucket.enqueue({ to: 'avian', from: 'sjs', body: 'cool story bro' })



    //// Implementation

    // The simplest thing I could think of. A rolling tally of # messages sent
    // each second, spanning one minute total, stored in a queue. Every second
    // we dequeue the oldest count and push a count of 0 which is then updated
    // as messages come in. The total in the queue at any given time must be
    // less than the specified maximum number of messages to send per minute.
    // The default is a maximum of 2 messages per minute.
    //
    // When that limit is reached messages will be queued until the queue
    // reaches the maximum specified size, after which messages are ignored. By
    // default up to 4 messages will be queued.

    var EventEmitter = require('events').EventEmitter

    function mixin(a, b) {
    for (var k in b) a[k] = b[k]
    }

    var DefaultOptions = { messagesPerMinute: 2 // send at most 1 message every N minutes
    , queueSize: 4 // queue up to N messages before ignoring new ones
    }

    function createMessageQueue(options) {
    options = mixin(options || {}, DefaultOptions)

    var maxMessagesPerMinute = options.messagesPerMinute
    , maxQueueSize = options.queueSize
    , queue = []
    , messagesSentThisMinute = 0
    , messagesSentEachSecond = [0] // left -> right, unshift is enqueue, pop is dequeue
    , limited = false

    function consume() {
    if (self.isEmpty()) return
    self.emit('message', queue.pop())
    messagesSentEachSecond[0] += 1
    messagesSentThisMinute += 1
    if (self.isEmpty()) self.emit('empty')
    limit()
    }

    function limit() {
    limited = messagesSentThisMinute >= maxMessagesPerMinute
    if (!limited) consume()
    }

    var self = {
    enqueue: function(msg) {
    if (self.isFull()) return false
    queue.push(msg)
    if (!limited) consume()
    return true
    }

    , isEmpty: function() {
    return queue.length === 0
    }

    , isFull: function() {
    return queue.length >= maxQueueSize
    }

    , start: function() {
    if (self.timeout) throw 'queue already started'
    limited = false
    messagesSentThisMinute = 0
    messagesSentEachSecond = [0]
    self.timeout = setTimeout(function() {
    if (messagesSentEachSecond.length === 60) {
    messagesSentThisMinute -= messagesSentEachSecond.pop()
    }
    messagesSentEachSecond.unshift(0)
    limit()
    }, 1000)
    }

    , stop: function() {
    if (!self.timeout) return
    clearTimeout(self.timeout)
    delete self.timeout
    queue = []
    messagesSentEachSecond = null
    }
    }

    EventEmitter.call(self)
    self.start()
    return self
    }