Skip to content

Instantly share code, notes, and snippets.

@dust-life
Forked from aconite33/bloodhoundce_import.py
Last active February 22, 2024 12:45
Show Gist options
  • Save dust-life/3bd83f1ceb8c65af15a4a668d6d2cd08 to your computer and use it in GitHub Desktop.
Save dust-life/3bd83f1ceb8c65af15a4a668d6d2cd08 to your computer and use it in GitHub Desktop.
Import large files into BloodHound CE Edition
import requests
import json
import time
import argparse
import getpass
import os
import sys
import glob
def main():
parser = argparse.ArgumentParser(description="Command line upload of large files into BloodHound CE edition")
parser.add_argument('-d', '--directory', type=str, help="Directory of JSON files you want to upload.", required=True)
parser.add_argument('-u', '--user', type=str, help="Username to login with.", required=True)
args = parser.parse_args()
if args.directory:
if not os.path.exists(args.directory):
sys.exit("Directory path doesn't exist.")
if args.user:
password = getpass.getpass(f"Enter password for {args.user}:")
print("Logging in...")
loginPayload = {
"login_method": "secret",
"secret": password,
"username": args.user
}
response = requests.post('http://localhost:8080/api/v2/login', json=loginPayload)
response_content = response.content.decode('utf-8')
json_obj = json.loads(response_content)
session_token = json_obj['data']['session_token']
print("Successful login.")
# Get all JSON files in the directory
json_files = glob.glob(os.path.join(args.directory, '*.json'))
for json_file in json_files:
print(f"Uploading file {json_file}...")
print("Requesting upload job...")
headers = {
'Accept': 'application/json, text/plain, */*',
'Authorization': 'Bearer {0}'.format(session_token)
}
response = requests.post('http://localhost:8080/api/v2/file-upload/start', headers=headers)
response_content = response.content.decode('utf-8')
json_obj = json.loads(response_content)
uploadID = json_obj['data']['id']
print("Created login job: {0}".format(uploadID))
print("Parsing file {0}".format(json_file))
headers = {
'accept': '*/*',
'Authorization': 'Bearer {0}'.format(session_token),
'Content-Type': 'application/x-www-form-urlencoded',
}
with open(json_file, mode='rb') as f:
data = f.read()
print("Finished loading file.")
print("Uploading file...")
response = requests.post('http://localhost:8080/api/v2/file-upload/{0}'.format(uploadID), headers=headers, data=data)
print("Upload complete.")
print("Allowing 10 seconds to assert file upload complete...")
time.sleep(10)
print("Ending job...")
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.9,es;q=0.8,fr;q=0.7',
'Authorization': 'Bearer {0}'.format(session_token)
}
response = requests.post('http://localhost:8080/api/v2/file-upload/{0}/end'.format(uploadID), headers=headers)
print(f"Completed uploading file {json_file}. Check BloodHound UI for processing information.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment