Skip to content

Instantly share code, notes, and snippets.

@murlock
Created December 27, 2022 20:42
Show Gist options
  • Save murlock/5a367c38946aceeecf8d65e23d0d213e to your computer and use it in GitHub Desktop.
Save murlock/5a367c38946aceeecf8d65e23d0d213e to your computer and use it in GitHub Desktop.
Script helper to convert MP4 to VP9
#!/usr/bin/env python3
from datetime import datetime, timedelta
import argparse
from io import FileIO
import json
import os
import os.path as osp
import subprocess
import sys
import time
from multiprocessing import Process
from progressbar import ProgressBar
import xattr
DRY_RUN = False
# save that convert-dir already tried current file
# and failed to create smaller file
XATTR = "user.x-convert-vp9"
AUDIO = "aac"
FORCE_AUDIO_CODEC = [] # ["wmapro"]
CONVERT_CODEC = ["h264", "mpeg4", "vc1", "wmv3"]
FORMAT = ["mov,mp4,m4a,3gp,3g2,mj2", "matroska,webm", "asf"]
# convert audio to AAC (with forcing to mp4 because aac is not supported by wmv)
# ffmpeg -i input.wmv -c:v copy -c:a aac output.mp4
class ProbeException(Exception):
pass
def display_progressbar(prc: Process, frames: int, fname: str):
pb = ProgressBar(max_value=frames)
stats: FileIO|None = None
while prc.is_alive():
if stats is None:
try:
stats = open(fname, "r")
except FileNotFoundError:
continue
data = stats.read().split("\n")
data.reverse()
for line in data:
if line.startswith("frame="):
val = int(line.split("=")[1])
pb.update(val)
break
time.sleep(0.1)
pb.finish()
def clean_file(fname: str):
if osp.exists(fname):
os.unlink(fname)
def now():
now = datetime.now()
return now.strftime("%Y-%m-%d %H:%M:%S")
def fsize(name):
file_stats = os.stat(name)
return file_stats.st_size
def to_human(size):
data = ["B", "KB", "MB", "GB"]
size = float(size)
for entry in data:
if size < 1024:
return "%3.1f %s" % (size, entry)
size /= 1024
return "N/A"
def run_process(*args):
null = open("/dev/null", "w")
subprocess.check_call(args[0], stdin=null, stdout=null, stderr=null)
def convert(name_in, frames, name_out, quality=30, audio_convert=False):
assert osp.abspath(name_in) == name_in
if osp.exists(name_out):
print(now(), "Skip", name_in, "already converted")
return
dirname, name = osp.split(name_out)
print(now(), "Convert", name_in, "(" + to_human(fsize(name_in)) + ")")
name_tmp = osp.join(dirname, "~" + name)
clean_file(name_tmp)
clean_file("ffmpeg2pass-0.log")
tmp = os.path.join("/tmp", name + ".progress")
pass1 = ["ffmpeg", "-i", name_in,
"-c:v", "libvpx-vp9", "-row-mt", "1", "-b:v", "0",
"-crf", str(quality), "-pass", "1",
"-an", "-f", "null", "-progress", tmp, "-stats_period", "1",
"/dev/null"]
pass2 = ["ffmpeg", "-i", name_in,
"-c:v", "libvpx-vp9", "-row-mt", "1", "-b:v", "0",
"-crf", str(quality), "-pass", "2",
"-c:a", "copy","-progress", tmp, "-stats_period", "1",
name_tmp]
if DRY_RUN:
print(now(), " ".join(pass1))
print(now(), " ".join(pass2))
return
start = time.time()
try:
clean_file(tmp)
prc = Process(target=run_process, args=(pass1, ))
prc.start()
display_progressbar(prc, frames, tmp)
prc.join()
clean_file(tmp)
prc = Process(target=run_process, args=(pass2, ))
prc.start()
display_progressbar(prc, frames, tmp)
prc.join()
os.unlink("ffmpeg2pass-0.log")
if fsize(name_tmp) > fsize(name_in):
print(now(), "!!!!!!!! Output video is bigger "
"than input video, removing vp9 media")
os.unlink(name_tmp)
xattr.setxattr(name_in, XATTR, str(quality).encode("utf-8"))
return
# display some stats
diff = fsize(name_in) - fsize(name_tmp)
percentage = 100.0 * diff / fsize(name_in)
print("Converted in",
timedelta(seconds=int(time.time() - start)),
"space saved:", to_human(fsize(name_in) - fsize(name_tmp)),
"( %2.1f %%)" % percentage)
# rename
os.rename(name_tmp, name_out)
os.unlink(name_in)
except subprocess.CalledProcessError as ex:
print(now(), "Error", ex)
os.unlink(name_tmp)
raise
except Exception as ex:
print(now(), "Error", ex)
raise
def get_codecs(name):
try:
ret = subprocess.check_output(
["ffprobe", "-hide_banner",
"-print_format", "json", "-show_format",
"-show_streams",
name], stderr=subprocess.PIPE)
except subprocess.CalledProcessError as err:
if type(err.stderr) == bytes:
msg = err.stderr.decode("utf8")
else:
msg = err.stderr
msg = msg.split("\n")[0]
# remove filename if present
msg = msg.replace(name, "")
raise ProbeException(msg)
ret = json.loads(ret)
# extract VIDEO codec
videos = [x for x in ret["streams"] if x["codec_type"] == "video"]
if len(videos) == 0:
raise ProbeException("no video streams")
if len(videos) > 1:
raise ProbeException("more than one video streams")
audios = [x for x in ret["streams"] if x["codec_type"] == "audio"]
# if len(audios) != 1:
# raise Exception("no audio or more than one audio for %s" % name)
if len(audios) == 0:
audios = [{'codec_name': None}]
return (videos[0]['codec_name'],
int(videos[0]['nb_frames']),
ret["format"]["format_name"],
audios[0]['codec_name'])
def convert_directory(directory, recursive=False, force=False):
# TODO: manage subdirectory
directory += "/"
entries = sorted(os.listdir(directory))
for entry in entries:
path = osp.join(directory, entry)
if osp.isdir(path) and recursive:
convert_directory(path, recursive=recursive)
else:
convert_file(path, force=force)
def convert_file(name, force=False):
name = osp.abspath(name)
dirname = osp.dirname(name)
if not osp.isfile(name):
print(now(), name, "is not a file")
return
try:
codec, frames, fmt, audio = get_codecs(name)
except ProbeException as err:
print(now(), "Probably not a video, skipping", name, ":", err)
return
if force is False:
if codec not in CONVERT_CODEC:
print(now(), "skip", name, "cause codec:", codec)
return
if audio in FORCE_AUDIO_CODEC:
print(now(), "warning", name, "will convert audio to", AUDIO)
if fmt not in FORMAT:
print(now(), "skip", name, "cause format:", fmt)
return
# keys = [v[0] for v in xattr.listxattr(name)]
if XATTR in xattr.listxattr(name):
# TODO check quality ?
print(now(), "skip", name, "cause VP9 was bigger then original file")
return
tmp = osp.splitext(name)
output = osp.join(dirname, "{}-vp9{}".format(*tmp))
convert(name, frames, output)
def main(params):
# TODO: manage subdirectory
for entry in params.entry:
if not osp.exists(entry):
print("%s not found, skip it" % entry)
continue
if osp.isfile(entry):
convert_file(entry, force=params.force)
else:
convert_directory(entry, recursive=params.recursive,
force=params.force)
def args(opts):
parser = argparse.ArgumentParser(
description="Convert video to VP9",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--recursive", "-r", default=False,
action="store_true")
parser.add_argument("--dry-run", "-d", default=False, action="store_true")
parser.add_argument("--force", "-f", default=False, action="store_true")
parser.add_argument("entry", nargs="*", help="Entries to parse")
return parser.parse_args(opts)
if __name__ == "__main__":
# DRY_RUN = True
opts = args(sys.argv[1:])
DRY_RUN = opts.dry_run
main(opts)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment