import asyncio import httpx import ipaddress import orjson import re import sys import urllib.parse from datetime import datetime, timezone, timedelta async def hookRaiseForStatus(r: httpx.Response): r.raise_for_status() s = httpx.AsyncClient( http2=True, timeout=30, follow_redirects=True, event_hooks={ 'response': [ hookRaiseForStatus, ], }, ) def cleanText(content: str): content = orjson.loads(content) content = content['commands'][0]['mutations'][0]['s'] content = urllib.parse.unquote(content) content = re.sub(r'%u([\dA-F]{1,4})', lambda m: chr(int(m[1], 16)), content) content = re.sub(r'\\x([\dA-F]{1,2})', lambda m: chr(int(m[1], 16)), content) content = content.replace('\r', '\n') content = content.replace('\x07', '') content = content.replace('\x06', '') return content async def getGeneralList(): result = '#\n# Source: https://docs.qq.com/doc/DQnJBTGJjSFZBR2JW\n' r = await s.get( 'https://docs.qq.com/dop-api/opendoc', params={'id': 'DQnJBTGJjSFZBR2JW'}, headers={'Referer': 'https://docs.qq.com/'}, ) content = cleanText(urllib.parse.unquote(max(r.text.splitlines(), key=len))) if m := re.search(r'规则更新于(\d+)年(\d+)月(\d+)日(\d+)时', content): d = datetime(int(m.group(1)), int(m.group(2)), int(m.group(3)), int(m.group(4)), tzinfo=timezone(timedelta(hours=8))).astimezone(timezone.utc) result += f'# Update: {d.isoformat().replace("+00:00", "Z")}\n#\n' if m := re.search(r'\x1e\x1c((?:[\da-f:\.\-]+?\n|#.*?\n)+?)\x1d', content): result += m.group(1) return result async def getTransactionList(): result = '#\n# Source: https://docs.qq.com/doc/DQmpBZmNMRFpiZUhZ\n' r = await s.get( 'https://docs.qq.com/dop-api/opendoc', params={'id': 'DQmpBZmNMRFpiZUhZ'}, headers={'Referer': 'https://docs.qq.com/'}, ) content = cleanText(urllib.parse.unquote(max(r.text.splitlines(), key=len))) if m := re.search(r'规则更新于(\d+)年(\d+)月(\d+)日(\d+)时', content): d = datetime(int(m.group(1)), int(m.group(2)), int(m.group(3)), int(m.group(4)), tzinfo=timezone(timedelta(hours=8))).astimezone(timezone.utc) result += f'# Update: {d.isoformat().replace("+00:00", "Z")}\n#\n' if m := re.search(r'\x1e\x1c([\da-f:\.\-\n]+?)\x1d', content): result += m.group(1) return result async def getBTNList(): result = '#\n# Source: https://raw.githubusercontent.com/PBH-BTN/BTN-Collected-Rules/main/combine/all.txt\n' r = await s.get('https://api.github.com/repos/PBH-BTN/BTN-Collected-Rules/commits', params={'path': 'combine/all.txt'}) r = await s.get(r.json()[0]['url']) result += f'# Update: {r.json()["commit"]["author"]["date"]}\n# Note: IPv6 single addresses are transformed into /64 CIDRs.\n#\n' r = await s.get('https://raw.githubusercontent.com/PBH-BTN/BTN-Collected-Rules/main/combine/all.txt') t = set() for line in r.text.splitlines(): line = line.strip() if not line or line.startswith('#'): continue try: address = ipaddress.ip_address(line) if isinstance(address, ipaddress.IPv6Address): network = ipaddress.IPv6Network(address).supernet(new_prefix=64) rule = f'{network[0]}-{network[-1]}\n' else: rule = f'{address}-{address}\n' except ValueError: try: network = ipaddress.ip_network(line) rule = f'{network[0]}-{network[-1]}\n' except ValueError: continue if rule in t: continue t.add(rule) result += rule return result async def main(): blacklist = ''.join(await asyncio.gather( getGeneralList(), getTransactionList(), getBTNList(), )) blacklistCIDR = '' for line in blacklist.splitlines(): line = line.strip() if not line or line.startswith('#'): blacklistCIDR += line + '\n' continue if m := re.match(r'([\da-f:\.]+)-([\da-f:\.]+)', line): ipRangeStart = ipaddress.ip_address(m.group(1)) ipRangeEnd = ipaddress.ip_address(m.group(2)) else: continue if ipRangeStart == ipRangeEnd: blacklistCIDR += str(ipRangeStart) + '\n' else: if ipRangeStart > ipRangeEnd: ipRangeStart, ipRangeEnd = ipRangeEnd, ipRangeStart for cidr in ipaddress.summarize_address_range(ipRangeStart, ipRangeEnd): blacklistCIDR += str(cidr) + '\n' with open(sys.argv[1], 'w', encoding='utf-8') as f: f.write('# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -\n') f.write(f'# 爬取时间:{datetime.now().replace(microsecond=0).astimezone(timezone.utc).isoformat().replace("+00:00", "Z")}\n') f.write('# 爬取用源代码:https://gist.github.com/TransparentLC/b6b07a8db34c4a6006179a6960e15e69\n') f.write('# CIDR版:https://i.akarin.dev/bittorrent-ip-blacklist-cidr.txt\n') f.write('#\n') f.write('# qBittorrent用户可以使用以下命令自动下载和加载IP黑名单(需要开启Web UI):\n') f.write('# curl --http3 --compressed --output /path/to/your/ip-filter.dat https://i.akarin.dev/bittorrent-ip-blacklist.txt\n') f.write('# curl --http3 --compressed --data "json={\\"ip_filter_enabled\\":false}" http://your.web.ui/api/v2/app/setPreferences\n') f.write('# curl --http3 --compressed --data "json={\\"ip_filter_enabled\\":true}" http://your.web.ui/api/v2/app/setPreferences\n') f.write('#\n') f.write('# 请在这里反馈你遇到的无限下载IP地址(由Bilibili@Damn☆You整理和维护):\n') f.write('# https://t.bilibili.com/886509991839662137\n') f.write('# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -\n') f.write(blacklist) with open(sys.argv[2], 'w', encoding='utf-8') as f: f.write('# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -\n') f.write(f'# 爬取时间:{datetime.now().replace(microsecond=0).astimezone(timezone.utc).isoformat().replace("+00:00", "Z")}\n') f.write('# 爬取用源代码:https://gist.github.com/TransparentLC/b6b07a8db34c4a6006179a6960e15e69\n') f.write('# IP范围版:https://i.akarin.dev/bittorrent-ip-blacklist.txt\n') f.write('#\n') f.write('# 本规则可以配合qBittorrent-ClientBlocker的ipBlockListURL选项使用。\n') f.write('#\n') f.write('# 请在这里反馈你遇到的无限下载IP地址(由Bilibili@Damn☆You整理和维护):\n') f.write('# https://t.bilibili.com/886509991839662137\n') f.write('# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -\n') f.write(blacklistCIDR) if __name__ == '__main__': asyncio.run(main())