Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save xcdr/f0bb777de7a2e40449e5b06df7ac9605 to your computer and use it in GitHub Desktop.

Select an option

Save xcdr/f0bb777de7a2e40449e5b06df7ac9605 to your computer and use it in GitHub Desktop.

Revisions

  1. @bholagabbar bholagabbar created this gist Sep 8, 2017.
    98 changes: 98 additions & 0 deletions auth0_django_middleware.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,98 @@
    from django.http import JsonResponse
    from jose import jwt


    AUTH0_DOMAIN = {YOUR_AUTH0_DOMAIN}
    API_AUDIENCE = {YOUR_API_AUDIENCE}
    ALGORITHMS = ["RS256"]

    """
    Cache the key available at https://{AUTH0_DOMAIN}/.well-known/jwks.json as a python dict
    """
    AUTH0_PUBLIC_KEY = {}


    class Auth0Middleware(object):
    def __init__(self, get_response):
    self.get_response = get_response

    def __call__(self, request):

    # GET TOKEN
    auth = request.META.get('HTTP_AUTHORIZATION')

    if not auth:
    return JsonResponse(data={"code": "authorization_header_missing",
    "description":
    "Authorization header is expected"}, status=401)

    parts = auth.split()

    if parts[0].lower() != "bearer":
    return JsonResponse(data={"code": "invalid_header",
    "description":
    "Authorization header must start with"
    "Bearer"}, status=401)
    elif len(parts) == 1:
    return JsonResponse(data={"code": "invalid_header",
    "description": "Token not found"}, status=401)
    elif len(parts) > 2:
    return JsonResponse(data={"code": "invalid_header",
    "description": "Authorization header must be"
    "Bearer token"}, status=401)

    token = parts[1]

    # VALIDATE TOKEN

    jwks = AUTH0_PUBLIC_KEY
    try:
    unverified_header = jwt.get_unverified_header(token)
    except jwt.JWTError:

    return JsonResponse(data={"code": "invalid_header",
    "description": "Invalid header. "
    "Use an RS256 signed JWT Access Token"}, status=401)

    if unverified_header["alg"] == "HS256":
    return JsonResponse(data={"code": "invalid_header",
    "description": "Invalid header. "
    "Use an RS256 signed JWT Access Token"}, status=401)

    rsa_key = {}
    for key in jwks["keys"]:
    if key["kid"] == unverified_header["kid"]:
    rsa_key = {
    "kty": key["kty"],
    "kid": key["kid"],
    "use": key["use"],
    "n": key["n"],
    "e": key["e"]
    }
    if rsa_key:
    try:
    jwt.decode(
    token,
    rsa_key,
    algorithms=ALGORITHMS,
    audience=API_AUDIENCE,
    issuer="https://" + AUTH0_DOMAIN + "/"
    )

    except jwt.ExpiredSignatureError:
    return JsonResponse(data={"code": "token_expired",
    "description": "token is expired"}, status=401)
    except jwt.JWTClaimsError:
    return JsonResponse(data={"code": "invalid_claims",
    "description": "incorrect claims,"
    " please check the audience and issuer"}, status=401)
    except Exception:
    return JsonResponse(data={"code": "invalid_header",
    "description": "Unable to parse authentication"
    " token."}, status=400)
    else:
    return JsonResponse(data={"code": "invalid_header",
    "description": "Unable to find appropriate key"}, status=401)

    response = self.get_response(request)
    return response