|  | import json | 
        
          |  | import os | 
        
          |  | from certvalidator import CertificateValidator, ValidationContext, errors | 
        
          |  | import boto3 | 
        
          |  | from asn1crypto import pem | 
        
          |  |  | 
        
          |  | ''' | 
        
          |  | Let's load our truststore from s3. Doing this outside of handler function so that this will be loaded only on coldstart. | 
        
          |  | If the truststore contents change, you need to update the lambda env var 'TRUSTSTORE_FILE_VERSIONID' | 
        
          |  | with the new files versionId. And also update the same in 'API Gateway > Custom domain names > Domain details > Truststore version' and wait till Status becomes Available. | 
        
          |  | If APi Gateway finds some problem with the truststore, such as could not find complete chain, it will display a warning. The warning details will tell you which cert it has a problem with and the problem. You need to fix the truststore chain till this warning goes away. | 
        
          |  | ''' | 
        
          |  |  | 
        
          |  | s3_client = boto3.client('s3') | 
        
          |  |  | 
        
          |  | bucket = os.environ.get('TRUSTSTORE_BUCKET') | 
        
          |  | key = os.environ.get('TRUSTSTORE_FILENAME') | 
        
          |  | versionId = os.environ.get('TRUSTSTORE_FILE_VERSIONID') | 
        
          |  |  | 
        
          |  | download_path = '/tmp/{}'.format(key) | 
        
          |  | s3_client.download_file(bucket, key, download_path, ExtraArgs={'VersionId': versionId}) | 
        
          |  |  | 
        
          |  | trust_roots = [] | 
        
          |  | with open(download_path, 'rb') as f: | 
        
          |  | for _, _, der_bytes in pem.unarmor(f.read(), multiple=True): | 
        
          |  | trust_roots.append(der_bytes) | 
        
          |  |  | 
        
          |  | def lambda_handler(event, context): | 
        
          |  | print("Received event: " + json.dumps(event, indent=2)) | 
        
          |  | ''' Get the client cert from lambda event ''' | 
        
          |  | cert = event["requestContext"]["authentication"]["clientCert"]["clientCertPem"].encode() | 
        
          |  |  | 
        
          |  | ''' | 
        
          |  | hard-fail mode, any error in checking revocation is considered a failure. However, if there is no known source of revocation information, it is not considered a failure. | 
        
          |  | This allows us to keep using self signed certs too. | 
        
          |  | ''' | 
        
          |  | context = ValidationContext(allow_fetching=True, revocation_mode="hard-fail", trust_roots=trust_roots) | 
        
          |  |  | 
        
          |  | try: | 
        
          |  | validator = CertificateValidator(cert, validation_context=context) | 
        
          |  | validator.validate_usage( | 
        
          |  | set(['digital_signature', 'key_encipherment']), | 
        
          |  | set(['server_auth', 'client_auth']), | 
        
          |  | True | 
        
          |  | ) | 
        
          |  | except Exception as inst: | 
        
          |  | print(inst) | 
        
          |  | print("The certificate could not be validated") | 
        
          |  | return { | 
        
          |  | "isAuthorized": "false", | 
        
          |  | "context": { | 
        
          |  | "exception": str(inst.args) | 
        
          |  | } | 
        
          |  | } | 
        
          |  | else: | 
        
          |  | print("The certificate is ok") | 
        
          |  | return { | 
        
          |  | "isAuthorized": "true", | 
        
          |  | "context": { | 
        
          |  | "exception": None | 
        
          |  | } | 
        
          |  | } |