import argparse import os import configparser import csv import sys from os.path import exists import requests #Console Output coloring. Makes knowing if you have any errors/ warnings easier to identify err_Col = '\033[91m' success_Col = '\033[92m' warn_Col = '\033[93m' no_col = '\033[0m' birb=""" _.----._ ,'.::.--..:._ /::/_,-;._.:._.;,-=_(.-' __ `._ ,;' _..-(((('' .,-'' `-._ _,'<.-'' _..``'.'`-'`. ` _.-((((_..--'' \ \ `.`. -' _.``' \ ` -------------------------------------------------- """ binary_hunt_ascii=""" ____ _ _ _ _ _ _ _ _ | __ )(_)_ __ __ _ _ __ _ _ / \ _ __ __ _| |_ _ ___(_)___ | | | |_ _ _ __ | |_(_)_ __ __ _ | _ \| | '_ \ / _` | '__| | | | / _ \ | '_ \ / _` | | | | / __| / __| | |_| | | | | '_ \| __| | '_ \ / _` | | |_) | | | | | (_| | | | |_| | / ___ \| | | | (_| | | |_| \__ \ \__ \ | _ | |_| | | | | |_| | | | | (_| | |____/|_|_| |_|\__,_|_| \__, | /_/ \_\_| |_|\__,_|_|\__, |___/_|___/ |_| |_|\__,_|_| |_|\__|_|_| |_|\__, | |___/ |___/ |___/ """ def credential_file(): credentials_response_locations = [ "/etc/carbonblack/credentials.response", f"{os.path.dirname(__file__)}/.carbonblack/credentials.response", f"{os.getcwd()}/.carbonblack/credentials.response" ] for p in credentials_response_locations: location = "" if exists(path=p): location = p else: pass return location def config_reader(file_location): config = configparser.ConfigParser() config.sections() config.read(file_location) return config def get_binaries(base_url, auth_token, query="*", rows="100000"): binary_url = f"{base_url}/api/v1/binary?q={query}&rows={rows}" headers = { "X-Auth-Token":f"{auth_token}" } binaries = requests.get(binary_url, headers=headers) if binaries.status_code == 200: return binaries else: sys.exit(err_Col + f"[!][!] We encountered a problem with the query to Carbon Black. The status code returned was {binaries.status_code}") return None def parse_results(results, base_url, subdomain): results_dict = [] for binary in results.json()["results"]: try: deets = dict() dig_sig = '' if "digsig_publisher" in binary: dig_sig = binary['digsig_publisher'] observed_filename_list = '' for observed in binary['observed_filename']: observed_filename_list += observed.replace("\\\\", "\\") + "\n" deets = { "shortname":subdomain, "md5":binary["md5"], "signature_status":binary["signed"], "company_name":binary["company_name"], "observed_filename":observed_filename_list, "count_observed_filename":int(len(binary["observed_filename"])), "original_filename":binary["original_filename"], "internal_name":binary["internal_name"], "file_desc":binary["file_desc"], "server_added_timestamp":binary["server_added_timestamp"], "digsig_publisher":dig_sig, "os_type":binary["os_type"], "host_count":binary["host_count"], "is_executable_image":binary["is_executable_image"], "url":f'{base_url}/#/binary/{binary["md5"]}' } results_dict.append(deets) except Exception as e: sys.exit(err_Col + f"[!] We encountered an issue trying to parse out your results. This is the exception {e}." + no_col) return results_dict def write_csv(output, results): header = ['shortname', 'md5', 'signature_status', 'company_name', 'observed_filename', 'count_observed_filename', 'original_filename', 'internal_name', 'file_desc', 'server_added_timestamp', 'digsig_publisher', 'os_type', 'host_count', 'is_executable_image', 'url'] with open(output, 'w', encoding='UTF8', newline='') as f: writer = csv.DictWriter(f, fieldnames=header) writer.writeheader() writer.writerows(results) def main(): parser = argparse.ArgumentParser(description="Script to hunt through binaries of a single Carbon Black Response customer.") parser.add_argument("--subdomain", "--d", type=str, required=False, help="Subdomain you would like to hunt through") parser.add_argument("--config", "--c", type=str, required=False, help="location of .ini file with the credentials required.") parser.add_argument("--output", "--o", required=False, help="Where do you want to store the CSV file of results?") parser.add_argument("--query", "--q", required=False, help="Specify a specific query you'd like to search. By default it'll return everything.") parser.add_argument("--rows", "--r", required=False, help="Specify the number of rows you'd like outputted. By default it'll return up to 100,000.") parser.add_argument("--no-color", action='store_true') parser.add_argument("--api", required=False, help="API Key for your Carbon Black Instance") parser.add_argument("--url", required=False, help="URL for your Carbon Black Response server") args = parser.parse_args() if args.no_color == True: err_Col = no_col success_Col = no_col warn_Col = no_col else: err_Col = '\033[91m' success_Col = '\033[92m' warn_Col = '\033[93m' #display ascii art print(err_Col + birb) print(success_Col + binary_hunt_ascii + no_col) cred_file = "" api = "" url = "" if args.config: cred_file = args.config elif credential_file() != "": cred_file = credential_file() else: print(warn_Col + "[!] We couldn't find a credential .ini file in any standard location.") if args.api: api = args.api else: api = input(warn_Col + "Please enter a valid API key: " + no_col) if args.url: url = args.url else: url = input(warn_Col + "Please enter a valid URL for your CB server (no trailing /): " + no_col) #checks to see if a subdomain was provided if args.subdomain and cred_file: subdomain = args.subdomain elif cred_file: subdomain = input(warn_Col + "[!] No subdomain provided. Please enter a subdomain that is included in your credential file: " + no_col) else: subdomain = "" #check if output location was given if args.output: output = args.output else: output = os.getcwd() + "/binary_analysis.csv" print(warn_Col + "[*] No output file provided. The output file will be in the current working directory and named binary_analysis.csv" + no_col) if api and url: base_url = url.rstrip() auth_token = api.rstrip() else: #check the config file to make sure it has the appropriate subdomain and format config = config_reader(cred_file) try: base_url = config[subdomain]['url'] auth_token = config[subdomain]['token'] except Exception as e: sys.exit(err_Col + "[!][!] The subodmain you provided is not in the credentials.response file. Please add it.") if args.query and args.rows: binaries = get_binaries(base_url, auth_token, query=args.query, rows=args.rows) elif args.query: binaries = get_binaries(base_url, auth_token, query=args.query) elif args.rows: binaries = get_binaries(base_url, auth_token, rows=args.rows) else: print(success_Col + "[*] Getting list of binaries from Carbon Black") binaries = get_binaries(base_url, auth_token) print(success_Col + f"[*] There were a total of {int(len(binaries.json()['results']))} results returned") print(success_Col + f"[*] Parsing the results to {output}") if binaries: parsed_results = parse_results(binaries, base_url, subdomain) write_csv(output, parsed_results) else: sys.exit(err_Col + "We've encountered a problem.") if __name__ == "__main__": sys.exit(main())