Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save vector-sec/ce3e6025850683ece22db91b06b14b42 to your computer and use it in GitHub Desktop.
Save vector-sec/ce3e6025850683ece22db91b06b14b42 to your computer and use it in GitHub Desktop.

Revisions

  1. @rc-abodkins rc-abodkins revised this gist Jul 2, 2021. 1 changed file with 0 additions and 3 deletions.
    3 changes: 0 additions & 3 deletions binary_analysis_hunting.py
    Original file line number Diff line number Diff line change
    @@ -88,10 +88,7 @@ def get_binaries(base_url, auth_token, query="*", rows="100000"):
    def parse_results(results, base_url, subdomain):

    results_dict = []
    #for binary in results:
    # print(binary)
    for binary in results.json()["results"]:
    #for detection in detection_dict[subdomain].values():
    try:
    deets = dict()

  2. @rc-abodkins rc-abodkins revised this gist Jul 2, 2021. 1 changed file with 33 additions and 15 deletions.
    48 changes: 33 additions & 15 deletions binary_analysis_hunting.py
    Original file line number Diff line number Diff line change
    @@ -123,7 +123,7 @@ def parse_results(results, base_url, subdomain):

    results_dict.append(deets)
    except Exception as e:
    sys.exit(err_Col + f"[!] We encountered an issue trying to parse out your results. This is the error encountered {e}." + no_col)
    sys.exit(err_Col + f"[!] We encountered an issue trying to parse out your results. This is the exception {e}." + no_col)

    return results_dict

    @@ -146,6 +146,8 @@ def main():
    parser.add_argument("--query", "--q", required=False, help="Specify a specific query you'd like to search. By default it'll return everything.")
    parser.add_argument("--rows", "--r", required=False, help="Specify the number of rows you'd like outputted. By default it'll return up to 100,000.")
    parser.add_argument("--no-color", action='store_true')
    parser.add_argument("--api", required=False, help="API Key for your Carbon Black Instance")
    parser.add_argument("--url", required=False, help="URL for your Carbon Black Response server")

    args = parser.parse_args()

    @@ -163,19 +165,34 @@ def main():
    print(err_Col + birb)
    print(success_Col + binary_hunt_ascii + no_col)

    #get credential file
    cred_file = ""
    api = ""
    url = ""



    if args.config:
    cred_file = args.config
    elif credential_file() != "":
    cred_file = credential_file()
    else:
    cred_file = input(warn_Col + "[!] We couldn't find a credential .ini file in any standard location. Please enter a valid configuration file path:" + no_col)
    print(warn_Col + "[!] We couldn't find a credential .ini file in any standard location.")
    if args.api:
    api = args.api
    else:
    api = input(warn_Col + "Please enter a valid API key: " + no_col)
    if args.url:
    url = args.url
    else:
    url = input(warn_Col + "Please enter a valid URL for your CB server (no trailing /): " + no_col)

    #checks to see if a subdomain was provided
    if args.subdomain:
    if args.subdomain and cred_file:
    subdomain = args.subdomain
    else:
    elif cred_file:
    subdomain = input(warn_Col + "[!] No subdomain provided. Please enter a subdomain that is included in your credential file: " + no_col)
    else:
    subdomain = ""

    #check if output location was given
    if args.output:
    @@ -184,16 +201,18 @@ def main():
    output = os.getcwd() + "/binary_analysis.csv"
    print(warn_Col + "[*] No output file provided. The output file will be in the current working directory and named binary_analysis.csv" + no_col)

    if api and url:
    base_url = url.rstrip()
    auth_token = api.rstrip()
    else:
    #check the config file to make sure it has the appropriate subdomain and format
    config = config_reader(cred_file)


    #check the config file to make sure it has the appropriate subdomain and format
    config = config_reader(cred_file)

    try:
    base_url = config[subdomain]['url']
    auth_token = config[subdomain]['token']
    except Exception as e:
    sys.exit(err_Col + "[!][!] The subodmain you provided is not in the credentials.response file. Please add it.")
    try:
    base_url = config[subdomain]['url']
    auth_token = config[subdomain]['token']
    except Exception as e:
    sys.exit(err_Col + "[!][!] The subodmain you provided is not in the credentials.response file. Please add it.")

    if args.query and args.rows:
    binaries = get_binaries(base_url, auth_token, query=args.query, rows=args.rows)
    @@ -216,4 +235,3 @@ def main():

    if __name__ == "__main__":
    sys.exit(main())

  3. @rc-abodkins rc-abodkins revised this gist Jul 2, 2021. No changes.
  4. @rc-abodkins rc-abodkins created this gist Jul 2, 2021.
    219 changes: 219 additions & 0 deletions binary_analysis_hunting.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,219 @@
    import argparse
    import os
    import configparser
    import csv
    import sys
    from os.path import exists
    import requests

    #Console Output coloring. Makes knowing if you have any errors/ warnings easier to identify
    err_Col = '\033[91m'
    success_Col = '\033[92m'
    warn_Col = '\033[93m'
    no_col = '\033[0m'


    birb="""
    _.----._
    ,'.::.--..:._
    /::/_,-<o)::;_`-._
    ::::::::`-';'`,--`-`
    ;::;'|::::,','
    ,'::/ ;:::/, :.
    /,':/ /::;' \ ':\\
    :'.:: ,-'' . `.::\\
    \.:;':. ` :: .:
    (;' ;;; .::' :|
    \,:;; \ `::.\.\\
    `);' '::' `:
    \. ` `' .: _,'
    `.: .. -. ' :. :/ _.-' _.-
    >;._.:._.;,-=_(.-' __ `._
    ,;' _..-(((('' .,-'' `-._
    _,'<.-'' _..``'.'`-'`. `
    _.-((((_..--'' \ \ `.`.
    -' _.``' \ `
    --------------------------------------------------
    """


    binary_hunt_ascii="""
    ____ _ _ _ _ _ _ _ _
    | __ )(_)_ __ __ _ _ __ _ _ / \ _ __ __ _| |_ _ ___(_)___ | | | |_ _ _ __ | |_(_)_ __ __ _
    | _ \| | '_ \ / _` | '__| | | | / _ \ | '_ \ / _` | | | | / __| / __| | |_| | | | | '_ \| __| | '_ \ / _` |
    | |_) | | | | | (_| | | | |_| | / ___ \| | | | (_| | | |_| \__ \ \__ \ | _ | |_| | | | | |_| | | | | (_| |
    |____/|_|_| |_|\__,_|_| \__, | /_/ \_\_| |_|\__,_|_|\__, |___/_|___/ |_| |_|\__,_|_| |_|\__|_|_| |_|\__, |
    |___/ |___/ |___/
    """

    def credential_file():
    credentials_response_locations = [
    "/etc/carbonblack/credentials.response",
    f"{os.path.dirname(__file__)}/.carbonblack/credentials.response",
    f"{os.getcwd()}/.carbonblack/credentials.response"
    ]

    for p in credentials_response_locations:
    location = ""
    if exists(path=p):
    location = p
    else:
    pass

    return location


    def config_reader(file_location):
    config = configparser.ConfigParser()
    config.sections()
    config.read(file_location)

    return config

    def get_binaries(base_url, auth_token, query="*", rows="100000"):
    binary_url = f"{base_url}/api/v1/binary?q={query}&rows={rows}"

    headers = {
    "X-Auth-Token":f"{auth_token}"
    }

    binaries = requests.get(binary_url, headers=headers)

    if binaries.status_code == 200:
    return binaries
    else:
    sys.exit(err_Col + f"[!][!] We encountered a problem with the query to Carbon Black. The status code returned was {binaries.status_code}")
    return None

    def parse_results(results, base_url, subdomain):

    results_dict = []
    #for binary in results:
    # print(binary)
    for binary in results.json()["results"]:
    #for detection in detection_dict[subdomain].values():
    try:
    deets = dict()

    dig_sig = ''
    if "digsig_publisher" in binary:
    dig_sig = binary['digsig_publisher']

    observed_filename_list = ''
    for observed in binary['observed_filename']:
    observed_filename_list += observed.replace("\\\\", "\\") + "\n"

    deets = {
    "shortname":subdomain,
    "md5":binary["md5"],
    "signature_status":binary["signed"],
    "company_name":binary["company_name"],
    "observed_filename":observed_filename_list,
    "count_observed_filename":int(len(binary["observed_filename"])),
    "original_filename":binary["original_filename"],
    "internal_name":binary["internal_name"],
    "file_desc":binary["file_desc"],
    "server_added_timestamp":binary["server_added_timestamp"],
    "digsig_publisher":dig_sig,
    "os_type":binary["os_type"],
    "host_count":binary["host_count"],
    "is_executable_image":binary["is_executable_image"],
    "url":f'{base_url}/#/binary/{binary["md5"]}'
    }

    results_dict.append(deets)
    except Exception as e:
    sys.exit(err_Col + f"[!] We encountered an issue trying to parse out your results. This is the error encountered {e}." + no_col)

    return results_dict

    def write_csv(output, results):

    header = ['shortname', 'md5', 'signature_status', 'company_name', 'observed_filename', 'count_observed_filename', 'original_filename', 'internal_name', 'file_desc', 'server_added_timestamp', 'digsig_publisher', 'os_type', 'host_count', 'is_executable_image', 'url']

    with open(output, 'w', encoding='UTF8', newline='') as f:
    writer = csv.DictWriter(f, fieldnames=header)
    writer.writeheader()
    writer.writerows(results)


    def main():
    parser = argparse.ArgumentParser(description="Script to hunt through binaries of a single Carbon Black Response customer.")

    parser.add_argument("--subdomain", "--d", type=str, required=False, help="Subdomain you would like to hunt through")
    parser.add_argument("--config", "--c", type=str, required=False, help="location of .ini file with the credentials required.")
    parser.add_argument("--output", "--o", required=False, help="Where do you want to store the CSV file of results?")
    parser.add_argument("--query", "--q", required=False, help="Specify a specific query you'd like to search. By default it'll return everything.")
    parser.add_argument("--rows", "--r", required=False, help="Specify the number of rows you'd like outputted. By default it'll return up to 100,000.")
    parser.add_argument("--no-color", action='store_true')

    args = parser.parse_args()

    if args.no_color == True:
    err_Col = no_col
    success_Col = no_col
    warn_Col = no_col

    else:
    err_Col = '\033[91m'
    success_Col = '\033[92m'
    warn_Col = '\033[93m'

    #display ascii art
    print(err_Col + birb)
    print(success_Col + binary_hunt_ascii + no_col)

    #get credential file
    if args.config:
    cred_file = args.config
    elif credential_file() != "":
    cred_file = credential_file()
    else:
    cred_file = input(warn_Col + "[!] We couldn't find a credential .ini file in any standard location. Please enter a valid configuration file path:" + no_col)

    #checks to see if a subdomain was provided
    if args.subdomain:
    subdomain = args.subdomain
    else:
    subdomain = input(warn_Col + "[!] No subdomain provided. Please enter a subdomain that is included in your credential file: " + no_col)

    #check if output location was given
    if args.output:
    output = args.output
    else:
    output = os.getcwd() + "/binary_analysis.csv"
    print(warn_Col + "[*] No output file provided. The output file will be in the current working directory and named binary_analysis.csv" + no_col)



    #check the config file to make sure it has the appropriate subdomain and format
    config = config_reader(cred_file)

    try:
    base_url = config[subdomain]['url']
    auth_token = config[subdomain]['token']
    except Exception as e:
    sys.exit(err_Col + "[!][!] The subodmain you provided is not in the credentials.response file. Please add it.")

    if args.query and args.rows:
    binaries = get_binaries(base_url, auth_token, query=args.query, rows=args.rows)
    elif args.query:
    binaries = get_binaries(base_url, auth_token, query=args.query)
    elif args.rows:
    binaries = get_binaries(base_url, auth_token, rows=args.rows)
    else:
    print(success_Col + "[*] Getting list of binaries from Carbon Black")
    binaries = get_binaries(base_url, auth_token)

    print(success_Col + f"[*] There were a total of {int(len(binaries.json()['results']))} results returned")
    print(success_Col + f"[*] Parsing the results to {output}")
    if binaries:
    parsed_results = parse_results(binaries, base_url, subdomain)
    write_csv(output, parsed_results)
    else:
    sys.exit(err_Col + "We've encountered a problem.")


    if __name__ == "__main__":
    sys.exit(main())