import sys, requests, time, math, subprocess, platform, threading from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry verbose = '-v' in sys.argv def downloadFile(u, hostname): try: if verbose: print(f"Attempting to download from {u}...") session = requests.Session() retry = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) headers = {'User-Agent': 'Mozilla/5.0'} if verbose: print("Session initialized. Sending GET request...") r = session.get(u, headers=headers, stream=True, timeout=10) if '-v' in sys.argv: pdb.set_trace() if r.status_code == 200: if verbose: print("GET request successful. Streaming content...") else: print(f"GET request failed with status code {r.status_code}") return None tl = int(r.headers.get('content-length')) if verbose: print(f"Content length: {tl} bytes.") pts = [] latencies = [] dl = 0 start_time = time.time() with open("/dev/null", 'wb') as f: st = time.process_time() for c in r.iter_content(1024): if verbose: print("Writing chunk to file...") dl += len(c) f.write(c) time_diff = time.process_time() - st if time_diff > 0: # prevent division by zero pts.append(dl // (time_diff)) latencies.append(get_latency(hostname)) p = int(50 * dl / tl) sys.stdout.write(f"\r[{'#' * p}{'.' * (50 - p)}] {int(100 * dl / tl)}%") sys.stdout.flush() end_time = time.time() elapsed_time = end_time - start_time avg = round(sum(pts) / len(pts), 2) if pts else 0 avg_latency = round(sum(latencies) / len(latencies), 2) if latencies else 0 max_jitter = max(latencies) - min(latencies) if latencies else 0 packet_loss = (1 - dl / tl) * 100 if tl else 0 if verbose: print(f"{time.time()-start_time:.2f}s: Download completed.") return avg, elapsed_time, avg_latency, max_jitter, packet_loss except Exception as e: print(f"An error occurred: {e}") return None intro_art = ''' # # # # ###### ##### ###### # # ###### ##### # # # # # ## # # # # ####### ##### # # # # # ##### # # # # # # # # # # # ##### # # # # # # ## # # # # # ###### # ###### # # ###### # # ##### ####### # # ##### ###### ###### ##### # ###### #### ##### # # # # # # # # # # # ##### # # ##### ##### # # # ##### #### # # ##### # # # # # # # # # # # # # # # # # # # # ##### # ###### ###### ##### # ###### #### # ''' print(intro_art) hosts = { "fsn1-speed.hetzner.com": {"sm": "https://fsn1-speed.hetzner.com/100MB.bin", "md": "https://fsn1-speed.hetzner.com/1GB.bin", "lg": "https://fsn1-speed.hetzner.com/10GB.bin"}, "hel1-speed.hetzner.com": {"sm": "https://hel1-speed.hetzner.com/100MB.bin", "md": "https://hel1-speed.hetzner.com/1GB.bin", "lg": "https://hel1-speed.hetzner.com/10GB.bin"}, "speed.hetzner.de": {"sm": "https://speed.hetzner.de/100MB.bin", "md": "https://speed.hetzner.de/1GB.bin", "lg": "https://speed.hetzner.de/10GB.bin"}, "ash.icmp.hetzner.com": {"sm": "http://ash.icmp.hetzner.com/100MB.bin", "md": "http://ash.icmp.hetzner.com/1GB.bin", "lg": "http://ash.icmp.hetzner.com/10GB.bin"}, "hil.icmp.hetzner.com": {"sm": "http://hil.icmp.hetzner.com/100MB.bin", "md": "http://hil.icmp.hetzner.com/1GB.bin", "lg": "http://hil.icmp.hetzner.com/10GB.bin"} } loc_map = { "fsn1-speed.hetzner.com": "Falkenstein, Germany", "hel1-speed.hetzner.com": "Helsinki, Finland", "speed.hetzner.de": "Nuremberg, Germany", "ash.icmp.hetzner.com": "Ashburn, Virginia, USA", "hil.icmp.hetzner.com": "Hillsboro, Oregon, USA" } def convert_size(s): if s <= 0: return "0B" i = int(math.floor(math.log(s, 1024))) return f"{round(s / math.pow(1024, i), 2)}{'B,KB,MB,GB,TB,PB,EB,ZB,YB'.split(',')[i]}" def get_latency(h): if platform.system() == "Windows": return 0 try: o = subprocess.check_output(["ping", "-c", "1", h]).decode("utf-8").split("\n") for l in o: if "time=" in l: return float(l.split("time=")[1].split(" ")[0]) except: return 0 def downloadFile(u, hostname): try: start_time = time.time() if verbose: print(f"{time.time()-start_time:.2f}s: Attempting to download from {u}...") session = requests.Session() retry = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) headers = {'User-Agent': 'Mozilla/5.0'} if verbose: print(f"{time.time()-start_time:.2f}s: Session initialized. Sending GET request...") r = session.get(u, headers=headers, stream=True, timeout=10) if r.status_code == 200: if verbose: print(f"{time.time()-start_time:.2f}s: GET request successful. Streaming content...") else: print(f"{time.time()-start_time:.2f}s: GET request failed with status code {r.status_code}") return None tl = int(r.headers.get('content-length')) if verbose: print(f"{time.time()-start_time:.2f}s: Content length: {tl} bytes.") pts = [] latencies = [] dl = 0 start_time = time.time() with open("/dev/null", 'wb') as f: st = time.process_time() last_latency_check = time.time() # Initialize the last latency check to the current time latency_check_interval = 5 # Set how often to check latency, in seconds for c in r.iter_content(1024): dl += len(c) f.write(c) time_diff = time.process_time() - st if time_diff > 0: #prevent division by zero pts.append(dl // time_diff) current_time = time.time() if current_time - last_latency_check > latency_check_interval: latencies.append(get_latency(hostname)) last_latency_check = current_time # Update the last latency check time p = int(50 * dl / tl) sys.stdout.write(f"\r[{'#' * p}{'.' * (50 - p)}] {int(100 * dl / tl)}%") sys.stdout.flush() end_time = time.time() elapsed_time = end_time - start_time avg = round(sum(pts) / len(pts), 2) if pts else 0 avg_latency = round(sum(latencies) / len(latencies), 2) if latencies else 0 max_jitter = max(latencies) - min(latencies) if latencies else 0 packet_loss = (1 - dl / tl) * 100 if tl else 0 if verbose: print(f"{time.time()-start_time:.2f}s: Download completed.") return avg, elapsed_time, avg_latency, max_jitter, packet_loss except Exception as e: print(f"{time.time()-start_time:.2f}s: An error occurred: {e}") return None def main(): sz = None if len(sys.argv) > 1 and sys.argv[1] in ["sm", "md", "lg"]: sz = sys.argv[1] if sz is None: print("Usage: python3 hetzner-speedtest.py [sm] Small 100MB") print(" [md] Medium 1GB") print(" [lg] Large 10GB") sz = input("Enter size (sm, md, lg): ") if sz not in ["sm", "md", "lg"]: return else: sz = sys.argv[1] lat_res = {} spd_res = {} time_res = {} avg_latency_res = {} max_jitter_res = {} packet_loss_res = {} traceroute_res = {} dns_resolution_res = {} dns_resolution_time_res = {} for h in hosts: loc = loc_map.get(h, h) print(f"\nTesting {loc}") f = hosts[h][sz] download_result = downloadFile(f, h) if download_result is not None: avg_spd, elapsed_time, avg_latency, max_jitter, packet_loss = download_result lat_res[loc] = get_latency(h) spd_res[loc] = avg_spd time_res[loc] = elapsed_time avg_latency_res[loc] = avg_latency max_jitter_res[loc] = max_jitter packet_loss_res[loc] = packet_loss print(f"\nSpeed: {convert_size(avg_spd)}/s") print(f"Time: {elapsed_time:.2f} seconds") print(f"Latency: {lat_res[loc]}ms") print(f"Avg Latency: {avg_latency}ms") print(f"Max Jitter: {max_jitter}ms") print(f"Packet Loss: {packet_loss:.2f}%") # Uncomment these lines if you implement traceroute and DNS resolution # print(f"Traceroute: \n{traceroute_result}") # print(f"DNS Resolution: {dns_resolution_result} (Time: {dns_resolution_time:.4f} seconds)") else: print(f"Could not complete the download for {loc}. Skipping...") if lat_res: min_lat_loc = min(lat_res, key=lat_res.get) max_spd_loc = max(spd_res, key=spd_res.get) min_time_loc = min(time_res, key=time_res.get) min_jitter_loc = min(max_jitter_res, key=max_jitter_res.get) max_jitter_loc = max(max_jitter_res, key=max_jitter_res.get) else: print("No successful streams established to analyse.") print(f"\nResults:") print(f"Lowest Latency: {min_lat_loc.ljust(25)}") print(f"Latency: {lat_res[min_lat_loc]:.3f}ms; Time: {time_res[min_lat_loc]:.2f}s; Speed: {convert_size(spd_res[min_lat_loc])}/s)") print(f"Highest Speed: {max_spd_loc.ljust(25)}") print(f"(Latency: {lat_res[max_spd_loc]:.3f}ms; Time: {time_res[max_spd_loc]:.2f}s; Speed: {convert_size(spd_res[max_spd_loc])}/s)") print(f"Fastest Time: {min_time_loc.ljust(25)}") print(f"(Latency: {lat_res[min_time_loc]:.3f}ms; Time: {time_res[min_time_loc]:.2f}s; Speed: {convert_size(spd_res[min_time_loc])}/s)") print(f"Lowest Jitter: {min_jitter_loc.ljust(25)} (Jitter: {convert_size(max_jitter_res[min_jitter_loc])}; Packet Loss: {packet_loss_res[min_jitter_loc]:.2f}%)") print(f"Highest Jitter: {max_jitter_loc.ljust(25)} (Jitter: {convert_size(max_jitter_res[max_jitter_loc])}; Packet Loss: {packet_loss_res[max_jitter_loc]:.2f}%)") if __name__ == "__main__": main()