Skip to content

Instantly share code, notes, and snippets.

@jonaslejon
Created October 18, 2024 17:11
Show Gist options
  • Select an option

  • Save jonaslejon/df39482b363d5ca997d3bb9a13ac3fbe to your computer and use it in GitHub Desktop.

Select an option

Save jonaslejon/df39482b363d5ca997d3bb9a13ac3fbe to your computer and use it in GitHub Desktop.

Revisions

  1. jonaslejon created this gist Oct 18, 2024.
    105 changes: 105 additions & 0 deletions esni-check.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,105 @@
    import os
    import gzip
    import orjson
    import argparse
    from colorama import init, Fore, Style

    # Initialize colorama for cross-platform color support
    init(autoreset=True)

    # Initialize counters for statistics
    total_sessions = 0
    sessions_with_server_name = 0
    sessions_without_server_name = 0

    def read_gz_json(file_path):
    """
    Reads and decompresses a .gz file and parses each line as JSON using orjson.
    """
    try:
    with gzip.open(file_path, 'rb') as f:
    # Iteratively process each line of the file
    for line in f:
    # Parse the JSON data line by line
    json_data = orjson.loads(line)
    yield json_data
    except Exception as e:
    print(f"{Fore.RED}[ERROR] Failed to read {file_path}: {e}{Style.RESET_ALL}")
    return None

    def process_logs(directory, verbose=False, output_file=None):
    """
    Recursively search for .log.gz files, filter ssl files, decompress, and process the JSON data.
    """
    global total_sessions, sessions_with_server_name, sessions_without_server_name

    sessions_without_server_name_list = []

    for root, _, files in os.walk(directory):
    for file in files:
    # Only process files with 'ssl' in the filename
    if file.endswith(".log.gz") and "ssl" in file:
    file_path = os.path.join(root, file)

    # Only print file processing info if verbose is enabled
    if verbose:
    print(f"{Fore.GREEN}[INFO] Processing file: {file_path}{Style.RESET_ALL}")

    # Read and process the JSON data from the .log.gz file
    for json_data in read_gz_json(file_path):
    if json_data is None:
    continue
    total_sessions += 1

    # Check if 'server_name' is present
    if 'server_name' in json_data and json_data['server_name']:
    sessions_with_server_name += 1
    else:
    sessions_without_server_name += 1
    sessions_without_server_name_list.append(json_data)

    # Write sessions without server_name to a file if specified
    if output_file:
    with open(output_file, 'w') as f_out:
    for session in sessions_without_server_name_list:
    f_out.write(orjson.dumps(session).decode('utf-8') + '\n')
    print(f"{Fore.BLUE}Sessions without 'server_name' written to: {output_file}{Style.RESET_ALL}")

    def print_statistics():
    """
    Print the final statistics after all logs have been processed.
    """
    if total_sessions == 0:
    print(f"{Fore.RED}[WARNING] No sessions found.{Style.RESET_ALL}")
    return

    # Calculate percentages
    percent_with_server_name = (sessions_with_server_name / total_sessions) * 100
    percent_without_server_name = (sessions_without_server_name / total_sessions) * 100

    # Print statistics with colors
    print(f"\n{Fore.CYAN}===== Processing Statistics ====={Style.RESET_ALL}")
    print(f"{Fore.GREEN}Total sessions processed: {total_sessions}{Style.RESET_ALL}")
    print(f"{Fore.GREEN}Sessions with 'server_name': {sessions_with_server_name} ({percent_with_server_name:.2f}%){Style.RESET_ALL}")
    print(f"{Fore.YELLOW}Sessions without 'server_name': {sessions_without_server_name} ({percent_without_server_name:.2f}%){Style.RESET_ALL}")

    if __name__ == "__main__":
    # Set up argument parsing
    parser = argparse.ArgumentParser(description="Process Zeek SSL logs for Encrypted SNI statistics.")
    parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output (print files being processed)")
    parser.add_argument("-d", "--directory", type=str, default="/usr/local/zeek/logs/", help="Directory to search for SSL logs (default: /usr/local/zeek/logs/)")
    parser.add_argument("-o", "--output", type=str, help="File to write sessions without 'server_name'")

    args = parser.parse_args()

    # Print the starting message
    print(f"{Fore.BLUE}Encrypted SNI statistics{Style.RESET_ALL}")
    print(f"{Fore.BLUE}Starting SSL log processing...{Style.RESET_ALL}")

    # Start processing from the specified directory
    process_logs(args.directory, verbose=args.verbose, output_file=args.output)

    # Print statistics after processing all files
    print_statistics()

    print(f"{Fore.BLUE}Processing complete!{Style.RESET_ALL}")