#!/usr/bin/env python3 """ M365 OSINT Reconnaissance Tool Based on techniques from: https://dstreefkerk.github.io/2025-07-m365-email-osint-after-lockdown/ This script performs modern M365/Azure AD reconnaissance after Microsoft's lockdown of traditional enumeration methods. It uses multiple validation techniques to discover organizational information and attempts to infer MOERA domains. """ import requests import dns.resolver import xml.etree.ElementTree as ET import re import sys import argparse from urllib.parse import quote import json from typing import Dict, List, Optional, Tuple import time class M365Recon: def __init__(self): self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) def get_user_realm_info(self, domain: str) -> Dict: """ Query GetUserRealm endpoint to extract organization information """ url = f"https://login.microsoftonline.com/getuserrealm.srf?login=admin@{domain}&xml=1" try: response = self.session.get(url, timeout=10) response.raise_for_status() # Parse XML response root = ET.fromstring(response.text) result = { 'domain': domain, 'organization_name': root.find('.//FederationBrandName').text if root.find('.//FederationBrandName') is not None else None, 'namespace_type': root.find('.//NameSpaceType').text if root.find('.//NameSpaceType') is not None else None, 'is_federated': False, 'auth_url': None, 'tenant_exists': True } if result['namespace_type'] == 'Federated': result['is_federated'] = True auth_url_elem = root.find('.//AuthURL') if auth_url_elem is not None: result['auth_url'] = auth_url_elem.text return result except requests.exceptions.RequestException as e: print(f"[!] Error querying GetUserRealm for {domain}: {e}") return {'domain': domain, 'tenant_exists': False} except ET.ParseError as e: print(f"[!] Error parsing XML response for {domain}: {e}") return {'domain': domain, 'tenant_exists': False} def get_oidc_metadata(self, domain: str) -> Dict: """ Query OIDC metadata endpoint to extract tenant ID """ url = f"https://login.microsoftonline.com/{domain}/v2.0/.well-known/openid-configuration" try: response = self.session.get(url, timeout=10) response.raise_for_status() metadata = response.json() issuer = metadata.get('issuer', '') # Extract tenant ID from issuer URL tenant_id_match = re.search(r'https://login\.microsoftonline\.com/([^/]+)/v2\.0', issuer) tenant_id = tenant_id_match.group(1) if tenant_id_match else None return { 'domain': domain, 'tenant_id': tenant_id, 'issuer': issuer, 'oidc_available': True } except requests.exceptions.RequestException: return {'domain': domain, 'oidc_available': False} except json.JSONDecodeError: return {'domain': domain, 'oidc_available': False} def check_eop_smart_host(self, domain: str) -> bool: """ Check if domain has Exchange Online Protection smart host configured """ smart_host = domain.replace('.', '-') + '.mail.protection.outlook.com' try: dns.resolver.resolve(smart_host, 'A') return True except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, Exception): return False def check_m365_dns_indicators(self, domain: str) -> Dict: """ Check various DNS indicators for M365 usage """ indicators = { 'msoid': False, 'txt_verification': False, 'autodiscover': False, 'enterprise_reg': False, 'dkim_selectors': False, 'mx_records': [], 'spf_record': None } # Check MSOID try: answers = dns.resolver.resolve(f'msoid.{domain}', 'CNAME') for answer in answers: if 'clientconfig.microsoftonline' in str(answer): indicators['msoid'] = True break except: pass # Check TXT records for MS verification try: answers = dns.resolver.resolve(domain, 'TXT') for answer in answers: txt_string = str(answer).strip('"') if txt_string.startswith('MS=ms'): indicators['txt_verification'] = True elif txt_string.startswith('v=spf1'): indicators['spf_record'] = txt_string except: pass # Check Autodiscover try: answers = dns.resolver.resolve(f'autodiscover.{domain}', 'CNAME') for answer in answers: if str(answer) == 'autodiscover.outlook.com.': indicators['autodiscover'] = True break except: pass # Check Enterprise Registration try: answers = dns.resolver.resolve(f'enterpriseregistration.{domain}', 'CNAME') for answer in answers: if str(answer) == 'enterpriseregistration.windows.net.': indicators['enterprise_reg'] = True break except: pass # Check DKIM selectors for selector in ['selector1', 'selector2']: try: dns.resolver.resolve(f'{selector}._domainkey.{domain}', 'CNAME') indicators['dkim_selectors'] = True break except: continue # Check MX records try: answers = dns.resolver.resolve(domain, 'MX') for answer in answers: indicators['mx_records'].append(str(answer.exchange)) except: pass return indicators def generate_moera_candidates(self, org_name: str, domain: str) -> List[Tuple[str, str, int]]: """ Generate MOERA domain candidates based on organization name and domain Returns list of (candidate_domain, generation_method, confidence_score) """ candidates = [] if not org_name: return candidates # Normalize organization name normalized = re.sub(r'[^a-zA-Z0-9\s]', '', org_name).lower() normalized = re.sub(r'\s+', ' ', normalized).strip() # Remove common corporate suffixes suffixes_to_remove = [ 'corporation', 'corp', 'company', 'co', 'limited', 'ltd', 'llc', 'inc', 'incorporated', 'pty', 'group', 'holdings', 'international', 'global', 'worldwide', 'enterprises', 'solutions', 'services' ] words = normalized.split() filtered_words = [w for w in words if w not in suffixes_to_remove] if not filtered_words: filtered_words = words # Fallback to original if we filtered everything # Method 1: Single word (highest confidence) if len(filtered_words) == 1: candidates.append((f"{filtered_words[0]}.onmicrosoft.com", "single_word", 90)) # Method 2: Concatenated words (high confidence) if len(filtered_words) > 1: concatenated = ''.join(filtered_words) candidates.append((f"{concatenated}.onmicrosoft.com", "concatenated", 80)) # Method 3: Acronym from organization name (medium confidence) if len(filtered_words) > 1: acronym = ''.join([w[0] for w in filtered_words if w]) if len(acronym) >= 2: candidates.append((f"{acronym}.onmicrosoft.com", "acronym", 60)) # Method 4: Domain-based candidates (medium confidence) domain_base = domain.split('.')[0] candidates.append((f"{domain_base}.onmicrosoft.com", "domain_based", 70)) # Method 5: First word only (lower confidence) if len(filtered_words) > 1: candidates.append((f"{filtered_words[0]}.onmicrosoft.com", "first_word", 50)) # Remove duplicates while preserving highest confidence scores unique_candidates = {} for candidate, method, confidence in candidates: if candidate not in unique_candidates or confidence > unique_candidates[candidate][1]: unique_candidates[candidate] = (method, confidence) return [(domain, method, confidence) for domain, (method, confidence) in unique_candidates.items()] def validate_moera_domain(self, candidate: str) -> bool: """ Validate MOERA domain by checking MX records """ try: answers = dns.resolver.resolve(candidate, 'MX') for answer in answers: if 'mail.protection.outlook.com' in str(answer.exchange): return True return False except: return False def assess_m365_confidence(self, indicators: Dict, eop_smart_host: bool) -> str: """ Assess M365 usage confidence based on various indicators """ if indicators['autodiscover'] or indicators['dkim_selectors'] or eop_smart_host: return "Yes" elif indicators['enterprise_reg']: return "Likely" elif indicators['msoid'] or indicators['txt_verification']: return "Possibly" elif any('outlook.com' in mx for mx in indicators['mx_records']): return "Likely" else: return "No" def analyze_domain(self, domain: str) -> Dict: """ Perform comprehensive analysis of a domain """ print(f"\n[*] Analyzing domain: {domain}") results = { 'domain': domain, 'user_realm': {}, 'oidc_metadata': {}, 'dns_indicators': {}, 'eop_smart_host': False, 'm365_confidence': 'No', 'moera_candidates': [], 'validated_moera': None } # Get user realm information print("[*] Querying GetUserRealm...") results['user_realm'] = self.get_user_realm_info(domain) # Get OIDC metadata print("[*] Querying OIDC metadata...") results['oidc_metadata'] = self.get_oidc_metadata(domain) # Check DNS indicators print("[*] Checking DNS indicators...") results['dns_indicators'] = self.check_m365_dns_indicators(domain) # Check EOP smart host print("[*] Checking EOP smart host...") results['eop_smart_host'] = self.check_eop_smart_host(domain) # Assess M365 confidence results['m365_confidence'] = self.assess_m365_confidence( results['dns_indicators'], results['eop_smart_host'] ) # Generate and validate MOERA candidates if results['user_realm'].get('organization_name'): print("[*] Generating MOERA candidates...") candidates = self.generate_moera_candidates( results['user_realm']['organization_name'], domain ) validated_candidates = [] for candidate, method, confidence in candidates: print(f"[*] Validating MOERA candidate: {candidate}") if self.validate_moera_domain(candidate): validated_candidates.append((candidate, method, confidence, True)) if not results['validated_moera'] or confidence > results['validated_moera'][2]: results['validated_moera'] = (candidate, method, confidence, True) else: validated_candidates.append((candidate, method, confidence, False)) # Add small delay to avoid overwhelming DNS servers time.sleep(0.1) results['moera_candidates'] = validated_candidates return results def print_results(self, results: Dict): """ Print formatted results """ domain = results['domain'] print(f"\n{'='*60}") print(f"ANALYSIS RESULTS FOR: {domain}") print(f"{'='*60}") # Basic tenant information user_realm = results['user_realm'] if user_realm.get('tenant_exists'): print(f"\n[+] TENANT INFORMATION") print(f" Organization Name: {user_realm.get('organization_name', 'N/A')}") print(f" Federated: {'Yes' if user_realm.get('is_federated') else 'No'}") if user_realm.get('auth_url'): print(f" Auth URL: {user_realm['auth_url']}") else: print(f"\n[-] No Azure AD tenant found for {domain}") return # OIDC metadata oidc = results['oidc_metadata'] if oidc.get('oidc_available'): print(f"\n[+] OIDC METADATA") print(f" Tenant ID: {oidc.get('tenant_id', 'N/A')}") print(f" Issuer: {oidc.get('issuer', 'N/A')}") # M365 confidence assessment print(f"\n[+] M365 USAGE ASSESSMENT") print(f" Confidence Level: {results['m365_confidence']}") # DNS indicators indicators = results['dns_indicators'] print(f"\n[+] DNS INDICATORS") print(f" MSOID Record: {'Yes' if indicators['msoid'] else 'No'}") print(f" MS TXT Verification: {'Yes' if indicators['txt_verification'] else 'No'}") print(f" Autodiscover: {'Yes' if indicators['autodiscover'] else 'No'}") print(f" Enterprise Registration: {'Yes' if indicators['enterprise_reg'] else 'No'}") print(f" DKIM Selectors: {'Yes' if indicators['dkim_selectors'] else 'No'}") print(f" EOP Smart Host: {'Yes' if results['eop_smart_host'] else 'No'}") if indicators['mx_records']: print(f" MX Records:") for mx in indicators['mx_records']: print(f" - {mx}") if indicators['spf_record']: print(f" SPF Record: {indicators['spf_record']}") # MOERA domain candidates if results['moera_candidates']: print(f"\n[+] MOERA DOMAIN ANALYSIS") if results['validated_moera']: candidate, method, confidence, valid = results['validated_moera'] print(f" Best Match: {candidate} (Confidence: {confidence}%, Method: {method})") print(f" All Candidates:") for candidate, method, confidence, valid in results['moera_candidates']: status = "✓ VALID" if valid else "✗ Invalid" inference = " (Inferred)" if confidence < 80 else "" print(f" {candidate} - {status} ({confidence}% confidence, {method}){inference}") else: print(f"\n[-] No MOERA candidates generated (organization name required)") def main(): parser = argparse.ArgumentParser( description='M365 OSINT Reconnaissance Tool', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python3 m365_recon.py microsoft.com python3 m365_recon.py contoso.com --output results.json Based on techniques from: https://dstreefkerk.github.io/2025-07-m365-email-osint-after-lockdown/ """ ) parser.add_argument('domain', help='Target domain to analyze') parser.add_argument('--output', '-o', help='Output results to JSON file') parser.add_argument('--quiet', '-q', action='store_true', help='Suppress progress messages') args = parser.parse_args() # Validate domain format domain_pattern = re.compile( r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$' ) if not domain_pattern.match(args.domain): print(f"[!] Invalid domain format: {args.domain}") sys.exit(1) # Initialize reconnaissance tool recon = M365Recon() try: # Analyze the domain results = recon.analyze_domain(args.domain) # Print results if not args.quiet: recon.print_results(results) # Save to file if requested if args.output: with open(args.output, 'w') as f: json.dump(results, f, indent=2) print(f"\n[+] Results saved to: {args.output}") except KeyboardInterrupt: print(f"\n[!] Analysis interrupted by user") sys.exit(1) except Exception as e: print(f"\n[!] Error during analysis: {e}") sys.exit(1) if __name__ == "__main__": main()