Created
December 27, 2022 20:42
-
-
Save murlock/5a367c38946aceeecf8d65e23d0d213e to your computer and use it in GitHub Desktop.
Script helper to convert MP4 to VP9
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| from datetime import datetime, timedelta | |
| import argparse | |
| from io import FileIO | |
| import json | |
| import os | |
| import os.path as osp | |
| import subprocess | |
| import sys | |
| import time | |
| from multiprocessing import Process | |
| from progressbar import ProgressBar | |
| import xattr | |
| DRY_RUN = False | |
| # save that convert-dir already tried current file | |
| # and failed to create smaller file | |
| XATTR = "user.x-convert-vp9" | |
| AUDIO = "aac" | |
| FORCE_AUDIO_CODEC = [] # ["wmapro"] | |
| CONVERT_CODEC = ["h264", "mpeg4", "vc1", "wmv3"] | |
| FORMAT = ["mov,mp4,m4a,3gp,3g2,mj2", "matroska,webm", "asf"] | |
| # convert audio to AAC (with forcing to mp4 because aac is not supported by wmv) | |
| # ffmpeg -i input.wmv -c:v copy -c:a aac output.mp4 | |
| class ProbeException(Exception): | |
| pass | |
| def display_progressbar(prc: Process, frames: int, fname: str): | |
| pb = ProgressBar(max_value=frames) | |
| stats: FileIO|None = None | |
| while prc.is_alive(): | |
| if stats is None: | |
| try: | |
| stats = open(fname, "r") | |
| except FileNotFoundError: | |
| continue | |
| data = stats.read().split("\n") | |
| data.reverse() | |
| for line in data: | |
| if line.startswith("frame="): | |
| val = int(line.split("=")[1]) | |
| pb.update(val) | |
| break | |
| time.sleep(0.1) | |
| pb.finish() | |
| def clean_file(fname: str): | |
| if osp.exists(fname): | |
| os.unlink(fname) | |
| def now(): | |
| now = datetime.now() | |
| return now.strftime("%Y-%m-%d %H:%M:%S") | |
| def fsize(name): | |
| file_stats = os.stat(name) | |
| return file_stats.st_size | |
| def to_human(size): | |
| data = ["B", "KB", "MB", "GB"] | |
| size = float(size) | |
| for entry in data: | |
| if size < 1024: | |
| return "%3.1f %s" % (size, entry) | |
| size /= 1024 | |
| return "N/A" | |
| def run_process(*args): | |
| null = open("/dev/null", "w") | |
| subprocess.check_call(args[0], stdin=null, stdout=null, stderr=null) | |
| def convert(name_in, frames, name_out, quality=30, audio_convert=False): | |
| assert osp.abspath(name_in) == name_in | |
| if osp.exists(name_out): | |
| print(now(), "Skip", name_in, "already converted") | |
| return | |
| dirname, name = osp.split(name_out) | |
| print(now(), "Convert", name_in, "(" + to_human(fsize(name_in)) + ")") | |
| name_tmp = osp.join(dirname, "~" + name) | |
| clean_file(name_tmp) | |
| clean_file("ffmpeg2pass-0.log") | |
| tmp = os.path.join("/tmp", name + ".progress") | |
| pass1 = ["ffmpeg", "-i", name_in, | |
| "-c:v", "libvpx-vp9", "-row-mt", "1", "-b:v", "0", | |
| "-crf", str(quality), "-pass", "1", | |
| "-an", "-f", "null", "-progress", tmp, "-stats_period", "1", | |
| "/dev/null"] | |
| pass2 = ["ffmpeg", "-i", name_in, | |
| "-c:v", "libvpx-vp9", "-row-mt", "1", "-b:v", "0", | |
| "-crf", str(quality), "-pass", "2", | |
| "-c:a", "copy","-progress", tmp, "-stats_period", "1", | |
| name_tmp] | |
| if DRY_RUN: | |
| print(now(), " ".join(pass1)) | |
| print(now(), " ".join(pass2)) | |
| return | |
| start = time.time() | |
| try: | |
| clean_file(tmp) | |
| prc = Process(target=run_process, args=(pass1, )) | |
| prc.start() | |
| display_progressbar(prc, frames, tmp) | |
| prc.join() | |
| clean_file(tmp) | |
| prc = Process(target=run_process, args=(pass2, )) | |
| prc.start() | |
| display_progressbar(prc, frames, tmp) | |
| prc.join() | |
| os.unlink("ffmpeg2pass-0.log") | |
| if fsize(name_tmp) > fsize(name_in): | |
| print(now(), "!!!!!!!! Output video is bigger " | |
| "than input video, removing vp9 media") | |
| os.unlink(name_tmp) | |
| xattr.setxattr(name_in, XATTR, str(quality).encode("utf-8")) | |
| return | |
| # display some stats | |
| diff = fsize(name_in) - fsize(name_tmp) | |
| percentage = 100.0 * diff / fsize(name_in) | |
| print("Converted in", | |
| timedelta(seconds=int(time.time() - start)), | |
| "space saved:", to_human(fsize(name_in) - fsize(name_tmp)), | |
| "( %2.1f %%)" % percentage) | |
| # rename | |
| os.rename(name_tmp, name_out) | |
| os.unlink(name_in) | |
| except subprocess.CalledProcessError as ex: | |
| print(now(), "Error", ex) | |
| os.unlink(name_tmp) | |
| raise | |
| except Exception as ex: | |
| print(now(), "Error", ex) | |
| raise | |
| def get_codecs(name): | |
| try: | |
| ret = subprocess.check_output( | |
| ["ffprobe", "-hide_banner", | |
| "-print_format", "json", "-show_format", | |
| "-show_streams", | |
| name], stderr=subprocess.PIPE) | |
| except subprocess.CalledProcessError as err: | |
| if type(err.stderr) == bytes: | |
| msg = err.stderr.decode("utf8") | |
| else: | |
| msg = err.stderr | |
| msg = msg.split("\n")[0] | |
| # remove filename if present | |
| msg = msg.replace(name, "") | |
| raise ProbeException(msg) | |
| ret = json.loads(ret) | |
| # extract VIDEO codec | |
| videos = [x for x in ret["streams"] if x["codec_type"] == "video"] | |
| if len(videos) == 0: | |
| raise ProbeException("no video streams") | |
| if len(videos) > 1: | |
| raise ProbeException("more than one video streams") | |
| audios = [x for x in ret["streams"] if x["codec_type"] == "audio"] | |
| # if len(audios) != 1: | |
| # raise Exception("no audio or more than one audio for %s" % name) | |
| if len(audios) == 0: | |
| audios = [{'codec_name': None}] | |
| return (videos[0]['codec_name'], | |
| int(videos[0]['nb_frames']), | |
| ret["format"]["format_name"], | |
| audios[0]['codec_name']) | |
| def convert_directory(directory, recursive=False, force=False): | |
| # TODO: manage subdirectory | |
| directory += "/" | |
| entries = sorted(os.listdir(directory)) | |
| for entry in entries: | |
| path = osp.join(directory, entry) | |
| if osp.isdir(path) and recursive: | |
| convert_directory(path, recursive=recursive) | |
| else: | |
| convert_file(path, force=force) | |
| def convert_file(name, force=False): | |
| name = osp.abspath(name) | |
| dirname = osp.dirname(name) | |
| if not osp.isfile(name): | |
| print(now(), name, "is not a file") | |
| return | |
| try: | |
| codec, frames, fmt, audio = get_codecs(name) | |
| except ProbeException as err: | |
| print(now(), "Probably not a video, skipping", name, ":", err) | |
| return | |
| if force is False: | |
| if codec not in CONVERT_CODEC: | |
| print(now(), "skip", name, "cause codec:", codec) | |
| return | |
| if audio in FORCE_AUDIO_CODEC: | |
| print(now(), "warning", name, "will convert audio to", AUDIO) | |
| if fmt not in FORMAT: | |
| print(now(), "skip", name, "cause format:", fmt) | |
| return | |
| # keys = [v[0] for v in xattr.listxattr(name)] | |
| if XATTR in xattr.listxattr(name): | |
| # TODO check quality ? | |
| print(now(), "skip", name, "cause VP9 was bigger then original file") | |
| return | |
| tmp = osp.splitext(name) | |
| output = osp.join(dirname, "{}-vp9{}".format(*tmp)) | |
| convert(name, frames, output) | |
| def main(params): | |
| # TODO: manage subdirectory | |
| for entry in params.entry: | |
| if not osp.exists(entry): | |
| print("%s not found, skip it" % entry) | |
| continue | |
| if osp.isfile(entry): | |
| convert_file(entry, force=params.force) | |
| else: | |
| convert_directory(entry, recursive=params.recursive, | |
| force=params.force) | |
| def args(opts): | |
| parser = argparse.ArgumentParser( | |
| description="Convert video to VP9", | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
| parser.add_argument("--recursive", "-r", default=False, | |
| action="store_true") | |
| parser.add_argument("--dry-run", "-d", default=False, action="store_true") | |
| parser.add_argument("--force", "-f", default=False, action="store_true") | |
| parser.add_argument("entry", nargs="*", help="Entries to parse") | |
| return parser.parse_args(opts) | |
| if __name__ == "__main__": | |
| # DRY_RUN = True | |
| opts = args(sys.argv[1:]) | |
| DRY_RUN = opts.dry_run | |
| main(opts) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment