var async = require('async'); var _ = require('underscore'); module.exports = { initialize: function(api, next){ function nullOrErrorResult(err, result) { return (err || result === null || result.length === 0); } //returns a function which takes a callback(err, data) //callback: // - err will be null or contain an error if one occurs // - data will contain an object with: // - 'timestamp': { class, queue, args } function getDelayedTaskSchedule(api) { return function(done) { var scheduler = api.resque.scheduler; var redis = scheduler.connection.redis; redis.zrangebyscore(scheduler.connection.key('delayed_queue_schedule'), '-inf', '+inf', function(err, items) { if (nullOrErrorResult(err, items)) { done(err, {}); } else { async.reduce(items, {}, function(memo, timestamp, callback) { var key = scheduler.connection.key('delayed:' + timestamp); redis.lrange(key, 0, -1, function(err, jobs) { if(err) { callback(err); } else { var jobList = jobs.map(JSON.parse); memo[timestamp] = jobList; callback(null, memo); } }); }, done); } }); }; } //returns a function which takes a callback(err, data) //callback: // - err will be null or contain an error if one occurs // - data will contain an object with: // - 'queuename': [{ class, args }, ... ] function getQueuedTasks(api) { return function(done) { var scheduler = api.resque.scheduler; var redis = scheduler.connection.redis; redis.smembers(scheduler.connection.key('queues'), function(err, queues) { if(nullOrErrorResult(err, queues)) { done(err, {}); } else { async.map(queues, function(queue, callback) { redis.lrange(scheduler.connection.key('queue:' + queue), 0, -1, function(err, items) { var result = { name: queue, items: [] }; if(!nullOrErrorResult(err, items)) { for(var i = 0; i < items.length; i++) { result.items.push(JSON.parse(items[i])); } } callback(err, result); }); }, function(err, queueObjects) { var result = {}; for(var i = 0; i < queueObjects.length; i ++) { var queueItem = queueObjects[i]; result[queueItem.name] = queueItem.items; } done(err, result); }); } }); }; } //returns a function which takes a callback(err, data) //callback: // - err will be null or contain an error if one occurs // - data will contain an object with: // - 'workername': 'idle' | { class, args } function getWorkerStatus(api) { return function(done) { var scheduler = api.resque.scheduler; var redis = scheduler.connection.redis; redis.smembers(scheduler.connection.key('workers'), function(err, workers) { if(nullOrErrorResult(err, workers)) { done(err, {}); } else { async.map(workers, function(workerName, callback) { redis.get(scheduler.connection.key('worker:' + workerName), function(err, statusString) { if(err) { callback(err); } else { var result = {name: workerName}; if(statusString == null) { result.status = 'idle'; } else { var status = JSON.parse(statusString); result.status = status.payload; } callback(null, result); } }); }, function(err, workerItems) { var result = {}; for(var i = 0; i < workerItems.length; i ++) { var workerItem = workerItems[i]; result[workerItem.name] = workerItem.status; } done(err, result); }); } }); }; } //returns a function which takes a callback(err, data) //callback: // - err will be null or contain an error if one occurs // - data will contain an object with: // - 'jobName': {status: 'scheduled', at: timestamp, in: timeToTimestamp} | // {status: 'running', running: 'workerName'} | // {status: 'enqueued', queue: 'queueName'} | // {status: 'dequeued'} function getStatusByJob(api) { return function(done) { var taskList = Object.keys(api.tasks.tasks); var result = {}; for(var i = 0; i < taskList.length; i++) { result[taskList[i]] = { status: 'dequeued' }; } async.parallel({ schedule: getDelayedTaskSchedule(api), queuedTasks: getQueuedTasks(api), workers: getWorkerStatus(api) }, function(err, results) { if(err) { done(err, {}); } else { _.each(results.schedule, function(jobs, timestamp) { var timeToTimestamp = timestamp - Math.round(new Date().getTime() / 1000); _.each(jobs, function(job) { result[job.class] = {status: 'scheduled', at: timestamp, in: timeToTimestamp}; }); }); _.each(results.queuedTasks, function(jobs, queue) { _.each(jobs, function(job) { result[job.class] = {status: 'enqueued', queue: queue}; }); }); _.each(results.workers, function(job, worker) { if(job === 'idle') { return; } else { result[job.class] = {status: 'running', running: worker}; } }); done(null, result); } }); }; } api.resqueHelper = { delayedSchedule: getDelayedTaskSchedule, queuedTasks: getQueuedTasks, workerStatus: getWorkerStatus, jobs: getStatusByJob }; next(); } };