Last active
April 16, 2021 00:32
-
-
Save BitTheByte/b88e35864848ab1abeb52742e85e7cb5 to your computer and use it in GitHub Desktop.
Revisions
-
BitTheByte revised this gist
Apr 16, 2021 . 1 changed file with 17 additions and 9 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -4,13 +4,14 @@ import time import threading import os import random requests.packages.urllib3.disable_warnings() acunetix_host = "127.0.0.1" acunetix_port = 3443 acunetix_token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" max_active_scans = 10 pending_targets = set() pending_file_name = "pending.scans.list" @@ -177,7 +178,7 @@ def populate_from_file(name, _list): _list.add(item) return _list threading.Thread(target=lambda: app.run(host="0.0.0.0", port=80)).start() acunetix = Acunetix(host="%s:%i" % (acunetix_host, acunetix_port), api=acunetix_token) if not os.path.isfile(pending_file_name): @@ -186,21 +187,28 @@ def populate_from_file(name, _list): while 1: try: time.sleep(5) populate_from_file(pending_file_name, pending_targets) print(f"[INFO] number of targets in qeueue = {len(pending_targets)}") n_running_scans = len(acunetix.running_scans()['scans']) n_starting_scans = len(acunetix.starting_scans()['scans']) if n_running_scans >= max_active_scans or n_starting_scans >= max_active_scans or len(pending_targets) == 0: print(f"[WARNING] max_active_scans of {max_active_scans} scan(s) is reached or no pending targets") continue temp = pending_targets.copy() try: target = pending_targets.pop() target_id, scan_id, response = acunetix.start_scan(target) print(f"[INFO] started new scan for {target} with target_id={target_id}, scan_id={scan_id}") open(pending_file_name, "w").write('\n'.join(pending_targets)) except Exception as e : print(e) random.shuffle(temp) pending_targets = temp except Exception as e: print(e) -
BitTheByte created this gist
Dec 5, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,206 @@ import json import flask import requests import time import threading import os requests.packages.urllib3.disable_warnings() acunetix_host = "127.0.0.1" acunetix_port = 3443 acunetix_token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" max_active_scans = 10 pending_targets = set() pending_file_name = "pending.scans.list" API_BASE = "/api/v1/" API_SCAN = API_BASE + "scans" API_TARGET = API_BASE + "targets" target_criticality_list = { "critical": "10", "high": "20", "normal": "10", "low": "0", } target_criticality_allowed = list(target_criticality_list.keys()) scan_profiles_list = { "full_scan": "11111111-1111-1111-1111-111111111111", "high_risk_vuln": "11111111-1111-1111-1111-111111111112", "xss_vuln": "11111111-1111-1111-1111-111111111116", "sql_injection_vuln": "11111111-1111-1111-1111-111111111113", "weak_passwords": "11111111-1111-1111-1111-111111111115", "crawl_only": "11111111-1111-1111-1111-111111111117", } scan_profiles_allowed = list(scan_profiles_list.keys()) class AXException(Exception): HTTP_ERROR = "httpError" AUTH_ERROR = "authError" SERVER_RESOURCE = "serverResource" NOT_ALLOWED_CRITICYLITY_PROFILE = "Criticallity not found" NOT_ALLOWED_SCAN_PROFILE = "Scan Profile not found" JSON_PARSING_ERROR = "Decoding JSON has failed" def __init__(self, key, message): Exception.__init__(self, message) self.key = key class Acunetix(object): def __init__(self, host=None, api=None, timeout=20): self.apikey = api self.host = str( "{}{}".format("https://" if "https://" not in host else "", host) ) self.timeout = timeout self.headers = { "X-Auth": self.apikey, "content-type": "application/json", "User-Agent": "Acunetix", } self.authenticated = self.__is_connected() if not self.authenticated: raise AXException("AUTH_ERROR", "Wrong API Key !") def __json_return(self, data): try: return json.loads(data) except Exception as e: raise AXException("JSON_PARSING_ERROR", f"Json Parsing has occured: {e}") def __send_request(self, method="get", endpoint="", data=None): request_call = getattr(requests, method) url = str("{}{}".format(self.host, endpoint if endpoint else "/")) try: request = request_call( url, headers=self.headers, timeout=self.timeout, data=json.dumps(data), verify=False, ) if request.status_code == 403: raise AXException("HTTP_ERROR", f"HTTP ERROR OCCURED: {request.text}") return self.__json_return(request.text) except Exception as e: raise AXException("HTTP_ERROR", f"HTTP ERROR OCCURED: {e}") def __is_connected(self): return False if 'Unauthorized' in str(self.info()) else True def info(self): return self.__send_request(method="get", endpoint="/api/v1/info") def targets(self): return self.__send_request( method="get", endpoint=f"{API_TARGET}?pagination=200" ) def add_target(self, target="", criticality="normal"): if criticality not in target_criticality_allowed: raise AXException("NOT_ALLOWED_CRITICYLITY_PROFILE", "Criticallity not found allowed values {}".format(str(list(target_criticality_allowed)))) target_address = ( target if "http://" in target or "https://" in target else "http://{}".format(target) ) data = { "address": str(target_address), "description": "Sent from Acunetix-Python", "criticality": target_criticality_list[criticality], } return self.__send_request(method="post", endpoint=API_TARGET, data=data) def delete_target(self, target_id): try: return self.__send_request( method="delete", endpoint=f"{API_TARGET}/{target_id}" ) except: pass def delete_all_targets(self): while True: targets = self.targets() if len(targets["targets"]): for target in targets["targets"]: self.delete_target(target["target_id"]) else: break def running_scans(self): return self.__send_request(method="get", endpoint=f"{API_SCAN}?l=100&q=status:processing;") def starting_scans(self): return self.__send_request(method="get", endpoint=f"{API_SCAN}?l=100&q=status:starting;") def start_scan(self, address=None, target_id=None, scan_profile="full_scan"): if scan_profile not in scan_profiles_allowed: raise AXException("NOT_ALLOWED_SCAN_PROFILE", "Scan Profile not found allowed values {}".format(str(list(scan_profiles_allowed)))) if address and not target_id: target_id = self.add_target(target=address)["target_id"] scan_payload = { "target_id": str(target_id), "profile_id": scan_profiles_list[scan_profile], "schedule": {"disable": False, "start_date": None, "time_sensitive": False}, } return target_id, scan_profile, self.__send_request(method="post", endpoint=API_SCAN, data=scan_payload) app = flask.Flask(import_name='') @app.route("/add_target/<host>") def add_interface(host): global pending_targets pending_targets.add(host) return f"Added {host} to pending targets queue. len = {len(pending_targets)}" @app.route("/show_targets") def target_interface(): global pending_targets return '\n'.join(pending_targets) def populate_from_file(name, _list): data = [target.strip() for target in open(name, "r").read().split("\n") if target.strip()] for item in data: _list.add(item) return _list threading.Thread(target=lambda: app.run(host="0.0.0.0", port=9999)).start() acunetix = Acunetix(host="%s:%i" % (acunetix_host, acunetix_port), api=acunetix_token) if not os.path.isfile(pending_file_name): open(pending_file_name, "w").write("") while 1: try: time.sleep(10) populate_from_file(pending_file_name, pending_targets) print(f"[INFO] number of targets in qeueue = {len(pending_targets)}") n_running_scans = len(acunetix.running_scans()['scans']) n_starting_scans = len(acunetix.starting_scans()['scans']) if n_running_scans >= max_active_scans or n_starting_scans >= max_active_scans or len(pending_targets) == 0: print(f"[WARNING] max_active_scans of {max_active_scans} scan(s) is reached or no pending targets") continue target = pending_targets.pop() target_id, scan_id, response = acunetix.start_scan(target) print(f"[INFO] started new scan for {target} with target_id={target_id}, scan_id={scan_id}") open(pending_file_name, "w").write('\n'.join(pending_targets)) except Exception as e: print(e)