#!/usr/bin/env python3 import base64 import os import time from OpenSSL import crypto import requests from requests.packages.urllib3 import exceptions as requests_exceptions CACHE_DOMAIN = 'cdn.ampproject.org' CURRENT_DIRECTORY = os.path.abspath(os.path.dirname(__file__)) PRIVATE_KEY_FILENAME = 'private-key-amp-cache.pem' PRIVATE_KEY_PATH = os.path.abspath(os.path.join( CURRENT_DIRECTORY, PRIVATE_KEY_FILENAME)) """ Invalidates the AMP cache for a given URL following the guidelines at https://developers.google.com/amp/cache/update-cache ============= Prerequisites ============= 1. Create your private and public key. 2. Update the private key variable filename defined above under PRIVATE_KEY_FILENAME if needed (relative path) 3. Publish the public key to your website at /.well-known/amphtml/apikey.pub 4. install the requests and OpenSSL libraries for Python 3 ======= Running ======= To run this script:: ./update_amp_cache.py """ class CacheUpdate(object): _signature = None _timestamp = None _update_cache_url = None def __init__(self, website_url): self.website_url = website_url self._cdn_url = 'https://{}/c/s/{}'.format( CACHE_DOMAIN, self.website_url) requests.packages.urllib3.disable_warnings( requests_exceptions.InsecureRequestWarning) def run(self): self._timestamp = int(time.time()) self._update_cache_url = ( "/update-cache/c/s/{}?amp_action=flush&_ts={}".format( self.website_url, self._timestamp)) self.calculate_signature() self.send_request() def calculate_signature(self): if not os.path.exists(PRIVATE_KEY_PATH): print 'Private Key Not Found' key_file = open(PRIVATE_KEY_PATH, "r") key = key_file.read() key_file.close() pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key) sign = crypto.sign(pkey, self._update_cache_url, "sha256") self._signature = base64.urlsafe_b64encode(sign).decode("utf-8") def send_request(self): domain = self.website_url.partition('/')[0] prefix = domain.replace('.', '-') amp_base_url = '{}.{}'.format(prefix, CACHE_DOMAIN) url = "https://{}{}&_url_signature={}".format( amp_base_url, self._update_cache_url, self._signature) r = requests.get(url, verify=False) if r.status_code == 200: print("Successfully updated cached AMP content for " "https://{} at {}".format(self.website_url, self._cdn_url)) else: print("Cache update failed. Response code: {}".format( r.status_code)) print(r.content) if __name__=="__main__": url_input = input("Website url of which AMP cache should be updated " "(e.g. 'www.mysite.com/my-article/'): ") validated_url_input = url_input.strip('https://') cacheUpdate = CacheUpdate(validated_url_input) cacheUpdate.run()