Skip to content

Instantly share code, notes, and snippets.

@joonathan
Forked from amertkara/aws_utils.py
Last active August 29, 2015 14:19
Show Gist options
  • Select an option

  • Save joonathan/2deb43ee6fba2db653af to your computer and use it in GitHub Desktop.

Select an option

Save joonathan/2deb43ee6fba2db653af to your computer and use it in GitHub Desktop.

Revisions

  1. @amertkara amertkara revised this gist Oct 3, 2014. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions aws_utils.py
    Original file line number Diff line number Diff line change
    @@ -26,6 +26,7 @@ def canonical_message_builder(content, format):
    try:
    m += field + "\n" + content[field] + "\n"
    except KeyError:
    # Build with what you have
    pass

    return str(m)
  2. @amertkara amertkara revised this gist Oct 3, 2014. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions aws_utils.py
    Original file line number Diff line number Diff line change
    @@ -16,9 +16,9 @@ def canonical_message_builder(content, format):
    Args:
    content (dict): Parsed body of the response
    format (list): List of the fields that needs to go into the message
    format (list): List of the fields that need to go into the message
    Returns (str):
    canonical message ready to be verified
    canonical message
    """
    m = ""

  3. @amertkara amertkara revised this gist Oct 3, 2014. 1 changed file with 3 additions and 2 deletions.
    5 changes: 3 additions & 2 deletions aws_utils.py
    Original file line number Diff line number Diff line change
    @@ -3,6 +3,7 @@
    import urllib2
    from M2Crypto import X509
    from base64 import b64decode
    from M2Crypto.Err import M2CryptoError

    SNS_MESSAGE_TYPE_SUB_NOTIFICATION = "SubscriptionConfirmation"
    SNS_MESSAGE_TYPE_NOTIFICATION = "Notification"
    @@ -41,7 +42,7 @@ def verify_sns_notification(request):
    True if he message passes the verification, False otherwise
    Raises:
    ValueError: If the body of the response couldn't be parsed
    Exception: If an error raises during the verification process, straight from OpenSSL
    M2CryptoError: If an error raises during the verification process
    URLError: If the SigningCertURL couldn't be opened
    """
    cert = None
    @@ -85,4 +86,4 @@ def verify_sns_notification(request):
    elif verification_result == 0:
    return False
    else:
    raise Exception("Some error occured while verifying the signature.")
    raise M2CryptoError("Some error occured while verifying the signature.")
  4. @amertkara amertkara revised this gist Oct 3, 2014. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions aws_utils.py
    Original file line number Diff line number Diff line change
    @@ -42,6 +42,7 @@ def verify_sns_notification(request):
    Raises:
    ValueError: If the body of the response couldn't be parsed
    Exception: If an error raises during the verification process, straight from OpenSSL
    URLError: If the SigningCertURL couldn't be opened
    """
    cert = None
    pubkey = None
  5. @amertkara amertkara revised this gist Oct 3, 2014. 1 changed file with 4 additions and 1 deletion.
    5 changes: 4 additions & 1 deletion aws_utils.py
    Original file line number Diff line number Diff line change
    @@ -22,7 +22,10 @@ def canonical_message_builder(content, format):
    m = ""

    for field in sorted(format):
    m += field + "\n" + content[field] + "\n"
    try:
    m += field + "\n" + content[field] + "\n"
    except KeyError:
    pass

    return str(m)

  6. @amertkara amertkara created this gist Oct 3, 2014.
    84 changes: 84 additions & 0 deletions aws_utils.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,84 @@
    # -*- coding: utf-8 -*-
    import json
    import urllib2
    from M2Crypto import X509
    from base64 import b64decode

    SNS_MESSAGE_TYPE_SUB_NOTIFICATION = "SubscriptionConfirmation"
    SNS_MESSAGE_TYPE_NOTIFICATION = "Notification"
    SNS_MESSAGE_TYPE_UNSUB_NOTIFICATION = "UnsubscribeConfirmation"

    def canonical_message_builder(content, format):
    """ Builds the canonical message to be verified.
    Sorts the fields as a requirement from AWS
    Args:
    content (dict): Parsed body of the response
    format (list): List of the fields that needs to go into the message
    Returns (str):
    canonical message ready to be verified
    """
    m = ""

    for field in sorted(format):
    m += field + "\n" + content[field] + "\n"

    return str(m)


    def verify_sns_notification(request):
    """ Takes a notification request from Amazon push service SNS and verifies the origin of the notification.
    Kudos to Artur Rodrigues for suggesting M2Crypto: http://goo.gl/KAgPPc
    Args:
    request (HTTPRequest): The request object that is passed to the view function
    Returns (bool):
    True if he message passes the verification, False otherwise
    Raises:
    ValueError: If the body of the response couldn't be parsed
    Exception: If an error raises during the verification process, straight from OpenSSL
    """
    cert = None
    pubkey = None
    canonical_message = None
    canonical_sub_unsub_format = ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"]
    canonical_notification_format = ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"]

    content = json.loads(request.body)
    decoded_signature = b64decode(content["Signature"])

    # Depending on the message type, canonical message format varies: http://goo.gl/oSrJl8
    if request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_SUB_NOTIFICATION or \
    request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_UNSUB_NOTIFICATION:

    canonical_message = canonical_message_builder(content, canonical_sub_unsub_format)

    elif request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_NOTIFICATION:

    canonical_message = canonical_message_builder(content, canonical_notification_format)

    else:
    raise ValueError("Message Type (%s) is not recognized" % request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None))

    # Load the certificate and extract the public key
    cert = X509.load_cert_string(str(urllib2.urlopen(content["SigningCertURL"]).read()))
    pubkey = cert.get_pubkey()
    pubkey.reset_context(md='sha1')
    pubkey.verify_init()

    # Feed the canonical message to sign it with the public key from the certificate
    pubkey.verify_update(canonical_message)

    # M2Crypto uses EVP_VerifyFinal() from openssl as the underlying verification function.
    # http://goo.gl/Bk2G36: "EVP_VerifyFinal() returns 1 for a correct signature, 0 for failure and -1
    # if some other error occurred."
    verification_result = pubkey.verify_final(decoded_signature)

    if verification_result == 1:
    return True
    elif verification_result == 0:
    return False
    else:
    raise Exception("Some error occured while verifying the signature.")