/** * This is a proof-of-concept for using ffmpeg as a HTTP video stream proxy * that can reduce ad volume. * * It only works on streams containing SCTE35 data packets. * You can check a stream using: * * ffmpeg -hide_banner -i 2>&1 | grep scte_35 * * Start the demo: * * node app.js * * Then open http://localhost:3000 in chrome or whatever... * * The volume will be reduced at the start of each ad-break, and restored when * the break ends. * * NB: if using windows, run this in a WSL bash prompt because cmd.exe|powershell * doesn't support unix domain sockets. */ // Channel 9 Sydney - 720p HLS stream const SOURCE_URL = 'https://9now-live.akamaized.net/hls/live/708951-b/ch9-syd/master1.m3u8' const net = require( 'net' ); const http = require( 'http' ); const crypto = require( 'crypto' ); const { spawn } = require( 'child_process' ); const { Transform } = require( 'stream' ); // https://www.scte.org/SCTEDocs/Standards/ANSI_SCTE%2035%202019r1.pdf // Section 9.6, page 29 class SpliceInfoSection { static validate ( buffer ) { let crc = 0xffffffff, i = 0; const l = buffer.length; for ( ; i < l; i ++ ) { crc ^= ( ( buffer[ i ] << 24 ) >>> 0 ); for ( let k = 0; k < 8; k ++ ) crc = crc & 0x80000000 ? ( crc << 1 ) ^ 0x04c11db7 : crc << 1; } return crc === 0; } static from ( buffer ) { if ( ! SpliceInfoSection.validate( buffer ) ) throw new Error( 'invalid splice info section: CRC error' ); let offset = 0; const result = Object.create( SpliceInfoSection.prototype, { table_id: { value: buffer[ offset ++ ], enumerable: false }, section_syntax_indicator: { value: ( buffer[ offset ] & 0x80 ) === 0x80, enumerable: false }, private_indicator: { value: ( buffer[ offset ] & 0x40 ) === 0x40, enumerable: false }, section_length: { value: ( ( buffer[ offset ++ ] & 0x0f ) << 8 ) | buffer[ offset ++ ], enumerable: false }, protocol_version: { value: buffer[ offset ++ ], enumerable: false }, encrypted_packet: { value: ( buffer[ offset ] & 0x80 ) === 0x80, enumerable: true }, encryption_algorithm: { value: ( buffer[ offset ] & 0x7e ) >> 1, enumerable: false }, pts_adjustment: { value: ( ( ( ( buffer[ offset ++ ] & 0x01 ) << 32 ) | buffer.slice( offset, offset += 4 ).readInt32BE( 0 ) ) >>> 0 ) / 90000, enumerable: true }, cw_index: { value: buffer[ offset ++ ], enumerable: false }, tier: { value: ( buffer[ offset ++ ] << 4 ) | ( ( buffer[ offset ] & 0xf0 ) >> 4 ), enumerable: false }, splice_command_length: { value: ( ( buffer[ offset ++ ] & 0x0f ) << 8 ) | buffer[ offset ++ ], enumerable: false }, splice_command_type: { value: buffer[ offset ++ ], enumerable: false }, } ); if ( result.table_id !== 0xfc ) throw new Error( 'invalid splice Info Section: bad table id' ); if ( result.section_length !== buffer.length - 3 ) throw new Error( 'invalid splice Info Section: bad section length' ); if ( result.splice_command_type == 0x05 ) { Object.defineProperties( result, { splice_event_id: { enumerable: true, value: buffer.slice( offset, offset += 4 ).readInt32BE( 0 ) }, splice_event_cancel_indicator: { enumerable: true, value: ( buffer[ offset ++ ] & 0x80 ) === 0x80 }, } ); if ( ! result.splice_event_cancel_indicator ) { Object.defineProperties( result, { out_of_network_indicator: { value: ( buffer[ offset ] & 0x80 ) === 0x80, enumerable: true }, program_splice_flag: { value: ( buffer[ offset ] & 0x40 ) === 0x40, enumerable: false }, duration_flag: { value: ( buffer[ offset ] & 0x20 ) === 0x20, enumerable: false }, splice_immediate_flag: { value: ( buffer[ offset ++ ] & 0x10 ) === 0x10, enumerable: false }, } ); } if ( result.program_splice_flag && ! result.splice_immediate_flag ) { const timeSpecified = ( buffer[ offset ] & 0x80 ) === 0x80; Object.defineProperty( result, 'splice_time', { value: timeSpecified ? ( ( ( ( buffer[ offset ++ ] & 0x01 ) << 32 ) | buffer.slice( offset, offset += 4 ).readInt32BE( 0 ) ) >>> 0 ) / 90000 : -1, enumerable: true, } ); } if ( ! result.program_splice_flag ) { const components = []; const componentCount = buffer[ offset ++ ]; for ( let i = 0 ; i < componentCount; i ++ ) { const component = { tag: buffer[ offset ++ ] }; const timeSpecified = ( buffer[ offset ] & 0x80 ) === 0x80; Object.defineProperty( component, 'splice_time', { value: timeSpecified ? ( ( ( ( buffer[ offset ++ ] & 0x01 ) << 32 ) | buffer.slice( offset, offset += 4 ).readInt32BE( 0 ) ) >>> 0 ) / 90000 : -1, enumerable: true, } ); } Object.defineProperty( result, 'components', { value: Object.freeze( components ), enumerable: true } ); } if ( result.duration_flag ) { Object.defineProperties( result, { auto_return: { value: ( buffer[ offset ] & 0x80 ) === 0x80, enumerable: true, }, duration: { value: ( ( ( ( buffer[ offset ++ ] & 0x01 ) << 32 ) | buffer.slice( offset, offset += 4 ).readInt32BE( 0 ) ) >>> 0 ) / 90000, enumerable: true }, } ); } Object.defineProperties( result, { unique_program_id: { value: ( buffer[ offset ++ ] << 8 ) | buffer[ offset ++ ], enumerable: true }, avail_num: { value: buffer[ offset ++ ], enumerable: false }, avails_expected: { value: buffer[ offset ++ ], enumerable: false }, } ); } else { console.error( 'Not a splice_insert command!' ); } const descriptor_loop_length = ( buffer[ offset ++ ] << 8 ) | buffer[ offset ++ ]; for ( let i = 0; i < descriptor_loop_length; i ++ ) { const splice_descriptor = { splice_descriptor_tag: buffer[ offset ++ ], descriptor_length: buffer[ offset ++ ], identifier: buffer.slice( offset, offset += 4 ).readInt32BE( 0 ), private_bytes: [] }; for ( let j = 0 ; j < splice_descriptor.descriptor_length; j ++ ) splice_descriptor.private_bytes.push( buffer[ offset ++ ] ); } Object.defineProperty( result, 'crc', { value: buffer.slice( offset, offset += 4 ).readInt32BE( 0 ), enumerable: false } ) return result; } } class SpliceInfoStream extends Transform { constructor ( options ) { super( Object.assign( options || {}, { objectMode: true } ) ) } _transform ( buffer, encoding, cb ) { try { this.push( SpliceInfoSection.from( buffer ) ); } catch ( err ) { console.error( 'SCTE35Stream:', err ); this.emit( 'invalid', buffer ); } cb(); } } const createProcess = ( input, output ) => new Promise( ( resolve, reject ) => { const ffmpeg = spawn( 'ffmpeg', [ '-i', input, '-hide_banner', '-loglevel', 'warning', '-threads', '2', '-analyzeduration', '5MB', '-probesize', '5MB', // video frames are not decoded '-map', '0:v', '-vcodec', 'copy', '-vsync', '0', // audio frames are decoded to alter the volume '-map', '0:a', '-filter:a', 'volume=1.0', '-acodec', 'aac', '-ab', '128k', '-ac', '2', '-async', '1', // send video and audio frames to unix domain socket // '-f', 'mpegts', '-f', 'mp4', '-movflags', 'faststart', '-movflags', 'empty_moov', '-movflags', 'frag_keyframe', '-listen', '1', output, // send scte 35 data packets to stdout '-map', '0:2', '-dcodec', 'copy', '-f', 'data', 'pipe:1' ] ); ffmpeg.once( 'error', err => { console.debug( 'ffmpeg error occurred' ); clearTimeout( timer ); reject( new Error( 'Failed to create ffmpeg child process' ) ); } ); ffmpeg.once( 'exit', code => { console.debug( 'ffmpeg exit has occurred' ); clearTimeout( timer ); ffmpeg.removeAllListeners( 'error' ); ffmpeg.stderr.once( 'data', buffer => { reject( buffer.toString( 'utf8' ) ); } ); } ); const timer = setTimeout( () => { ffmpeg.removeAllListeners( 'error' ); ffmpeg.removeAllListeners( 'exit' ); resolve( ffmpeg ); }, 150 ); } ); const app = ( req, res ) => { console.log( 'new request', res.connection.remoteAddress, res.connection.remotePort ); // create a hash from the remote conn. info to make a unique socket name const md5 = crypto.createHash( 'md5' ); md5.update( res.connection.remoteAddress + res.connection.remotePort ); const socketPath = `/tmp/adsoft.${md5.digest('hex')}.sock`; // create ffmpeg child process createProcess( SOURCE_URL, 'unix:' + socketPath ).then( ffmpeg => { ffmpeg.once( 'exit', () => console.debug( `ffmpeg child process ${ffmpeg.pid} has exited` ) ); ffmpeg.stderr.on( 'data', buffer => console.debug( buffer.toString( 'utf8' ) ) ); const scte35Stream = new SpliceInfoStream(); scte35Stream.on( 'data', info => { console.log( 'scte35 info:', info ); // set volume of stream ffmpeg.stdin.write( `Cvolume ${info.pts_adjustment} volume ${info.out_of_network_indicator?0.25:1.0}\n` ); } ); ffmpeg.stdout.pipe( scte35Stream ); // wait for ffmpeg to create the socket file... setTimeout( () => { const socket = net.createConnection( socketPath, () => { console.debug( 'connected to domain socket' ); res.setHeader( 'Content-Type', 'video/mp4' ); socket.pipe( res ); res.on( 'close', () => { console.debug( 'connection closed' ) socket.destroy(); } ); res.on( 'finish', () => { console.debug( 'response finished' ) socket.destroy(); } ); } ); socket.once( 'error', err => { console.error( 'unix socket error:', err ); ffmpeg.kill(); } ); }, 500 ); } ).catch( err => { console.error( err ); res.end( err.toString() ); } ); } const server = http.createServer( app ).listen( 3000, '0.0.0.0', () => { console.log( 'Listening...' ); } ); //