Created
October 27, 2025 15:06
-
-
Save williballenthin/ff8f6cac32ae4cd70239d543fdb09eac to your computer and use it in GitHub Desktop.
triage decompile_function for IDA MCP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import sys | |
| if sys.version_info < (3, 11): | |
| raise RuntimeError("Python 3.11 or higher is required for the MCP plugin") | |
| import json | |
| import struct | |
| import threading | |
| import http.server | |
| from urllib.parse import urlparse | |
| from typing import ( | |
| Any, | |
| Callable, | |
| get_type_hints, | |
| TypedDict, | |
| Optional, | |
| Annotated, | |
| TypeVar, | |
| Generic, | |
| NotRequired, | |
| overload, | |
| Literal, | |
| ) | |
| class JSONRPCError(Exception): | |
| def __init__(self, code: int, message: str, data: Any = None): | |
| self.code = code | |
| self.message = message | |
| self.data = data | |
| class RPCRegistry: | |
| def __init__(self): | |
| self.methods: dict[str, Callable] = {} | |
| self.unsafe: set[str] = set() | |
| def register(self, func: Callable) -> Callable: | |
| self.methods[func.__name__] = func | |
| return func | |
| def mark_unsafe(self, func: Callable) -> Callable: | |
| self.unsafe.add(func.__name__) | |
| return func | |
| def dispatch(self, method: str, params: Any) -> Any: | |
| if method not in self.methods: | |
| raise JSONRPCError(-32601, f"Method '{method}' not found") | |
| func = self.methods[method] | |
| hints = get_type_hints(func) | |
| # Remove return annotation if present | |
| hints.pop("return", None) | |
| if isinstance(params, list): | |
| if len(params) != len(hints): | |
| raise JSONRPCError(-32602, f"Invalid params: expected {len(hints)} arguments, got {len(params)}") | |
| # Validate and convert parameters | |
| converted_params = [] | |
| for value, (param_name, expected_type) in zip(params, hints.items()): | |
| try: | |
| if not isinstance(value, expected_type): | |
| value = expected_type(value) | |
| converted_params.append(value) | |
| except (ValueError, TypeError): | |
| raise JSONRPCError(-32602, f"Invalid type for parameter '{param_name}': expected {expected_type.__name__}") | |
| return func(*converted_params) | |
| elif isinstance(params, dict): | |
| if set(params.keys()) != set(hints.keys()): | |
| raise JSONRPCError(-32602, f"Invalid params: expected {list(hints.keys())}") | |
| # Validate and convert parameters | |
| converted_params = {} | |
| for param_name, expected_type in hints.items(): | |
| value = params.get(param_name) | |
| try: | |
| if not isinstance(value, expected_type): | |
| value = expected_type(value) | |
| converted_params[param_name] = value | |
| except (ValueError, TypeError): | |
| raise JSONRPCError(-32602, f"Invalid type for parameter '{param_name}': expected {expected_type.__name__}") | |
| return func(**converted_params) | |
| else: | |
| raise JSONRPCError(-32600, "Invalid Request: params must be array or object") | |
| rpc_registry = RPCRegistry() | |
| def jsonrpc(func: Callable) -> Callable: | |
| """Decorator to register a function as a JSON-RPC method""" | |
| global rpc_registry | |
| return rpc_registry.register(func) | |
| def unsafe(func: Callable) -> Callable: | |
| """Decorator to register mark a function as unsafe""" | |
| return rpc_registry.mark_unsafe(func) | |
| class JSONRPCRequestHandler(http.server.BaseHTTPRequestHandler): | |
| def send_jsonrpc_error(self, code: int, message: str, id: Any = None): | |
| response = { | |
| "jsonrpc": "2.0", | |
| "error": { | |
| "code": code, | |
| "message": message | |
| } | |
| } | |
| if id is not None: | |
| response["id"] = id | |
| response_body = json.dumps(response).encode("utf-8") | |
| self.send_response(200) | |
| self.send_header("Content-Type", "application/json") | |
| self.send_header("Content-Length", str(len(response_body))) | |
| self.end_headers() | |
| self.wfile.write(response_body) | |
| def do_POST(self): | |
| global rpc_registry | |
| parsed_path = urlparse(self.path) | |
| if parsed_path.path != "/mcp": | |
| self.send_jsonrpc_error(-32098, "Invalid endpoint", None) | |
| return | |
| content_length = int(self.headers.get("Content-Length", 0)) | |
| if content_length == 0: | |
| self.send_jsonrpc_error(-32700, "Parse error: missing request body", None) | |
| return | |
| request_body = self.rfile.read(content_length) | |
| try: | |
| request = json.loads(request_body) | |
| except json.JSONDecodeError: | |
| self.send_jsonrpc_error(-32700, "Parse error: invalid JSON", None) | |
| return | |
| # Prepare the response | |
| response: dict[str, Any] = { | |
| "jsonrpc": "2.0" | |
| } | |
| if request.get("id") is not None: | |
| response["id"] = request.get("id") | |
| try: | |
| # Basic JSON-RPC validation | |
| if not isinstance(request, dict): | |
| raise JSONRPCError(-32600, "Invalid Request") | |
| if request.get("jsonrpc") != "2.0": | |
| raise JSONRPCError(-32600, "Invalid JSON-RPC version") | |
| if "method" not in request: | |
| raise JSONRPCError(-32600, "Method not specified") | |
| # Dispatch the method | |
| result = rpc_registry.dispatch(request["method"], request.get("params", [])) | |
| response["result"] = result | |
| except JSONRPCError as e: | |
| response["error"] = { | |
| "code": e.code, | |
| "message": e.message | |
| } | |
| if e.data is not None: | |
| response["error"]["data"] = e.data | |
| except IDAError as e: | |
| response["error"] = { | |
| "code": -32000, | |
| "message": e.message, | |
| } | |
| except Exception as e: | |
| traceback.print_exc() | |
| response["error"] = { | |
| "code": -32603, | |
| "message": "Internal error (please report a bug)", | |
| "data": traceback.format_exc(), | |
| } | |
| try: | |
| response_body = json.dumps(response).encode("utf-8") | |
| except Exception as e: | |
| traceback.print_exc() | |
| response_body = json.dumps({ | |
| "error": { | |
| "code": -32603, | |
| "message": "Internal error (please report a bug)", | |
| "data": traceback.format_exc(), | |
| } | |
| }).encode("utf-8") | |
| self.send_response(200) | |
| self.send_header("Content-Type", "application/json") | |
| self.send_header("Content-Length", str(len(response_body))) | |
| self.end_headers() | |
| self.wfile.write(response_body) | |
| def log_message(self, format, *args): | |
| # Suppress logging | |
| pass | |
| class MCPHTTPServer(http.server.HTTPServer): | |
| allow_reuse_address = False | |
| class Server: | |
| HOST = "localhost" | |
| PORT = 13337 | |
| def __init__(self): | |
| self.server = None | |
| self.server_thread = None | |
| self.running = False | |
| def start(self): | |
| if self.running: | |
| print("[MCP] Server is already running") | |
| return | |
| self.server_thread = threading.Thread(target=self._run_server, daemon=True) | |
| self.running = True | |
| self.server_thread.start() | |
| def stop(self): | |
| if not self.running: | |
| return | |
| self.running = False | |
| if self.server: | |
| self.server.shutdown() | |
| self.server.server_close() | |
| if self.server_thread: | |
| self.server_thread.join() | |
| self.server = None | |
| print("[MCP] Server stopped") | |
| def _run_server(self): | |
| try: | |
| # Create server in the thread to handle binding | |
| self.server = MCPHTTPServer((Server.HOST, Server.PORT), JSONRPCRequestHandler) | |
| print(f"[MCP] Server started at http://{Server.HOST}:{Server.PORT}") | |
| self.server.serve_forever() | |
| except OSError as e: | |
| if e.errno == 98 or e.errno == 10048: # Port already in use (Linux/Windows) | |
| print("[MCP] Error: Port 13337 is already in use") | |
| else: | |
| print(f"[MCP] Server error: {e}") | |
| self.running = False | |
| except Exception as e: | |
| print(f"[MCP] Server error: {e}") | |
| finally: | |
| self.running = False | |
| # A module that helps with writing thread safe ida code. | |
| # Based on: | |
| # https://web.archive.org/web/20160305190440/http://www.williballenthin.com/blog/2015/09/04/idapython-synchronization-decorator/ | |
| import logging | |
| import queue | |
| import traceback | |
| import functools | |
| from enum import IntEnum, IntFlag | |
| import ida_hexrays | |
| import ida_kernwin | |
| import ida_funcs | |
| import ida_gdl | |
| import ida_lines | |
| import ida_idaapi | |
| import idc | |
| import idaapi | |
| import idautils | |
| import ida_nalt | |
| import ida_bytes | |
| import ida_typeinf | |
| import ida_xref | |
| import ida_entry | |
| import idautils | |
| import ida_idd | |
| import ida_dbg | |
| import ida_name | |
| import ida_ida | |
| import ida_frame | |
| ida_major, ida_minor = map(int, idaapi.get_kernel_version().split(".")) | |
| class IDAError(Exception): | |
| def __init__(self, message: str): | |
| super().__init__(message) | |
| @property | |
| def message(self) -> str: | |
| return self.args[0] | |
| class IDASyncError(Exception): | |
| pass | |
| # Important note: Always make sure the return value from your function f is a | |
| # copy of the data you have gotten from IDA, and not the original data. | |
| # | |
| # Example: | |
| # -------- | |
| # | |
| # Do this: | |
| # | |
| # @idaread | |
| # def ts_Functions(): | |
| # return list(idautils.Functions()) | |
| # | |
| # Don't do this: | |
| # | |
| # @idaread | |
| # def ts_Functions(): | |
| # return idautils.Functions() | |
| # | |
| logger = logging.getLogger(__name__) | |
| # Enum for safety modes. Higher means safer: | |
| class IDASafety(IntEnum): | |
| SAFE_NONE = ida_kernwin.MFF_FAST | |
| SAFE_READ = ida_kernwin.MFF_READ | |
| SAFE_WRITE = ida_kernwin.MFF_WRITE | |
| call_stack = queue.LifoQueue() | |
| def sync_wrapper(ff, safety_mode: IDASafety): | |
| """ | |
| Call a function ff with a specific IDA safety_mode. | |
| """ | |
| #logger.debug('sync_wrapper: {}, {}'.format(ff.__name__, safety_mode)) | |
| if safety_mode not in [IDASafety.SAFE_READ, IDASafety.SAFE_WRITE]: | |
| error_str = 'Invalid safety mode {} over function {}'\ | |
| .format(safety_mode, ff.__name__) | |
| logger.error(error_str) | |
| raise IDASyncError(error_str) | |
| # No safety level is set up: | |
| res_container = queue.Queue() | |
| def runned(): | |
| #logger.debug('Inside runned') | |
| # Make sure that we are not already inside a sync_wrapper: | |
| if not call_stack.empty(): | |
| last_func_name = call_stack.get() | |
| error_str = ('Call stack is not empty while calling the ' | |
| 'function {} from {}').format(ff.__name__, last_func_name) | |
| #logger.error(error_str) | |
| raise IDASyncError(error_str) | |
| call_stack.put((ff.__name__)) | |
| try: | |
| res_container.put(ff()) | |
| except Exception as x: | |
| res_container.put(x) | |
| finally: | |
| call_stack.get() | |
| #logger.debug('Finished runned') | |
| ret_val = idaapi.execute_sync(runned, safety_mode) | |
| res = res_container.get() | |
| if isinstance(res, Exception): | |
| raise res | |
| return res | |
| def idawrite(f): | |
| """ | |
| decorator for marking a function as modifying the IDB. | |
| schedules a request to be made in the main IDA loop to avoid IDB corruption. | |
| """ | |
| @functools.wraps(f) | |
| def wrapper(*args, **kwargs): | |
| ff = functools.partial(f, *args, **kwargs) | |
| ff.__name__ = f.__name__ # type: ignore | |
| return sync_wrapper(ff, idaapi.MFF_WRITE) | |
| return wrapper | |
| def idaread(f): | |
| """ | |
| decorator for marking a function as reading from the IDB. | |
| schedules a request to be made in the main IDA loop to avoid | |
| inconsistent results. | |
| MFF_READ constant via: http://www.openrce.org/forums/posts/1827 | |
| """ | |
| @functools.wraps(f) | |
| def wrapper(*args, **kwargs): | |
| ff = functools.partial(f, *args, **kwargs) | |
| ff.__name__ = f.__name__ # type: ignore | |
| return sync_wrapper(ff, idaapi.MFF_READ) | |
| return wrapper | |
| def is_window_active(): | |
| """Returns whether IDA is currently active""" | |
| try: | |
| from PyQt5.QtWidgets import QApplication | |
| except ImportError: | |
| return False | |
| app = QApplication.instance() | |
| if app is None: | |
| return False | |
| for widget in app.topLevelWidgets(): | |
| if widget.isActiveWindow(): | |
| return True | |
| return False | |
| class Metadata(TypedDict): | |
| path: str | |
| module: str | |
| base: str | |
| size: str | |
| md5: str | |
| sha256: str | |
| crc32: str | |
| filesize: str | |
| def get_image_size() -> int: | |
| try: | |
| # https://www.hex-rays.com/products/ida/support/sdkdoc/structidainfo.html | |
| info = idaapi.get_inf_structure() # type: ignore | |
| omin_ea = info.omin_ea | |
| omax_ea = info.omax_ea | |
| except AttributeError: | |
| import ida_ida | |
| omin_ea = ida_ida.inf_get_omin_ea() | |
| omax_ea = ida_ida.inf_get_omax_ea() | |
| # Bad heuristic for image size (bad if the relocations are the last section) | |
| image_size = omax_ea - omin_ea | |
| # Try to extract it from the PE header | |
| header = idautils.peutils_t().header() | |
| if header and header[:4] == b"PE\0\0": | |
| image_size = struct.unpack("<I", header[0x50:0x54])[0] | |
| return image_size | |
| @jsonrpc | |
| @idaread | |
| def get_metadata() -> Metadata: | |
| """Get metadata about the current IDB""" | |
| # Fat Mach-O binaries can return a None hash: | |
| # https://github.com/mrexodia/ida-pro-mcp/issues/26 | |
| def hash(f): | |
| try: | |
| return f().hex() | |
| except: | |
| return "" | |
| return Metadata(path=idaapi.get_input_file_path(), | |
| module=idaapi.get_root_filename(), | |
| base=hex(idaapi.get_imagebase()), | |
| size=hex(get_image_size()), | |
| md5=hash(ida_nalt.retrieve_input_file_md5), | |
| sha256=hash(ida_nalt.retrieve_input_file_sha256), | |
| crc32=hex(ida_nalt.retrieve_input_file_crc32()), | |
| filesize=hex(ida_nalt.retrieve_input_file_size())) | |
| def get_prototype(fn: ida_funcs.func_t) -> Optional[str]: | |
| try: | |
| prototype: ida_typeinf.tinfo_t = fn.get_prototype() | |
| if prototype is not None: | |
| return str(prototype) | |
| else: | |
| return None | |
| except AttributeError: | |
| try: | |
| return idc.get_type(fn.start_ea) | |
| except: | |
| tif = ida_typeinf.tinfo_t() | |
| if ida_nalt.get_tinfo(tif, fn.start_ea): | |
| return str(tif) | |
| return None | |
| except Exception as e: | |
| print(f"Error getting function prototype: {e}") | |
| return None | |
| class Function(TypedDict): | |
| address: str | |
| name: str | |
| size: str | |
| def parse_address(address: str | int) -> int: | |
| if isinstance(address, int): | |
| return address | |
| try: | |
| return int(address, 0) | |
| except ValueError: | |
| for ch in address: | |
| if ch not in "0123456789abcdefABCDEF": | |
| raise IDAError(f"Failed to parse address: {address}") | |
| raise IDAError(f"Failed to parse address (missing 0x prefix): {address}") | |
| @overload | |
| def get_function(address: int, *, raise_error: Literal[True]) -> Function: ... | |
| @overload | |
| def get_function(address: int) -> Function: ... | |
| @overload | |
| def get_function(address: int, *, raise_error: Literal[False]) -> Optional[Function]: ... | |
| def get_function(address, *, raise_error=True): | |
| fn = idaapi.get_func(address) | |
| if fn is None: | |
| if raise_error: | |
| raise IDAError(f"No function found at address {hex(address)}") | |
| return None | |
| try: | |
| name = fn.get_name() | |
| except AttributeError: | |
| name = ida_funcs.get_func_name(fn.start_ea) | |
| return Function(address=hex(address), name=name, size=hex(fn.end_ea - fn.start_ea)) | |
| DEMANGLED_TO_EA = {} | |
| def create_demangled_to_ea_map(): | |
| for ea in idautils.Functions(): | |
| # Get the function name and demangle it | |
| # MNG_NODEFINIT inhibits everything except the main name | |
| # where default demangling adds the function signature | |
| # and decorators (if any) | |
| demangled = idaapi.demangle_name( | |
| idc.get_name(ea, 0), idaapi.MNG_NODEFINIT) | |
| if demangled: | |
| DEMANGLED_TO_EA[demangled] = ea | |
| def get_type_by_name(type_name: str) -> ida_typeinf.tinfo_t: | |
| # 8-bit integers | |
| if type_name in ('int8', '__int8', 'int8_t', 'char', 'signed char'): | |
| return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT8) | |
| elif type_name in ('uint8', '__uint8', 'uint8_t', 'unsigned char', 'byte', 'BYTE'): | |
| return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT8) | |
| # 16-bit integers | |
| elif type_name in ('int16', '__int16', 'int16_t', 'short', 'short int', 'signed short', 'signed short int'): | |
| return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT16) | |
| elif type_name in ('uint16', '__uint16', 'uint16_t', 'unsigned short', 'unsigned short int', 'word', 'WORD'): | |
| return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT16) | |
| # 32-bit integers | |
| elif type_name in ('int32', '__int32', 'int32_t', 'int', 'signed int', 'long', 'long int', 'signed long', 'signed long int'): | |
| return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT32) | |
| elif type_name in ('uint32', '__uint32', 'uint32_t', 'unsigned int', 'unsigned long', 'unsigned long int', 'dword', 'DWORD'): | |
| return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT32) | |
| # 64-bit integers | |
| elif type_name in ('int64', '__int64', 'int64_t', 'long long', 'long long int', 'signed long long', 'signed long long int'): | |
| return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT64) | |
| elif type_name in ('uint64', '__uint64', 'uint64_t', 'unsigned int64', 'unsigned long long', 'unsigned long long int', 'qword', 'QWORD'): | |
| return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT64) | |
| # 128-bit integers | |
| elif type_name in ('int128', '__int128', 'int128_t', '__int128_t'): | |
| return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT128) | |
| elif type_name in ('uint128', '__uint128', 'uint128_t', '__uint128_t', 'unsigned int128'): | |
| return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT128) | |
| # Floating point types | |
| elif type_name in ('float', ): | |
| return ida_typeinf.tinfo_t(ida_typeinf.BTF_FLOAT) | |
| elif type_name in ('double', ): | |
| return ida_typeinf.tinfo_t(ida_typeinf.BTF_DOUBLE) | |
| elif type_name in ('long double', 'ldouble'): | |
| return ida_typeinf.tinfo_t(ida_typeinf.BTF_LDOUBLE) | |
| # Boolean type | |
| elif type_name in ('bool', '_Bool', 'boolean'): | |
| return ida_typeinf.tinfo_t(ida_typeinf.BTF_BOOL) | |
| # Void type | |
| elif type_name in ('void', ): | |
| return ida_typeinf.tinfo_t(ida_typeinf.BTF_VOID) | |
| # If not a standard type, try to get a named type | |
| tif = ida_typeinf.tinfo_t() | |
| if tif.get_named_type(None, type_name, ida_typeinf.BTF_STRUCT): | |
| return tif | |
| if tif.get_named_type(None, type_name, ida_typeinf.BTF_TYPEDEF): | |
| return tif | |
| if tif.get_named_type(None, type_name, ida_typeinf.BTF_ENUM): | |
| return tif | |
| if tif.get_named_type(None, type_name, ida_typeinf.BTF_UNION): | |
| return tif | |
| if tif := ida_typeinf.tinfo_t(type_name): | |
| return tif | |
| raise IDAError(f"Unable to retrieve {type_name} type info object") | |
| @jsonrpc | |
| @idaread | |
| def get_function_by_name( | |
| name: Annotated[str, "Name of the function to get"] | |
| ) -> Function: | |
| """Get a function by its name""" | |
| function_address = idaapi.get_name_ea(idaapi.BADADDR, name) | |
| if function_address == idaapi.BADADDR: | |
| # If map has not been created yet, create it | |
| if len(DEMANGLED_TO_EA) == 0: | |
| create_demangled_to_ea_map() | |
| # Try to find the function in the map, else raise an error | |
| if name in DEMANGLED_TO_EA: | |
| function_address = DEMANGLED_TO_EA[name] | |
| else: | |
| raise IDAError(f"No function found with name {name}") | |
| return get_function(function_address) | |
| @jsonrpc | |
| @idaread | |
| def get_function_by_address( | |
| address: Annotated[str, "Address of the function to get"], | |
| ) -> Function: | |
| """Get a function by its address""" | |
| return get_function(parse_address(address)) | |
| @jsonrpc | |
| @idaread | |
| def get_current_address() -> str: | |
| """Get the address currently selected by the user""" | |
| return hex(idaapi.get_screen_ea()) | |
| @jsonrpc | |
| @idaread | |
| def get_current_function() -> Optional[Function]: | |
| """Get the function currently selected by the user""" | |
| return get_function(idaapi.get_screen_ea()) | |
| class ConvertedNumber(TypedDict): | |
| decimal: str | |
| hexadecimal: str | |
| bytes: str | |
| ascii: Optional[str] | |
| binary: str | |
| @jsonrpc | |
| def convert_number( | |
| text: Annotated[str, "Textual representation of the number to convert"], | |
| size: Annotated[Optional[int], "Size of the variable in bytes"], | |
| ) -> ConvertedNumber: | |
| """Convert a number (decimal, hexadecimal) to different representations""" | |
| try: | |
| value = int(text, 0) | |
| except ValueError: | |
| raise IDAError(f"Invalid number: {text}") | |
| # Estimate the size of the number | |
| if not size: | |
| size = 0 | |
| n = abs(value) | |
| while n: | |
| size += 1 | |
| n >>= 1 | |
| size += 7 | |
| size //= 8 | |
| # Convert the number to bytes | |
| try: | |
| bytes = value.to_bytes(size, "little", signed=True) | |
| except OverflowError: | |
| raise IDAError(f"Number {text} is too big for {size} bytes") | |
| # Convert the bytes to ASCII | |
| ascii = "" | |
| for byte in bytes.rstrip(b"\x00"): | |
| if byte >= 32 and byte <= 126: | |
| ascii += chr(byte) | |
| else: | |
| ascii = None | |
| break | |
| return ConvertedNumber( | |
| decimal=str(value), | |
| hexadecimal=hex(value), | |
| bytes=bytes.hex(" "), | |
| ascii=ascii, | |
| binary=bin(value), | |
| ) | |
| T = TypeVar("T") | |
| class Page(TypedDict, Generic[T]): | |
| data: list[T] | |
| next_offset: Optional[int] | |
| def paginate(data: list[T], offset: int, count: int) -> Page[T]: | |
| if count == 0: | |
| count = len(data) | |
| next_offset = offset + count | |
| if next_offset >= len(data): | |
| next_offset = None | |
| return { | |
| "data": data[offset:offset + count], | |
| "next_offset": next_offset, | |
| } | |
| def pattern_filter(data: list[T], pattern: str, key: str) -> list[T]: | |
| if not pattern: | |
| return data | |
| # TODO: implement /regex/ matching | |
| def matches(item) -> bool: | |
| return pattern.lower() in item[key].lower() | |
| return list(filter(matches, data)) | |
| @jsonrpc | |
| @idaread | |
| def list_functions( | |
| offset: Annotated[int, "Offset to start listing from (start at 0)"], | |
| count: Annotated[int, "Number of functions to list (100 is a good default, 0 means remainder)"], | |
| ) -> Page[Function]: | |
| """List all functions in the database (paginated)""" | |
| functions = [get_function(address) for address in idautils.Functions()] | |
| return paginate(functions, offset, count) | |
| class Global(TypedDict): | |
| address: str | |
| name: str | |
| @jsonrpc | |
| @idaread | |
| def list_globals_filter( | |
| offset: Annotated[int, "Offset to start listing from (start at 0)"], | |
| count: Annotated[int, "Number of globals to list (100 is a good default, 0 means remainder)"], | |
| filter: Annotated[str, "Filter to apply to the list (required parameter, empty string for no filter). Case-insensitive contains or /regex/ syntax"], | |
| ) -> Page[Global]: | |
| """List matching globals in the database (paginated, filtered)""" | |
| globals: list[Global] = [] | |
| for addr, name in idautils.Names(): | |
| # Skip functions and none | |
| if not idaapi.get_func(addr) or name is None: | |
| globals += [Global(address=hex(addr), name=name)] | |
| globals = pattern_filter(globals, filter, "name") | |
| return paginate(globals, offset, count) | |
| @jsonrpc | |
| def list_globals( | |
| offset: Annotated[int, "Offset to start listing from (start at 0)"], | |
| count: Annotated[int, "Number of globals to list (100 is a good default, 0 means remainder)"], | |
| ) -> Page[Global]: | |
| """List all globals in the database (paginated)""" | |
| return list_globals_filter(offset, count, "") | |
| class Import(TypedDict): | |
| address: str | |
| imported_name: str | |
| module: str | |
| @jsonrpc | |
| @idaread | |
| def list_imports( | |
| offset: Annotated[int, "Offset to start listing from (start at 0)"], | |
| count: Annotated[int, "Number of imports to list (100 is a good default, 0 means remainder)"], | |
| ) -> Page[Import]: | |
| """ List all imported symbols with their name and module (paginated) """ | |
| nimps = ida_nalt.get_import_module_qty() | |
| rv = [] | |
| for i in range(nimps): | |
| module_name = ida_nalt.get_import_module_name(i) | |
| if not module_name: | |
| module_name = "<unnamed>" | |
| def imp_cb(ea, symbol_name, ordinal, acc): | |
| if not symbol_name: | |
| symbol_name = f"#{ordinal}" | |
| acc += [Import(address=hex(ea), imported_name=symbol_name, module=module_name)] | |
| return True | |
| imp_cb_w_context = lambda ea, symbol_name, ordinal: imp_cb(ea, symbol_name, ordinal, rv) | |
| ida_nalt.enum_import_names(i, imp_cb_w_context) | |
| return paginate(rv, offset, count) | |
| class String(TypedDict): | |
| address: str | |
| length: int | |
| string: str | |
| @jsonrpc | |
| @idaread | |
| def list_strings_filter( | |
| offset: Annotated[int, "Offset to start listing from (start at 0)"], | |
| count: Annotated[int, "Number of strings to list (100 is a good default, 0 means remainder)"], | |
| filter: Annotated[str, "Filter to apply to the list (required parameter, empty string for no filter). Case-insensitive contains or /regex/ syntax"], | |
| ) -> Page[String]: | |
| """List matching strings in the database (paginated, filtered)""" | |
| strings: list[String] = [] | |
| for item in idautils.Strings(): | |
| if item is None: | |
| continue | |
| try: | |
| string = str(item) | |
| if string: | |
| strings += [ | |
| String(address=hex(item.ea), length=item.length, string=string), | |
| ] | |
| except: | |
| continue | |
| strings = pattern_filter(strings, filter, "string") | |
| return paginate(strings, offset, count) | |
| @jsonrpc | |
| def list_strings( | |
| offset: Annotated[int, "Offset to start listing from (start at 0)"], | |
| count: Annotated[int, "Number of strings to list (100 is a good default, 0 means remainder)"], | |
| ) -> Page[String]: | |
| """List all strings in the database (paginated)""" | |
| return list_strings_filter(offset, count, "") | |
| @jsonrpc | |
| @idaread | |
| def list_local_types(): | |
| """List all Local types in the database""" | |
| error = ida_hexrays.hexrays_failure_t() | |
| locals = [] | |
| idati = ida_typeinf.get_idati() | |
| type_count = ida_typeinf.get_ordinal_limit(idati) | |
| for ordinal in range(1, type_count): | |
| try: | |
| tif = ida_typeinf.tinfo_t() | |
| if tif.get_numbered_type(idati, ordinal): | |
| type_name = tif.get_type_name() | |
| if not type_name: | |
| type_name = f"<Anonymous Type #{ordinal}>" | |
| locals.append(f"\nType #{ordinal}: {type_name}") | |
| if tif.is_udt(): | |
| c_decl_flags = (ida_typeinf.PRTYPE_MULTI | ida_typeinf.PRTYPE_TYPE | ida_typeinf.PRTYPE_SEMI | ida_typeinf.PRTYPE_DEF | ida_typeinf.PRTYPE_METHODS | ida_typeinf.PRTYPE_OFFSETS) | |
| c_decl_output = tif._print(None, c_decl_flags) | |
| if c_decl_output: | |
| locals.append(f" C declaration:\n{c_decl_output}") | |
| else: | |
| simple_decl = tif._print(None, ida_typeinf.PRTYPE_1LINE | ida_typeinf.PRTYPE_TYPE | ida_typeinf.PRTYPE_SEMI) | |
| if simple_decl: | |
| locals.append(f" Simple declaration:\n{simple_decl}") | |
| else: | |
| message = f"\nType #{ordinal}: Failed to retrieve information." | |
| if error.str: | |
| message += f": {error.str}" | |
| if error.errea != idaapi.BADADDR: | |
| message += f"from (address: {hex(error.errea)})" | |
| raise IDAError(message) | |
| except: | |
| continue | |
| return locals | |
| def decompile_checked(address: int) -> ida_hexrays.cfunc_t: | |
| if not ida_hexrays.init_hexrays_plugin(): | |
| raise IDAError("Hex-Rays decompiler is not available") | |
| error = ida_hexrays.hexrays_failure_t() | |
| cfunc = ida_hexrays.decompile_func(address, error, ida_hexrays.DECOMP_WARNINGS) | |
| if not cfunc: | |
| if error.code == ida_hexrays.MERR_LICENSE: | |
| raise IDAError("Decompiler license is not available. Use `disassemble_function` to get the assembly code instead.") | |
| message = f"Decompilation failed at {hex(address)}" | |
| if error.str: | |
| message += f": {error.str}" | |
| if error.errea != idaapi.BADADDR: | |
| message += f" (address: {hex(error.errea)})" | |
| raise IDAError(message) | |
| return cfunc # type: ignore (this is a SWIG issue) | |
| @jsonrpc | |
| @idaread | |
| def decompile_function( | |
| address: Annotated[str, "Address of the function to decompile"], | |
| ) -> str: | |
| """Decompile a function at the given address""" | |
| start = parse_address(address) | |
| cfunc = decompile_checked(start) | |
| if is_window_active(): | |
| ida_hexrays.open_pseudocode(start, ida_hexrays.OPF_REUSE) | |
| sv = cfunc.get_pseudocode() | |
| pseudocode = "" | |
| for i, sl in enumerate(sv): | |
| sl: ida_kernwin.simpleline_t | |
| item = ida_hexrays.ctree_item_t() | |
| addr = None if i > 0 else cfunc.entry_ea | |
| if cfunc.get_line_item(sl.line, 0, False, None, item, None): # type: ignore (IDA SDK type hint wrong) | |
| dstr: str | None = item.dstr() | |
| if dstr: | |
| ds = dstr.split(": ") | |
| if len(ds) == 2: | |
| try: | |
| addr = int(ds[0], 16) | |
| except ValueError: | |
| pass | |
| line = ida_lines.tag_remove(sl.line) | |
| if len(pseudocode) > 0: | |
| pseudocode += "\n" | |
| if not addr: | |
| pseudocode += f"/* line: {i} */ {line}" | |
| else: | |
| pseudocode += f"/* line: {i}, address: {hex(addr)} */ {line}" | |
| return pseudocode | |
| class DisassemblyLine(TypedDict): | |
| segment: NotRequired[str] | |
| address: str | |
| label: NotRequired[str] | |
| instruction: str | |
| comments: NotRequired[list[str]] | |
| class Argument(TypedDict): | |
| name: str | |
| type: str | |
| class StackFrameVariable(TypedDict): | |
| name: str | |
| offset: str | |
| size: str | |
| type: str | |
| class DisassemblyFunction(TypedDict): | |
| name: str | |
| start_ea: str | |
| return_type: NotRequired[str] | |
| arguments: NotRequired[list[Argument]] | |
| stack_frame: list[StackFrameVariable] | |
| lines: list[DisassemblyLine] | |
| @jsonrpc | |
| @idaread | |
| def disassemble_function( | |
| start_address: Annotated[str, "Address of the function to disassemble"], | |
| ) -> DisassemblyFunction: | |
| """Get assembly code for a function (API-compatible with older IDA builds)""" | |
| start = parse_address(start_address) | |
| func = idaapi.get_func(start) | |
| if not func: | |
| raise IDAError(f"No function found at address {hex(start)}") | |
| if is_window_active(): | |
| ida_kernwin.jumpto(start) | |
| func_name: str = ida_funcs.get_func_name(func.start_ea) or "<unnamed>" | |
| lines: list[DisassemblyLine] = [] | |
| for ea in idautils.FuncItems(func.start_ea): | |
| if ea == idaapi.BADADDR: | |
| continue | |
| seg = idaapi.getseg(ea) | |
| segment: str | None = idaapi.get_segm_name(seg) if seg else None | |
| label: str | None = idc.get_name(ea, 0) | |
| if not label or (label == func_name and ea == func.start_ea): | |
| label = None | |
| comments: list[str] = [] | |
| c: str | None = idaapi.get_cmt(ea, False) | |
| if c: | |
| comments.append(c) | |
| c = idaapi.get_cmt(ea, True) | |
| if c: | |
| comments.append(c) | |
| mnem: str = idc.print_insn_mnem(ea) or "" | |
| ops: list[str] = [] | |
| for n in range(8): | |
| if idc.get_operand_type(ea, n) == idaapi.o_void: | |
| break | |
| ops.append(idc.print_operand(ea, n) or "") | |
| instruction = f"{mnem} {', '.join(ops)}".rstrip() | |
| line: DisassemblyLine = { | |
| "address": hex(ea), | |
| "instruction": instruction | |
| } | |
| if segment: | |
| line["segment"] = segment | |
| if label: | |
| line["label"] = label | |
| if comments: | |
| line["comments"] = comments | |
| lines.append(line) | |
| # prototype and args via tinfo (safe across versions) | |
| rettype = None | |
| args: Optional[list[Argument]] = None | |
| tif = ida_typeinf.tinfo_t() | |
| if ida_nalt.get_tinfo(tif, func.start_ea) and tif.is_func(): | |
| ftd = ida_typeinf.func_type_data_t() | |
| if tif.get_func_details(ftd): | |
| rettype = str(ftd.rettype) | |
| args = [Argument(name=(a.name or f"arg{i}"), type=str(a.type)) | |
| for i, a in enumerate(ftd)] | |
| out: DisassemblyFunction = { | |
| "name": func_name, | |
| "start_ea": hex(func.start_ea), | |
| "stack_frame": get_stack_frame_variables_internal(func.start_ea, False), | |
| "lines": lines, | |
| } | |
| if rettype: | |
| out["return_type"] = rettype | |
| if args is not None: | |
| out["arguments"] = args | |
| return out | |
| class Xref(TypedDict): | |
| address: str | |
| type: str | |
| function: Optional[Function] | |
| @jsonrpc | |
| @idaread | |
| def get_xrefs_to( | |
| address: Annotated[str, "Address to get cross references to"], | |
| ) -> list[Xref]: | |
| """Get all cross references to the given address""" | |
| xrefs = [] | |
| xref: ida_xref.xrefblk_t | |
| for xref in idautils.XrefsTo(parse_address(address)): # type: ignore (IDA SDK type hints are incorrect) | |
| xrefs += [ | |
| Xref(address=hex(xref.frm), | |
| type="code" if xref.iscode else "data", | |
| function=get_function(xref.frm, raise_error=False)) | |
| ] | |
| return xrefs | |
| @jsonrpc | |
| @idaread | |
| def get_xrefs_to_field( | |
| struct_name: Annotated[str, "Name of the struct (type) containing the field"], | |
| field_name: Annotated[str, "Name of the field (member) to get xrefs to"], | |
| ) -> list[Xref]: | |
| """Get all cross references to a named struct field (member)""" | |
| # Get the type library | |
| til = ida_typeinf.get_idati() | |
| if not til: | |
| raise IDAError("Failed to retrieve type library.") | |
| # Get the structure type info | |
| tif = ida_typeinf.tinfo_t() | |
| if not tif.get_named_type(til, struct_name, ida_typeinf.BTF_STRUCT, True, False): | |
| print(f"Structure '{struct_name}' not found.") | |
| return [] | |
| # Get The field index | |
| idx = ida_typeinf.get_udm_by_fullname(None, struct_name + '.' + field_name) # type: ignore (IDA SDK type hints are incorrect) | |
| if idx == -1: | |
| print(f"Field '{field_name}' not found in structure '{struct_name}'.") | |
| return [] | |
| # Get the type identifier | |
| tid = tif.get_udm_tid(idx) | |
| if tid == ida_idaapi.BADADDR: | |
| raise IDAError(f"Unable to get tid for structure '{struct_name}' and field '{field_name}'.") | |
| # Get xrefs to the tid | |
| xrefs = [] | |
| xref: ida_xref.xrefblk_t | |
| for xref in idautils.XrefsTo(tid): # type: ignore (IDA SDK type hints are incorrect) | |
| xrefs += [ | |
| Xref(address=hex(xref.frm), | |
| type="code" if xref.iscode else "data", | |
| function=get_function(xref.frm, raise_error=False)) | |
| ] | |
| return xrefs | |
| @jsonrpc | |
| @idaread | |
| def get_callees( | |
| function_address: Annotated[str, "Address of the function to get callee functions"], | |
| ) -> list[dict[str, str]]: | |
| """Get all the functions called (callees) by the function at function_address""" | |
| func_start = parse_address(function_address) | |
| func = idaapi.get_func(func_start) | |
| if not func: | |
| raise IDAError(f"No function found containing address {function_address}") | |
| func_end = idc.find_func_end(func_start) | |
| callees: list[dict[str, str]] = [] | |
| current_ea = func_start | |
| while current_ea < func_end: | |
| insn = idaapi.insn_t() | |
| idaapi.decode_insn(insn, current_ea) | |
| if insn.itype in [idaapi.NN_call, idaapi.NN_callfi, idaapi.NN_callni]: | |
| target = idc.get_operand_value(current_ea, 0) | |
| target_type = idc.get_operand_type(current_ea, 0) | |
| # check if it's a direct call - avoid getting the indirect call offset | |
| if target_type in [idaapi.o_mem, idaapi.o_near, idaapi.o_far]: | |
| # in here, we do not use get_function because the target can be external function. | |
| # but, we should mark the target as internal/external function. | |
| func_type = ( | |
| "internal" if idaapi.get_func(target) is not None else "external" | |
| ) | |
| func_name = idc.get_name(target) | |
| if func_name is not None: | |
| callees.append( | |
| {"address": hex(target), "name": func_name, "type": func_type} | |
| ) | |
| current_ea = idc.next_head(current_ea, func_end) | |
| # deduplicate callees | |
| unique_callee_tuples = {tuple(callee.items()) for callee in callees} | |
| unique_callees = [dict(callee) for callee in unique_callee_tuples] | |
| return unique_callees # type: ignore | |
| @jsonrpc | |
| @idaread | |
| def get_callers( | |
| function_address: Annotated[str, "Address of the function to get callers"], | |
| ) -> list[Function]: | |
| """Get all callers of the given address""" | |
| callers = {} | |
| for caller_address in idautils.CodeRefsTo(parse_address(function_address), 0): | |
| # validate the xref address is a function | |
| func = get_function(caller_address, raise_error=False) | |
| if not func: | |
| continue | |
| # load the instruction at the xref address | |
| insn = idaapi.insn_t() | |
| idaapi.decode_insn(insn, caller_address) | |
| # check the instruction is a call | |
| if insn.itype not in [idaapi.NN_call, idaapi.NN_callfi, idaapi.NN_callni]: | |
| continue | |
| # deduplicate callers by address | |
| callers[func["address"]] = func | |
| return list(callers.values()) | |
| @jsonrpc | |
| @idaread | |
| def get_entry_points() -> list[Function]: | |
| """Get all entry points in the database""" | |
| result = [] | |
| for i in range(ida_entry.get_entry_qty()): | |
| ordinal = ida_entry.get_entry_ordinal(i) | |
| address = ida_entry.get_entry(ordinal) | |
| func = get_function(address, raise_error=False) | |
| if func is not None: | |
| result.append(func) | |
| return result | |
| @jsonrpc | |
| @idawrite | |
| def set_comment( | |
| address: Annotated[str, "Address in the function to set the comment for"], | |
| comment: Annotated[str, "Comment text"], | |
| ): | |
| """Set a comment for a given address in the function disassembly and pseudocode""" | |
| ea = parse_address(address) | |
| if not idaapi.set_cmt(ea, comment, False): | |
| raise IDAError(f"Failed to set disassembly comment at {hex(ea)}") | |
| if not ida_hexrays.init_hexrays_plugin(): | |
| return | |
| # Reference: https://cyber.wtf/2019/03/22/using-ida-python-to-analyze-trickbot/ | |
| # Check if the address corresponds to a line | |
| try: | |
| cfunc = decompile_checked(ea) | |
| except IDAError: | |
| # Skip decompiler comment if decompilation fails | |
| return | |
| # Special case for function entry comments | |
| if ea == cfunc.entry_ea: | |
| idc.set_func_cmt(ea, comment, True) | |
| cfunc.refresh_func_ctext() | |
| return | |
| eamap = cfunc.get_eamap() | |
| if ea not in eamap: | |
| print(f"Failed to set decompiler comment at {hex(ea)}") | |
| return | |
| nearest_ea = eamap[ea][0].ea | |
| # Remove existing orphan comments | |
| if cfunc.has_orphan_cmts(): | |
| cfunc.del_orphan_cmts() | |
| cfunc.save_user_cmts() | |
| # Set the comment by trying all possible item types | |
| tl = idaapi.treeloc_t() | |
| tl.ea = nearest_ea | |
| for itp in range(idaapi.ITP_SEMI, idaapi.ITP_COLON): | |
| tl.itp = itp | |
| cfunc.set_user_cmt(tl, comment) | |
| cfunc.save_user_cmts() | |
| cfunc.refresh_func_ctext() | |
| if not cfunc.has_orphan_cmts(): | |
| return | |
| cfunc.del_orphan_cmts() | |
| cfunc.save_user_cmts() | |
| print(f"Failed to set decompiler comment at {hex(ea)}") | |
| def refresh_decompiler_widget(): | |
| widget = ida_kernwin.get_current_widget() | |
| if widget is not None: | |
| vu = ida_hexrays.get_widget_vdui(widget) | |
| if vu is not None: | |
| vu.refresh_ctext() | |
| def refresh_decompiler_ctext(function_address: int): | |
| error = ida_hexrays.hexrays_failure_t() | |
| cfunc: ida_hexrays.cfunc_t = ida_hexrays.decompile_func(function_address, error, ida_hexrays.DECOMP_WARNINGS) | |
| if cfunc: | |
| cfunc.refresh_func_ctext() | |
| @jsonrpc | |
| @idawrite | |
| def rename_local_variable( | |
| function_address: Annotated[str, "Address of the function containing the variable"], | |
| old_name: Annotated[str, "Current name of the variable"], | |
| new_name: Annotated[str, "New name for the variable (empty for a default name)"], | |
| ): | |
| """Rename a local variable in a function""" | |
| func = idaapi.get_func(parse_address(function_address)) | |
| if not func: | |
| raise IDAError(f"No function found at address {function_address}") | |
| if not ida_hexrays.rename_lvar(func.start_ea, old_name, new_name): | |
| raise IDAError(f"Failed to rename local variable {old_name} in function {hex(func.start_ea)}") | |
| refresh_decompiler_ctext(func.start_ea) | |
| @jsonrpc | |
| @idawrite | |
| def rename_global_variable( | |
| old_name: Annotated[str, "Current name of the global variable"], | |
| new_name: Annotated[str, "New name for the global variable (empty for a default name)"], | |
| ): | |
| """Rename a global variable""" | |
| ea = idaapi.get_name_ea(idaapi.BADADDR, old_name) | |
| if not idaapi.set_name(ea, new_name): | |
| raise IDAError(f"Failed to rename global variable {old_name} to {new_name}") | |
| refresh_decompiler_ctext(ea) | |
| @jsonrpc | |
| @idawrite | |
| def set_global_variable_type( | |
| variable_name: Annotated[str, "Name of the global variable"], | |
| new_type: Annotated[str, "New type for the variable"], | |
| ): | |
| """Set a global variable's type""" | |
| ea = idaapi.get_name_ea(idaapi.BADADDR, variable_name) | |
| tif = get_type_by_name(new_type) | |
| if not tif: | |
| raise IDAError(f"Parsed declaration is not a variable type") | |
| if not ida_typeinf.apply_tinfo(ea, tif, ida_typeinf.PT_SIL): | |
| raise IDAError(f"Failed to apply type") | |
| def patch_address_assemble( | |
| ea: int, | |
| assemble: str, | |
| ) -> int: | |
| """Patch Address Assemble""" | |
| (check_assemble, bytes_to_patch) = idautils.Assemble(ea, assemble) | |
| if check_assemble == False: | |
| raise IDAError(f"Failed to assemble instruction: {assemble}") | |
| try: | |
| ida_bytes.patch_bytes(ea, bytes_to_patch) | |
| except: | |
| raise IDAError(f"Failed to patch bytes at address {hex(ea)}") | |
| return len(bytes_to_patch) | |
| @jsonrpc | |
| @idawrite | |
| def patch_address_assembles( | |
| address: Annotated[str, "Starting Address to apply patch"], | |
| instructions: Annotated[str, "Assembly instructions separated by ';'"], | |
| ) -> str: | |
| ea = parse_address(address) | |
| assembles = instructions.split(";") | |
| for assemble in assembles: | |
| assemble = assemble.strip() | |
| try: | |
| patch_bytes_len = patch_address_assemble(ea, assemble) | |
| except IDAError as e: | |
| raise IDAError(f"Failed to patch bytes at address {hex(ea)}: {e}") | |
| ea += patch_bytes_len | |
| return f"Patched {len(assembles)} instructions" | |
| @jsonrpc | |
| @idaread | |
| def get_global_variable_value_by_name(variable_name: Annotated[str, "Name of the global variable"]) -> str: | |
| """ | |
| Read a global variable's value (if known at compile-time) | |
| Prefer this function over the `data_read_*` functions. | |
| """ | |
| ea = idaapi.get_name_ea(idaapi.BADADDR, variable_name) | |
| if ea == idaapi.BADADDR: | |
| raise IDAError(f"Global variable {variable_name} not found") | |
| return get_global_variable_value_internal(ea) | |
| @jsonrpc | |
| @idaread | |
| def get_global_variable_value_at_address(address: Annotated[str, "Address of the global variable"]) -> str: | |
| """ | |
| Read a global variable's value by its address (if known at compile-time) | |
| Prefer this function over the `data_read_*` functions. | |
| """ | |
| ea = parse_address(address) | |
| return get_global_variable_value_internal(ea) | |
| def get_global_variable_value_internal(ea: int) -> str: | |
| # Get the type information for the variable | |
| tif = ida_typeinf.tinfo_t() | |
| if not ida_nalt.get_tinfo(tif, ea): | |
| # No type info, maybe we can figure out its size by its name | |
| if not ida_bytes.has_any_name(ea): | |
| raise IDAError(f"Failed to get type information for variable at {ea:#x}") | |
| size = ida_bytes.get_item_size(ea) | |
| if size == 0: | |
| raise IDAError(f"Failed to get type information for variable at {ea:#x}") | |
| else: | |
| # Determine the size of the variable | |
| size = tif.get_size() | |
| # Read the value based on the size | |
| if size == 0 and tif.is_array() and tif.get_array_element().is_decl_char(): | |
| return_string = idaapi.get_strlit_contents(ea, -1, 0).decode("utf-8").strip() | |
| return f"\"{return_string}\"" | |
| elif size == 1: | |
| return hex(ida_bytes.get_byte(ea)) | |
| elif size == 2: | |
| return hex(ida_bytes.get_word(ea)) | |
| elif size == 4: | |
| return hex(ida_bytes.get_dword(ea)) | |
| elif size == 8: | |
| return hex(ida_bytes.get_qword(ea)) | |
| else: | |
| # For other sizes, return the raw bytes | |
| return ' '.join(hex(x) for x in ida_bytes.get_bytes(ea, size)) | |
| @jsonrpc | |
| @idawrite | |
| def rename_function( | |
| function_address: Annotated[str, "Address of the function to rename"], | |
| new_name: Annotated[str, "New name for the function (empty for a default name)"], | |
| ): | |
| """Rename a function""" | |
| func = idaapi.get_func(parse_address(function_address)) | |
| if not func: | |
| raise IDAError(f"No function found at address {function_address}") | |
| if not idaapi.set_name(func.start_ea, new_name): | |
| raise IDAError(f"Failed to rename function {hex(func.start_ea)} to {new_name}") | |
| refresh_decompiler_ctext(func.start_ea) | |
| @jsonrpc | |
| @idawrite | |
| def set_function_prototype( | |
| function_address: Annotated[str, "Address of the function"], | |
| prototype: Annotated[str, "New function prototype"], | |
| ): | |
| """Set a function's prototype""" | |
| func = idaapi.get_func(parse_address(function_address)) | |
| if not func: | |
| raise IDAError(f"No function found at address {function_address}") | |
| try: | |
| tif = ida_typeinf.tinfo_t(prototype, None, ida_typeinf.PT_SIL) | |
| if not tif.is_func(): | |
| raise IDAError(f"Parsed declaration is not a function type") | |
| if not ida_typeinf.apply_tinfo(func.start_ea, tif, ida_typeinf.PT_SIL): | |
| raise IDAError(f"Failed to apply type") | |
| refresh_decompiler_ctext(func.start_ea) | |
| except Exception as e: | |
| raise IDAError(f"Failed to parse prototype string: {prototype}") | |
| class my_modifier_t(ida_hexrays.user_lvar_modifier_t): | |
| def __init__(self, var_name: str, new_type: ida_typeinf.tinfo_t): | |
| ida_hexrays.user_lvar_modifier_t.__init__(self) | |
| self.var_name = var_name | |
| self.new_type = new_type | |
| def modify_lvars(self, lvinf): | |
| for lvar_saved in lvinf.lvvec: | |
| lvar_saved: ida_hexrays.lvar_saved_info_t | |
| if lvar_saved.name == self.var_name: | |
| lvar_saved.type = self.new_type | |
| return True | |
| return False | |
| # NOTE: This is extremely hacky, but necessary to get errors out of IDA | |
| def parse_decls_ctypes(decls: str, hti_flags: int) -> tuple[int, list[str]]: | |
| if sys.platform == "win32": | |
| import ctypes | |
| assert isinstance(decls, str), "decls must be a string" | |
| assert isinstance(hti_flags, int), "hti_flags must be an int" | |
| c_decls = decls.encode("utf-8") | |
| c_til = None | |
| ida_dll = ctypes.CDLL("ida") | |
| ida_dll.parse_decls.argtypes = [ | |
| ctypes.c_void_p, | |
| ctypes.c_char_p, | |
| ctypes.c_void_p, | |
| ctypes.c_int, | |
| ] | |
| ida_dll.parse_decls.restype = ctypes.c_int | |
| messages: list[str] = [] | |
| @ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_char_p, ctypes.c_char_p) | |
| def magic_printer(fmt: bytes, arg1: bytes): | |
| if fmt.count(b"%") == 1 and b"%s" in fmt: | |
| formatted = fmt.replace(b"%s", arg1) | |
| messages.append(formatted.decode("utf-8")) | |
| return len(formatted) + 1 | |
| else: | |
| messages.append(f"unsupported magic_printer fmt: {repr(fmt)}") | |
| return 0 | |
| errors = ida_dll.parse_decls(c_til, c_decls, magic_printer, hti_flags) | |
| else: | |
| # NOTE: The approach above could also work on other platforms, but it's | |
| # not been tested and there are differences in the vararg ABIs. | |
| errors = ida_typeinf.parse_decls(None, decls, False, hti_flags) | |
| messages = [] | |
| return errors, messages | |
| @jsonrpc | |
| @idawrite | |
| def declare_c_type( | |
| c_declaration: Annotated[str, "C declaration of the type. Examples include: typedef int foo_t; struct bar { int a; bool b; };"], | |
| ): | |
| """Create or update a local type from a C declaration""" | |
| # PT_SIL: Suppress warning dialogs (although it seems unnecessary here) | |
| # PT_EMPTY: Allow empty types (also unnecessary?) | |
| # PT_TYP: Print back status messages with struct tags | |
| flags = ida_typeinf.PT_SIL | ida_typeinf.PT_EMPTY | ida_typeinf.PT_TYP | |
| errors, messages = parse_decls_ctypes(c_declaration, flags) | |
| pretty_messages = "\n".join(messages) | |
| if errors > 0: | |
| raise IDAError(f"Failed to parse type:\n{c_declaration}\n\nErrors:\n{pretty_messages}") | |
| return f"success\n\nInfo:\n{pretty_messages}" | |
| @jsonrpc | |
| @idawrite | |
| def set_local_variable_type( | |
| function_address: Annotated[str, "Address of the decompiled function containing the variable"], | |
| variable_name: Annotated[str, "Name of the variable"], | |
| new_type: Annotated[str, "New type for the variable"], | |
| ): | |
| """Set a local variable's type""" | |
| try: | |
| # Some versions of IDA don't support this constructor | |
| new_tif = ida_typeinf.tinfo_t(new_type, None, ida_typeinf.PT_SIL) | |
| except Exception: | |
| try: | |
| new_tif = ida_typeinf.tinfo_t() | |
| # parse_decl requires semicolon for the type | |
| ida_typeinf.parse_decl(new_tif, None, new_type + ";", ida_typeinf.PT_SIL) # type: ignore (IDA SDK type hints are incorrect) | |
| except Exception: | |
| raise IDAError(f"Failed to parse type: {new_type}") | |
| func = idaapi.get_func(parse_address(function_address)) | |
| if not func: | |
| raise IDAError(f"No function found at address {function_address}") | |
| if not ida_hexrays.rename_lvar(func.start_ea, variable_name, variable_name): | |
| raise IDAError(f"Failed to find local variable: {variable_name}") | |
| modifier = my_modifier_t(variable_name, new_tif) | |
| if not ida_hexrays.modify_user_lvars(func.start_ea, modifier): | |
| raise IDAError(f"Failed to modify local variable: {variable_name}") | |
| refresh_decompiler_ctext(func.start_ea) | |
| @jsonrpc | |
| @idaread | |
| def get_stack_frame_variables( | |
| function_address: Annotated[str, "Address of the disassembled function to retrieve the stack frame variables"] | |
| ) -> list[StackFrameVariable]: | |
| """ Retrieve the stack frame variables for a given function """ | |
| return get_stack_frame_variables_internal(parse_address(function_address), True) | |
| def get_stack_frame_variables_internal(function_address: int, raise_error: bool) -> list[StackFrameVariable]: | |
| # TODO: IDA 8.3 does not support tif.get_type_by_tid | |
| if ida_major < 9: | |
| return [] | |
| func = idaapi.get_func(function_address) | |
| if not func: | |
| if raise_error: | |
| raise IDAError(f"No function found at address {function_address}") | |
| return [] | |
| tif = ida_typeinf.tinfo_t() | |
| if not tif.get_type_by_tid(func.frame) or not tif.is_udt(): | |
| return [] | |
| members: list[StackFrameVariable] = [] | |
| udt = ida_typeinf.udt_type_data_t() | |
| tif.get_udt_details(udt) | |
| for udm in udt: | |
| if not udm.is_gap(): | |
| name = udm.name | |
| offset = udm.offset // 8 | |
| size = udm.size // 8 | |
| type = str(udm.type) | |
| members.append(StackFrameVariable( | |
| name=name, | |
| offset=hex(offset), | |
| size=hex(size), | |
| type=type | |
| )) | |
| return members | |
| class StructureMember(TypedDict): | |
| name: str | |
| offset: str | |
| size: str | |
| type: str | |
| class StructureDefinition(TypedDict): | |
| name: str | |
| size: str | |
| members: list[StructureMember] | |
| @jsonrpc | |
| @idaread | |
| def get_defined_structures() -> list[StructureDefinition]: | |
| """ Returns a list of all defined structures """ | |
| rv = [] | |
| limit = ida_typeinf.get_ordinal_limit() | |
| for ordinal in range(1, limit): | |
| tif = ida_typeinf.tinfo_t() | |
| tif.get_numbered_type(None, ordinal) | |
| if tif.is_udt(): | |
| udt = ida_typeinf.udt_type_data_t() | |
| members = [] | |
| if tif.get_udt_details(udt): | |
| members = [ | |
| StructureMember(name=x.name, | |
| offset=hex(x.offset // 8), | |
| size=hex(x.size // 8), | |
| type=str(x.type)) | |
| for _, x in enumerate(udt) | |
| ] | |
| rv += [StructureDefinition(name=tif.get_type_name(), # type: ignore (IDA SDK type hints are incorrect) | |
| size=hex(tif.get_size()), | |
| members=members)] | |
| return rv | |
| @jsonrpc | |
| @idaread | |
| def analyze_struct_detailed(name: Annotated[str, "Name of the structure to analyze"]) -> dict: | |
| """Detailed analysis of a structure with all fields""" | |
| # Get tinfo object | |
| tif = ida_typeinf.tinfo_t() | |
| if not tif.get_named_type(None, name): | |
| raise IDAError(f"Structure '{name}' not found!") | |
| result = { | |
| "name": name, | |
| "type": str(tif._print()), | |
| "size": tif.get_size(), | |
| "is_udt": tif.is_udt() | |
| } | |
| if not tif.is_udt(): | |
| result["error"] = "This is not a user-defined type!" | |
| return result | |
| # Get UDT (User Defined Type) details | |
| udt_data = ida_typeinf.udt_type_data_t() | |
| if not tif.get_udt_details(udt_data): | |
| result["error"] = "Failed to get structure details!" | |
| return result | |
| result["member_count"] = udt_data.size() | |
| result["is_union"] = udt_data.is_union | |
| result["udt_type"] = "Union" if udt_data.is_union else "Struct" | |
| # Output information about each field | |
| members = [] | |
| for i, member in enumerate(udt_data): | |
| offset = member.begin() // 8 # Convert bits to bytes | |
| size = member.size // 8 if member.size > 0 else member.type.get_size() | |
| member_type = member.type._print() | |
| member_name = member.name | |
| member_info = { | |
| "index": i, | |
| "offset": f"0x{offset:08X}", | |
| "size": size, | |
| "type": member_type, | |
| "name": member_name, | |
| "is_nested_udt": member.type.is_udt() | |
| } | |
| # If this is a nested structure, show additional information | |
| if member.type.is_udt(): | |
| member_info["nested_size"] = member.type.get_size() | |
| members.append(member_info) | |
| result["members"] = members | |
| result["total_size"] = tif.get_size() | |
| return result | |
| @jsonrpc | |
| @idaread | |
| def get_struct_at_address(address: Annotated[str, "Address to analyze structure at"], | |
| struct_name: Annotated[str, "Name of the structure"]) -> dict: | |
| """Get structure field values at a specific address""" | |
| addr = parse_address(address) | |
| # Get structure tinfo | |
| tif = ida_typeinf.tinfo_t() | |
| if not tif.get_named_type(None, struct_name): | |
| raise IDAError(f"Structure '{struct_name}' not found!") | |
| # Get structure details | |
| udt_data = ida_typeinf.udt_type_data_t() | |
| if not tif.get_udt_details(udt_data): | |
| raise IDAError("Failed to get structure details!") | |
| result = { | |
| "struct_name": struct_name, | |
| "address": f"0x{addr:X}", | |
| "members": [] | |
| } | |
| for member in udt_data: | |
| offset = member.begin() // 8 | |
| member_addr = addr + offset | |
| member_type = member.type._print() | |
| member_name = member.name | |
| member_size = member.type.get_size() | |
| # Try to get value based on size | |
| try: | |
| if member.type.is_ptr(): | |
| # Pointer | |
| is_64bit = ida_ida.inf_is_64bit() if ida_major >= 9 else idaapi.get_inf_structure().is_64bit() | |
| if is_64bit: | |
| value = idaapi.get_qword(member_addr) | |
| value_str = f"0x{value:016X}" | |
| else: | |
| value = idaapi.get_dword(member_addr) | |
| value_str = f"0x{value:08X}" | |
| elif member_size == 1: | |
| value = idaapi.get_byte(member_addr) | |
| value_str = f"0x{value:02X} ({value})" | |
| elif member_size == 2: | |
| value = idaapi.get_word(member_addr) | |
| value_str = f"0x{value:04X} ({value})" | |
| elif member_size == 4: | |
| value = idaapi.get_dword(member_addr) | |
| value_str = f"0x{value:08X} ({value})" | |
| elif member_size == 8: | |
| value = idaapi.get_qword(member_addr) | |
| value_str = f"0x{value:016X} ({value})" | |
| else: | |
| # For large structures, read first few bytes | |
| bytes_data = [] | |
| for i in range(min(member_size, 16)): | |
| try: | |
| byte_val = idaapi.get_byte(member_addr + i) | |
| bytes_data.append(f"{byte_val:02X}") | |
| except: | |
| break | |
| value_str = f"[{' '.join(bytes_data)}{'...' if member_size > 16 else ''}]" | |
| except: | |
| value_str = "<failed to read>" | |
| member_info = { | |
| "offset": f"0x{offset:08X}", | |
| "type": member_type, | |
| "name": member_name, | |
| "value": value_str | |
| } | |
| result["members"].append(member_info) | |
| return result | |
| @jsonrpc | |
| @idaread | |
| def get_struct_info_simple(name: Annotated[str, "Name of the structure"]) -> dict: | |
| """Simple function to get basic structure information""" | |
| tif = ida_typeinf.tinfo_t() | |
| if not tif.get_named_type(None, name): | |
| raise IDAError(f"Structure '{name}' not found!") | |
| info = { | |
| 'name': name, | |
| 'type': tif._print(), | |
| 'size': tif.get_size(), | |
| 'is_udt': tif.is_udt() | |
| } | |
| if tif.is_udt(): | |
| udt_data = ida_typeinf.udt_type_data_t() | |
| if tif.get_udt_details(udt_data): | |
| info['member_count'] = udt_data.size() | |
| info['is_union'] = udt_data.is_union | |
| members = [] | |
| for member in udt_data: | |
| members.append({ | |
| 'name': member.name, | |
| 'type': member.type._print(), | |
| 'offset': member.begin() // 8, | |
| 'size': member.type.get_size() | |
| }) | |
| info['members'] = members | |
| return info | |
| @jsonrpc | |
| @idaread | |
| def search_structures(filter: Annotated[str, "Filter pattern to search for structures (case-insensitive)"]) -> list[dict]: | |
| """Search for structures by name pattern""" | |
| results = [] | |
| limit = ida_typeinf.get_ordinal_limit() | |
| for ordinal in range(1, limit): | |
| tif = ida_typeinf.tinfo_t() | |
| if tif.get_numbered_type(None, ordinal): | |
| type_name: str = tif.get_type_name() # type: ignore (IDA SDK type hints are incorrect) | |
| if type_name and filter.lower() in type_name.lower(): | |
| if tif.is_udt(): | |
| udt_data = ida_typeinf.udt_type_data_t() | |
| member_count = 0 | |
| if tif.get_udt_details(udt_data): | |
| member_count = udt_data.size() | |
| results.append({ | |
| "name": type_name, | |
| "size": tif.get_size(), | |
| "member_count": member_count, | |
| "is_union": udt_data.is_union if tif.get_udt_details(udt_data) else False, | |
| "ordinal": ordinal | |
| }) | |
| return results | |
| @jsonrpc | |
| @idawrite | |
| def rename_stack_frame_variable( | |
| function_address: Annotated[str, "Address of the disassembled function to set the stack frame variables"], | |
| old_name: Annotated[str, "Current name of the variable"], | |
| new_name: Annotated[str, "New name for the variable (empty for a default name)"] | |
| ): | |
| """ Change the name of a stack variable for an IDA function """ | |
| func = idaapi.get_func(parse_address(function_address)) | |
| if not func: | |
| raise IDAError(f"No function found at address {function_address}") | |
| frame_tif = ida_typeinf.tinfo_t() | |
| if not ida_frame.get_func_frame(frame_tif, func): | |
| raise IDAError("No frame returned.") | |
| idx, udm = frame_tif.get_udm(old_name) # type: ignore (IDA SDK type hints are incorrect) | |
| if not udm: | |
| raise IDAError(f"{old_name} not found.") | |
| tid = frame_tif.get_udm_tid(idx) | |
| if ida_frame.is_special_frame_member(tid): | |
| raise IDAError(f"{old_name} is a special frame member. Will not change the name.") | |
| udm = ida_typeinf.udm_t() | |
| frame_tif.get_udm_by_tid(udm, tid) | |
| offset = udm.offset // 8 | |
| if ida_frame.is_funcarg_off(func, offset): | |
| raise IDAError(f"{old_name} is an argument member. Will not change the name.") | |
| sval = ida_frame.soff_to_fpoff(func, offset) | |
| if not ida_frame.define_stkvar(func, new_name, sval, udm.type): | |
| raise IDAError("failed to rename stack frame variable") | |
| @jsonrpc | |
| @idawrite | |
| def create_stack_frame_variable( | |
| function_address: Annotated[str, "Address of the disassembled function to set the stack frame variables"], | |
| offset: Annotated[str, "Offset of the stack frame variable"], | |
| variable_name: Annotated[str, "Name of the stack variable"], | |
| type_name: Annotated[str, "Type of the stack variable"] | |
| ): | |
| """ For a given function, create a stack variable at an offset and with a specific type """ | |
| func = idaapi.get_func(parse_address(function_address)) | |
| if not func: | |
| raise IDAError(f"No function found at address {function_address}") | |
| ea = parse_address(offset) | |
| frame_tif = ida_typeinf.tinfo_t() | |
| if not ida_frame.get_func_frame(frame_tif, func): | |
| raise IDAError("No frame returned.") | |
| tif = get_type_by_name(type_name) | |
| if not ida_frame.define_stkvar(func, variable_name, ea, tif): | |
| raise IDAError("failed to define stack frame variable") | |
| @jsonrpc | |
| @idawrite | |
| def set_stack_frame_variable_type( | |
| function_address: Annotated[str, "Address of the disassembled function to set the stack frame variables"], | |
| variable_name: Annotated[str, "Name of the stack variable"], | |
| type_name: Annotated[str, "Type of the stack variable"] | |
| ): | |
| """ For a given disassembled function, set the type of a stack variable """ | |
| func = idaapi.get_func(parse_address(function_address)) | |
| if not func: | |
| raise IDAError(f"No function found at address {function_address}") | |
| frame_tif = ida_typeinf.tinfo_t() | |
| if not ida_frame.get_func_frame(frame_tif, func): | |
| raise IDAError("No frame returned.") | |
| idx, udm = frame_tif.get_udm(variable_name) # type: ignore (IDA SDK type hints are incorrect) | |
| if not udm: | |
| raise IDAError(f"{variable_name} not found.") | |
| tid = frame_tif.get_udm_tid(idx) | |
| udm = ida_typeinf.udm_t() | |
| frame_tif.get_udm_by_tid(udm, tid) | |
| offset = udm.offset // 8 | |
| tif = get_type_by_name(type_name) | |
| if not ida_frame.set_frame_member_type(func, offset, tif): | |
| raise IDAError("failed to set stack frame variable type") | |
| @jsonrpc | |
| @idawrite | |
| def delete_stack_frame_variable( | |
| function_address: Annotated[str, "Address of the function to set the stack frame variables"], | |
| variable_name: Annotated[str, "Name of the stack variable"] | |
| ): | |
| """ Delete the named stack variable for a given function """ | |
| func = idaapi.get_func(parse_address(function_address)) | |
| if not func: | |
| raise IDAError(f"No function found at address {function_address}") | |
| frame_tif = ida_typeinf.tinfo_t() | |
| if not ida_frame.get_func_frame(frame_tif, func): | |
| raise IDAError("No frame returned.") | |
| idx, udm = frame_tif.get_udm(variable_name) # type: ignore (IDA SDK type hints are incorrect) | |
| if not udm: | |
| raise IDAError(f"{variable_name} not found.") | |
| tid = frame_tif.get_udm_tid(idx) | |
| if ida_frame.is_special_frame_member(tid): | |
| raise IDAError(f"{variable_name} is a special frame member. Will not delete.") | |
| udm = ida_typeinf.udm_t() | |
| frame_tif.get_udm_by_tid(udm, tid) | |
| offset = udm.offset // 8 | |
| size = udm.size // 8 | |
| if ida_frame.is_funcarg_off(func, offset): | |
| raise IDAError(f"{variable_name} is an argument member. Will not delete.") | |
| if not ida_frame.delete_frame_members(func, offset, offset+size): | |
| raise IDAError("failed to delete stack frame variable") | |
| @jsonrpc | |
| @idaread | |
| def read_memory_bytes( | |
| memory_address: Annotated[str, "Address of the memory value to be read"], | |
| size: Annotated[int, "size of memory to read"] | |
| ) -> str: | |
| """ | |
| Read bytes at a given address. | |
| Only use this function if `get_global_variable_at` and `get_global_variable_by_name` | |
| both failed. | |
| """ | |
| return ' '.join(f'{x:#02x}' for x in ida_bytes.get_bytes(parse_address(memory_address), size)) | |
| @jsonrpc | |
| @idaread | |
| def data_read_byte( | |
| address: Annotated[str, "Address to get 1 byte value from"], | |
| ) -> int: | |
| """ | |
| Read the 1 byte value at the specified address. | |
| Only use this function if `get_global_variable_at` failed. | |
| """ | |
| ea = parse_address(address) | |
| return ida_bytes.get_wide_byte(ea) | |
| @jsonrpc | |
| @idaread | |
| def data_read_word( | |
| address: Annotated[str, "Address to get 2 bytes value from"], | |
| ) -> int: | |
| """ | |
| Read the 2 byte value at the specified address as a WORD. | |
| Only use this function if `get_global_variable_at` failed. | |
| """ | |
| ea = parse_address(address) | |
| return ida_bytes.get_wide_word(ea) | |
| @jsonrpc | |
| @idaread | |
| def data_read_dword( | |
| address: Annotated[str, "Address to get 4 bytes value from"], | |
| ) -> int: | |
| """ | |
| Read the 4 byte value at the specified address as a DWORD. | |
| Only use this function if `get_global_variable_at` failed. | |
| """ | |
| ea = parse_address(address) | |
| return ida_bytes.get_wide_dword(ea) | |
| @jsonrpc | |
| @idaread | |
| def data_read_qword( | |
| address: Annotated[str, "Address to get 8 bytes value from"] | |
| ) -> int: | |
| """ | |
| Read the 8 byte value at the specified address as a QWORD. | |
| Only use this function if `get_global_variable_at` failed. | |
| """ | |
| ea = parse_address(address) | |
| return ida_bytes.get_qword(ea) | |
| @jsonrpc | |
| @idaread | |
| def data_read_string( | |
| address: Annotated[str, "Address to get string from"] | |
| ) -> str: | |
| """ | |
| Read the string at the specified address. | |
| Only use this function if `get_global_variable_at` failed. | |
| """ | |
| try: | |
| return idaapi.get_strlit_contents(parse_address(address),-1,0).decode("utf-8") | |
| except Exception as e: | |
| return "Error:" + str(e) | |
| class RegisterValue(TypedDict): | |
| name: str | |
| value: str | |
| class ThreadRegisters(TypedDict): | |
| thread_id: int | |
| registers: list[RegisterValue] | |
| def dbg_ensure_running() -> "ida_idd.debugger_t": | |
| dbg = ida_idd.get_dbg() | |
| if not dbg: | |
| raise IDAError("Debugger not running") | |
| if ida_dbg.get_ip_val() is None: | |
| raise IDAError("Debugger not running") | |
| return dbg | |
| @jsonrpc | |
| @idaread | |
| @unsafe | |
| def dbg_get_registers() -> list[ThreadRegisters]: | |
| """Get all registers and their values. This function is only available when debugging.""" | |
| result: list[ThreadRegisters] = [] | |
| dbg = dbg_ensure_running() | |
| for thread_index in range(ida_dbg.get_thread_qty()): | |
| tid = ida_dbg.getn_thread(thread_index) | |
| regs = [] | |
| regvals: ida_idd.regvals_t = ida_dbg.get_reg_vals(tid) | |
| for reg_index, rv in enumerate(regvals): | |
| rv: ida_idd.regval_t | |
| reg_info = dbg.regs(reg_index) | |
| # NOTE: Apparently this can fail under some circumstances | |
| try: | |
| reg_value = rv.pyval(reg_info.dtype) | |
| except ValueError: | |
| reg_value = ida_idaapi.BADADDR | |
| if isinstance(reg_value, int): | |
| reg_value = hex(reg_value) | |
| if isinstance(reg_value, bytes): | |
| reg_value = reg_value.hex(" ") | |
| else: | |
| reg_value = str(reg_value) | |
| regs.append({ | |
| "name": reg_info.name, | |
| "value": reg_value, | |
| }) | |
| result.append({ | |
| "thread_id": tid, | |
| "registers": regs, | |
| }) | |
| return result | |
| @jsonrpc | |
| @idaread | |
| @unsafe | |
| def dbg_get_call_stack() -> list[dict[str, str]]: | |
| """Get the current call stack.""" | |
| callstack = [] | |
| try: | |
| tid = ida_dbg.get_current_thread() | |
| trace = ida_idd.call_stack_t() | |
| if not ida_dbg.collect_stack_trace(tid, trace): | |
| return [] | |
| for frame in trace: | |
| frame_info = { | |
| "address": hex(frame.callea), | |
| } | |
| try: | |
| module_info = ida_idd.modinfo_t() | |
| if ida_dbg.get_module_info(frame.callea, module_info): | |
| frame_info["module"] = os.path.basename(module_info.name) | |
| else: | |
| frame_info["module"] = "<unknown>" | |
| name = ( | |
| ida_name.get_nice_colored_name( | |
| frame.callea, | |
| ida_name.GNCN_NOCOLOR | |
| | ida_name.GNCN_NOLABEL | |
| | ida_name.GNCN_NOSEG | |
| | ida_name.GNCN_PREFDBG, | |
| ) | |
| or "<unnamed>" | |
| ) | |
| frame_info["symbol"] = name | |
| except Exception as e: | |
| frame_info["module"] = "<error>" | |
| frame_info["symbol"] = str(e) | |
| callstack.append(frame_info) | |
| except Exception as e: | |
| pass | |
| return callstack | |
| class Breakpoint(TypedDict): | |
| ea: str | |
| enabled: bool | |
| condition: Optional[str] | |
| def list_breakpoints(): | |
| breakpoints: list[Breakpoint] = [] | |
| for i in range(ida_dbg.get_bpt_qty()): | |
| bpt = ida_dbg.bpt_t() | |
| if ida_dbg.getn_bpt(i, bpt): | |
| breakpoints.append(Breakpoint( | |
| ea=hex(bpt.ea), | |
| enabled=bpt.flags & ida_dbg.BPT_ENABLED, | |
| condition=str(bpt.condition) if bpt.condition else None, | |
| )) | |
| return breakpoints | |
| @jsonrpc | |
| @idaread | |
| @unsafe | |
| def dbg_list_breakpoints(): | |
| """List all breakpoints in the program.""" | |
| return list_breakpoints() | |
| @jsonrpc | |
| @idaread | |
| @unsafe | |
| def dbg_start_process(): | |
| """Start the debugger, returns the current instruction pointer""" | |
| if len(list_breakpoints()) == 0: | |
| for i in range(ida_entry.get_entry_qty()): | |
| ordinal = ida_entry.get_entry_ordinal(i) | |
| address = ida_entry.get_entry(ordinal) | |
| if address != ida_idaapi.BADADDR: | |
| ida_dbg.add_bpt(address, 0, idaapi.BPT_SOFT) | |
| if idaapi.start_process("", "", "") == 1: | |
| ip = ida_dbg.get_ip_val() | |
| if ip is not None: | |
| return hex(ip) | |
| raise IDAError("Failed to start debugger (did the user configure the debugger manually one time?)") | |
| @jsonrpc | |
| @idaread | |
| @unsafe | |
| def dbg_exit_process(): | |
| """Exit the debugger""" | |
| dbg_ensure_running() | |
| if idaapi.exit_process(): | |
| return | |
| raise IDAError("Failed to exit debugger") | |
| @jsonrpc | |
| @idaread | |
| @unsafe | |
| def dbg_continue_process() -> str: | |
| """Continue the debugger, returns the current instruction pointer""" | |
| dbg_ensure_running() | |
| if idaapi.continue_process(): | |
| ip = ida_dbg.get_ip_val() | |
| if ip is not None: | |
| return hex(ip) | |
| raise IDAError("Failed to continue debugger") | |
| @jsonrpc | |
| @idaread | |
| @unsafe | |
| def dbg_run_to( | |
| address: Annotated[str, "Run the debugger to the specified address"], | |
| ): | |
| """Run the debugger to the specified address""" | |
| dbg_ensure_running() | |
| ea = parse_address(address) | |
| if idaapi.run_to(ea): | |
| ip = ida_dbg.get_ip_val() | |
| if ip is not None: | |
| return hex(ip) | |
| raise IDAError(f"Failed to run to address {hex(ea)}") | |
| @jsonrpc | |
| @idaread | |
| @unsafe | |
| def dbg_set_breakpoint( | |
| address: Annotated[str, "Set a breakpoint at the specified address"], | |
| ): | |
| """Set a breakpoint at the specified address""" | |
| ea = parse_address(address) | |
| if idaapi.add_bpt(ea, 0, idaapi.BPT_SOFT): | |
| return f"Breakpoint set at {hex(ea)}" | |
| breakpoints = list_breakpoints() | |
| for bpt in breakpoints: | |
| if bpt["ea"] == hex(ea): | |
| return | |
| raise IDAError(f"Failed to set breakpoint at address {hex(ea)}") | |
| @jsonrpc | |
| @idaread | |
| @unsafe | |
| def dbg_step_into(): | |
| """Step into the current instruction""" | |
| dbg_ensure_running() | |
| if idaapi.step_into(): | |
| ip = ida_dbg.get_ip_val() | |
| if ip is not None: | |
| return hex(ip) | |
| raise IDAError("Failed to step into") | |
| @jsonrpc | |
| @idaread | |
| @unsafe | |
| def dbg_step_over(): | |
| """Step over the current instruction""" | |
| dbg_ensure_running() | |
| if idaapi.step_over(): | |
| ip = ida_dbg.get_ip_val() | |
| if ip is not None: | |
| return hex(ip) | |
| raise IDAError("Failed to step over") | |
| @jsonrpc | |
| @idaread | |
| @unsafe | |
| def dbg_delete_breakpoint( | |
| address: Annotated[str, "del a breakpoint at the specified address"], | |
| ): | |
| """Delete a breakpoint at the specified address""" | |
| ea = parse_address(address) | |
| if idaapi.del_bpt(ea): | |
| return | |
| raise IDAError(f"Failed to delete breakpoint at address {hex(ea)}") | |
| @jsonrpc | |
| @idaread | |
| @unsafe | |
| def dbg_enable_breakpoint( | |
| address: Annotated[str, "Enable or disable a breakpoint at the specified address"], | |
| enable: Annotated[bool, "Enable or disable a breakpoint"], | |
| ): | |
| """Enable or disable a breakpoint at the specified address""" | |
| ea = parse_address(address) | |
| if idaapi.enable_bpt(ea, enable): | |
| return | |
| raise IDAError(f"Failed to {'' if enable else 'disable '}breakpoint at address {hex(ea)}") | |
| class MCP(idaapi.plugin_t): | |
| flags = idaapi.PLUGIN_KEEP | |
| comment = "MCP Plugin" | |
| help = "MCP" | |
| wanted_name = "MCP" | |
| wanted_hotkey = "Ctrl-Alt-M" | |
| def init(self): | |
| self.server = Server() | |
| hotkey = MCP.wanted_hotkey.replace("-", "+") | |
| if sys.platform == "darwin": | |
| hotkey = hotkey.replace("Alt", "Option") | |
| print(f"[MCP] Plugin loaded, use Edit -> Plugins -> MCP ({hotkey}) to start the server") | |
| return idaapi.PLUGIN_KEEP | |
| def run(self, arg): | |
| self.server.start() | |
| def term(self): | |
| self.server.stop() | |
| def PLUGIN_ENTRY(): | |
| return MCP() | |
| import idautils | |
| import ida_funcs | |
| import ida_kernwin | |
| for f in idautils.Functions(): | |
| print(ida_hexrays.decompile_func(f)) | |
| try: | |
| print(decompile_function(str(f))) | |
| except Exception as e: | |
| print("exception", e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment