#!/usr/bin/env python3 import os import sys class BTFailure(Exception): pass def decode_int(x, f): f += 1 new_f = x.index(ord("e"), f) n = int(x[f:new_f]) if x[f] == ord("-"): if x[f + 1] == ord("0"): raise ValueError elif x[f] == ord("0") and new_f != f + 1: raise ValueError return n, new_f + 1 def decode_bytes(x, f): colon = x.index(ord(":"), f) n = int(x[f:colon]) if x[f] == ord("0") and colon != f + 1: raise ValueError colon += 1 return x[colon:colon + n], colon + n def decode_string(x, f): r, l = decode_bytes(x, f) return str(r, "utf-8"), l def decode_list(x, f): r, f = [], f + 1 while x[f] != ord("e"): v, f = decode_func[x[f]](x, f) r.append(v) return r, f + 1 def decode_dict(x, f): r, f = {}, f + 1 while x[f] != ord("e"): k, f = decode_string(x, f) r[k], f = decode_func[x[f]](x, f) return r, f + 1 decode_func = { ord("l"): decode_list, ord("d"): decode_dict, ord("i"): decode_int, ord("0"): decode_bytes, ord("1"): decode_bytes, ord("2"): decode_bytes, ord("3"): decode_bytes, ord("4"): decode_bytes, ord("5"): decode_bytes, ord("6"): decode_bytes, ord("7"): decode_bytes, ord("8"): decode_bytes, ord("9"): decode_bytes } def bdecode(x): try: r, l = decode_func[x[0]](x, 0) except (IndexError, KeyError, ValueError): raise BTFailure("not a valid bencoded string") if l != len(x): raise BTFailure("invalid bencoded value (data after valid prefix)") return r def encode_int(x): result = bytearray() result.append(ord("i")) result.extend(bytearray(str(x), "utf-8")) result.append(ord("e")) return result def encode_bool(x): if x: return encode_int(1) else: return encode_int(0) def encode_bytes(x): result = bytearray() result.extend(bytearray("{}:".format(len(x)), "utf-8")) result.extend(x) return result def encode_string(x): return encode_bytes(bytearray(x, "utf-8")) def encode_list(x): result = bytearray() result.append(ord("l")) for i in x: result.extend(encode_func[type(i)](i)) result.append(ord("e")) return result def encode_dict(x): result = bytearray() result.append(ord("d")) for k, v in sorted(x.items()): result.extend(encode_string(str(k))) result.extend(encode_func[type(v)](v)) result.append(ord("e")) return result encode_func = { int: encode_int, bool: encode_bool, bytes: encode_bytes, str: encode_string, list: encode_list, tuple: encode_list, dict: encode_dict } def bencode(x): return encode_func[type(x)](x) class Main(object): @staticmethod def read_content(file_path): with open(file_path, 'rb') as f: return bdecode(f.read()) @classmethod def process(cls, base_path, content, force): resume_path = "{}{}.fastresume".format(base_path, content) resume_data = cls.read_content(resume_path) save_path = resume_data['save_path'] if force: torrent_path = "{}{}.torrent".format(base_path, content) torrent_data = cls.read_content(torrent_path) if 'pieces' not in resume_data: piece_length = len(torrent_data['info']['pieces']) / 20 resume_data['pieces'] = [1 for _ in range(int(piece_length))] if 'mapped_files' not in resume_data: if resume_data['qBt-hasRootFolder']: base_path = torrent_data['info']['name'] else: base_path = b'' files = torrent_data['info'].get('files') if not files: files = [{ 'path': [torrent_data['info']['name']], }] resume_data['mapped_files'] = [ os.path.join(base_path, os.path.join(*f['path'])) for f in files ] elif any(p == 0 for p in resume_data.get('pieces', [0])): print('Do not support incomplete torrent.') exit(1) resume_data['seed_mode'] = 1 resume_data['paused'] = 1 def get_file_state(file_path): size = os.path.getsize(file_path) mtime = int(os.path.getmtime(file_path)) return size, mtime resume_data['file sizes'] = [ get_file_state(os.path.join(save_path, f)) for f in resume_data['mapped_files'] ] return cls.write_content(resume_path, bencode(resume_data)) @staticmethod def write_content(file_path, content): os.rename(file_path, file_path + '.bak') with open(file_path, 'wb') as f: f.write(content) @classmethod def main(cls): if len(sys.argv) != 2: print('%s torrent_hash' % sys.argv[0]) exit(1) base_path = os.path.expanduser('~/.local/share/data/qBittorrent/BT_backup/') if os.path.exists(base_path + 'session.lock'): print('Please quit qBittorrent at first.') exit(1) torrent_hash = sys.argv[1] if torrent_hash[0] == '@': torrent_hash = torrent_hash[1:] force = True else: force = False cls.process(base_path, torrent_hash, force) if __name__ == '__main__': Main.main()