Skip to content

Instantly share code, notes, and snippets.

@simo97
Last active January 28, 2022 20:10
Show Gist options
  • Save simo97/cb9ddcf706d68cff8df1d0a17ef91c43 to your computer and use it in GitHub Desktop.
Save simo97/cb9ddcf706d68cff8df1d0a17ef91c43 to your computer and use it in GitHub Desktop.

Revisions

  1. simo97 revised this gist Jan 28, 2022. 1 changed file with 29 additions and 0 deletions.
    29 changes: 29 additions & 0 deletions fusion_utils.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,29 @@
    def get_or_create_user(user_id, first_name='', last_name='', middle_name='', *args, **kwargs):
    """
    Get or create a local user from fusionauth's information. Mostly used for local user federation.
    You can actually add more code here to support and make various authentication actions, for example
    to implement a work around the fact that FA don't support yet users having more than one email address.
    :param user_id:
    :param first_name:
    :param last_name:
    :param middle_name:
    :return:
    """
    from django.db.models import Q
    User = get_user_model()
    user_qs = User.objects.filter(
    Q(email=user_id)
    )
    if user_qs.exists():
    return user_qs.first()
    # if not user:
    user = User(
    email=user_id , first_name=first_name ,
    last_name=last_name , **kwargs
    )
    # todo: create the mail here
    user.save()

    return user
  2. simo97 revised this gist Jan 28, 2022. 1 changed file with 4 additions and 44 deletions.
    48 changes: 4 additions & 44 deletions auth.py
    Original file line number Diff line number Diff line change
    @@ -18,47 +18,7 @@ def authenticate(username: str, password: str, *args, **kwargs):

    response = client.login(login_data)
    _status = response.status
    if _status == 200:
    return get_or_create_user(response.success_response["user"]["email"]), None
    if _status == 203:
    return get_or_create_user(response.success_response["user"]["email"]), {
    "detail": response.success_response["changePasswordReason"],
    "message": "changePasswordNeeded",
    }
    if _status == 202:
    return get_or_create_user(response.success_response["user"]["email"]), {
    "detail": "The user is not registered for the application specified by the applicationId on the request."
    }
    if _status == 212:
    return get_or_create_user(response.success_response["user"]["email"]), {
    "detail": "Email need to be verified"
    }
    if _status == 213:
    return get_or_create_user(response.success_response["user"]["email"]), {
    "detail": "Email need to be verified"
    }
    if _status == 242:
    return get_or_create_user(response.success_response["user"]["email"]), {
    "detail": "2FA Required"
    }
    if 200 <= _status <= 299:
    return get_or_create_user(response.success_response["user"]["email"]), None
    if _status == 400:
    return (
    None,
    response.error_response,
    )
    if _status == 401:
    return None, str(_status)
    if _status == 404:
    return None, str(_status)
    if _status == 409:
    return None, response.error_response
    if _status == 410:
    return None, response.error_response
    if _status == 423:
    return None, None
    if _status == 500:
    raise Exception("An error occured")
    if _status == 503:
    raise Exc
    if 200 >= _status <= 299:
    return get_or_create_user(response.success_response["user"]["email"])
    if _status > 299 :
    return None
  3. simo97 created this gist Jan 28, 2022.
    64 changes: 64 additions & 0 deletions auth.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,64 @@
    from fusionauth.fusionauth_client import FusionAuthClient

    class FusionAuthBackend:

    def authenticate(username: str, password: str, *args, **kwargs):
    # fusionauth_client.
    client = FusionAuthClient(settings.FUSION_AUTH_API_KEY, settings.FUSION_AUTH_BASE_URL)
    _tenant_id = settings.FUSION_AUTH_MAIN_TENANT_ID
    if _tenant_id != "":
    client.set_tenant_id(settings.FUSION_AUTH_MAIN_TENANT_ID)

    login_data = {
    "loginId": username,
    "password": password,
    "applicationId": settings.FUSION_AUTH_APP_ID,
    "noJWT": True,
    }

    response = client.login(login_data)
    _status = response.status
    if _status == 200:
    return get_or_create_user(response.success_response["user"]["email"]), None
    if _status == 203:
    return get_or_create_user(response.success_response["user"]["email"]), {
    "detail": response.success_response["changePasswordReason"],
    "message": "changePasswordNeeded",
    }
    if _status == 202:
    return get_or_create_user(response.success_response["user"]["email"]), {
    "detail": "The user is not registered for the application specified by the applicationId on the request."
    }
    if _status == 212:
    return get_or_create_user(response.success_response["user"]["email"]), {
    "detail": "Email need to be verified"
    }
    if _status == 213:
    return get_or_create_user(response.success_response["user"]["email"]), {
    "detail": "Email need to be verified"
    }
    if _status == 242:
    return get_or_create_user(response.success_response["user"]["email"]), {
    "detail": "2FA Required"
    }
    if 200 <= _status <= 299:
    return get_or_create_user(response.success_response["user"]["email"]), None
    if _status == 400:
    return (
    None,
    response.error_response,
    )
    if _status == 401:
    return None, str(_status)
    if _status == 404:
    return None, str(_status)
    if _status == 409:
    return None, response.error_response
    if _status == 410:
    return None, response.error_response
    if _status == 423:
    return None, None
    if _status == 500:
    raise Exception("An error occured")
    if _status == 503:
    raise Exc