from google.oauth2 import service_account from google.cloud import resourcemanager_v3 import collections import logging import time import re from google.cloud import recommender logger = logging.getLogger(__name__) logging.basicConfig(level = logging.INFO) logger.setLevel(logging.INFO) SCOPE = ["https://www.googleapis.com/auth/cloud-platform"] credentials = service_account.Credentials.from_service_account_file("secret.json", scopes = SCOPE) resource_mgr = resourcemanager_v3.ProjectsClient(credentials=credentials) rec_client = recommender.RecommenderClient(credentials=credentials) final_recommendations = {"recommendations" : []} def get_projects(): '''Get the list of projects using resource manager - List projects that service accounts have access to.''' projects = [] for project in resource_mgr.search_projects(): projects.append(project.project_id) return projects def recom_data(project_id): '''Get the active recommendation using the recommender client''' recomms = [] try: data = rec_client.list_recommendations(parent = f"projects/{project_id}/locations/global/recommenders/google.iam.policy.Recommender") return data except Exception as e: return logger.info(e) def verify_srv_acc(email , project_id): '''verify if the service account is User managed one here ''' success = re.search(('@' + project_id), email) return success def extract_details(data , project_id): '''Extracting the required details from recommender response for its fine usage over updating role bindings''' def_acc = set() update_data = [] for recommendation in data: for op_groups in recommendation.content.operation_groups: for op in op_groups.operations: try: srv_email = op.path_filters["/iamPolicy/bindings/*/members/*"] except: srv_email = op.value if not verify_srv_acc(srv_email ,project_id): def_acc.add(srv_email) action = op.action role = op.path_filters["/iamPolicy/bindings/*/role"] final_recommendations["recommendations"].append({"action" : action , "role" : role , "service_account" : srv_email}) update_data.append((recommendation.name , recommendation.etag)) return final_recommendations , update_data # print("The Default service accounts are %s" % ",".join(def_acc)) if __name__ == "__main__": proj_ids = get_projects() for id in proj_ids: rec_data = recom_data(id) # logger.info(f"recommendations for the project {id} is {rec_data}") rec , _ = extract_details(rec_data , id) recomm = rec.get("recommendations") for r in recomm: srv_acc = r.get("service_account") role = r.get("role") action = r.get("action") print(f"Service account - {srv_acc} - {action} the role {role}")