import abc from collections import defaultdict import json import os import pandas as pd import time from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple import requests def process_file(path: str) -> List[Tuple[dict]]: if not path.endswith(".json"): raise ValueError("File must be in JSON format") data_dict = json.load(open(path)) # Flatten the JSON structure and extract flow tuples into a DataFrame flow_data = [] for record in data_dict['records']: time = record['time'] version = record['properties']['Version'] if not version or int(version) != 2: # print(f"Unsupported version: {version}") continue for flow in record['properties']['flows']: rule = flow['rule'] if rule != 'DefaultRule_AllowInternetOutBound': # print(f"Unsupported rule: {rule}") continue for subflow in flow['flows']: for tuple_str in subflow['flowTuples']: tuple_data = tuple_str.split(',') protocol = tuple_data[5] flow = tuple_data[6] decision = tuple_data[7] state = tuple_data[8] # we only care about end of TCP outbound connections that were accepted - because we have the bytes sent if protocol == 'T' and flow == 'O' and decision == 'A' and state == 'E': flow_data.append({ 'time': tuple_data[0], 'source_ip': tuple_data[1], 'dest_ip': tuple_data[2], 'source_port': int(tuple_data[3]), 'dest_port': int(tuple_data[4]), 'protocol': tuple_data[5], 'traffic_flow': tuple_data[6], 'traffic_decision': tuple_data[7], 'flow_state': tuple_data[8], 'packets_sent': int(tuple_data[9]) if tuple_data[9].isdigit() else 0, 'bytes_sent': int(tuple_data[10]) if tuple_data[10].isdigit() else 0, 'packets_received': int(tuple_data[11]) if tuple_data[11].isdigit() else 0, 'bytes_received': int(tuple_data[12]) if tuple_data[12].isdigit() else 0 }) return flow_data def format_bytes(value: float, *args: List[Any], **kwargs: Dict[str, Any]) -> str: """ Converts a byte value into a human-readable format with the appropriate units. :param value: The size in bytes to be converted. :return: A human-readable string representing the size. """ units = ["B", "KB", "MB", "GB", "TB", "PB"] size = float(value) for unit in units: if size < 1024: return f"{size:.4f} {unit}" size /= 1024.0 return f"{size:.4f} {units[-1]}" @dataclass class IpGeo: status: str continent: Optional[str] country: Optional[str] region: Optional[str] INTER_CONTINENTAL_COSTS_PER_GB = { "NA": 0.05, "EU": 0.05, "AS": 0.08, "OC": 0.08, "AF": 0.08, "SA": 0.16, } INTRA_CONTINENTAL_COSTS_PER_GB = { "NA": 0.02, "EU": 0.02, "AS": 0.08, "OC": 0.08, "ME": 0.08, "AF": 0.08, "SA": 0.16, } BYTES_IN_GB = 1024 * 1024 * 1024 def estimate_cost(home: IpGeo, traffic: pd.DataFrame) -> dict[str, Tuple[float, float]]: """ Given the home location and the traffic data, estimate the cost of the traffic. Returns a dict of {'inter-continental': (existing_cost, predicted_cost), 'intra-continental': (existing_cost, predicted_cost)} """ inter_cost = 0 intra_cost = 0 inter_continental_cost_per_gb = INTER_CONTINENTAL_COSTS_PER_GB[home.continent] intra_continental_cost_per_gb = INTRA_CONTINENTAL_COSTS_PER_GB[home.continent] min_time, max_time = traffic['time'].min(), traffic['time'].max() hours = (max_time - min_time).total_seconds() / 3600 for idx, row in traffic.iterrows(): if row['continent'] != home.continent: inter_cost += row['bytes_sent'] / BYTES_IN_GB * inter_continental_cost_per_gb else: intra_cost += row['bytes_sent'] / BYTES_IN_GB * intra_continental_cost_per_gb daily_inter_cost = inter_cost / hours * 24 daily_intra_cost = intra_cost / hours * 24 return {'inter-continental': (inter_cost, daily_inter_cost), 'intra-continental': (intra_cost, daily_intra_cost)} class IpGeoCache: data: dict[str, IpGeo] def __init__(self): self.data = defaultdict(IpGeo) def __getitem__(self, ip: str) -> IpGeo: return self.data[ip] def load(self) -> bool: if os.path.exists("ip_cache.json"): try: data = json.load(open("ip_cache.json")) self.data = {ip: IpGeo(**data[ip]) for ip in data} return True except Exception as e: ... return False def save(self, ips: dict[str, IpGeo]) -> bool: try: json.dump({ip: ips[ip].__dict__ for ip in ips}, open("ip_cache.json", "w")) return True except Exception as e: return False def add_known(self, ip: str, geo: IpGeo) -> None: self.data[ip] = geo def add_unknown(self, ip: str) -> None: self.data[ip] = IpGeo("error", None, None, None) def is_known(self, ip: str) -> bool: return ip in self.data and self.data[ip].status == 'success' def is_unknown(self, ip: str) -> bool: return ip in self.data and self.data[ip].status != 'success' def unknown(self) -> List[str]: return [ip for ip in self.data if self.data[ip].status != 'success'] def all_known(self) -> bool: return all([self.is_known(ip) for ip in self.data]) class GeoService(abc.ABC): @abc.abstractmethod def get_geo_data_batch(self, ips: List[str]) -> Tuple[Dict[str, IpGeo], bool]: pass @abc.abstractmethod def get_geo_data_single(self, ip: str) -> Tuple[IpGeo, bool]: pass class IpApiGeoService(GeoService): cache: IpGeoCache batch_size: int = 100 def __init__(self, cache: IpGeoCache) -> None: super().__init__() self.cache = cache def get_geo_data_batch(self, ips: List[str]) -> Dict[str, IpGeo]: batch_size = 100 results = {} ok = True for ip in ips: batch = [] if self.cache.is_known(ip): results[ip] = self.cache[ip] continue batch.append({"query": ip, "fields": "status,continentCode,countryCode,region,query"}) if len(batch) == batch_size: try: response = requests.post( f"https://ip-api.com/batch?fields=status,continentCode,countryCode,region,query", json=batch) response_json = response.json() for idx, resp in enumerate(response_json): results[batch[idx]['query']] = IpGeo( status=resp['status'], continent=resp['continentCode'], country=resp['countryCode'], region=resp['region'] ) except Exception as e: ok = False print(f"Error fetching geo data: {e}") return results, ok def get_geo_data_single(self, ip: str) -> Tuple[IpGeo, bool]: result = self.get_geo_data_batch([ip]) return (result[0][ip], result[1]) class IpApi2GeoService(GeoService): cache: IpGeoCache def __init__(self, cache: IpGeoCache) -> None: super().__init__() self.cache = cache def get_geo_data_batch(self, ips: List[str]) -> Dict[str, IpGeo]: results = {} ok = True for ip in ips: result = self.get_geo_data_single(ip) results[ip] = result[0] ok = ok and result[1] return results, ok def get_geo_data_single(self, ip: str) -> Tuple[IpGeo, bool]: try: if self.cache.is_known(ip): return self.cache[ip], True response = requests.get(f"https://ipapi.co/{ip}/json/") response.raise_for_status() resp_json = response.json() ip_geo = IpGeo( "success", resp_json['continent_code'], resp_json['country_code'], resp_json['region_code'] ) self.cache.add_known(ip, ip_geo) return self.cache[ip], True except Exception as e: return IpGeo("error", None, None, None), False def get_geo_data(ips: List[str]) -> Dict[str, IpGeo]: # first collect ips to make a batch request cached_ips = IpGeoCache() cached_ips.load() services = [IpApi2GeoService(cached_ips), IpApiGeoService(cached_ips)] ok = False for service in services: results, ok = service.get_geo_data_batch(ips) if cached_ips.all_known(): break if not ok: raise Exception(f"Some services failed. There are {len(cached_ips.unknown())} unknown IPs. Please review them manually.") cached_ips.save(results) return results def main(): folder = "data" if os.getcwd().endswith("nsg") else "nsg/data" # traverse the directory and process each file # note that the file structure can be nested (h=19\m=32\macAddress=00-0D-3A-13-1E-8E\PT1H.json) # we want to process all "leaf" files df = pd.DataFrame(columns=['time', 'source_ip', 'dest_ip', 'source_port', 'dest_port', 'protocol', 'traffic_flow', 'traffic_decision', 'flow_state', 'packets_sent', 'bytes_sent', 'packets_received', 'bytes_received']) files_processed = 0 print(f"{os.getcwd()}") for root, _, files in os.walk(folder): for file in files: path = os.path.join(root, file) if path.endswith(".json"): data_tuples = process_file(path) print(f"Processed {len(data_tuples)} flow tuples from {path}") df = pd.concat([df, pd.DataFrame(data_tuples)], ignore_index=True) files_processed += 1 print(f"Processed {files_processed} files and {len(df)} flow tuples in total") print(f"total bytes sent: {format_bytes(df['bytes_sent'].sum())}") # summarize all traffic by destination IP dest_ip_summary = df.groupby('dest_ip', as_index=False).agg({ 'bytes_sent': 'sum', 'bytes_received': 'sum', }) geo_data = get_geo_data(list(dest_ip_summary['dest_ip'].unique())) existing_cost, predicted_cost = estimate_cost(IpGeo("EU", "NL", "CA"), dest_ip_summary) print(f"Existing cost: {existing_cost:.2f} USD") print(f"Predicted daily cost: {predicted_cost:.2f} USD") # extend the dataframe with the geo data (continent, country, region) dest_ip_summary['continent'] = dest_ip_summary['dest_ip'].map(lambda x: geo_data[x]['continentCode'] if geo_data[x]['status'] == 'success' else None) dest_ip_summary['country'] = dest_ip_summary['dest_ip'].map(lambda x: geo_data[x]['countryCode'] if geo_data[x]['status'] == 'success' else None) dest_ip_summary['region'] = dest_ip_summary['dest_ip'].map(lambda x: geo_data[x]['region'] if geo_data[x]['status'] == 'success' else None) # summarize sent data by each of the above (by itself) and print each summary for col in ['continent', 'country', 'region']: print(f"Summary by {col}") print(dest_ip_summary.groupby(col, as_index=False).agg({ 'bytes_sent': 'sum', 'bytes_received': 'sum', })) if __name__ == "__main__": main()