Skip to content

Instantly share code, notes, and snippets.

@aytvill
Forked from edemnati/class - gcp_cloud_util.py
Created August 19, 2023 18:37
Show Gist options
  • Select an option

  • Save aytvill/eb54f978cbe86c397b4ff6b95b58e670 to your computer and use it in GitHub Desktop.

Select an option

Save aytvill/eb54f978cbe86c397b4ff6b95b58e670 to your computer and use it in GitHub Desktop.

Revisions

  1. @edemnati edemnati revised this gist Oct 1, 2019. 1 changed file with 0 additions and 7 deletions.
    7 changes: 0 additions & 7 deletions class - gcp_cloud_util.py
    Original file line number Diff line number Diff line change
    @@ -40,9 +40,6 @@ def get_projectid(self):
    def connect(self,bucketid):
    #Open connection
    try:
    #if self.sa_path:
    # self.pubsub_client = self.pubsub_v1.PublisherClient.from_service_account_file(self.sa_path)
    #else
    storage_client = self.storage.Client(project=self.projectid)
    self.bucketid=bucketid
    self.bucket = storage_client.get_bucket(self.bucketid)
    @@ -82,10 +79,6 @@ def get_projectid(self):
    def connect(self,topic):
    #Open connection
    try:
    #if self.sa_path:
    # self.pubsub_client = self.pubsub_v1.PublisherClient.from_service_account_file(self.sa_path)
    # self.topic = self.pubsub_client.topic_path(self.projectid, topic) #topic = 'projects/' + PROJECTID + '/topics/' + topic
    #else
    self.pubsub_client = self.pubsub_v1.PublisherClient()
    self.topic = self.pubsub_client.topic_path(self.projectid, topic) #topic = 'projects/' + PROJECTID + '/topics/' + topic
    print('Connected to Cloud pubsub: Project/topic: {}/{}'.format(self.projectid,self.topic))
  2. @edemnati edemnati created this gist Oct 1, 2019.
    157 changes: 157 additions & 0 deletions class - gcp_cloud_util.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,157 @@
    import os

    class gcp_util:
    def get_projectid(self,projectid=None):
    if projectid is None:
    env_var=['GCP_PROJECT','GOOGLE_CLOUD_PROJECT']
    for v in env_var:
    try:
    my_projectid=os.environ.get(v)
    print('OK')
    break
    except:
    pass
    try:
    my_projectid
    except:
    print("Error: Failed to get projectid from environment variables")
    raise
    else:
    my_projectid=projectid

    return my_projectid

    class gcp_storage:
    def __init__(self,projectid=None,sa_path=None):
    from google.cloud import storage
    self.storage=storage

    util=gcp_util()
    self.projectid=util.get_projectid(projectid)

    if sa_path:
    os.environ['GOOGLE_APPLICATION_CREDENTIALS']=sa_path
    self.sa_path=sa_path

    def get_projectid(self):
    return self.projectid


    def connect(self,bucketid):
    #Open connection
    try:
    #if self.sa_path:
    # self.pubsub_client = self.pubsub_v1.PublisherClient.from_service_account_file(self.sa_path)
    #else
    storage_client = self.storage.Client(project=self.projectid)
    self.bucketid=bucketid
    self.bucket = storage_client.get_bucket(self.bucketid)
    print('Connected to Cloud storage: Project/Bucket: {}/{}'.format(self.projectid,self.bucketid))
    message="Connected to Cloud storage"
    except Exception as ex:
    print("Error: Cloud storage connection failed to Project/Bucket: {}/{}".format(self.projectid,bucketid))
    raise
    message="Eror connecting to Cloud storage"

    return message

    def upload_from_string(self,filename,dataset):
    blob = self.bucket.blob( filename )
    blob.upload_from_string( dataset)
    return blob.public_url

    def download_as_string(self,filename):
    blob = self.bucket.blob( filename )
    return blob.download_as_string()

    class gcp_pubsub:
    def __init__(self,projectid=None,sa_path=None):
    from google.cloud import pubsub_v1
    self.pubsub_v1=pubsub_v1

    util=gcp_util()
    self.projectid=util.get_projectid(projectid)

    if sa_path:
    os.environ['GOOGLE_APPLICATION_CREDENTIALS']=sa_path
    self.sa_path=sa_path

    def get_projectid(self):
    return self.projectid

    def connect(self,topic):
    #Open connection
    try:
    #if self.sa_path:
    # self.pubsub_client = self.pubsub_v1.PublisherClient.from_service_account_file(self.sa_path)
    # self.topic = self.pubsub_client.topic_path(self.projectid, topic) #topic = 'projects/' + PROJECTID + '/topics/' + topic
    #else
    self.pubsub_client = self.pubsub_v1.PublisherClient()
    self.topic = self.pubsub_client.topic_path(self.projectid, topic) #topic = 'projects/' + PROJECTID + '/topics/' + topic
    print('Connected to Cloud pubsub: Project/topic: {}/{}'.format(self.projectid,self.topic))
    message="Connected to Cloud PubSub"
    except :
    print("Error: Cloud pubsub connection failed to Project/topic: {}/{}".format(self.projectid,topic))
    raise
    message="Eror connecting to Cloud PubSub"

    return message

    def publish_message(self,data,**kwargs):
    params={}
    if len(kwargs)>0:
    for key, value in kwargs.items():
    params[key]=value
    response = self.pubsub_client.publish(topic=self.topic,data=data,**params)

    return response

    class gcp_firestore:
    def __init__(self,projectid=None,sa_path=None):
    import firebase_admin
    from firebase_admin import credentials
    from firebase_admin import firestore
    #from google.cloud import firestore
    self.firestore=firestore
    self.credentials=credentials
    self.firebase_admin=firebase_admin

    util=gcp_util()
    self.projectid=util.get_projectid(projectid)

    if sa_path:
    os.environ['GOOGLE_APPLICATION_CREDENTIALS']=sa_path
    self.sa_path=sa_path
    else:
    self.sa_path=None

    def get_projectid(self):
    return self.projectid

    def connect(self):
    #Connect to firestore
    my_firestore=''
    try:
    # Use the application default credentials

    if (not len(self.firebase_admin._apps)):
    if self.sa_path:
    # Use a service account
    cred = credentials.Certificate(self.sa_path)
    else:
    cred = self.credentials.ApplicationDefault()

    self.firebase_admin.initialize_app(cred, {
    'projectId': self.projectid,
    })

    my_firestore = self.firestore.client() #project=self.projectid
    message="Connected to Cloud firestore"
    except:
    print("Error: firestore connection failed to Project: {}".format(self.projectid))
    raise
    message="Eror connecting to Cloud firestore"

    return message,my_firestore