// **************************************************************************** // This code is designed to allow your ExpressJS app to stream uploads through // clamav and on to S3 will full error-handling. // ----- // You're going to need the following NPM modules installed for this to work: // - uuid // - busboy // - aws-sdk // - s3-upload-stream // - express // - clamscan // - filesize // **************************************************************************** const uuid = require('uuid/v4'); const BusBoy = require('busboy'); const AWS = require('aws-sdk'); const NodeClam = require('clamscan'); const ClamScan = new NodeClam().init({ clamdscan: { socket: '/var/run/clamd.scan/clamd.sock', timeout: 300000, local_fallback: true, }, preference: 'clamdscan' }); const s3 = new AWS.S3({ params: { Bucket: 'BUCKET_NAME_HERE' } }); const s3_stream = require('s3-upload-stream')(s3); // **************************************************************************** // For taking an end-user's Express framework upload stream and piping it though // clamscan and then on to S3 with full error-handling // ----- // @param Object req The Express Request object // @param Object res The Express Response object // @return Promise I think this can work... // **************************************************************************** function pipe2s3(req, res, opts={}) { const debug_mode = true; // Change to `false` if you don't wanna see a bazillion messages in your logs const EventEmitter = require('events'); const pipeline = new EventEmitter(); return new Promise((resolve, reject) => { let filesize = 0; let s3_details = null; let scan_result = null; let file_info = {}; let fields = {}; let num_files = 0; const defaults = { s3_path: '', // Needs trailing slash if provided... s3_id: null, s3_acl: 'private', s3_metadata: {}, max_file_size: 20 * 1024 * 1024, // 20 MB max_files: null, // FALSEY === No max number of files allowed_mimetypes: [], // FALSEY === Accept anything }; // Merge user option with defaults const options = {...defaults, ...opts}; if (!options.s3_id) options.s3_id = `${options.s3_path}${uuid()}`; // Instantiate BusBoy for this request const busboy = new BusBoy({headers: req.headers, limits: { fileSize: options.max_file_size, files: options.max_files}}); const log_error = (err) => { const code = uuid(); console.error(`Error Code: ${code}: ${err}`, err); } // Function to remove file from S3 const remove_s3_obj = async () => { try { const result = await s3.deleteObject({Key: options.s3_id}).promise(); console.log(`S3 Object: "${options.s3_id}" was deleted due to a ClamAV error or virus detection.`, result); } catch (err) { log_error(err); } }; // When file has been uploaded to S3 and has been scanned, this function is called const pipeline_complete = async () => { if (debug_mode) console.log("Pipeline complete!", {s3_details, scan_result, file_info}); // If the S3 upload threw an error if (s3_details instanceof Error) { log_error(s3_details); return reject(new Error("There was an issue with your upload (Code: 1). Please try again. If you continue to experience issues, please contact Customer Support!")); } // If the scan threw an error... else if (scan_result instanceof Error) { if ('data' in scan_result && error.data.is_infected) { log_error("Stream contained virus(es):", scan_result.data.viruses); } // Not sure what's going on with this ECONNRESET stuff... if ('code' in scan_result && scan_result.code !== 'ECONNRESET') { log_error(scan_result); // Remove the S3 object remove_s3_obj(); return reject(new Error("There was an issue with your upload (Code: 2). Please try again. If you continue to experience issues, please contact Customer Support!")); } } // If the file is infected else if (scan_result && 'is_infected' in scan_result && scan_result.is_infected === true) { console.log(`A virus (${scan_result.viruses.join(', ')}) has been uploaded!`); // Remove the S3 object remove_s3_obj(); return reject(new Error("The file you've uploaded contained a virus. Please scan your system immediately. If you feel this is in error, please contact Customer Support. Thank you!")); } // If we're unsure the file is infected, just note that in the logs else if (scan_result && 'is_infected' in scan_result && scan_result.is_infected === null) { console.log(`There was an issue scanning the uploaded file... You might need to investigate manually: `, {s3_details, file_info}); } // If the file uploaded just file... else { if (debug_mode) console.log(`The file uploaded was just fine... Carrying on...`); } // Resolve upload promise with file info if (s3_details && 'Location' in s3_details) s3_details.Location = decodeURIComponent(s3_details.Location); // Not sure why decoding is necessary, but, w/e... return resolve({s3_details, file_info, fields}); }; // Wait for both the file to be uploaded to S3 and for the scan to complete // and then call `pipeline_complete` pipeline.on('part-complete', () => { // If the full pipeline has completed... if (scan_result !== null && s3_details !== null) pipeline_complete(); }); // Wait for file(s) busboy.on('file', (fieldname, file, filename, encoding, mimetype) => { num_files++; // Make sure we're only allowing the specified type of file(s) if (Array.isArray(options.allowed_mimetypes) && options.allowed_mimetypes.length > 0 && !options.allowed_mimetypes.includes(mimetype)) return reject(new Error("Invalid file type provided!")); const filename_ascii = filename.replace(/[^\x00-\x7F]/g, "").replace(/[,;'"\\\/`|><*:$]/g, "").replace(/^[.-]+(.*)/,"$1"); // Update file info object file_info.filename = filename; file_info.encoding = encoding; file_info.mimetype = mimetype; file_info.filename_ascii = filename_ascii; // Configure the S3 streaming upload const upload = s3_stream.upload({ Bucket: s3_config.bucket, Key: options.s3_id, ContentDisposition: `inline; filename="${filename_ascii}"`, ContentType: mimetype, ACL: options.s3_acl, Metadata: options.s3_metadata, }); upload.maxPartSize(20 * 1024 * 1024); // 20 MB upload.concurrentParts(5); upload.on('error', err => { s3_details = err; pipeline.emit('part-complete'); }); // Do this whenever a chunk of the upload has been received by S3 upload.on('part', details => { if (details.receivedSize > filesize) { filesize = details.receivedSize; file_info.filesize = filesize; } if (debug_mode) console.log("File uploading to S3: ", Math.round((details.uploadedSize / details.receivedSize) * 100) + `% (${details.uploadedSize} / ${details.receivedSize})`); }); // When the file has been fully uploaded to S3 upload.on('uploaded', details => { if (debug_mode) console.log("File Uploaded to S3: ", {details, filesize}); s3_details = details; pipeline.emit('part-complete'); }); // Get instance of clamscan object ClamScan.then(clamscan => { const av = clamscan.passthrough(); // If there's an error scanning the file av.on('error', error => { scan_result = error; pipeline.emit('part-complete'); }).on('finish', () => { if (debug_mode) console.log("All data has been sent to virus scanner"); }).on('end', () => { if (debug_mode) console.log("All data has been retrieved by ClamAV and sent on to the destination!"); }).on('scan-complete', result => { if (debug_mode) console.log("Scan Complete. Result: ", result); scan_result = result; pipeline.emit('part-complete'); }); // Pipe stream through ClamAV and on to S3 file.pipe(av).pipe(upload) }).catch(e => { log_error(e); reject(e); }); if (debug_mode) console.log("Got a file stream!", filename); }); busboy.on('limit', () => { const pretty_filesize = filesize(filesize); console.log("File is too big..."); // TODO: Kill stream?? return reject(new Error(`The file you've provided (${pretty_filesize}) is over the maximum ${filesize(options.max_file_size)} allowed.`)); }); // When busboy has sent the entire upload to ClamAV busboy.on('finish', () => { if (debug_mode) console.log("BusBoy has fully flushed to S3 Stream..."); if (num_files === 0) pipeline_complete(); }); // Capture the non-file fields too... busboy.on('field', (fieldname, val, fieldnameTruncated, valTruncated, encoding, mimetype) => { fields[fieldname] = val; }); // Send request to busboy req.pipe(busboy); }); }