Created
October 18, 2024 17:11
-
-
Save jonaslejon/df39482b363d5ca997d3bb9a13ac3fbe to your computer and use it in GitHub Desktop.
Revisions
-
jonaslejon created this gist
Oct 18, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,105 @@ import os import gzip import orjson import argparse from colorama import init, Fore, Style # Initialize colorama for cross-platform color support init(autoreset=True) # Initialize counters for statistics total_sessions = 0 sessions_with_server_name = 0 sessions_without_server_name = 0 def read_gz_json(file_path): """ Reads and decompresses a .gz file and parses each line as JSON using orjson. """ try: with gzip.open(file_path, 'rb') as f: # Iteratively process each line of the file for line in f: # Parse the JSON data line by line json_data = orjson.loads(line) yield json_data except Exception as e: print(f"{Fore.RED}[ERROR] Failed to read {file_path}: {e}{Style.RESET_ALL}") return None def process_logs(directory, verbose=False, output_file=None): """ Recursively search for .log.gz files, filter ssl files, decompress, and process the JSON data. """ global total_sessions, sessions_with_server_name, sessions_without_server_name sessions_without_server_name_list = [] for root, _, files in os.walk(directory): for file in files: # Only process files with 'ssl' in the filename if file.endswith(".log.gz") and "ssl" in file: file_path = os.path.join(root, file) # Only print file processing info if verbose is enabled if verbose: print(f"{Fore.GREEN}[INFO] Processing file: {file_path}{Style.RESET_ALL}") # Read and process the JSON data from the .log.gz file for json_data in read_gz_json(file_path): if json_data is None: continue total_sessions += 1 # Check if 'server_name' is present if 'server_name' in json_data and json_data['server_name']: sessions_with_server_name += 1 else: sessions_without_server_name += 1 sessions_without_server_name_list.append(json_data) # Write sessions without server_name to a file if specified if output_file: with open(output_file, 'w') as f_out: for session in sessions_without_server_name_list: f_out.write(orjson.dumps(session).decode('utf-8') + '\n') print(f"{Fore.BLUE}Sessions without 'server_name' written to: {output_file}{Style.RESET_ALL}") def print_statistics(): """ Print the final statistics after all logs have been processed. """ if total_sessions == 0: print(f"{Fore.RED}[WARNING] No sessions found.{Style.RESET_ALL}") return # Calculate percentages percent_with_server_name = (sessions_with_server_name / total_sessions) * 100 percent_without_server_name = (sessions_without_server_name / total_sessions) * 100 # Print statistics with colors print(f"\n{Fore.CYAN}===== Processing Statistics ====={Style.RESET_ALL}") print(f"{Fore.GREEN}Total sessions processed: {total_sessions}{Style.RESET_ALL}") print(f"{Fore.GREEN}Sessions with 'server_name': {sessions_with_server_name} ({percent_with_server_name:.2f}%){Style.RESET_ALL}") print(f"{Fore.YELLOW}Sessions without 'server_name': {sessions_without_server_name} ({percent_without_server_name:.2f}%){Style.RESET_ALL}") if __name__ == "__main__": # Set up argument parsing parser = argparse.ArgumentParser(description="Process Zeek SSL logs for Encrypted SNI statistics.") parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output (print files being processed)") parser.add_argument("-d", "--directory", type=str, default="/usr/local/zeek/logs/", help="Directory to search for SSL logs (default: /usr/local/zeek/logs/)") parser.add_argument("-o", "--output", type=str, help="File to write sessions without 'server_name'") args = parser.parse_args() # Print the starting message print(f"{Fore.BLUE}Encrypted SNI statistics{Style.RESET_ALL}") print(f"{Fore.BLUE}Starting SSL log processing...{Style.RESET_ALL}") # Start processing from the specified directory process_logs(args.directory, verbose=args.verbose, output_file=args.output) # Print statistics after processing all files print_statistics() print(f"{Fore.BLUE}Processing complete!{Style.RESET_ALL}")