Skip to content

Instantly share code, notes, and snippets.

@LyleScott
Last active May 14, 2022 05:51
Show Gist options
  • Select an option

  • Save LyleScott/cd79d3fae54781bb4167ffcb1bc2252d to your computer and use it in GitHub Desktop.

Select an option

Save LyleScott/cd79d3fae54781bb4167ffcb1bc2252d to your computer and use it in GitHub Desktop.

Revisions

  1. LyleScott revised this gist May 14, 2022. 1 changed file with 7 additions and 1 deletion.
    8 changes: 7 additions & 1 deletion gcs_expiring_link.py
    Original file line number Diff line number Diff line change
    @@ -1,7 +1,13 @@
    """
    Example for how to return a https link to a GCS file that expires in {expiration}
    seconds from now.
    Receivers of the link do not need a Google account or any special access.
    Dependencies:
    pip install google-auth
    """

    import argparse
    import binascii
    import collections
  2. LyleScott revised this gist May 14, 2022. 1 changed file with 5 additions and 1 deletion.
    6 changes: 5 additions & 1 deletion gcs_expiring_link.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,7 @@
    """
    pip install google-auth
    """

    import argparse
    import binascii
    import collections
    @@ -11,7 +15,7 @@


    def generate_signed_url(
    service_account_file, bucket_name: str, object_name: str, expiration: int = 604800
    service_account_file: str, bucket_name: str, object_name: str, expiration: int = 600
    ) -> str:
    if expiration > 604800:
    raise ValueError("Expiration can't be longer than 604800 seconds (7 days).")
  3. LyleScott created this gist May 14, 2022.
    119 changes: 119 additions & 0 deletions gcs_expiring_link.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,119 @@
    import argparse
    import binascii
    import collections
    import datetime
    import hashlib
    import sys
    from datetime import datetime, timezone
    from urllib.parse import quote

    from google.oauth2 import service_account


    def generate_signed_url(
    service_account_file, bucket_name: str, object_name: str, expiration: int = 604800
    ) -> str:
    if expiration > 604800:
    raise ValueError("Expiration can't be longer than 604800 seconds (7 days).")

    now = datetime.now(tz=timezone.utc)
    request_timestamp = now.strftime("%Y%m%dT%H%M%SZ")
    datestamp = now.strftime("%Y%m%d")

    google_credentials = service_account.Credentials.from_service_account_file(
    service_account_file
    )
    client_email = google_credentials.service_account_email
    credential_scope = f"{datestamp}/auto/storage/goog4_request"
    credential = f"{client_email}/{credential_scope}"

    host = f"{bucket_name}.storage.googleapis.com"
    headers = {"host": host}

    canonical_headers = ""
    ordered_headers = collections.OrderedDict(sorted(headers.items()))
    for k, v in ordered_headers.items():
    lower_k = str(k).lower()
    strip_v = str(v).lower()
    canonical_headers += f"{lower_k}:{strip_v}\n"

    signed_headers = ""
    for k in ordered_headers.keys():
    signed_headers += f"{str(k).lower()};"
    signed_headers = signed_headers[:-1] # remove trailing ';'

    query_parameters = {
    "X-Goog-Algorithm": "GOOG4-RSA-SHA256",
    "X-Goog-Credential": credential,
    "X-Goog-Date": request_timestamp,
    "X-Goog-Expires": expiration,
    "X-Goog-SignedHeaders": signed_headers,
    }

    canonical_query_string = ""
    ordered_query_parameters = collections.OrderedDict(sorted(query_parameters.items()))
    for k, v in ordered_query_parameters.items():
    encoded_k = quote(str(k), safe="")
    encoded_v = quote(str(v), safe="")
    canonical_query_string += f"{encoded_k}={encoded_v}&"
    canonical_query_string = canonical_query_string[:-1] # remove trailing '&'

    escaped_object_name = quote(object_name, safe=b"/~")
    canonical_uri = f"/{escaped_object_name}"
    canonical_request = "\n".join(
    [
    "GET",
    canonical_uri,
    canonical_query_string,
    canonical_headers,
    signed_headers,
    "UNSIGNED-PAYLOAD",
    ]
    )

    canonical_request_hash = hashlib.sha256(canonical_request.encode()).hexdigest()

    string_to_sign = "\n".join(
    [
    "GOOG4-RSA-SHA256",
    request_timestamp,
    credential_scope,
    canonical_request_hash,
    ]
    )

    # signer.sign() signs using RSA-SHA256 with PKCS1v15 padding
    signature = binascii.hexlify(
    google_credentials.signer.sign(string_to_sign)
    ).decode()

    signed_url = (
    f"https://{host}{canonical_uri}"
    f"?{canonical_query_string}"
    f"&x-goog-signature={signature}"
    )

    return signed_url


    if __name__ == "__main__":
    parser = argparse.ArgumentParser(
    description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
    "service_account_file", help="Path to your Google service account keyfile."
    )
    parser.add_argument("bucket_name", help="Your Cloud Storage bucket name.")
    parser.add_argument("object_name", help="Your Cloud Storage object name.")
    parser.add_argument("expiration", type=int, help="Expiration time.")

    args = parser.parse_args()

    signed_url = generate_signed_url(
    service_account_file=args.service_account_file,
    bucket_name=args.bucket_name,
    object_name=args.object_name,
    expiration=int(args.expiration),
    )

    print(signed_url)