Skip to content

Instantly share code, notes, and snippets.

@sinkers
Created July 14, 2014 04:50
Show Gist options
  • Select an option

  • Save sinkers/d647a80fdb180b4cc3a6 to your computer and use it in GitHub Desktop.

Select an option

Save sinkers/d647a80fdb180b4cc3a6 to your computer and use it in GitHub Desktop.

Revisions

  1. sinkers created this gist Jul 14, 2014.
    180 changes: 180 additions & 0 deletions livestreammonitor.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,180 @@
    #!/usr/bin/python
    '''
    Script to monitor live streams and send an Amazon SNS if the stream is down (and possibly take restorative action)
    Future:
    Black detect: ffmpeg -i out.mp4 -vf blackdetect -f null -
    Note the following doesn't seem to fully work without looking at the debug logs
    On live ffmpeg -y -i rtmp://cp30129.live.edgefcs.net/live/videoops-videoops@50541 -vf blackdetect -t 10 -loglevel debug -f null -
    '''

    from boto import boto
    import sys
    import os
    from subprocess import PIPE, Popen
    from threading import Thread
    import time
    import logging

    try:
    from Queue import Queue, Empty
    except ImportError:
    from queue import Queue, Empty # python 3.x



    logging.basicConfig(level=logging.DEBUG,
    format='%(asctime)s %(message)s',
    handlers=[logging.FileHandler("encoder.log"),
    logging.StreamHandler()])

    logging.info("Starting")

    # You will need to go into your Amazon Console and create a new topic ARN for this
    AWS_TOPIC_ARN = ""
    AWS_KEY = ""
    AWS_SECRET = ""
    # Set how often you want to check in seconds
    SLEEP_TIME = 30
    FFPROBE = "/usr/local/bin/ffprobe"

    # List of streams to check
    CHECK_STREAMS = ['http://wowsyd.sinclairmediatech.com/live/canberra/playlist.m3u8', 'http://repackage.video.news.com.au/live/canberra/playlist.m3u8']

    ON_POSIX = 'posix' in sys.builtin_module_names

    def process_line(std, q):
    partialLine = ""
    tmpLines=[]
    end_of_message = False
    while (True):
    data = std.read(10)

    #print repr(data)

    #break when there is no more data
    if len(data) == 0:
    end_of_message = True

    #data needs to be added to previous line
    if ((not "\n" in data) and (not end_of_message)):
    partialLine += data
    #lines are terminated in this string
    else:
    tmpLines = []

    #split by \n
    split = data.split("\n")

    #add the rest of partial line to first item of array
    if partialLine != "":
    split[0] = partialLine + split[0]
    partialLine = ""

    #add every item apart from last to tmpLines array
    if len(split) > 1:
    for i in range(len(split)-1):
    tmpLines.append(split[i])

    #last item is '' if data string ends in \r
    #last line is partial, save for temporary storage
    if split[-1] != "":
    partialLine = split[-1]
    #last line is terminated
    else:
    tmpLines.append(split[-1])

    #print split[0]
    q.put(split[0])
    if (end_of_message):
    #print partialLine
    break

    def enqueue_output(stdout, queue):
    #for line in iter(stdout.readline, b''):
    # queue.put(line)
    process_line(stdout, queue)
    stdout.close()

    def run(q, ffcmd, thread_name):
    logging.info("Running " + ffcmd)
    p = Popen(ffcmd,shell=True, stderr=PIPE, stdin=PIPE, bufsize=1, close_fds=ON_POSIX)
    #q = Queue()
    #t = Thread(target=enqueue_output, args=(p.stdout, q))
    #t.daemon = True # thread dies with the program
    #t.start()
    t = Thread(target=enqueue_output, name=thread_name, args=(p.stderr, q))
    t.daemon = True
    t.start()
    return p

    def probe(stream):
    probeq = Queue()
    logging.info("Checking " + stream)
    probeproc = run(probeq, FFPROBE + " " + stream, "probethread")
    # Need to read from the queue until the queue is empty and process has exited
    while (True):
    #print "Queue not empty"
    # print probeq.join()
    try:
    line = probeq.get_nowait()
    logging.info(line)
    '''
    Possible error responses:
    Server error: Failed to play stream
    Input/output error
    Need a regex to match for video found!
    Sample: Stream #0:0: Video: h264 (Baseline), yuv420p, 640x360 [SAR 1:1 DAR 16:9], 655 kb/s, 25 tbr, 1k tbn, 50 tbc
    Should also look for audio
    '''

    if ("Stream #0:0: Video" in line):
    logging.info("Found stream " + stream)
    logging.info(line)
    return True
    if ("Stream #0:1: Video" in line):
    logging.info("Found stream " + stream)
    logging.info(line)
    return True
    elif ("error" in line):
    return False
    elif (probeproc.poll() > 0):
    return False

    # Make sure we don't spin out of control
    #time.sleep(1)

    except Empty:
    pass


    return False

    def send_message(msg):
    sns = boto.connect_sns(aws_access_key_id=AWS_KEY, aws_secret_access_key=AWS_SECRET)
    print sns
    subj = "Problem with streaming"
    res = sns.publish(AWS_TOPIC_ARN, msg, subj)


    def live_probe():
    pass

    # Endless loop for monitoring

    while(True):
    for stream in CHECK_STREAMS:
    if (probe(stream)):
    pass
    else:
    logging.info("Error with " + stream + " sending alert")
    '''
    This needs to check if there has already been an error message sent
    If so it shouldn't send another one for x mins
    Also needs to send a message when the problem is cleared
    '''
    send_message(stream)

    time.sleep(SLEEP_TIME)