|
|
@@ -0,0 +1,219 @@ |
|
|
import argparse |
|
|
import os |
|
|
import configparser |
|
|
import csv |
|
|
import sys |
|
|
from os.path import exists |
|
|
import requests |
|
|
|
|
|
#Console Output coloring. Makes knowing if you have any errors/ warnings easier to identify |
|
|
err_Col = '\033[91m' |
|
|
success_Col = '\033[92m' |
|
|
warn_Col = '\033[93m' |
|
|
no_col = '\033[0m' |
|
|
|
|
|
|
|
|
birb=""" |
|
|
_.----._ |
|
|
,'.::.--..:._ |
|
|
/::/_,-<o)::;_`-._ |
|
|
::::::::`-';'`,--`-` |
|
|
;::;'|::::,',' |
|
|
,'::/ ;:::/, :. |
|
|
/,':/ /::;' \ ':\\ |
|
|
:'.:: ,-'' . `.::\\ |
|
|
\.:;':. ` :: .: |
|
|
(;' ;;; .::' :| |
|
|
\,:;; \ `::.\.\\ |
|
|
`);' '::' `: |
|
|
\. ` `' .: _,' |
|
|
`.: .. -. ' :. :/ _.-' _.- |
|
|
>;._.:._.;,-=_(.-' __ `._ |
|
|
,;' _..-(((('' .,-'' `-._ |
|
|
_,'<.-'' _..``'.'`-'`. ` |
|
|
_.-((((_..--'' \ \ `.`. |
|
|
-' _.``' \ ` |
|
|
-------------------------------------------------- |
|
|
""" |
|
|
|
|
|
|
|
|
binary_hunt_ascii=""" |
|
|
____ _ _ _ _ _ _ _ _ |
|
|
| __ )(_)_ __ __ _ _ __ _ _ / \ _ __ __ _| |_ _ ___(_)___ | | | |_ _ _ __ | |_(_)_ __ __ _ |
|
|
| _ \| | '_ \ / _` | '__| | | | / _ \ | '_ \ / _` | | | | / __| / __| | |_| | | | | '_ \| __| | '_ \ / _` | |
|
|
| |_) | | | | | (_| | | | |_| | / ___ \| | | | (_| | | |_| \__ \ \__ \ | _ | |_| | | | | |_| | | | | (_| | |
|
|
|____/|_|_| |_|\__,_|_| \__, | /_/ \_\_| |_|\__,_|_|\__, |___/_|___/ |_| |_|\__,_|_| |_|\__|_|_| |_|\__, | |
|
|
|___/ |___/ |___/ |
|
|
""" |
|
|
|
|
|
def credential_file(): |
|
|
credentials_response_locations = [ |
|
|
"/etc/carbonblack/credentials.response", |
|
|
f"{os.path.dirname(__file__)}/.carbonblack/credentials.response", |
|
|
f"{os.getcwd()}/.carbonblack/credentials.response" |
|
|
] |
|
|
|
|
|
for p in credentials_response_locations: |
|
|
location = "" |
|
|
if exists(path=p): |
|
|
location = p |
|
|
else: |
|
|
pass |
|
|
|
|
|
return location |
|
|
|
|
|
|
|
|
def config_reader(file_location): |
|
|
config = configparser.ConfigParser() |
|
|
config.sections() |
|
|
config.read(file_location) |
|
|
|
|
|
return config |
|
|
|
|
|
def get_binaries(base_url, auth_token, query="*", rows="100000"): |
|
|
binary_url = f"{base_url}/api/v1/binary?q={query}&rows={rows}" |
|
|
|
|
|
headers = { |
|
|
"X-Auth-Token":f"{auth_token}" |
|
|
} |
|
|
|
|
|
binaries = requests.get(binary_url, headers=headers) |
|
|
|
|
|
if binaries.status_code == 200: |
|
|
return binaries |
|
|
else: |
|
|
sys.exit(err_Col + f"[!][!] We encountered a problem with the query to Carbon Black. The status code returned was {binaries.status_code}") |
|
|
return None |
|
|
|
|
|
def parse_results(results, base_url, subdomain): |
|
|
|
|
|
results_dict = [] |
|
|
#for binary in results: |
|
|
# print(binary) |
|
|
for binary in results.json()["results"]: |
|
|
#for detection in detection_dict[subdomain].values(): |
|
|
try: |
|
|
deets = dict() |
|
|
|
|
|
dig_sig = '' |
|
|
if "digsig_publisher" in binary: |
|
|
dig_sig = binary['digsig_publisher'] |
|
|
|
|
|
observed_filename_list = '' |
|
|
for observed in binary['observed_filename']: |
|
|
observed_filename_list += observed.replace("\\\\", "\\") + "\n" |
|
|
|
|
|
deets = { |
|
|
"shortname":subdomain, |
|
|
"md5":binary["md5"], |
|
|
"signature_status":binary["signed"], |
|
|
"company_name":binary["company_name"], |
|
|
"observed_filename":observed_filename_list, |
|
|
"count_observed_filename":int(len(binary["observed_filename"])), |
|
|
"original_filename":binary["original_filename"], |
|
|
"internal_name":binary["internal_name"], |
|
|
"file_desc":binary["file_desc"], |
|
|
"server_added_timestamp":binary["server_added_timestamp"], |
|
|
"digsig_publisher":dig_sig, |
|
|
"os_type":binary["os_type"], |
|
|
"host_count":binary["host_count"], |
|
|
"is_executable_image":binary["is_executable_image"], |
|
|
"url":f'{base_url}/#/binary/{binary["md5"]}' |
|
|
} |
|
|
|
|
|
results_dict.append(deets) |
|
|
except Exception as e: |
|
|
sys.exit(err_Col + f"[!] We encountered an issue trying to parse out your results. This is the error encountered {e}." + no_col) |
|
|
|
|
|
return results_dict |
|
|
|
|
|
def write_csv(output, results): |
|
|
|
|
|
header = ['shortname', 'md5', 'signature_status', 'company_name', 'observed_filename', 'count_observed_filename', 'original_filename', 'internal_name', 'file_desc', 'server_added_timestamp', 'digsig_publisher', 'os_type', 'host_count', 'is_executable_image', 'url'] |
|
|
|
|
|
with open(output, 'w', encoding='UTF8', newline='') as f: |
|
|
writer = csv.DictWriter(f, fieldnames=header) |
|
|
writer.writeheader() |
|
|
writer.writerows(results) |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="Script to hunt through binaries of a single Carbon Black Response customer.") |
|
|
|
|
|
parser.add_argument("--subdomain", "--d", type=str, required=False, help="Subdomain you would like to hunt through") |
|
|
parser.add_argument("--config", "--c", type=str, required=False, help="location of .ini file with the credentials required.") |
|
|
parser.add_argument("--output", "--o", required=False, help="Where do you want to store the CSV file of results?") |
|
|
parser.add_argument("--query", "--q", required=False, help="Specify a specific query you'd like to search. By default it'll return everything.") |
|
|
parser.add_argument("--rows", "--r", required=False, help="Specify the number of rows you'd like outputted. By default it'll return up to 100,000.") |
|
|
parser.add_argument("--no-color", action='store_true') |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
if args.no_color == True: |
|
|
err_Col = no_col |
|
|
success_Col = no_col |
|
|
warn_Col = no_col |
|
|
|
|
|
else: |
|
|
err_Col = '\033[91m' |
|
|
success_Col = '\033[92m' |
|
|
warn_Col = '\033[93m' |
|
|
|
|
|
#display ascii art |
|
|
print(err_Col + birb) |
|
|
print(success_Col + binary_hunt_ascii + no_col) |
|
|
|
|
|
#get credential file |
|
|
if args.config: |
|
|
cred_file = args.config |
|
|
elif credential_file() != "": |
|
|
cred_file = credential_file() |
|
|
else: |
|
|
cred_file = input(warn_Col + "[!] We couldn't find a credential .ini file in any standard location. Please enter a valid configuration file path:" + no_col) |
|
|
|
|
|
#checks to see if a subdomain was provided |
|
|
if args.subdomain: |
|
|
subdomain = args.subdomain |
|
|
else: |
|
|
subdomain = input(warn_Col + "[!] No subdomain provided. Please enter a subdomain that is included in your credential file: " + no_col) |
|
|
|
|
|
#check if output location was given |
|
|
if args.output: |
|
|
output = args.output |
|
|
else: |
|
|
output = os.getcwd() + "/binary_analysis.csv" |
|
|
print(warn_Col + "[*] No output file provided. The output file will be in the current working directory and named binary_analysis.csv" + no_col) |
|
|
|
|
|
|
|
|
|
|
|
#check the config file to make sure it has the appropriate subdomain and format |
|
|
config = config_reader(cred_file) |
|
|
|
|
|
try: |
|
|
base_url = config[subdomain]['url'] |
|
|
auth_token = config[subdomain]['token'] |
|
|
except Exception as e: |
|
|
sys.exit(err_Col + "[!][!] The subodmain you provided is not in the credentials.response file. Please add it.") |
|
|
|
|
|
if args.query and args.rows: |
|
|
binaries = get_binaries(base_url, auth_token, query=args.query, rows=args.rows) |
|
|
elif args.query: |
|
|
binaries = get_binaries(base_url, auth_token, query=args.query) |
|
|
elif args.rows: |
|
|
binaries = get_binaries(base_url, auth_token, rows=args.rows) |
|
|
else: |
|
|
print(success_Col + "[*] Getting list of binaries from Carbon Black") |
|
|
binaries = get_binaries(base_url, auth_token) |
|
|
|
|
|
print(success_Col + f"[*] There were a total of {int(len(binaries.json()['results']))} results returned") |
|
|
print(success_Col + f"[*] Parsing the results to {output}") |
|
|
if binaries: |
|
|
parsed_results = parse_results(binaries, base_url, subdomain) |
|
|
write_csv(output, parsed_results) |
|
|
else: |
|
|
sys.exit(err_Col + "We've encountered a problem.") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
sys.exit(main()) |
|
|
|