//// Usage var queue = createMessageQueue({ messagesPerMinute: 60 }) queue.on('message', function(msg) { console.log('message: ' + JSON.stringify(msg, null, 2)) }) queue.enqueue({ to: 'avian', from: 'sjs', body: 'cool story bro' }) //// Implementation // The simplest thing I could think of. A rolling tally of # messages sent // each second, spanning one minute total, stored in a queue. Every second // we dequeue the oldest count and push a count of 0 which is then updated // as messages come in. The total in the queue at any given time must be // less than the specified maximum number of messages to send per minute. // The default is a maximum of 20 messages per minute. // // When that limit is reached messages will be queued until the queue // reaches the maximum specified size, after which messages are ignored. By // default up to 10 messages will be queued. // // There is a volume knob that goes from 0 to 10 where 0 is mute and 10 is // maximum chattiness. You can retrieve and set the volume so an external // algorithm could adjust the volume as necessary. var EventEmitter = require('events').EventEmitter // a couple of utils function mixin(a, b) { for (var k in b) a[k] = b[k] } function clamp(n, min, max) { if (n > max) return max if (n < min) return min return n } // the good stuff var MaxVolume = 10 , DefaultOptions = { messagesPerMinute: 20 // send at most 1 message every N minutes , queueSize: 10 // queue up to N messages before ignoring new ones } function createMessageQueue(options) { options = mixin(options || {}, DefaultOptions) var absoluteMaxMessagesPerMinute = options.messagesPerMinute , maxQueueSize = options.queueSize , currentMaxMessagesPerMinute = absoluteMaxMessagesPerMinute , messagesSentThisMinute = 0 , messagesSentEachSecond = [0] // flows left -> right. unshift is enqueue, pop is dequeue. , queue = [] , limited = false , volume = MaxVolume , preMuteVolume = volume // used to restore the volume on unmute function consume() { if (self.isEmpty()) return self.emit('message', queue.pop()) messagesSentEachSecond[0] += 1 messagesSentThisMinute += 1 limit() } function limit() { currentMaxMessagesPerMinute = Math.round((volume / MaxVolume) * absoluteMaxMessagesPerMinute) limited = messagesSentThisMinute >= currentMaxMessagesPerMinute if (!limited) consume() } var self = { enqueue: function(msg) { if (self.isFull()) return false queue.push(msg) if (!limited) consume() return true } , isEmpty: function() { return queue.length === 0 } , isFull: function() { return queue.length >= maxQueueSize } , mute: function() { if (volume > 0) { preMuteVolume = volume volume = 0 } } , start: function() { if (self.timeout) throw 'queue already started' limited = false volume = 10 messagesSentThisMinute = 0 messagesSentEachSecond = [0] self.timeout = setTimeout(function() { if (messagesSentEachSecond.length === 60) { messagesSentThisMinute -= messagesSentEachSecond.pop() } messagesSentEachSecond.unshift(0) limit() }, 1000) } , stop: function() { if (!self.timeout) return clearTimeout(self.timeout) delete self.timeout queue = [] messagesSentEachSecond = null } , volume: function(newVolume) { if (typeof newVolume !== 'undefined') { var n = +newVolume if (typeof n !== 'number' || isNaN(n)) throw 'volume does not go to 11' volume = clamp(n, 0, MaxVolume) } else { return volume } } , unmute: function() { if (volume === 0) volume = preMuteVolume } } EventEmitter.call(self) self.start() return self }