Skip to content

Instantly share code, notes, and snippets.

@bogdangoie
Forked from Takaitra/update_amp_cache.py
Last active July 4, 2019 16:24
Show Gist options
  • Save bogdangoie/9b363f0502fc06fd04aa7f36730e92ab to your computer and use it in GitHub Desktop.
Save bogdangoie/9b363f0502fc06fd04aa7f36730e92ab to your computer and use it in GitHub Desktop.
Update AMP Content
#!/usr/bin/env python3
import base64
import os
import time
from OpenSSL import crypto
import requests
from requests.packages.urllib3 import exceptions as requests_exceptions
CACHE_DOMAIN = 'cdn.ampproject.org'
CURRENT_DIRECTORY = os.path.abspath(os.path.dirname(__file__))
PRIVATE_KEY_FILENAME = 'private-key-amp-cache.pem'
PRIVATE_KEY_PATH = os.path.abspath(os.path.join(
CURRENT_DIRECTORY, PRIVATE_KEY_FILENAME))
"""
Invalidates the AMP cache for a given URL following the guidelines at
https://developers.google.com/amp/cache/update-cache
=============
Prerequisites
=============
1. Create your private and public key.
2. Update the private key variable filename defined above under PRIVATE_KEY_FILENAME if needed (relative path)
3. Publish the public key to your website at /.well-known/amphtml/apikey.pub
4. install the requests and OpenSSL libraries for Python 3
=======
Running
=======
To run this script::
./update_amp_cache.py
"""
class CacheUpdate(object):
_signature = None
_timestamp = None
_update_cache_url = None
def __init__(self, website_url):
self.website_url = website_url
self._cdn_url = 'https://{}/c/s/{}'.format(
CACHE_DOMAIN, self.website_url)
requests.packages.urllib3.disable_warnings(
requests_exceptions.InsecureRequestWarning)
def run(self):
self._timestamp = int(time.time())
self._update_cache_url = (
"/update-cache/c/s/{}?amp_action=flush&amp_ts={}".format(
self.website_url, self._timestamp))
self.calculate_signature()
self.send_request()
def calculate_signature(self):
if not os.path.exists(PRIVATE_KEY_PATH):
print 'Private Key Not Found'
key_file = open(PRIVATE_KEY_PATH, "r")
key = key_file.read()
key_file.close()
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
sign = crypto.sign(pkey, self._update_cache_url, "sha256")
self._signature = base64.urlsafe_b64encode(sign).decode("utf-8")
def send_request(self):
domain = self.website_url.partition('/')[0]
prefix = domain.replace('.', '-')
amp_base_url = '{}.{}'.format(prefix, CACHE_DOMAIN)
url = "https://{}{}&amp_url_signature={}".format(
amp_base_url, self._update_cache_url, self._signature)
r = requests.get(url, verify=False)
if r.status_code == 200:
print("Successfully updated cached AMP content for "
"https://{} at {}".format(self.website_url, self._cdn_url))
else:
print("Cache update failed. Response code: {}".format(
r.status_code))
print(r.content)
if __name__=="__main__":
url_input = input("Website url of which AMP cache should be updated "
"(e.g. 'www.mysite.com/my-article/'): ")
validated_url_input = url_input.strip('https://')
cacheUpdate = CacheUpdate(validated_url_input)
cacheUpdate.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment