|
import argparse |
|
import http.server |
|
import webbrowser |
|
import urllib.parse |
|
import requests |
|
import threading |
|
import time |
|
|
|
# Configure Argument Parser |
|
parser = argparse.ArgumentParser( |
|
description="Retrieve an OAuth 2.0 access token using Authorization Code Flow and call Microsoft APIs." |
|
) |
|
|
|
parser.add_argument("--tenant-id", required=True, help="Azure AD Tenant ID") |
|
parser.add_argument("--client-id", required=True, help="Azure AD Application (Client) ID") |
|
parser.add_argument("--client-secret", required=True, help="Azure AD Application Client Secret") |
|
parser.add_argument("--scope", choices=["graph", "azure"], required=True, |
|
help="Choose between 'graph' for Microsoft Graph API or 'azure' for Azure Service Management API.") |
|
|
|
args = parser.parse_args() |
|
|
|
# Configuration |
|
authority = f"https://login.microsoftonline.com/{args.tenant_id}/oauth2/v2.0" |
|
redirect_uri = "http://localhost:8000" |
|
|
|
# Determine Scope and API Endpoint Automatically === |
|
if args.scope == "graph": |
|
scope = "https://graph.microsoft.com/User.Read" |
|
api_endpoint = "https://graph.microsoft.com/v1.0/me" # Get user info |
|
elif args.scope == "azure": |
|
scope = "https://management.azure.com/user_impersonation" |
|
api_endpoint = "https://management.azure.com/subscriptions?api-version=2020-01-01" # List subscriptions |
|
|
|
auth_code = None |
|
|
|
# Open Browser for User Login |
|
auth_url = ( |
|
f"{authority}/authorize?" |
|
f"client_id={args.client_id}" |
|
f"&response_type=code" |
|
f"&redirect_uri={urllib.parse.quote(redirect_uri)}" |
|
f"&scope={urllib.parse.quote(scope)}" |
|
f"&response_mode=query" |
|
) |
|
|
|
print("Open the following URL in your browser and sign in:\n") |
|
print(auth_url) |
|
|
|
# Automatically open browser |
|
webbrowser.open(auth_url) |
|
|
|
|
|
# Start Local HTTP Server to Capture Authorization Code |
|
class OAuthHandler(http.server.BaseHTTPRequestHandler): |
|
def do_GET(self): |
|
global auth_code |
|
parsed_path = urllib.parse.urlparse(self.path) |
|
params = urllib.parse.parse_qs(parsed_path.query) |
|
|
|
if "code" in params: |
|
auth_code = params["code"][0] |
|
self.send_response(200) |
|
self.end_headers() |
|
self.wfile.write(b"Authorization successful! You can close this tab.") |
|
|
|
print(f"Authorization Code Received: {auth_code}\n") |
|
threading.Thread(target=self.server.shutdown).start() |
|
|
|
# Start HTTP server in a separate thread |
|
server = http.server.HTTPServer(("localhost", 8000), OAuthHandler) |
|
print("Waiting for authorization code on port 8000...\n") |
|
|
|
# Run the server in a thread so it doesn't block execution |
|
server_thread = threading.Thread(target=server.serve_forever) |
|
server_thread.start() |
|
|
|
# Wait until the auth_code is received |
|
while auth_code is None: |
|
time.sleep(1) |
|
|
|
server_thread.join() |
|
|
|
# Exchange Authorization Code for Access Token |
|
token_response = requests.post( |
|
f"{authority}/token", |
|
data={ |
|
"client_id": args.client_id, |
|
"client_secret": args.client_secret, |
|
"grant_type": "authorization_code", |
|
"scope": scope, |
|
"code": auth_code, |
|
"redirect_uri": redirect_uri, |
|
}, |
|
headers={"Content-Type": "application/x-www-form-urlencoded"}, |
|
) |
|
|
|
token_json = token_response.json() |
|
access_token = token_json.get("access_token") |
|
|
|
if not access_token: |
|
print(f"Error retrieving access token: {token_json}") |
|
exit(1) |
|
|
|
print("Access Token Retrieved Successfully!\n") |
|
print(access_token) |
|
|
|
# Use Access Token to Call API Based on Scope === |
|
response = requests.get( |
|
api_endpoint, |
|
headers={"Authorization": f"Bearer {access_token}", "Accept": "application/json"}, |
|
) |
|
|
|
if response.status_code == 200: |
|
print("API Response:") |
|
print(response.json()) |
|
else: |
|
print(f"API Error: {response.text}") |