Created
October 13, 2025 15:36
-
-
Save duyhenryer/5a04bd837c8e2167d9197e7678416ac7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import re, sys, os | |
| import logging | |
| from time import sleep | |
| import datetime | |
| import boto3 | |
| from dateutil.relativedelta import relativedelta | |
| # from googleapiclient import discovery | |
| # from oauth2client.client import GoogleCredentials | |
| from pprint import pprint | |
| ## Descriptions: | |
| # | |
| # - Use tag: {'Name': 'tag-key', 'Value': 'fabackup'} for running this backup script | |
| # - The retention format: | |
| # - [RetentionHourly],[RetentionDaily],[RetentionWeekly],[RetentionMonthly],[RetentionYearly] | |
| # | |
| # - Ex: 2-3-2-1-0-0: To have 2 hourly, 3 daily, 2 weekly, 1 monthly and dont use yearly | |
| # create a logging format | |
| formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| today = datetime.datetime.now() | |
| def setup_logger(name, log_file=None, level=logging.INFO, stdout=False): | |
| streamhandler = logging.StreamHandler(sys.stdout) | |
| streamhandler.setFormatter(formatter) | |
| filehandler = logging.FileHandler(log_file) | |
| filehandler.setFormatter(formatter) | |
| logger = logging.getLogger(name) | |
| logger.setLevel(level) | |
| logger.addHandler(streamhandler) | |
| logger.addHandler(filehandler) | |
| return logger | |
| jenkins_build_number = sys.argv[1] | |
| log_path = '/var/log/' | |
| aws_log_name = jenkins_build_number+'__'+today.strftime('aws_%Y-%m-%d.log') | |
| # gcp_log_name = jenkins_build_number+'__'+today.strftime('gcp_%Y-%m-%d.log') | |
| logger = setup_logger('aws', log_path + aws_log_name) | |
| # gcplogger = setup_logger('gcp', log_path + gcp_log_name) | |
| # Cloud time format | |
| time_format = { | |
| 'aws': '%Y-%m-%d_%H-%M-%S', | |
| 'gcp': '%Y-%m-%d-%H-%M-%S' # GCP only allow this character: 0-9 a-z and - | |
| } | |
| retention_cnf = { | |
| 'db': '5-5-0-0-0', # [RetentionHourly],[RetentionDaily],[RetentionWeekly],[RetentionMonthly],[RetentionYearly] | |
| 'normal': '0-5-0-0-0' | |
| } | |
| # Region | |
| aws_region="us-east-1" | |
| # AWS | |
| ec2 = boto3.resource('ec2',region_name=aws_region) | |
| client = boto3.client('ec2', region_name=aws_region) | |
| client_autoscaling = boto3.client('autoscaling',region_name=aws_region) | |
| # # GCP | |
| # os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = './gcp-key.json' | |
| # credentials = GoogleCredentials.get_application_default() | |
| # service = discovery.build('compute', 'v1', credentials=credentials) | |
| # gcp_projectId = 'knorex-rtb' | |
| def main(): | |
| # Delete orphan snapshots | |
| if today.hour == 5: | |
| delete_orphan_snapshots() | |
| # gcp | |
| #cloud_backup(cloud='gcp') | |
| #expired_backup(cloud='gcp') | |
| #gcplogger.info('GCP DONE!') | |
| #aws | |
| cloud_backup(cloud='aws') | |
| expired_backup(cloud='aws') | |
| logger.info('AWS DONE!') | |
| def cloud_backup(cloud=None): | |
| if (cloud=='aws'): | |
| logger.info('STARTING BACKUP ALL INSTANCES ...') | |
| # Just pick instances which in running state (code 16) | |
| aws_instances = ec2.instances.filter(Filters=[{'Name': 'instance-state-code','Values':['16']}]) | |
| all_asg_instances, latest_asg_instance = get_asg_instances() | |
| filled_instances = [instance for instance in aws_instances if instance not in all_asg_instances] | |
| filled_instances.extend(latest_asg_instance) | |
| for instance in filled_instances: | |
| try: | |
| backup_instances(instance, cloud=cloud) | |
| except Exception as e: | |
| logger.error('Something was wrong: {}'.format(e)) | |
| # elif (cloud=='gcp'): | |
| # gcplogger.info('STARTING BACKUP ALL INSTANCES ...') | |
| # # Get all running instances, then get list of disk | |
| # #print os.environ["GOOGLE_APPLICATION_CREDENTIALS"] | |
| # zones = get_zone_list() | |
| # gcp_instances = [] | |
| # for zone in zones: | |
| # if "europe-north1" not in zone['name']: | |
| # request = service.instances().list(project=gcp_projectId, zone=zone['name'], filter="status eq 'RUNNING'") | |
| # while request is not None: | |
| # response = request.execute() | |
| # if 'items' in response: | |
| # for i in response['items']: | |
| # gcp_instances.append(i) | |
| # request = service.instances().list_next(previous_request=request, previous_response=response) | |
| # for instance in gcp_instances: | |
| # try: | |
| # backup_instances(instance, cloud=cloud) | |
| # except Exception as e: | |
| # gcplogger.error('Something was wrong: {}'.format(e)) | |
| else: | |
| raise Exception('Unknown cloud provider') | |
| def parse_description(description): | |
| return re.search('SourceAmi (.*?) for', description).group(1) | |
| def get_zone_list(): | |
| request = service.zones().list(project=gcp_projectId).execute() | |
| if 'items' in request: | |
| return request['items'] | |
| return '' | |
| def backup_instances(instance, cloud=None): | |
| if (cloud == 'aws'): | |
| instance_tags = dict(map(lambda x: (x['Key'], x['Value']), instance.tags or [])) | |
| instance_name = instance_tags.get('Name', '[unnamed]') | |
| instance_type = instance_tags.get('backup_type') | |
| if instance_type == 'none' or instance_type == 'None': | |
| logger.info('Skip backup for instance {} ({}). Backup type is none'.format(instance.id, instance_name)) | |
| return | |
| backup_cfg_str = retention_cnf.get(instance_type) | |
| backup_cfg = parse_config(instance, instance_name, backup_cfg_str) | |
| if (backup_cfg == ''): | |
| logger.error('Skip backup for this instance: {} because error in syntax. Key is: {}'.format(instance_name, backup_cfg_str)) | |
| return | |
| backup_label, retention = calc_retention(backup_cfg) | |
| if backup_label is None: | |
| logger.info('Skip backup of instance {} ({}); fabackup is {}' | |
| .format(instance.id, instance_name, backup_cfg_str)) | |
| return | |
| #current_datetime = datetime.datetime.now() | |
| date_stamp = today.strftime(time_format.get(cloud)) | |
| delete_date_fmt = (today + retention).strftime(time_format.get(cloud)) | |
| logger.info('Work on instance {} ({}); Create {} backups to be deleted on {}' | |
| .format(instance.id, instance_name, backup_label, delete_date_fmt)) | |
| # Create AMI | |
| ami_id = client.create_image( | |
| InstanceId=instance.id, | |
| Name=instance_name +'(' +instance.instance_id + ')' + '__' + date_stamp, | |
| Description="Created by fabackup__" + backup_label + "__ at: " + date_stamp, | |
| NoReboot=True, DryRun=False)['ImageId'] | |
| logger.info('Sleep 20s for creating AMI') | |
| sleep(20) | |
| # Tag AMI | |
| if ami_id: | |
| # Need to check if the AMI was created successfuly before doing more further | |
| n = 0 | |
| while True: | |
| if (n >=5): | |
| logger.error('Cannot create AMI of this instance: {}'.format(instance.instance_id)) | |
| return | |
| try: | |
| image_state = ec2.Image(ami_id).state | |
| except: | |
| logger.error('Cannot get AMI state, skip!') | |
| return | |
| if ( image_state != 'failed'): | |
| break | |
| logger.error('Creating AMI failed, try again now') | |
| ami_id = client.create_image( | |
| InstanceId=instance.id, | |
| Name=instance_name +'(' +instance.instance_id + ')' + '__' + date_stamp, | |
| Description="Created by fabackup__" + backup_label + "__ at: " + date_stamp, | |
| NoReboot=True, DryRun=False)['ImageId'] | |
| sleep(20) | |
| n += 1 | |
| logger.info('Create tags for AMI {}'.format(ami_id)) | |
| # Instance's tag | |
| if (instance.tags is not None): | |
| instance_tags = map(lambda x: {'Key':'ec2-'+x['Key'],'Value':x['Value']}, instance.tags) | |
| else: | |
| instance_tags = list(instance_tags) | |
| tags = { | |
| 'Name': instance_name + '__' + backup_label + '__' + date_stamp, | |
| 'InstanceID': instance.instance_id, | |
| 'delete_after': delete_date_fmt | |
| } | |
| # Add AMI's tag | |
| ami_tags = list(map(lambda kv: {'Key': kv[0], 'Value': kv[1]}, list(tags.items()))) | |
| instance_tags.extend(ami_tags) | |
| client.create_tags(Resources=[ami_id,], Tags=instance_tags) | |
| # Tag snapshots | |
| # One snapshot just belong to one AMI image! | |
| count = 0 | |
| while True: | |
| if (count >= 10): | |
| logger.error('Cannot get snapshots of AMI ({})'.format(ami_id)) | |
| break | |
| snapshots = client.describe_snapshots(Filters=[{'Name': 'description', 'Values': ['*'+ami_id+'*']}])['Snapshots'] | |
| for snapshot in snapshots: | |
| logger.info('Create tags for snapshot {}'.format(snapshot['SnapshotId'])) | |
| tags = { | |
| 'Name': instance_name + '('+instance.instance_id+')' + '__' + backup_label + '__' + date_stamp, | |
| 'VolumeId': snapshot['VolumeId'], | |
| 'AMI': ami_id, | |
| 'delete_after': delete_date_fmt | |
| } | |
| tag_list = list(map(lambda kv: {'Key': kv[0], 'Value': kv[1]}, list(tags.items()))) | |
| client.create_tags(Resources=[snapshot['SnapshotId'],], Tags=tag_list) | |
| if (len(snapshots) >= 1): | |
| break | |
| logger.info('Wait more 60 for creating snapshots') | |
| sleep(60) | |
| count += 1 | |
| else: | |
| logger.info('Cannot create AMI for instance {} ({})'.format(instance_name, instance.instance_id)) | |
| # elif (cloud == 'gcp'): | |
| # # Backup all disk which attached into RUNNING instance | |
| # # backup_cfg_str = instance['labels'].get('fabackup') | |
| # instance_label = instance.get('labels','') | |
| # instance_type = instance_label.get('backup_type', 'normal') if (instance_label != '') else 'normal' | |
| # instance_zone = instance['zone'].split('/')[-1] | |
| # backup_cfg_str = retention_cnf.get(instance_type) | |
| # description = instance.get('description', '') | |
| # instance_name = instance['name'] # This is uniq name | |
| # instance_id = instance['id'] | |
| # backup_cfg = parse_config(instance, instance_name, backup_cfg_str) | |
| # if (backup_cfg == ''): | |
| # gcplogger.error('Skip backup for this instance: {} because error in syntax. Key is: {}'.format(instance_name, backup_cfg_str)) | |
| # return | |
| # backup_label, retention = calc_retention(backup_cfg) | |
| # if backup_label is None: | |
| # gcplogger.info('Skip backup of instance {} ({}); fabackup is {}' | |
| # .format(instance_id, instance_name, backup_cfg_str)) | |
| # return | |
| # #current_datetime = datetime.datetime.now() | |
| # date_stamp = today.strftime(time_format.get(cloud)) | |
| # delete_date_fmt = (today + retention).strftime(time_format.get(cloud)) | |
| # gcplogger.info('Work on instance {} ({}); Create {} backups to be deleted on {}' | |
| # .format(instance_id, instance_name, backup_label, delete_date_fmt)) | |
| # gcplogger.info('This instance has {} disk(s)'.format(len(instance['disks']))) | |
| # # Create snapshot for attached disk | |
| # num_dist = 0 | |
| # for disk in instance['disks']: | |
| # num_dist += 1 | |
| # snap_name = 'id-'+instance_id+'-'+ str(num_dist) +'-'+backup_label+'-'+date_stamp | |
| # request_body = { | |
| # 'name': snap_name, | |
| # 'description': "Created by fabackup__" + backup_label + "__ at: " + date_stamp, | |
| # 'labels': {'delete_after': delete_date_fmt, 'instance_name': instance_name, 'disk_name': disk['source'].split('/')[-1]} | |
| # } | |
| # try: | |
| # gcplogger.info('Create snapshot for disk: {} ({})'.format(disk['source'].split('/')[-1],snap_name)) | |
| # snapshot = service.disks().createSnapshot(project=gcp_projectId,zone=instance_zone, disk=disk['source'].split('/')[-1], body=request_body).execute() | |
| # except Exception as e: | |
| # gcplogger.error('Error creating snapshot: {}'.format(e)) | |
| else: | |
| raise Exception('Unknown cloud provider') | |
| def parse_config(instance, instance_name, config): | |
| try: | |
| backup_configuration = list(map(int, config.split('-'))) | |
| if any(i < 0 for i in backup_configuration): | |
| return '' | |
| return backup_configuration | |
| except: | |
| return '' | |
| # This function will return two list: | |
| # - 1. List contain all instances belong to any asg | |
| # - 2. List contain latest instance per asg | |
| # NOTE: | |
| # - priorites using on demand instance. Will chose the latest on demand instance | |
| # - In case all instances of asg were spot instances, choose the lastest one | |
| def get_asg_instances(): | |
| asgs = client_autoscaling.describe_auto_scaling_groups()['AutoScalingGroups'] | |
| all_asg_instances = [] | |
| latest_asg_instance = [] | |
| for asg in asgs: | |
| ondemand_instances_per_asg = [] | |
| spot_instances_per_asg = [] | |
| for i in asg['Instances']: | |
| instance = ec2.Instance(i['InstanceId']) | |
| all_asg_instances.append(instance) | |
| if (instance.spot_instance_request_id is not None): | |
| spot_instances_per_asg.append(instance) | |
| else: | |
| ondemand_instances_per_asg.append(instance) | |
| if (len(ondemand_instances_per_asg) > 0): | |
| latest_asg_instance.append(sorted(ondemand_instances_per_asg, key=lambda x: x.launch_time, reverse=True)[0]) | |
| elif (len(spot_instances_per_asg) >0): | |
| latest_asg_instance.append(sorted(spot_instances_per_asg, key=lambda x: x.launch_time, reverse=True)[0]) | |
| return all_asg_instances, latest_asg_instance | |
| def expired_backup(cloud=None): | |
| if (cloud == 'aws'): | |
| logger.info('STARTING DELETE OLD BACKUP ...') | |
| LCs = client_autoscaling.describe_launch_configurations()['LaunchConfigurations'] | |
| #current_time = datetime.datetime.now() | |
| snapshots = ec2.snapshots.filter(OwnerIds=['self'], | |
| Filters=[{'Name': 'tag-key','Values': ['delete_after']}, {'Name': 'status', 'Values': ['completed']}]) | |
| ami_images = ec2.images.filter(Owners=['self'], | |
| Filters=[{'Name': 'tag-key','Values': ['delete_after']}, {'Name': 'state', 'Values': ['available']}]) | |
| skip_ami_lst = [] | |
| for ami in ami_images: | |
| # Check if this AMI belong any Lauch Configuration | |
| skip_ami = False | |
| for lc in LCs: | |
| if (lc['ImageId'] == ami.id): | |
| skip_ami = True | |
| skip_ami_lst.append(ami.id) | |
| break | |
| if skip_ami: | |
| continue | |
| # Deregister AMI | |
| ami_tag = dict(map(lambda x: (x['Key'], x['Value']), ami.tags or [])) | |
| delete_on = ami_tag.get('delete_after') | |
| delete_timestamp = validate_timestamp(delete_on, time_format.get(cloud)) | |
| if (delete_timestamp == ''): | |
| logger.error('The time syntax of snapshot: {} was wrong, skip now'.format(ami.id)) | |
| continue | |
| if (today >= delete_timestamp): | |
| logger.info('Remove AMI {}' | |
| .format(ami.id)) | |
| try: | |
| ami.deregister(DryRun=False) | |
| except: | |
| logger.error('Error deregister AMI: {}'.format(ami.id)) | |
| else: | |
| logger.info('The AMI: {} will be kept' | |
| .format(ami.id)) | |
| for snapshot in snapshots: | |
| snapshot_tag = dict(map(lambda x: (x['Key'], x['Value']), snapshot.tags or [])) | |
| delete_on = snapshot_tag.get('delete_after') | |
| ami_id = snapshot_tag.get('AMI') | |
| if (ami_id in skip_ami_lst): | |
| logger.info('The snapshot: {} will be kept because it belong to an ami which in lauch configuration' | |
| .format(snapshot.id)) | |
| continue | |
| delete_timestamp = validate_timestamp(delete_on, time_format.get(cloud)) | |
| if (delete_timestamp == ''): | |
| logger.error('The time syntax of snapshot: {} was wrong, skip now'.format(snapshot.id)) | |
| continue | |
| if (today >= delete_timestamp): | |
| logger.info('Remove snapshot {} (of volume {})' | |
| .format(snapshot.id, snapshot.volume_id)) | |
| try: | |
| snapshot.delete(DryRun=False) | |
| except: | |
| logger.error('Error deleting snapshot: {}'.format(snapshot.id)) | |
| else: | |
| logger.info('The snapshot: {} will be kept'.format(snapshot.id)) | |
| elif (cloud == 'gcp'): | |
| # GCP using this time stamp format: %Y-%m-%d_%H-%M-%S | |
| # | |
| gcplogger.info('STARTING DELETE OLD BACKUP ...') | |
| #current_time = datetime.datetime.now() | |
| request = service.snapshots().list(project=gcp_projectId, filter="(labels.delete_after eq '.*')(status eq 'READY')") | |
| while request is not None: | |
| response = request.execute() | |
| #pprint(request) | |
| if ('items' in response): | |
| for snapshot in response['items']: | |
| delete_on = snapshot['labels'].get('delete_after') | |
| delete_timestamp = validate_timestamp(delete_on, time_format.get(cloud)) | |
| if (delete_timestamp == ''): | |
| logger.error('The time syntax of snapshot: {} was wrong, skip now'.format(snapshot['name'])) | |
| continue | |
| if (today >= delete_timestamp): | |
| logger.info('Remove snapshot {}' | |
| .format(snapshot['name'])) | |
| try: | |
| r = service.snapshots().delete(project=gcp_projectId, snapshot=snapshot['name']).execute() | |
| except: | |
| gcplogger.error('Error delete snapshot: {}'.format(r)) | |
| else: | |
| gcplogger.info('The snapshot: {} will be kept' | |
| .format(snapshot['name'])) | |
| request = service.snapshots().list_next(previous_request=request, previous_response=response) | |
| else: | |
| raise Exception('Unknown cloud provider') | |
| def validate_timestamp(time_str,format): | |
| try: | |
| time_stamp = datetime.datetime.strptime(time_str, format) | |
| return time_stamp | |
| except: | |
| return '' | |
| def calc_retention(backup_configuration): | |
| r_hourly, r_daily, r_weekly, r_monthly, r_yearly = backup_configuration | |
| if today.day == 1: | |
| if today.month == 1 and r_yearly > 0 and today.hour == 0: | |
| return 'yearly', relativedelta(years=r_yearly) | |
| if r_monthly > 0 and today.hour == 0: | |
| return 'monthly', relativedelta(months=r_monthly) | |
| if today.weekday() == 6 and r_weekly > 0 and today.hour == 0: | |
| return 'weekly', relativedelta(weeks=r_weekly) | |
| if today.hour == 0 and r_daily > 0: | |
| return 'daily', relativedelta(days=r_daily) | |
| if r_hourly > 0: | |
| return 'hourly', relativedelta(hours=r_hourly) | |
| return None, None | |
| def delete_orphan_snapshots(): | |
| logger.info('STARTING DELETE ORPHAN SNAPSHOTS') | |
| # Get all snapshots in system | |
| all_snapshots = [] | |
| logger.info('Get all snapshots in system') | |
| snapshots = ec2.snapshots.filter(OwnerIds=['self'], Filters=[{'Name': 'status', 'Values': ['completed']}]) | |
| for snapshot in snapshots: | |
| all_snapshots.append(snapshot.id) | |
| logger.info('There are {} snapshots in system'.format(len(all_snapshots))) | |
| logger.info('Get all running snapshots') | |
| running_snapshots = [] | |
| # Get all running snapshots in system | |
| ami_images = ec2.images.filter(Owners=['self'], | |
| Filters=[{'Name': 'state', 'Values': ['available']}]) | |
| for image in ami_images: | |
| for device in image.block_device_mappings: | |
| if ('Ebs' in device): | |
| running_snapshots.append(device['Ebs']['SnapshotId']) | |
| logger.info('There are {} running snapshots'.format(len(running_snapshots))) | |
| # Get dead snapshots | |
| dead_snapshots = [s for s in all_snapshots if s not in running_snapshots] | |
| logger.info('Dead snapshots are: {}'.format(dead_snapshots)) | |
| for snapshotid in dead_snapshots: | |
| logger.info('Deleting snapshot {}'.format(snapshotid)) | |
| ec2.Snapshot(snapshotid).delete(DryRun=False) | |
| def isAMI_done(aws_client,image_id): | |
| try: | |
| available = 0 | |
| while available == 0: | |
| logger.info('Wait 20s for creating AMI ') | |
| sleep(20) | |
| image = aws_client.describe_images(ImageIds=[image_id]) | |
| if image['Images'][0]['State'] == 'available': | |
| available = 1 | |
| if available == 1: | |
| return True | |
| except Exception, e: | |
| logger.error('Something error!') | |
| if __name__ == '__main__': | |
| main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment