""" Based on Matt Brown's ALPR videos: https://www.youtube.com/watch?v=BQTy9XVeSaE This script can: 1. Grab public ALPRs from Shodan using the query below then save the IPs to a file. 2. Enumerate a file of ALPR IPs and attempt to find all cameras associated to the ALPR aggregator found on Shodan. Supply your paid account ($50) Shodan API key with the "SHODAN_API_KEY" environment variable. ### USE THIS SCRIPT AT YOUR OWN RISK ### ALPRs are often used by law enforcement. What you do with the information you find is at your own responsibility. https://www.shodan.io/search?query=port%3A8080+country%3AUS+http.html%3A%22PLease+contact+administrator+to+get+correct+stream+name%22 """ import re import os import sys from multiprocessing.pool import ThreadPool import requests import time BASE_API_URL = "https://api.shodan.io/shodan" ALPR_QUERY = 'port:8080 country:US http.html:"PLease contact administrator to get correct stream name"' MAX_CAM_NUMBER = 10 # the amount of cams to try for each IP def search(ip_path: str): IP_ADDRESSES: list[str] = [] def _request(page: int = 1) -> dict: print("[search] Requesting IP addresses from Shodan...") return requests.get(f"{BASE_API_URL}/host/search?query={ALPR_QUERY}&key={API_KEY}&page={page}").json() def _parse(matches: list[dict]): for match in matches: IP_ADDRESSES.append(match['ip_str']) # Make initial search res = _request(page=1) # Calculate pages total_results: int = res["total"] pages: int = (total_results // 100) + 1 print(f"[search] Parsing {len(res['matches'])} results on page 1...") _parse(res['matches']) for page in range(2, pages + 1): res = _request(page=page)["matches"] print(f"[search] Parsing {len(res)} results on page {page}...") _parse(res) time.sleep(3) print(f"[search] Done parsing. Writing to {ip_path}...") with open(ip_path, "a") as f: f.writelines([f"{ip}\n" for ip in IP_ADDRESSES]) print(f"[search] Done writing {len(IP_ADDRESSES)} IP addresses to {ip_path}.") def enum_cams(ip_list: list[str], http_path: str): all_results: list[str] = [] def _executor(ip: str) -> list[str]: http_cam_links: list[str] = [] print(f"[enum] {f'[{ip}]'.ljust(20)} New thread") for i in range(MAX_CAM_NUMBER + 1): print(f"[enum] {f'[{ip}]'.ljust(20)} Trying: cam{i}ir...") this_http = f"http://{ip}:8080/cam{i}ir" try: res = requests.get(this_http) if res.status_code == 200: print(f"[enum] {f'[{ip}]'.ljust(20)} Success: cam{i}ir") http_cam_links.append(this_http) else: print(f"[enum] {f'[{ip}]'.ljust(20)} Fail: cam{i}ir") except Exception as e: print(f"[enum] {f'[{ip}]'.ljust(20)} Error: {e}") print(f"[enum] Completed thread for IP address {ip}") return http_cam_links print( f"[enum] Starting thread pool for {len(ip_list)} IP addresses (attempting to find {MAX_CAM_NUMBER} cameras)...") pool = ThreadPool(10) results = pool.map(_executor, ip_list) print("[enum] Done enumerating all IP addresses in pool.") pool.close() for result in results: all_results.extend(result) print(f"[enum] Writing {len(all_results)} HTTP cam links to {http_path}...") with open(http_path, "a") as f: f.writelines([f"{r}\n" for r in all_results]) print(f"[enum] Done writing HTTP cam links.") if __name__ == "__main__": cwd = os.path.abspath(os.getcwd()) def _print_usage(): print("Usage: grab_alprs_from_shodan.py search|cams\n") print("- search: Search for ALPR IPs from Shodan.") print("- cams: Enumerate ALPR aggregator IPs and find all cameras.") exit(1) if len(sys.argv) <= 1: _print_usage() elif sys.argv[1] == "search": if not (API_KEY := os.environ.get('SHODAN_API_KEY')): raise OSError("SHODAN_API_KEY environment variable not set") # Ask the user where they want to store results ip_path = input(f"Path to save IP's to [{cwd}/ips.txt]: ") if not ip_path: ip_path = "ips.txt" search(ip_path) elif sys.argv[1] == "cams": ip_list: list[str] = [] ip_path = input(f"Path of saved IP addresses [{cwd}/ips.txt]: ") if not ip_path: ip_path = "ips.txt" if not os.path.exists(ip_path): raise FileNotFoundError(f"File '{ip_path}' not found.") # Read IP file and validate. Parse into list of IP addresses with open(ip_path, "r") as f: lines = f.readlines() for idx, line in enumerate(lines, start=1): if not re.compile(r"^((25[0-5]|(2[0-4]|1[0-9]|[1-9]|)[0-9])(\.(?!$)|$)){4}$").match(line): raise ValueError(f"Invalid IP address on line {idx}: {line}") else: ip_list.append(line.strip()) http_path = input(f"Path to save discovered cams to [{cwd}/cams.txt]: ") if not http_path: http_path = "cams.txt" enum_cams(ip_list, http_path) else: _print_usage()