Created
          June 28, 2021 17:01 
        
      - 
      
 - 
        
Save 6un9-h0-Dan/2e2fec7a76a5624e7a41a261314836fc to your computer and use it in GitHub Desktop.  
    Hunchly data forwarder that downloads YouTube videos, and analyzes their frames for objects using the Imagga API.
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | # pip install flask pytube3 opencv-python imagehash requests | |
| from flask import Flask,request | |
| from PIL import Image | |
| from pytube import YouTube | |
| import cv2 | |
| import hashlib | |
| import imagehash | |
| import json | |
| import os | |
| import requests | |
| import shutil | |
| import threading | |
| import urllib | |
| # set the IP address for it to listen on | |
| LISTEN_IP = '172.16.206.1' | |
| # Imagga API keys | |
| api_key = 'IMAGGA_API_KEY' | |
| api_secret = 'IMAGGA_SECRET_KEY' | |
| # threshold for uniqueness in images | |
| # the high the number the more accurate but the less matches | |
| # the lower the number the more images will get processed but it will be more costly. | |
| threshold = 14 | |
| app = Flask(__name__) | |
| if not os.path.exists("videodata"): | |
| os.mkdir("videos") | |
| os.mkdir("videodata") | |
| os.mkdir("videoframes") | |
| # | |
| # Analyze video frame using Imagga. | |
| # | |
| def analyze_frame_imagga(video_frame_path): | |
| tags = [] | |
| response = requests.post( | |
| 'https://api.imagga.com/v2/tags', | |
| auth=(api_key, api_secret), | |
| files={'image': open(video_frame_path, 'rb')}) | |
| if response.status_code == 200: | |
| imagga_response = response.json() | |
| for result in imagga_response['result']['tags']: | |
| if result['confidence'] >= 25.00: | |
| # key is a language code, val is the actual value of the tag | |
| for key,val in result['tag'].items(): | |
| if val not in tags: | |
| tags.append(val) | |
| print("[*] Found: {} in the video frame.".format(",".join(tags))) | |
| return tags | |
| # | |
| # Function to analyze the video frames for objects. | |
| # | |
| def analyze_video(video_path,video_hash): | |
| video_objects = [] | |
| # open the video on disk | |
| analyzer = cv2.VideoCapture(video_path) | |
| print("[*] Analyzing video frames.") | |
| # get the number of frames | |
| total_frames = analyzer.get(cv2.CAP_PROP_FRAME_COUNT) | |
| fps = analyzer.get(cv2.CAP_PROP_FPS) | |
| # walk all of the frames only collecting relatively unique ones | |
| # using a perceptual hash like TinEye or others use | |
| frame_index = 0 | |
| unique_frames = [] | |
| hashes = [] | |
| while frame_index < int(total_frames): | |
| # read a frame out | |
| val,frame = analyzer.read() | |
| frame_image = Image.fromarray(frame) | |
| frame_hash = imagehash.phash(frame_image) | |
| if frame_hash not in hashes: | |
| unique_frames.append((frame_index,frame_image)) | |
| hashes.append(frame_hash) | |
| frame_index += int(fps/2) | |
| index = 0 | |
| previous_hash = None | |
| diff = None | |
| # walk the list of unique looking frames and then | |
| # do a second pass on them and process them for objects | |
| for frame_index,frame in unique_frames: | |
| if previous_hash == None: | |
| previous_hash = imagehash.phash(frame) | |
| else: | |
| new_hash = imagehash.phash(frame) | |
| diff = new_hash - previous_hash | |
| previous_hash = new_hash | |
| if diff == None or diff > threshold: | |
| filename = "videoframes/{}_frame_{}.jpeg".format(video_hash,frame_index) | |
| with open(filename,"wb") as fd: | |
| frame.save(fd,"JPEG") | |
| index += 1 | |
| # now check the image for objects | |
| frame_objects = analyze_frame_imagga(filename) | |
| for video_object in frame_objects: | |
| video_objects.append((frame_index,video_object)) | |
| # return the list of detected objects | |
| return video_objects | |
| # | |
| # Helper function to hash the video in blocks. | |
| # | |
| def sum_video(filename): | |
| h = hashlib.sha256() | |
| with open(filename, 'rb') as f: | |
| for chunk in iter(lambda: f.read(128 * 8196), b''): | |
| h.update(chunk) | |
| return h.hexdigest() | |
| # | |
| # This function does the heavy lifting for the YouTube retrieval. | |
| # | |
| def process_url(hunchly_page): | |
| # now we run our enrichment plugins that we want | |
| if hunchly_page == None: | |
| return "OK" | |
| # check the Url for YouTube (note you want more variations/intelligence here) | |
| url_object = urllib.parse.urlparse(hunchly_page['page']['url']) | |
| # now store it in your database of choice or do whatever you want! | |
| if "youtube.com" in url_object.netloc: | |
| # now we want to download the video | |
| try: | |
| yt = YouTube(hunchly_page['page']['url']) | |
| ys = yt.streams.get_highest_resolution() | |
| # download it to disk | |
| ys.download(output_path="videos") | |
| # hash the video | |
| video_hash = sum_video("videos/{}".format(ys.default_filename)) | |
| # move the file to be named by hash | |
| extension = ys.default_filename.split(".")[-1] | |
| shutil.move("videos/{}".format(ys.default_filename),"videos/{}.{}".format(video_hash,extension)) | |
| # store the closed captioning for searching later | |
| with open("videodata/{}-captions.txt".format(video_hash),"w") as fd: | |
| for caption in yt.caption_tracks: | |
| fd.write(caption.generate_srt_captions()) | |
| # analyze the video frames for objects | |
| video_objects = analyze_video("videos/{}.{}".format(video_hash,extension),video_hash) | |
| row = {} | |
| row['URL'] = hunchly_page['page']['url'] | |
| row['Author'] = yt.author | |
| row['Date'] = str(yt.publish_date) | |
| row['ChannelID'] = yt.channel_id | |
| row['Description'] = yt.description.replace("\r","").replace("\n","") | |
| row['Keywords'] = yt.keywords | |
| row['Filesize'] = ys.filesize | |
| row['Hash'] = video_hash | |
| row['DetectedObjects'] = video_objects | |
| # store the JSON on disk for loading into BigQuery / Elasticsearch | |
| with open("videodata/{}-metadata.json".format(video_hash),"w") as fd: | |
| json.dump(row, fd) | |
| except: | |
| pass | |
| return | |
| # | |
| # Define the route for the data forwarder to hit. | |
| # | |
| @app.route('/',methods=["POST","GET"]) | |
| def newpage(): | |
| hunchly_page = request.json | |
| # we spin up a separate thread so the Hunchly UI updates more quickly | |
| t = threading.Thread(target=process_url,args=(hunchly_page,)) | |
| t.start() | |
| return "OK" | |
| if __name__ == "__main__": | |
| import os | |
| if 'WINGDB_ACTIVE' in os.environ: | |
| app.debug = False | |
| app.run(host=LISTEN_IP) | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment