Skip to content

Instantly share code, notes, and snippets.

@xybytes
Created February 27, 2025 21:14
Show Gist options
  • Save xybytes/333ebed21e9e99804eca36d13b9ff8cb to your computer and use it in GitHub Desktop.
Save xybytes/333ebed21e9e99804eca36d13b9ff8cb to your computer and use it in GitHub Desktop.
Retrieve an OAuth 2.0 access token using Authorization Code Flow and call Microsoft APIs
import argparse
import http.server
import webbrowser
import urllib.parse
import requests
import threading
import time
# Configure Argument Parser
parser = argparse.ArgumentParser(
description="Retrieve an OAuth 2.0 access token using Authorization Code Flow and call Microsoft APIs."
)
parser.add_argument("--tenant-id", required=True, help="Azure AD Tenant ID")
parser.add_argument("--client-id", required=True, help="Azure AD Application (Client) ID")
parser.add_argument("--client-secret", required=True, help="Azure AD Application Client Secret")
parser.add_argument("--scope", choices=["graph", "azure"], required=True,
help="Choose between 'graph' for Microsoft Graph API or 'azure' for Azure Service Management API.")
args = parser.parse_args()
# Configuration
authority = f"https://login.microsoftonline.com/{args.tenant_id}/oauth2/v2.0"
redirect_uri = "http://localhost:8000"
# Determine Scope and API Endpoint Automatically ===
if args.scope == "graph":
scope = "https://graph.microsoft.com/User.Read"
api_endpoint = "https://graph.microsoft.com/v1.0/me" # Get user info
elif args.scope == "azure":
scope = "https://management.azure.com/user_impersonation"
api_endpoint = "https://management.azure.com/subscriptions?api-version=2020-01-01" # List subscriptions
auth_code = None
# Open Browser for User Login
auth_url = (
f"{authority}/authorize?"
f"client_id={args.client_id}"
f"&response_type=code"
f"&redirect_uri={urllib.parse.quote(redirect_uri)}"
f"&scope={urllib.parse.quote(scope)}"
f"&response_mode=query"
)
print("Open the following URL in your browser and sign in:\n")
print(auth_url)
# Automatically open browser
webbrowser.open(auth_url)
# Start Local HTTP Server to Capture Authorization Code
class OAuthHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
global auth_code
parsed_path = urllib.parse.urlparse(self.path)
params = urllib.parse.parse_qs(parsed_path.query)
if "code" in params:
auth_code = params["code"][0]
self.send_response(200)
self.end_headers()
self.wfile.write(b"Authorization successful! You can close this tab.")
print(f"Authorization Code Received: {auth_code}\n")
threading.Thread(target=self.server.shutdown).start()
# Start HTTP server in a separate thread
server = http.server.HTTPServer(("localhost", 8000), OAuthHandler)
print("Waiting for authorization code on port 8000...\n")
# Run the server in a thread so it doesn't block execution
server_thread = threading.Thread(target=server.serve_forever)
server_thread.start()
# Wait until the auth_code is received
while auth_code is None:
time.sleep(1)
server_thread.join()
# Exchange Authorization Code for Access Token
token_response = requests.post(
f"{authority}/token",
data={
"client_id": args.client_id,
"client_secret": args.client_secret,
"grant_type": "authorization_code",
"scope": scope,
"code": auth_code,
"redirect_uri": redirect_uri,
},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
token_json = token_response.json()
access_token = token_json.get("access_token")
if not access_token:
print(f"Error retrieving access token: {token_json}")
exit(1)
print("Access Token Retrieved Successfully!\n")
print(access_token)
# Use Access Token to Call API Based on Scope ===
response = requests.get(
api_endpoint,
headers={"Authorization": f"Bearer {access_token}", "Accept": "application/json"},
)
if response.status_code == 200:
print("API Response:")
print(response.json())
else:
print(f"API Error: {response.text}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment