import time import etw import etw.evntrace import sys import argparse import threading class RundownDotNetETW(etw.ETW): def __init__(self, verbose, high_risk_only): # set options self.verbose = verbose self.high_risk_only = high_risk_only self.should_stop_capture = False # define capture provider info providers = [ etw.ProviderInfo( 'DotNet Runtime Rundown', etw.GUID("{A669021C-C450-4609-A035-5AF59AF4DF18}"), level=etw.evntrace.TRACE_LEVEL_INFORMATION, any_keywords=[ "LoaderRundownKeyword", "StartRundownKeyword", ] ), ] super().__init__(providers=providers, event_callback=self.print_event) def start(self): # do pre-capture setup self.do_capture_setup() super().start() def stop(self): super().stop() # do post-capture teardown self.do_capture_teardown() def do_capture_setup(self): # do whatever setup for capture here pass def do_capture_teardown(self): # do whatever for capture teardown here pass def print_event(self, event): # Common info proc_id = event[1]["EventHeader"]["ProcessId"] event_id = event[0] to_print = [str(proc_id)] should_print = False if self.high_risk_only else True # Event specific if event_id == 155: # AssemblyDCStart_V1 to_print.append("AssemblyDCStart_V1") to_print.append(event[1]["AssemblyID"]) to_print.append(event[1]["AssemblyFlags"]) to_print.append(event[1]["FullyQualifiedAssemblyName"]) elif event_id == 153: # ModuleDCStart_V2 to_print.append("ModuleDCStart_V2") to_print.append(event[1]["AssemblyID"]) to_print.append(event[1]["ModuleFlags"]) to_print.append(event[1]["ModuleILPath"]) to_print.append(event[1]["ModuleNativePath"]) to_print.append(event[1]["ManagedPdbBuildPath"]) il_path = event[1]["ModuleILPath"].lower() module_flags = event[1]["ModuleFlags"] # Look for byte stream loaded assemblies if not il_path.endswith(".exe") \ and not il_path.endswith(".dll") \ and "Dynamic" not in module_flags: should_print = True elif event_id == 151: # DomainModuleDCStart_V1 to_print.append("DomainModuleDCStart_V1") to_print.append(event[1]["AssemblyID"]) to_print.append(event[1]["ModuleFlags"]) to_print.append(event[1]["ModuleILPath"]) to_print.append(event[1]["ModuleNativePath"]) il_path = event[1]["ModuleILPath"].lower() module_flags = event[1]["ModuleFlags"] # Look for byte stream loaded assemblies if not il_path.endswith(".exe") \ and not il_path.endswith(".dll") \ and "Dynamic" not in module_flags: should_print = True # Interesting in non powershell.exe processes as shows use of powershell dynamically elif "system.management.automation" in il_path.lower(): should_print = True elif event_id == 145: # DCStartComplete_V1 self.should_stop_capture = True if not self.verbose: return elif not self.verbose: return # Print information if self.verbose and should_print: print(event) elif should_print: for n in range(0, len(to_print)): to_print[n] = str(to_print[n]) print(", ".join(to_print)) sys.stdout.flush() class RuntimeDotNetETW(etw.ETW): high_risk_method_names = [ "VirtualAlloc", "VirtualAllocEx", "CreateThread", "CreateRemoteThread", "WriteProcessMemory", "FromBase64String", "DownloadFile", "RunPS", "SetThreadContext", "MiniDumpWriteDump", "LoadLibrary", "GetProcAddress", "WaitForSingleObject" ] high_risk_namespaces = [ "System.IO.MemoryStream" ] def __init__(self, verbose, high_risk_only, method_tracing): # set options self.verbose = verbose self.high_risk_only = high_risk_only self.method_tracing = method_tracing keywords = [ "LoaderKeyword" ] if self.method_tracing: keywords.extend( [ "JitKeyword", "JitTracingKeyword", "InteropKeyword" ] ) print(keywords) # define capture provider info providers = [ etw.ProviderInfo( 'DotNet Runtime', etw.GUID("{E13C0D23-CCBC-4E12-931B-D9CC2EEE27E4}"), level=etw.evntrace.TRACE_LEVEL_VERBOSE if self.method_tracing else etw.evntrace.TRACE_LEVEL_INFORMATION, any_keywords=keywords ), ] super().__init__(providers=providers, event_callback=self.print_event) def start(self): # do pre-capture setup self.do_capture_setup() super().start() def stop(self): super().stop() # do post-capture teardown self.do_capture_teardown() def do_capture_setup(self): # do whatever setup for capture here pass def do_capture_teardown(self): # do whatever for capture teardown here pass def print_event(self, event): # Common info proc_id = event[1]["EventHeader"]["ProcessId"] event_id = event[0] to_print = [str(proc_id)] should_print = False if self.high_risk_only else True # Event specific if event_id == 154: # AssemblyLoad_V1 to_print.append("AssemblyLoad_V1") to_print.append(event[1]["AssemblyID"]) to_print.append(event[1]["AssemblyFlags"]) to_print.append(event[1]["FullyQualifiedAssemblyName"]) elif event_id == 152: # ModuleLoad_V2 to_print.append("ModuleLoad_V2") to_print.append(event[1]["AssemblyID"]) to_print.append(event[1]["ModuleFlags"]) to_print.append(event[1]["ModuleILPath"]) to_print.append(event[1]["ModuleNativePath"]) to_print.append(event[1]["ManagedPdbBuildPath"]) il_path = event[1]["ModuleILPath"].lower() module_flags = event[1]["ModuleFlags"] # Look for byte stream loaded assemblies if not il_path.endswith(".exe") \ and not il_path.endswith(".dll") \ and "Dynamic" not in module_flags: should_print = True # Interesting in non powershell.exe processes as shows use of powershell dynamically elif "system.management.automation" in il_path.lower(): should_print = True elif event_id == 151: # DomainModuleLoad_V1 to_print.append("DomainModuleLoad_V1") to_print.append(event[1]["AssemblyID"]) to_print.append(event[1]["ModuleFlags"]) to_print.append(event[1]["ModuleILPath"]) to_print.append(event[1]["ModuleNativePath"]) il_path = event[1]["ModuleILPath"].lower() module_flags = event[1]["ModuleFlags"] # Look for byte stream loaded assemblies if not il_path.endswith(".exe") \ and not il_path.endswith(".dll") \ and "Dynamic" not in module_flags: should_print = True elif event_id == 145: # MethodJittingStarted to_print.append("MethodJittingStarted") to_print.append(event[1]["ModuleID"]) to_print.append(event[1]["MethodNamespace"]) to_print.append(event[1]["MethodName"]) if event[1]["MethodName"] in self.high_risk_method_names \ or event[1]["MethodNamespace"] in self.high_risk_namespaces: should_print = True elif event_id == 185 or event_id == 186: # MethodJitInliningSucceeded and MethodJitInliningFailed if event_id == 185: to_print.append("MethodJitInliningSucceeded") else: to_print.append("MethodJitInliningFailed") to_print.append(event[1]["MethodBeingCompiledNamespace"]) to_print.append(event[1]["MethodBeingCompiledName"]) to_print.append(event[1]["InlinerNamespace"]) to_print.append(event[1]["InlinerName"]) to_print.append(event[1]["InlineeNamespace"]) to_print.append(event[1]["InlineeName"]) if event[1]["InlineeName"] in self.high_risk_method_names \ or event[1]["InlineeNamespace"] in self.high_risk_namespaces: should_print = True elif event_id == 88: # ILStubGenerated to_print.append("ILStubGenerated") to_print.append(event[1]["StubFlags"]) to_print.append(event[1]["ManagedInteropMethodNamespace"]) to_print.append(event[1]["ManagedInteropMethodName"]) if event[1]["ManagedInteropMethodName"] in self.high_risk_method_names: should_print = True elif not self.verbose: return # Print information if self.verbose and should_print: print(event) elif should_print: for n in range(0, len(to_print)): to_print[n] = str(to_print[n]) print(", ".join(to_print)) sys.stdout.flush() def rundown_capture(args): # instantiate class capture = RundownDotNetETW(verbose=args.verbose, high_risk_only=args.high_risk_only) # start capture capture.start() # check capture status while not capture.should_stop_capture: time.sleep(0.1) # stop capture capture.stop() def runtime_capture(args): # instantiate class capture = RuntimeDotNetETW( verbose=args.verbose, high_risk_only=args.high_risk_only, method_tracing=args.enable_method_tracing ) # start capture capture.start() # wait for return input() # stop capture capture.stop() parser = argparse.ArgumentParser() parser.add_argument( "--verbose", help="Displays full details for all event types, as opposed to limited details for specific events", action="store_true" ) parser.add_argument( "--high-risk-only", help="Only show high risk events (byte stream loaded assemblies, high risk method calls etc)", action="store_true" ) parser.add_argument( "--disable-rundown-provider", help="Do not capture current system state - only show events post-startup", action="store_true" ) parser.add_argument( "--enable-method-tracing", help="Enables relevant JIT and Interop events for providing method call visibility", action="store_true" ) args = parser.parse_args() if not args.disable_rundown_provider: threading.Thread(target=rundown_capture, args=(args,)).start() runtime_capture(args)