const prom = require('prom-client'); const pm2 = require('pm2'); let pm2Bus; const REQ_TOPIC = 'get_prom_register'; function pm2exec(cmd, ...args) { return new Promise((resolve, reject) => { pm2[cmd](...args, (err, resp) => (err ? reject(err) : resolve(resp))); }); } function getOnlineInstances(instancesData) { return instancesData.filter(({ pm2_env }) => pm2_env.status === 'online'); } function getMainMetricsRegister(instancesData) { // don't use prom.register here because these metrics // will be summed in cluster mode! const registry = new prom.Registry(); const mainMetrics = [ { name: 'up', help: 'Is the process running' }, { name: 'cpu', help: 'Process cpu usage' }, { name: 'memory', help: 'Process memory usage' }, { name: 'heap_size', help: 'Process heap size' }, { name: 'used_heap_size', help: 'Process heap usage' }, { name: 'uptime', help: 'Process uptime' }, { name: 'instances', help: 'Process instances' }, { name: 'restarts', help: 'Process restarts' }, { name: 'loop_delay', help: 'Event Loop Latency' }, { name: 'loop_delay_p95', help: 'Event Loop Latency p95' }, ].reduce((acc, { name, help }) => { acc[name] = new prom.Gauge({ name, help, labelNames: ['name', 'instance'], registers: [registry], }); return acc; }, {}); instancesData.forEach(({ name, pm2_env, monit }) => { const conf = { name: name, instance: pm2_env.pm_id, }; const axm = pm2_env.axm_monitor; const values = { up: pm2_env.status === 'online' ? 1 : 0, cpu: monit.cpu, memory: monit.memory, heap_size: parseFloat(axm['Heap Size'].value) || null, used_heap_size: parseFloat(axm['Used Heap Size'].value) || null, uptime: Math.round((Date.now() - pm2_env.pm_uptime) / 1000), instances: pm2_env.instances || 1, restarts: pm2_env.unstable_restarts + pm2_env.restart_time, loop_delay: parseFloat(axm['Event Loop Latency'].value) || null, loop_delay_p95: parseFloat(axm['Event Loop Latency p95'].value) || null, }; Object.entries(values).forEach(([name, value]) => { if (value !== null) { mainMetrics[name].set(conf, value); } }); }); return registry; } function requestNeighboursData(instancesData, instancesToWait) { const targetInstanceId = Number(process.env.pm_id); const data = { topic: REQ_TOPIC, data: { targetInstanceId } }; Object.values(instancesData).forEach(({ pm_id }) => { if (pm_id !== targetInstanceId) { pm2exec('sendDataToProcessId', pm_id, data).catch(e => { instancesToWait.count--; console.error(`Failed to request metrics from instance #${pm_id}: ${e.message}`); }); } }); } function getCurrentRegistry(instancesData) { return prom.AggregatorRegistry.aggregate([ getMainMetricsRegister(instancesData).getMetricsAsJSON(), prom.register.getMetricsAsJSON(), ]); } async function getAggregatedRegistry(instancesData) { const onlineInstances = getOnlineInstances(instancesData); let instancesToWait = { count: onlineInstances.length }; const registryPromise = new Promise(async (resolve, reject) => { const registersList = []; const instanceId = Number(process.env.pm_id); const eventName = `process:${instanceId}`; let responcesCount = 1; let timeoutId; function sendResult() { pm2Bus.off(eventName); resolve(prom.AggregatorRegistry.aggregate(registersList)); } function kickNoResponseTimeout() { timeoutId = setTimeout(() => { console.warn( `Metrics were sent by timeout. No response from ${instancesToWait.count - responcesCount} instances.`, ); sendResult(); }, 1000); } try { registersList[instanceId] = getCurrentRegistry( instancesData, ).getMetricsAsJSON(); if (!pm2Bus) { pm2Bus = await pm2exec('launchBus'); } kickNoResponseTimeout(); pm2Bus.on(eventName, packet => { registersList[packet.data.instanceId] = packet.data.register; responcesCount++; clearTimeout(timeoutId); if (responcesCount === instancesToWait.count) { sendResult(); } else { kickNoResponseTimeout(); } }); } catch (e) { reject(e); } }); // this function must be called after the registryPromise declaration // because requests have to be sent after the listener was setup. requestNeighboursData(onlineInstances, instancesToWait); return registryPromise; } // Listener process.on('message', packet => { if (packet.topic === REQ_TOPIC) { process.send({ type: `process:${packet.data.targetInstanceId}`, data: { instanceId: Number(process.env.pm_id), register: prom.register.getMetricsAsJSON(), }, }); } }); (async () => await pm2exec('connect'))(); module.exports = async (req, res) => { let responceData; try { const instancesData = await pm2exec('list'); const register = getOnlineInstances(instancesData).length > 1 ? await getAggregatedRegistry(instancesData) : getCurrentRegistry(instancesData); responceData = register.metrics(); } catch (err) { console.error(`Failed to get metrics: ${err.message}`); } finally { res.set('Content-Type', prom.register.contentType); res.end(responceData); } };