-
-
Save fjctp/63824517c1ce8b7040a5e9d6ee5397a0 to your computer and use it in GitHub Desktop.
Speedtest against Hetzner servers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sys, requests, time, math, subprocess, platform, threading | |
| from requests.adapters import HTTPAdapter | |
| from requests.packages.urllib3.util.retry import Retry | |
| verbose = '-v' in sys.argv | |
| def downloadFile(u, hostname): | |
| try: | |
| if verbose: | |
| print(f"Attempting to download from {u}...") | |
| session = requests.Session() | |
| retry = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]) | |
| adapter = HTTPAdapter(max_retries=retry) | |
| session.mount('http://', adapter) | |
| session.mount('https://', adapter) | |
| headers = {'User-Agent': 'Mozilla/5.0'} | |
| if verbose: | |
| print("Session initialized. Sending GET request...") | |
| r = session.get(u, headers=headers, stream=True, timeout=10) | |
| if '-v' in sys.argv: | |
| pdb.set_trace() | |
| if r.status_code == 200: | |
| if verbose: | |
| print("GET request successful. Streaming content...") | |
| else: | |
| print(f"GET request failed with status code {r.status_code}") | |
| return None | |
| tl = int(r.headers.get('content-length')) | |
| if verbose: | |
| print(f"Content length: {tl} bytes.") | |
| pts = [] | |
| latencies = [] | |
| dl = 0 | |
| start_time = time.time() | |
| with open("/dev/null", 'wb') as f: | |
| st = time.process_time() | |
| for c in r.iter_content(1024): | |
| if verbose: | |
| print("Writing chunk to file...") | |
| dl += len(c) | |
| f.write(c) | |
| time_diff = time.process_time() - st | |
| if time_diff > 0: # prevent division by zero | |
| pts.append(dl // (time_diff)) | |
| latencies.append(get_latency(hostname)) | |
| p = int(50 * dl / tl) | |
| sys.stdout.write(f"\r[{'#' * p}{'.' * (50 - p)}] {int(100 * dl / tl)}%") | |
| sys.stdout.flush() | |
| end_time = time.time() | |
| elapsed_time = end_time - start_time | |
| avg = round(sum(pts) / len(pts), 2) if pts else 0 | |
| avg_latency = round(sum(latencies) / len(latencies), 2) if latencies else 0 | |
| max_jitter = max(latencies) - min(latencies) if latencies else 0 | |
| packet_loss = (1 - dl / tl) * 100 if tl else 0 | |
| if verbose: | |
| print(f"{time.time()-start_time:.2f}s: Download completed.") | |
| return avg, elapsed_time, avg_latency, max_jitter, packet_loss | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| return None | |
| intro_art = ''' | |
| # # | |
| # # ###### ##### ###### # # ###### ##### | |
| # # # # # ## # # # # | |
| ####### ##### # # # # # ##### # # | |
| # # # # # # # # # ##### | |
| # # # # # # ## # # # | |
| # # ###### # ###### # # ###### # # | |
| ##### ####### | |
| # # ##### ###### ###### ##### # ###### #### ##### | |
| # # # # # # # # # # # | |
| ##### # # ##### ##### # # # ##### #### # | |
| # ##### # # # # # # # # | |
| # # # # # # # # # # # # | |
| ##### # ###### ###### ##### # ###### #### # | |
| ''' | |
| print(intro_art) | |
| hosts = { | |
| "fsn1-speed.hetzner.com": {"sm": "https://fsn1-speed.hetzner.com/100MB.bin", "md": "https://fsn1-speed.hetzner.com/1GB.bin", "lg": "https://fsn1-speed.hetzner.com/10GB.bin"}, | |
| "hel1-speed.hetzner.com": {"sm": "https://hel1-speed.hetzner.com/100MB.bin", "md": "https://hel1-speed.hetzner.com/1GB.bin", "lg": "https://hel1-speed.hetzner.com/10GB.bin"}, | |
| "speed.hetzner.de": {"sm": "https://speed.hetzner.de/100MB.bin", "md": "https://speed.hetzner.de/1GB.bin", "lg": "https://speed.hetzner.de/10GB.bin"}, | |
| "ash.icmp.hetzner.com": {"sm": "http://ash.icmp.hetzner.com/100MB.bin", "md": "http://ash.icmp.hetzner.com/1GB.bin", "lg": "http://ash.icmp.hetzner.com/10GB.bin"}, | |
| "hil.icmp.hetzner.com": {"sm": "http://hil.icmp.hetzner.com/100MB.bin", "md": "http://hil.icmp.hetzner.com/1GB.bin", "lg": "http://hil.icmp.hetzner.com/10GB.bin"} | |
| } | |
| loc_map = { | |
| "fsn1-speed.hetzner.com": "Falkenstein, Germany", | |
| "hel1-speed.hetzner.com": "Helsinki, Finland", | |
| "speed.hetzner.de": "Nuremberg, Germany", | |
| "ash.icmp.hetzner.com": "Ashburn, Virginia, USA", | |
| "hil.icmp.hetzner.com": "Hillsboro, Oregon, USA" | |
| } | |
| def convert_size(s): | |
| if s <= 0: | |
| return "0B" | |
| i = int(math.floor(math.log(s, 1024))) | |
| return f"{round(s / math.pow(1024, i), 2)}{'B,KB,MB,GB,TB,PB,EB,ZB,YB'.split(',')[i]}" | |
| def get_latency(h): | |
| if platform.system() == "Windows": return 0 | |
| try: | |
| o = subprocess.check_output(["ping", "-c", "1", h]).decode("utf-8").split("\n") | |
| for l in o: | |
| if "time=" in l: return float(l.split("time=")[1].split(" ")[0]) | |
| except: return 0 | |
| def downloadFile(u, hostname): | |
| try: | |
| start_time = time.time() | |
| if verbose: | |
| print(f"{time.time()-start_time:.2f}s: Attempting to download from {u}...") | |
| session = requests.Session() | |
| retry = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]) | |
| adapter = HTTPAdapter(max_retries=retry) | |
| session.mount('http://', adapter) | |
| session.mount('https://', adapter) | |
| headers = {'User-Agent': 'Mozilla/5.0'} | |
| if verbose: | |
| print(f"{time.time()-start_time:.2f}s: Session initialized. Sending GET request...") | |
| r = session.get(u, headers=headers, stream=True, timeout=10) | |
| if r.status_code == 200: | |
| if verbose: | |
| print(f"{time.time()-start_time:.2f}s: GET request successful. Streaming content...") | |
| else: | |
| print(f"{time.time()-start_time:.2f}s: GET request failed with status code {r.status_code}") | |
| return None | |
| tl = int(r.headers.get('content-length')) | |
| if verbose: | |
| print(f"{time.time()-start_time:.2f}s: Content length: {tl} bytes.") | |
| pts = [] | |
| latencies = [] | |
| dl = 0 | |
| start_time = time.time() | |
| with open("/dev/null", 'wb') as f: | |
| st = time.process_time() | |
| last_latency_check = time.time() # Initialize the last latency check to the current time | |
| latency_check_interval = 5 # Set how often to check latency, in seconds | |
| for c in r.iter_content(1024): | |
| dl += len(c) | |
| f.write(c) | |
| time_diff = time.process_time() - st | |
| if time_diff > 0: #prevent division by zero | |
| pts.append(dl // time_diff) | |
| current_time = time.time() | |
| if current_time - last_latency_check > latency_check_interval: | |
| latencies.append(get_latency(hostname)) | |
| last_latency_check = current_time # Update the last latency check time | |
| p = int(50 * dl / tl) | |
| sys.stdout.write(f"\r[{'#' * p}{'.' * (50 - p)}] {int(100 * dl / tl)}%") | |
| sys.stdout.flush() | |
| end_time = time.time() | |
| elapsed_time = end_time - start_time | |
| avg = round(sum(pts) / len(pts), 2) if pts else 0 | |
| avg_latency = round(sum(latencies) / len(latencies), 2) if latencies else 0 | |
| max_jitter = max(latencies) - min(latencies) if latencies else 0 | |
| packet_loss = (1 - dl / tl) * 100 if tl else 0 | |
| if verbose: | |
| print(f"{time.time()-start_time:.2f}s: Download completed.") | |
| return avg, elapsed_time, avg_latency, max_jitter, packet_loss | |
| except Exception as e: | |
| print(f"{time.time()-start_time:.2f}s: An error occurred: {e}") | |
| return None | |
| def main(): | |
| sz = None | |
| if len(sys.argv) > 1 and sys.argv[1] in ["sm", "md", "lg"]: | |
| sz = sys.argv[1] | |
| if sz is None: | |
| print("Usage: python3 hetzner-speedtest.py [sm] Small 100MB") | |
| print(" [md] Medium 1GB") | |
| print(" [lg] Large 10GB") | |
| sz = input("Enter size (sm, md, lg): ") | |
| if sz not in ["sm", "md", "lg"]: | |
| return | |
| else: | |
| sz = sys.argv[1] | |
| lat_res = {} | |
| spd_res = {} | |
| time_res = {} | |
| avg_latency_res = {} | |
| max_jitter_res = {} | |
| packet_loss_res = {} | |
| traceroute_res = {} | |
| dns_resolution_res = {} | |
| dns_resolution_time_res = {} | |
| for h in hosts: | |
| loc = loc_map.get(h, h) | |
| print(f"\nTesting {loc}") | |
| f = hosts[h][sz] | |
| download_result = downloadFile(f, h) | |
| if download_result is not None: | |
| avg_spd, elapsed_time, avg_latency, max_jitter, packet_loss = download_result | |
| lat_res[loc] = get_latency(h) | |
| spd_res[loc] = avg_spd | |
| time_res[loc] = elapsed_time | |
| avg_latency_res[loc] = avg_latency | |
| max_jitter_res[loc] = max_jitter | |
| packet_loss_res[loc] = packet_loss | |
| print(f"\nSpeed: {convert_size(avg_spd)}/s") | |
| print(f"Time: {elapsed_time:.2f} seconds") | |
| print(f"Latency: {lat_res[loc]}ms") | |
| print(f"Avg Latency: {avg_latency}ms") | |
| print(f"Max Jitter: {max_jitter}ms") | |
| print(f"Packet Loss: {packet_loss:.2f}%") | |
| # Uncomment these lines if you implement traceroute and DNS resolution | |
| # print(f"Traceroute: \n{traceroute_result}") | |
| # print(f"DNS Resolution: {dns_resolution_result} (Time: {dns_resolution_time:.4f} seconds)") | |
| else: | |
| print(f"Could not complete the download for {loc}. Skipping...") | |
| if lat_res: | |
| min_lat_loc = min(lat_res, key=lat_res.get) | |
| max_spd_loc = max(spd_res, key=spd_res.get) | |
| min_time_loc = min(time_res, key=time_res.get) | |
| min_jitter_loc = min(max_jitter_res, key=max_jitter_res.get) | |
| max_jitter_loc = max(max_jitter_res, key=max_jitter_res.get) | |
| else: | |
| print("No successful streams established to analyse.") | |
| print(f"\nResults:") | |
| print(f"Lowest Latency: {min_lat_loc.ljust(25)}") | |
| print(f"Latency: {lat_res[min_lat_loc]:.3f}ms; Time: {time_res[min_lat_loc]:.2f}s; Speed: {convert_size(spd_res[min_lat_loc])}/s)") | |
| print(f"Highest Speed: {max_spd_loc.ljust(25)}") | |
| print(f"(Latency: {lat_res[max_spd_loc]:.3f}ms; Time: {time_res[max_spd_loc]:.2f}s; Speed: {convert_size(spd_res[max_spd_loc])}/s)") | |
| print(f"Fastest Time: {min_time_loc.ljust(25)}") | |
| print(f"(Latency: {lat_res[min_time_loc]:.3f}ms; Time: {time_res[min_time_loc]:.2f}s; Speed: {convert_size(spd_res[min_time_loc])}/s)") | |
| print(f"Lowest Jitter: {min_jitter_loc.ljust(25)} (Jitter: {convert_size(max_jitter_res[min_jitter_loc])}; Packet Loss: {packet_loss_res[min_jitter_loc]:.2f}%)") | |
| print(f"Highest Jitter: {max_jitter_loc.ljust(25)} (Jitter: {convert_size(max_jitter_res[max_jitter_loc])}; Packet Loss: {packet_loss_res[max_jitter_loc]:.2f}%)") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment