- 
      
- 
        Save bugre/b80d261697004404dfd09cbee61ef237 to your computer and use it in GitHub Desktop. 
    This is a proof-of-concept for using ffmpeg as a HTTP video stream proxy that can reduce the volume of ad-breaks
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | /** | |
| * This is a proof-of-concept for using ffmpeg as a HTTP video stream proxy | |
| * that can reduce ad volume. | |
| * | |
| * It only works on streams containing SCTE35 data packets. | |
| * You can check a stream using: | |
| * | |
| * ffmpeg -hide_banner -i <SOURCE_URL> 2>&1 | grep scte_35 | |
| * | |
| * Start the demo: | |
| * | |
| * node app.js | |
| * | |
| * Then open http://localhost:3000 in chrome or whatever... | |
| * | |
| * The volume will be reduced at the start of each ad-break, and restored when | |
| * the break ends. | |
| * | |
| * NB: if using windows, run this in a WSL bash prompt because cmd.exe|powershell | |
| * doesn't support unix domain sockets. | |
| */ | |
| // Channel 9 Sydney - 720p HLS stream | |
| const SOURCE_URL = 'https://9now-live.akamaized.net/hls/live/708951-b/ch9-syd/master1.m3u8' | |
| const net = require( 'net' ); | |
| const http = require( 'http' ); | |
| const crypto = require( 'crypto' ); | |
| const { spawn } = require( 'child_process' ); | |
| const { Transform } = require( 'stream' ); | |
| // https://www.scte.org/SCTEDocs/Standards/ANSI_SCTE%2035%202019r1.pdf | |
| // Section 9.6, page 29 | |
| class SpliceInfoSection { | |
| static validate ( buffer ) { | |
| let crc = 0xffffffff, i = 0; | |
| const l = buffer.length; | |
| for ( ; i < l; i ++ ) { | |
| crc ^= ( ( buffer[ i ] << 24 ) >>> 0 ); | |
| for ( let k = 0; k < 8; k ++ ) | |
| crc = crc & 0x80000000 ? ( crc << 1 ) ^ 0x04c11db7 : crc << 1; | |
| } | |
| return crc === 0; | |
| } | |
| static from ( buffer ) { | |
| if ( ! SpliceInfoSection.validate( buffer ) ) | |
| throw new Error( 'invalid splice info section: CRC error' ); | |
| let offset = 0; | |
| const result = Object.create( SpliceInfoSection.prototype, { | |
| table_id: { | |
| value: buffer[ offset ++ ], | |
| enumerable: false | |
| }, | |
| section_syntax_indicator: { | |
| value: ( buffer[ offset ] & 0x80 ) === 0x80, | |
| enumerable: false | |
| }, | |
| private_indicator: { | |
| value: ( buffer[ offset ] & 0x40 ) === 0x40, | |
| enumerable: false | |
| }, | |
| section_length: { | |
| value: ( ( buffer[ offset ++ ] & 0x0f ) << 8 ) | buffer[ offset ++ ], | |
| enumerable: false | |
| }, | |
| protocol_version: { | |
| value: buffer[ offset ++ ], | |
| enumerable: false | |
| }, | |
| encrypted_packet: { | |
| value: ( buffer[ offset ] & 0x80 ) === 0x80, | |
| enumerable: true | |
| }, | |
| encryption_algorithm: { | |
| value: ( buffer[ offset ] & 0x7e ) >> 1, | |
| enumerable: false | |
| }, | |
| pts_adjustment: { | |
| value: ( ( ( ( buffer[ offset ++ ] & 0x01 ) << 32 ) | buffer.slice( offset, offset += 4 ).readInt32BE( 0 ) ) >>> 0 ) / 90000, | |
| enumerable: true | |
| }, | |
| cw_index: { | |
| value: buffer[ offset ++ ], | |
| enumerable: false | |
| }, | |
| tier: { | |
| value: ( buffer[ offset ++ ] << 4 ) | ( ( buffer[ offset ] & 0xf0 ) >> 4 ), | |
| enumerable: false | |
| }, | |
| splice_command_length: { | |
| value: ( ( buffer[ offset ++ ] & 0x0f ) << 8 ) | buffer[ offset ++ ], | |
| enumerable: false | |
| }, | |
| splice_command_type: { | |
| value: buffer[ offset ++ ], | |
| enumerable: false | |
| }, | |
| } ); | |
| if ( result.table_id !== 0xfc ) | |
| throw new Error( 'invalid splice Info Section: bad table id' ); | |
| if ( result.section_length !== buffer.length - 3 ) | |
| throw new Error( 'invalid splice Info Section: bad section length' ); | |
| if ( result.splice_command_type == 0x05 ) { | |
| Object.defineProperties( result, { | |
| splice_event_id: { | |
| enumerable: true, | |
| value: buffer.slice( offset, offset += 4 ).readInt32BE( 0 ) | |
| }, | |
| splice_event_cancel_indicator: { | |
| enumerable: true, | |
| value: ( buffer[ offset ++ ] & 0x80 ) === 0x80 | |
| }, | |
| } ); | |
| if ( ! result.splice_event_cancel_indicator ) { | |
| Object.defineProperties( result, { | |
| out_of_network_indicator: { | |
| value: ( buffer[ offset ] & 0x80 ) === 0x80, | |
| enumerable: true | |
| }, | |
| program_splice_flag: { | |
| value: ( buffer[ offset ] & 0x40 ) === 0x40, | |
| enumerable: false | |
| }, | |
| duration_flag: { | |
| value: ( buffer[ offset ] & 0x20 ) === 0x20, | |
| enumerable: false | |
| }, | |
| splice_immediate_flag: { | |
| value: ( buffer[ offset ++ ] & 0x10 ) === 0x10, | |
| enumerable: false | |
| }, | |
| } ); | |
| } | |
| if ( result.program_splice_flag && ! result.splice_immediate_flag ) { | |
| const timeSpecified = ( buffer[ offset ] & 0x80 ) === 0x80; | |
| Object.defineProperty( result, 'splice_time', { | |
| value: timeSpecified | |
| ? ( ( ( ( buffer[ offset ++ ] & 0x01 ) << 32 ) | buffer.slice( offset, offset += 4 ).readInt32BE( 0 ) ) >>> 0 ) / 90000 | |
| : -1, | |
| enumerable: true, | |
| } ); | |
| } | |
| if ( ! result.program_splice_flag ) { | |
| const components = []; | |
| const componentCount = buffer[ offset ++ ]; | |
| for ( let i = 0 ; i < componentCount; i ++ ) { | |
| const component = { tag: buffer[ offset ++ ] }; | |
| const timeSpecified = ( buffer[ offset ] & 0x80 ) === 0x80; | |
| Object.defineProperty( component, 'splice_time', { | |
| value: timeSpecified | |
| ? ( ( ( ( buffer[ offset ++ ] & 0x01 ) << 32 ) | buffer.slice( offset, offset += 4 ).readInt32BE( 0 ) ) >>> 0 ) / 90000 | |
| : -1, | |
| enumerable: true, | |
| } ); | |
| } | |
| Object.defineProperty( result, 'components', { | |
| value: Object.freeze( components ), | |
| enumerable: true | |
| } ); | |
| } | |
| if ( result.duration_flag ) { | |
| Object.defineProperties( result, { | |
| auto_return: { | |
| value: ( buffer[ offset ] & 0x80 ) === 0x80, | |
| enumerable: true, | |
| }, | |
| duration: { | |
| value: ( ( ( ( buffer[ offset ++ ] & 0x01 ) << 32 ) | buffer.slice( offset, offset += 4 ).readInt32BE( 0 ) ) >>> 0 ) / 90000, | |
| enumerable: true | |
| }, | |
| } ); | |
| } | |
| Object.defineProperties( result, { | |
| unique_program_id: { | |
| value: ( buffer[ offset ++ ] << 8 ) | buffer[ offset ++ ], | |
| enumerable: true | |
| }, | |
| avail_num: { | |
| value: buffer[ offset ++ ], | |
| enumerable: false | |
| }, | |
| avails_expected: { | |
| value: buffer[ offset ++ ], | |
| enumerable: false | |
| }, | |
| } ); | |
| } else { | |
| console.error( 'Not a splice_insert command!' ); | |
| } | |
| const descriptor_loop_length = ( buffer[ offset ++ ] << 8 ) | buffer[ offset ++ ]; | |
| for ( let i = 0; i < descriptor_loop_length; i ++ ) { | |
| const splice_descriptor = { | |
| splice_descriptor_tag: buffer[ offset ++ ], | |
| descriptor_length: buffer[ offset ++ ], | |
| identifier: buffer.slice( offset, offset += 4 ).readInt32BE( 0 ), | |
| private_bytes: [] | |
| }; | |
| for ( let j = 0 ; j < splice_descriptor.descriptor_length; j ++ ) | |
| splice_descriptor.private_bytes.push( buffer[ offset ++ ] ); | |
| } | |
| Object.defineProperty( result, 'crc', { | |
| value: buffer.slice( offset, offset += 4 ).readInt32BE( 0 ), | |
| enumerable: false | |
| } ) | |
| return result; | |
| } | |
| } | |
| class SpliceInfoStream extends Transform { | |
| constructor ( options ) { | |
| super( Object.assign( options || {}, { objectMode: true } ) ) | |
| } | |
| _transform ( buffer, encoding, cb ) { | |
| try { | |
| this.push( SpliceInfoSection.from( buffer ) ); | |
| } catch ( err ) { | |
| console.error( 'SCTE35Stream:', err ); | |
| this.emit( 'invalid', buffer ); | |
| } | |
| cb(); | |
| } | |
| } | |
| const createProcess = ( input, output ) => new Promise( ( resolve, reject ) => { | |
| const ffmpeg = spawn( 'ffmpeg', [ | |
| '-i', input, | |
| '-hide_banner', | |
| '-loglevel', 'warning', | |
| '-threads', '2', | |
| '-analyzeduration', '5MB', | |
| '-probesize', '5MB', | |
| // video frames are not decoded | |
| '-map', '0:v', '-vcodec', 'copy', '-vsync', '0', | |
| // audio frames are decoded to alter the volume | |
| '-map', '0:a', '-filter:a', 'volume=1.0', | |
| '-acodec', 'aac', '-ab', '128k', '-ac', '2', '-async', '1', | |
| // send video and audio frames to unix domain socket | |
| // '-f', 'mpegts', | |
| '-f', 'mp4', | |
| '-movflags', 'faststart', | |
| '-movflags', 'empty_moov', | |
| '-movflags', 'frag_keyframe', | |
| '-listen', '1', output, | |
| // send scte 35 data packets to stdout | |
| '-map', '0:2', '-dcodec', 'copy', | |
| '-f', 'data', 'pipe:1' | |
| ] ); | |
| ffmpeg.once( 'error', err => { | |
| console.debug( 'ffmpeg error occurred' ); | |
| clearTimeout( timer ); | |
| reject( new Error( 'Failed to create ffmpeg child process' ) ); | |
| } ); | |
| ffmpeg.once( 'exit', code => { | |
| console.debug( 'ffmpeg exit has occurred' ); | |
| clearTimeout( timer ); | |
| ffmpeg.removeAllListeners( 'error' ); | |
| ffmpeg.stderr.once( 'data', buffer => { | |
| reject( buffer.toString( 'utf8' ) ); | |
| } ); | |
| } ); | |
| const timer = setTimeout( () => { | |
| ffmpeg.removeAllListeners( 'error' ); | |
| ffmpeg.removeAllListeners( 'exit' ); | |
| resolve( ffmpeg ); | |
| }, 150 ); | |
| } ); | |
| const app = ( req, res ) => { | |
| console.log( 'new request', res.connection.remoteAddress, res.connection.remotePort ); | |
| // create a hash from the remote conn. info to make a unique socket name | |
| const md5 = crypto.createHash( 'md5' ); | |
| md5.update( res.connection.remoteAddress + res.connection.remotePort ); | |
| const socketPath = `/tmp/adsoft.${md5.digest('hex')}.sock`; | |
| // create ffmpeg child process | |
| createProcess( SOURCE_URL, 'unix:' + socketPath ).then( ffmpeg => { | |
| ffmpeg.once( 'exit', () => console.debug( `ffmpeg child process ${ffmpeg.pid} has exited` ) ); | |
| ffmpeg.stderr.on( 'data', buffer => console.debug( buffer.toString( 'utf8' ) ) ); | |
| const scte35Stream = new SpliceInfoStream(); | |
| scte35Stream.on( 'data', info => { | |
| console.log( 'scte35 info:', info ); | |
| // set volume of stream | |
| ffmpeg.stdin.write( `Cvolume ${info.pts_adjustment} volume ${info.out_of_network_indicator?0.25:1.0}\n` ); | |
| } ); | |
| ffmpeg.stdout.pipe( scte35Stream ); | |
| // wait for ffmpeg to create the socket file... | |
| setTimeout( () => { | |
| const socket = net.createConnection( socketPath, () => { | |
| console.debug( 'connected to domain socket' ); | |
| res.setHeader( 'Content-Type', 'video/mp4' ); | |
| socket.pipe( res ); | |
| res.on( 'close', () => { | |
| console.debug( 'connection closed' ) | |
| socket.destroy(); | |
| } ); | |
| res.on( 'finish', () => { | |
| console.debug( 'response finished' ) | |
| socket.destroy(); | |
| } ); | |
| } ); | |
| socket.once( 'error', err => { | |
| console.error( 'unix socket error:', err ); | |
| ffmpeg.kill(); | |
| } ); | |
| }, 500 ); | |
| } ).catch( err => { | |
| console.error( err ); | |
| res.end( err.toString() ); | |
| } ); | |
| } | |
| const server = http.createServer( app ).listen( 3000, '0.0.0.0', () => { | |
| console.log( 'Listening...' ); | |
| } ); | |
| // | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment