Skip to content

Instantly share code, notes, and snippets.

@xybytes
Created February 27, 2025 21:14
Show Gist options
  • Save xybytes/333ebed21e9e99804eca36d13b9ff8cb to your computer and use it in GitHub Desktop.
Save xybytes/333ebed21e9e99804eca36d13b9ff8cb to your computer and use it in GitHub Desktop.

Revisions

  1. xybytes created this gist Feb 27, 2025.
    117 changes: 117 additions & 0 deletions oauth_authorization_code_flow.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,117 @@
    import argparse
    import http.server
    import webbrowser
    import urllib.parse
    import requests
    import threading
    import time

    # Configure Argument Parser
    parser = argparse.ArgumentParser(
    description="Retrieve an OAuth 2.0 access token using Authorization Code Flow and call Microsoft APIs."
    )

    parser.add_argument("--tenant-id", required=True, help="Azure AD Tenant ID")
    parser.add_argument("--client-id", required=True, help="Azure AD Application (Client) ID")
    parser.add_argument("--client-secret", required=True, help="Azure AD Application Client Secret")
    parser.add_argument("--scope", choices=["graph", "azure"], required=True,
    help="Choose between 'graph' for Microsoft Graph API or 'azure' for Azure Service Management API.")

    args = parser.parse_args()

    # Configuration
    authority = f"https://login.microsoftonline.com/{args.tenant_id}/oauth2/v2.0"
    redirect_uri = "http://localhost:8000"

    # Determine Scope and API Endpoint Automatically ===
    if args.scope == "graph":
    scope = "https://graph.microsoft.com/User.Read"
    api_endpoint = "https://graph.microsoft.com/v1.0/me" # Get user info
    elif args.scope == "azure":
    scope = "https://management.azure.com/user_impersonation"
    api_endpoint = "https://management.azure.com/subscriptions?api-version=2020-01-01" # List subscriptions

    auth_code = None

    # Open Browser for User Login
    auth_url = (
    f"{authority}/authorize?"
    f"client_id={args.client_id}"
    f"&response_type=code"
    f"&redirect_uri={urllib.parse.quote(redirect_uri)}"
    f"&scope={urllib.parse.quote(scope)}"
    f"&response_mode=query"
    )

    print("Open the following URL in your browser and sign in:\n")
    print(auth_url)

    # Automatically open browser
    webbrowser.open(auth_url)


    # Start Local HTTP Server to Capture Authorization Code
    class OAuthHandler(http.server.BaseHTTPRequestHandler):
    def do_GET(self):
    global auth_code
    parsed_path = urllib.parse.urlparse(self.path)
    params = urllib.parse.parse_qs(parsed_path.query)

    if "code" in params:
    auth_code = params["code"][0]
    self.send_response(200)
    self.end_headers()
    self.wfile.write(b"Authorization successful! You can close this tab.")

    print(f"Authorization Code Received: {auth_code}\n")
    threading.Thread(target=self.server.shutdown).start()

    # Start HTTP server in a separate thread
    server = http.server.HTTPServer(("localhost", 8000), OAuthHandler)
    print("Waiting for authorization code on port 8000...\n")

    # Run the server in a thread so it doesn't block execution
    server_thread = threading.Thread(target=server.serve_forever)
    server_thread.start()

    # Wait until the auth_code is received
    while auth_code is None:
    time.sleep(1)

    server_thread.join()

    # Exchange Authorization Code for Access Token
    token_response = requests.post(
    f"{authority}/token",
    data={
    "client_id": args.client_id,
    "client_secret": args.client_secret,
    "grant_type": "authorization_code",
    "scope": scope,
    "code": auth_code,
    "redirect_uri": redirect_uri,
    },
    headers={"Content-Type": "application/x-www-form-urlencoded"},
    )

    token_json = token_response.json()
    access_token = token_json.get("access_token")

    if not access_token:
    print(f"Error retrieving access token: {token_json}")
    exit(1)

    print("Access Token Retrieved Successfully!\n")
    print(access_token)

    # Use Access Token to Call API Based on Scope ===
    response = requests.get(
    api_endpoint,
    headers={"Authorization": f"Bearer {access_token}", "Accept": "application/json"},
    )

    if response.status_code == 200:
    print("API Response:")
    print(response.json())
    else:
    print(f"API Error: {response.text}")