Skip to content

Instantly share code, notes, and snippets.

@6un9-h0-Dan
Created June 28, 2021 17:01
Show Gist options
  • Save 6un9-h0-Dan/2e2fec7a76a5624e7a41a261314836fc to your computer and use it in GitHub Desktop.
Save 6un9-h0-Dan/2e2fec7a76a5624e7a41a261314836fc to your computer and use it in GitHub Desktop.
Hunchly data forwarder that downloads YouTube videos, and analyzes their frames for objects using the Imagga API.
# pip install flask pytube3 opencv-python imagehash requests
from flask import Flask,request
from PIL import Image
from pytube import YouTube
import cv2
import hashlib
import imagehash
import json
import os
import requests
import shutil
import threading
import urllib
# set the IP address for it to listen on
LISTEN_IP = '172.16.206.1'
# Imagga API keys
api_key = 'IMAGGA_API_KEY'
api_secret = 'IMAGGA_SECRET_KEY'
# threshold for uniqueness in images
# the high the number the more accurate but the less matches
# the lower the number the more images will get processed but it will be more costly.
threshold = 14
app = Flask(__name__)
if not os.path.exists("videodata"):
os.mkdir("videos")
os.mkdir("videodata")
os.mkdir("videoframes")
#
# Analyze video frame using Imagga.
#
def analyze_frame_imagga(video_frame_path):
tags = []
response = requests.post(
'https://api.imagga.com/v2/tags',
auth=(api_key, api_secret),
files={'image': open(video_frame_path, 'rb')})
if response.status_code == 200:
imagga_response = response.json()
for result in imagga_response['result']['tags']:
if result['confidence'] >= 25.00:
# key is a language code, val is the actual value of the tag
for key,val in result['tag'].items():
if val not in tags:
tags.append(val)
print("[*] Found: {} in the video frame.".format(",".join(tags)))
return tags
#
# Function to analyze the video frames for objects.
#
def analyze_video(video_path,video_hash):
video_objects = []
# open the video on disk
analyzer = cv2.VideoCapture(video_path)
print("[*] Analyzing video frames.")
# get the number of frames
total_frames = analyzer.get(cv2.CAP_PROP_FRAME_COUNT)
fps = analyzer.get(cv2.CAP_PROP_FPS)
# walk all of the frames only collecting relatively unique ones
# using a perceptual hash like TinEye or others use
frame_index = 0
unique_frames = []
hashes = []
while frame_index < int(total_frames):
# read a frame out
val,frame = analyzer.read()
frame_image = Image.fromarray(frame)
frame_hash = imagehash.phash(frame_image)
if frame_hash not in hashes:
unique_frames.append((frame_index,frame_image))
hashes.append(frame_hash)
frame_index += int(fps/2)
index = 0
previous_hash = None
diff = None
# walk the list of unique looking frames and then
# do a second pass on them and process them for objects
for frame_index,frame in unique_frames:
if previous_hash == None:
previous_hash = imagehash.phash(frame)
else:
new_hash = imagehash.phash(frame)
diff = new_hash - previous_hash
previous_hash = new_hash
if diff == None or diff > threshold:
filename = "videoframes/{}_frame_{}.jpeg".format(video_hash,frame_index)
with open(filename,"wb") as fd:
frame.save(fd,"JPEG")
index += 1
# now check the image for objects
frame_objects = analyze_frame_imagga(filename)
for video_object in frame_objects:
video_objects.append((frame_index,video_object))
# return the list of detected objects
return video_objects
#
# Helper function to hash the video in blocks.
#
def sum_video(filename):
h = hashlib.sha256()
with open(filename, 'rb') as f:
for chunk in iter(lambda: f.read(128 * 8196), b''):
h.update(chunk)
return h.hexdigest()
#
# This function does the heavy lifting for the YouTube retrieval.
#
def process_url(hunchly_page):
# now we run our enrichment plugins that we want
if hunchly_page == None:
return "OK"
# check the Url for YouTube (note you want more variations/intelligence here)
url_object = urllib.parse.urlparse(hunchly_page['page']['url'])
# now store it in your database of choice or do whatever you want!
if "youtube.com" in url_object.netloc:
# now we want to download the video
try:
yt = YouTube(hunchly_page['page']['url'])
ys = yt.streams.get_highest_resolution()
# download it to disk
ys.download(output_path="videos")
# hash the video
video_hash = sum_video("videos/{}".format(ys.default_filename))
# move the file to be named by hash
extension = ys.default_filename.split(".")[-1]
shutil.move("videos/{}".format(ys.default_filename),"videos/{}.{}".format(video_hash,extension))
# store the closed captioning for searching later
with open("videodata/{}-captions.txt".format(video_hash),"w") as fd:
for caption in yt.caption_tracks:
fd.write(caption.generate_srt_captions())
# analyze the video frames for objects
video_objects = analyze_video("videos/{}.{}".format(video_hash,extension),video_hash)
row = {}
row['URL'] = hunchly_page['page']['url']
row['Author'] = yt.author
row['Date'] = str(yt.publish_date)
row['ChannelID'] = yt.channel_id
row['Description'] = yt.description.replace("\r","").replace("\n","")
row['Keywords'] = yt.keywords
row['Filesize'] = ys.filesize
row['Hash'] = video_hash
row['DetectedObjects'] = video_objects
# store the JSON on disk for loading into BigQuery / Elasticsearch
with open("videodata/{}-metadata.json".format(video_hash),"w") as fd:
json.dump(row, fd)
except:
pass
return
#
# Define the route for the data forwarder to hit.
#
@app.route('/',methods=["POST","GET"])
def newpage():
hunchly_page = request.json
# we spin up a separate thread so the Hunchly UI updates more quickly
t = threading.Thread(target=process_url,args=(hunchly_page,))
t.start()
return "OK"
if __name__ == "__main__":
import os
if 'WINGDB_ACTIVE' in os.environ:
app.debug = False
app.run(host=LISTEN_IP)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment