Skip to content

Instantly share code, notes, and snippets.

@Ecalose
Forked from nickfox-taterli/cf_speed.py
Created August 28, 2020 04:59
Show Gist options
  • Select an option

  • Save Ecalose/e4a4dbc78a1f65761d6d4b9dafffe994 to your computer and use it in GitHub Desktop.

Select an option

Save Ecalose/e4a4dbc78a1f65761d6d4b9dafffe994 to your computer and use it in GitHub Desktop.
用下载方法查找CF最快下载速度的IP
import requests
import threadpool
from datetime import datetime
g_cnt = 0
def requests_test(ip):
global g_cnt
g_cnt = g_cnt + 1
g_cnt_str = '[' + str(g_cnt) + ']'
try:
dl_speed = list()
for i in range(5):
headers = {
'Host': 'vpsro.imghost.tech'
}
response = requests.get("http://" + ip + "/LookingGlass/1GB.test", stream=True, headers=headers,
timeout=1)
dl = 0
dt = datetime.now()
# 每块16KB,越大分块可能越快.
for data in response.iter_content(chunk_size=16384):
dl += len(data)
# 大于10秒直接出结果,10秒内下载完你大概是火箭.
if (datetime.now() - dt).seconds > 10:
dl_speed.append(round((dl / 10) / 1024 / 1024, 2))
break
print(g_cnt_str + ip + '速度:' + str(dl_speed[0]) + ' MB/s | ' + str(dl_speed[1]) + ' MB/s | ' + str(
dl_speed[2]) + ' MB/s | ' + str(dl_speed[3]) + ' MB/s | ' + str(dl_speed[4]) + ' MB/s')
except requests.exceptions.ConnectionError:
print(g_cnt_str + '无法使用(失效):' + ip)
except requests.exceptions.ReadTimeout:
print(g_cnt_str + '无法使用(低速):' + ip)
except requests.exceptions.ChunkedEncodingError:
print(g_cnt_str + '无法使用(错误):' + ip)
except KeyboardInterrupt:
exit(0)
except:
print(g_cnt_str + '无法使用(未知):' + ip)
with open('cf_ips.txt') as f:
ips = f.read().split('\n')
# 要注意总带宽容量,每线程至少需要1Mbps才能消化.
pool = threadpool.ThreadPool(5) # 线程池设置,最多同时跑N个线程
tasks = threadpool.makeRequests(requests_test, ips)
[pool.putRequest(task) for task in tasks]
pool.wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment