Skip to content

Instantly share code, notes, and snippets.

@jimmyislive
Last active February 21, 2019 21:19
Show Gist options
  • Select an option

  • Save jimmyislive/c5d12acc128dab0df8b9 to your computer and use it in GitHub Desktop.

Select an option

Save jimmyislive/c5d12acc128dab0df8b9 to your computer and use it in GitHub Desktop.

Revisions

  1. jimmyislive revised this gist Oct 23, 2015. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions aws_kms.py
    Original file line number Diff line number Diff line change
    @@ -1,6 +1,7 @@
    # Define your AWS_KMS_ARN, KMS_REGION, KMS_AWS_ACCESS_KEY_ID, KMS_AWS_SECRET_ACCESS_KEY someplace
    import base64
    import boto3
    from Crypto.Cipher import AES

    class AwsKms(object):

  2. jimmyislive created this gist Oct 23, 2015.
    44 changes: 44 additions & 0 deletions aws_kms.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,44 @@
    # Define your AWS_KMS_ARN, KMS_REGION, KMS_AWS_ACCESS_KEY_ID, KMS_AWS_SECRET_ACCESS_KEY someplace
    import base64
    import boto3

    class AwsKms(object):

    def __init__(self):
    self.key_id = AWS_KMS_ARN
    self.client = boto3.client('kms',
    region_name=KMS_REGION,
    aws_access_key_id=KMS_AWS_ACCESS_KEY_ID,
    aws_secret_access_key=KMS_AWS_SECRET_ACCESS_KEY)

    def generate_data_key(self, key_spec='AES_256'):
    """returns plaintext and encrypted key.
    Store the encrypted key / Use the plaintext key and promptly discard
    """
    response = self.client.generate_data_key(KeyId=self.key_id, KeySpec=key_spec)
    if response['ResponseMetadata']['HTTPStatusCode'] == 200:
    return base64.b64encode(response['CiphertextBlob'])
    # if you cannot generate the symmetric key itself, something is wrong with your
    # credentials...bail out
    raise Exception('Error while generating data key: {0}'.format(response))

    def get_plaintext_symmetric_key(self, cipherkey):
    response = self.client.decrypt(CiphertextBlob=base64.b64decode(cipherkey))
    if response['ResponseMetadata']['HTTPStatusCode'] == 200:
    return response['Plaintext']

    def encrypt(self, plaintext, cipherkey):
    symmetric_key = self.get_plaintext_symmetric_key(cipherkey)
    if not symmetric_key:
    # either log something or raise an exception here...
    return
    return base64.b64encode(AES.new(symmetric_key, AES.MODE_CFB).encrypt(plaintext))

    def decrypt(self, ciphertext, cipherkey):
    symmetric_key = self.get_plaintext_symmetric_key(cipherkey)
    if not symmetric_key:
    # either log something or raise an exception here...
    return
    return AES.new(symmetric_key, AES.MODE_CFB).decrypt(base64.b64decode(ciphertext))