Skip to content

Instantly share code, notes, and snippets.

@duyhenryer
Created October 13, 2025 15:36
Show Gist options
  • Save duyhenryer/5a04bd837c8e2167d9197e7678416ac7 to your computer and use it in GitHub Desktop.
Save duyhenryer/5a04bd837c8e2167d9197e7678416ac7 to your computer and use it in GitHub Desktop.
import re, sys, os
import logging
from time import sleep
import datetime
import boto3
from dateutil.relativedelta import relativedelta
# from googleapiclient import discovery
# from oauth2client.client import GoogleCredentials
from pprint import pprint
## Descriptions:
#
# - Use tag: {'Name': 'tag-key', 'Value': 'fabackup'} for running this backup script
# - The retention format:
# - [RetentionHourly],[RetentionDaily],[RetentionWeekly],[RetentionMonthly],[RetentionYearly]
#
# - Ex: 2-3-2-1-0-0: To have 2 hourly, 3 daily, 2 weekly, 1 monthly and dont use yearly
# create a logging format
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
today = datetime.datetime.now()
def setup_logger(name, log_file=None, level=logging.INFO, stdout=False):
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setFormatter(formatter)
filehandler = logging.FileHandler(log_file)
filehandler.setFormatter(formatter)
logger = logging.getLogger(name)
logger.setLevel(level)
logger.addHandler(streamhandler)
logger.addHandler(filehandler)
return logger
jenkins_build_number = sys.argv[1]
log_path = '/var/log/'
aws_log_name = jenkins_build_number+'__'+today.strftime('aws_%Y-%m-%d.log')
# gcp_log_name = jenkins_build_number+'__'+today.strftime('gcp_%Y-%m-%d.log')
logger = setup_logger('aws', log_path + aws_log_name)
# gcplogger = setup_logger('gcp', log_path + gcp_log_name)
# Cloud time format
time_format = {
'aws': '%Y-%m-%d_%H-%M-%S',
'gcp': '%Y-%m-%d-%H-%M-%S' # GCP only allow this character: 0-9 a-z and -
}
retention_cnf = {
'db': '5-5-0-0-0', # [RetentionHourly],[RetentionDaily],[RetentionWeekly],[RetentionMonthly],[RetentionYearly]
'normal': '0-5-0-0-0'
}
# Region
aws_region="us-east-1"
# AWS
ec2 = boto3.resource('ec2',region_name=aws_region)
client = boto3.client('ec2', region_name=aws_region)
client_autoscaling = boto3.client('autoscaling',region_name=aws_region)
# # GCP
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = './gcp-key.json'
# credentials = GoogleCredentials.get_application_default()
# service = discovery.build('compute', 'v1', credentials=credentials)
# gcp_projectId = 'knorex-rtb'
def main():
# Delete orphan snapshots
if today.hour == 5:
delete_orphan_snapshots()
# gcp
#cloud_backup(cloud='gcp')
#expired_backup(cloud='gcp')
#gcplogger.info('GCP DONE!')
#aws
cloud_backup(cloud='aws')
expired_backup(cloud='aws')
logger.info('AWS DONE!')
def cloud_backup(cloud=None):
if (cloud=='aws'):
logger.info('STARTING BACKUP ALL INSTANCES ...')
# Just pick instances which in running state (code 16)
aws_instances = ec2.instances.filter(Filters=[{'Name': 'instance-state-code','Values':['16']}])
all_asg_instances, latest_asg_instance = get_asg_instances()
filled_instances = [instance for instance in aws_instances if instance not in all_asg_instances]
filled_instances.extend(latest_asg_instance)
for instance in filled_instances:
try:
backup_instances(instance, cloud=cloud)
except Exception as e:
logger.error('Something was wrong: {}'.format(e))
# elif (cloud=='gcp'):
# gcplogger.info('STARTING BACKUP ALL INSTANCES ...')
# # Get all running instances, then get list of disk
# #print os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
# zones = get_zone_list()
# gcp_instances = []
# for zone in zones:
# if "europe-north1" not in zone['name']:
# request = service.instances().list(project=gcp_projectId, zone=zone['name'], filter="status eq 'RUNNING'")
# while request is not None:
# response = request.execute()
# if 'items' in response:
# for i in response['items']:
# gcp_instances.append(i)
# request = service.instances().list_next(previous_request=request, previous_response=response)
# for instance in gcp_instances:
# try:
# backup_instances(instance, cloud=cloud)
# except Exception as e:
# gcplogger.error('Something was wrong: {}'.format(e))
else:
raise Exception('Unknown cloud provider')
def parse_description(description):
return re.search('SourceAmi (.*?) for', description).group(1)
def get_zone_list():
request = service.zones().list(project=gcp_projectId).execute()
if 'items' in request:
return request['items']
return ''
def backup_instances(instance, cloud=None):
if (cloud == 'aws'):
instance_tags = dict(map(lambda x: (x['Key'], x['Value']), instance.tags or []))
instance_name = instance_tags.get('Name', '[unnamed]')
instance_type = instance_tags.get('backup_type')
if instance_type == 'none' or instance_type == 'None':
logger.info('Skip backup for instance {} ({}). Backup type is none'.format(instance.id, instance_name))
return
backup_cfg_str = retention_cnf.get(instance_type)
backup_cfg = parse_config(instance, instance_name, backup_cfg_str)
if (backup_cfg == ''):
logger.error('Skip backup for this instance: {} because error in syntax. Key is: {}'.format(instance_name, backup_cfg_str))
return
backup_label, retention = calc_retention(backup_cfg)
if backup_label is None:
logger.info('Skip backup of instance {} ({}); fabackup is {}'
.format(instance.id, instance_name, backup_cfg_str))
return
#current_datetime = datetime.datetime.now()
date_stamp = today.strftime(time_format.get(cloud))
delete_date_fmt = (today + retention).strftime(time_format.get(cloud))
logger.info('Work on instance {} ({}); Create {} backups to be deleted on {}'
.format(instance.id, instance_name, backup_label, delete_date_fmt))
# Create AMI
ami_id = client.create_image(
InstanceId=instance.id,
Name=instance_name +'(' +instance.instance_id + ')' + '__' + date_stamp,
Description="Created by fabackup__" + backup_label + "__ at: " + date_stamp,
NoReboot=True, DryRun=False)['ImageId']
logger.info('Sleep 20s for creating AMI')
sleep(20)
# Tag AMI
if ami_id:
# Need to check if the AMI was created successfuly before doing more further
n = 0
while True:
if (n >=5):
logger.error('Cannot create AMI of this instance: {}'.format(instance.instance_id))
return
try:
image_state = ec2.Image(ami_id).state
except:
logger.error('Cannot get AMI state, skip!')
return
if ( image_state != 'failed'):
break
logger.error('Creating AMI failed, try again now')
ami_id = client.create_image(
InstanceId=instance.id,
Name=instance_name +'(' +instance.instance_id + ')' + '__' + date_stamp,
Description="Created by fabackup__" + backup_label + "__ at: " + date_stamp,
NoReboot=True, DryRun=False)['ImageId']
sleep(20)
n += 1
logger.info('Create tags for AMI {}'.format(ami_id))
# Instance's tag
if (instance.tags is not None):
instance_tags = map(lambda x: {'Key':'ec2-'+x['Key'],'Value':x['Value']}, instance.tags)
else:
instance_tags = list(instance_tags)
tags = {
'Name': instance_name + '__' + backup_label + '__' + date_stamp,
'InstanceID': instance.instance_id,
'delete_after': delete_date_fmt
}
# Add AMI's tag
ami_tags = list(map(lambda kv: {'Key': kv[0], 'Value': kv[1]}, list(tags.items())))
instance_tags.extend(ami_tags)
client.create_tags(Resources=[ami_id,], Tags=instance_tags)
# Tag snapshots
# One snapshot just belong to one AMI image!
count = 0
while True:
if (count >= 10):
logger.error('Cannot get snapshots of AMI ({})'.format(ami_id))
break
snapshots = client.describe_snapshots(Filters=[{'Name': 'description', 'Values': ['*'+ami_id+'*']}])['Snapshots']
for snapshot in snapshots:
logger.info('Create tags for snapshot {}'.format(snapshot['SnapshotId']))
tags = {
'Name': instance_name + '('+instance.instance_id+')' + '__' + backup_label + '__' + date_stamp,
'VolumeId': snapshot['VolumeId'],
'AMI': ami_id,
'delete_after': delete_date_fmt
}
tag_list = list(map(lambda kv: {'Key': kv[0], 'Value': kv[1]}, list(tags.items())))
client.create_tags(Resources=[snapshot['SnapshotId'],], Tags=tag_list)
if (len(snapshots) >= 1):
break
logger.info('Wait more 60 for creating snapshots')
sleep(60)
count += 1
else:
logger.info('Cannot create AMI for instance {} ({})'.format(instance_name, instance.instance_id))
# elif (cloud == 'gcp'):
# # Backup all disk which attached into RUNNING instance
# # backup_cfg_str = instance['labels'].get('fabackup')
# instance_label = instance.get('labels','')
# instance_type = instance_label.get('backup_type', 'normal') if (instance_label != '') else 'normal'
# instance_zone = instance['zone'].split('/')[-1]
# backup_cfg_str = retention_cnf.get(instance_type)
# description = instance.get('description', '')
# instance_name = instance['name'] # This is uniq name
# instance_id = instance['id']
# backup_cfg = parse_config(instance, instance_name, backup_cfg_str)
# if (backup_cfg == ''):
# gcplogger.error('Skip backup for this instance: {} because error in syntax. Key is: {}'.format(instance_name, backup_cfg_str))
# return
# backup_label, retention = calc_retention(backup_cfg)
# if backup_label is None:
# gcplogger.info('Skip backup of instance {} ({}); fabackup is {}'
# .format(instance_id, instance_name, backup_cfg_str))
# return
# #current_datetime = datetime.datetime.now()
# date_stamp = today.strftime(time_format.get(cloud))
# delete_date_fmt = (today + retention).strftime(time_format.get(cloud))
# gcplogger.info('Work on instance {} ({}); Create {} backups to be deleted on {}'
# .format(instance_id, instance_name, backup_label, delete_date_fmt))
# gcplogger.info('This instance has {} disk(s)'.format(len(instance['disks'])))
# # Create snapshot for attached disk
# num_dist = 0
# for disk in instance['disks']:
# num_dist += 1
# snap_name = 'id-'+instance_id+'-'+ str(num_dist) +'-'+backup_label+'-'+date_stamp
# request_body = {
# 'name': snap_name,
# 'description': "Created by fabackup__" + backup_label + "__ at: " + date_stamp,
# 'labels': {'delete_after': delete_date_fmt, 'instance_name': instance_name, 'disk_name': disk['source'].split('/')[-1]}
# }
# try:
# gcplogger.info('Create snapshot for disk: {} ({})'.format(disk['source'].split('/')[-1],snap_name))
# snapshot = service.disks().createSnapshot(project=gcp_projectId,zone=instance_zone, disk=disk['source'].split('/')[-1], body=request_body).execute()
# except Exception as e:
# gcplogger.error('Error creating snapshot: {}'.format(e))
else:
raise Exception('Unknown cloud provider')
def parse_config(instance, instance_name, config):
try:
backup_configuration = list(map(int, config.split('-')))
if any(i < 0 for i in backup_configuration):
return ''
return backup_configuration
except:
return ''
# This function will return two list:
# - 1. List contain all instances belong to any asg
# - 2. List contain latest instance per asg
# NOTE:
# - priorites using on demand instance. Will chose the latest on demand instance
# - In case all instances of asg were spot instances, choose the lastest one
def get_asg_instances():
asgs = client_autoscaling.describe_auto_scaling_groups()['AutoScalingGroups']
all_asg_instances = []
latest_asg_instance = []
for asg in asgs:
ondemand_instances_per_asg = []
spot_instances_per_asg = []
for i in asg['Instances']:
instance = ec2.Instance(i['InstanceId'])
all_asg_instances.append(instance)
if (instance.spot_instance_request_id is not None):
spot_instances_per_asg.append(instance)
else:
ondemand_instances_per_asg.append(instance)
if (len(ondemand_instances_per_asg) > 0):
latest_asg_instance.append(sorted(ondemand_instances_per_asg, key=lambda x: x.launch_time, reverse=True)[0])
elif (len(spot_instances_per_asg) >0):
latest_asg_instance.append(sorted(spot_instances_per_asg, key=lambda x: x.launch_time, reverse=True)[0])
return all_asg_instances, latest_asg_instance
def expired_backup(cloud=None):
if (cloud == 'aws'):
logger.info('STARTING DELETE OLD BACKUP ...')
LCs = client_autoscaling.describe_launch_configurations()['LaunchConfigurations']
#current_time = datetime.datetime.now()
snapshots = ec2.snapshots.filter(OwnerIds=['self'],
Filters=[{'Name': 'tag-key','Values': ['delete_after']}, {'Name': 'status', 'Values': ['completed']}])
ami_images = ec2.images.filter(Owners=['self'],
Filters=[{'Name': 'tag-key','Values': ['delete_after']}, {'Name': 'state', 'Values': ['available']}])
skip_ami_lst = []
for ami in ami_images:
# Check if this AMI belong any Lauch Configuration
skip_ami = False
for lc in LCs:
if (lc['ImageId'] == ami.id):
skip_ami = True
skip_ami_lst.append(ami.id)
break
if skip_ami:
continue
# Deregister AMI
ami_tag = dict(map(lambda x: (x['Key'], x['Value']), ami.tags or []))
delete_on = ami_tag.get('delete_after')
delete_timestamp = validate_timestamp(delete_on, time_format.get(cloud))
if (delete_timestamp == ''):
logger.error('The time syntax of snapshot: {} was wrong, skip now'.format(ami.id))
continue
if (today >= delete_timestamp):
logger.info('Remove AMI {}'
.format(ami.id))
try:
ami.deregister(DryRun=False)
except:
logger.error('Error deregister AMI: {}'.format(ami.id))
else:
logger.info('The AMI: {} will be kept'
.format(ami.id))
for snapshot in snapshots:
snapshot_tag = dict(map(lambda x: (x['Key'], x['Value']), snapshot.tags or []))
delete_on = snapshot_tag.get('delete_after')
ami_id = snapshot_tag.get('AMI')
if (ami_id in skip_ami_lst):
logger.info('The snapshot: {} will be kept because it belong to an ami which in lauch configuration'
.format(snapshot.id))
continue
delete_timestamp = validate_timestamp(delete_on, time_format.get(cloud))
if (delete_timestamp == ''):
logger.error('The time syntax of snapshot: {} was wrong, skip now'.format(snapshot.id))
continue
if (today >= delete_timestamp):
logger.info('Remove snapshot {} (of volume {})'
.format(snapshot.id, snapshot.volume_id))
try:
snapshot.delete(DryRun=False)
except:
logger.error('Error deleting snapshot: {}'.format(snapshot.id))
else:
logger.info('The snapshot: {} will be kept'.format(snapshot.id))
elif (cloud == 'gcp'):
# GCP using this time stamp format: %Y-%m-%d_%H-%M-%S
#
gcplogger.info('STARTING DELETE OLD BACKUP ...')
#current_time = datetime.datetime.now()
request = service.snapshots().list(project=gcp_projectId, filter="(labels.delete_after eq '.*')(status eq 'READY')")
while request is not None:
response = request.execute()
#pprint(request)
if ('items' in response):
for snapshot in response['items']:
delete_on = snapshot['labels'].get('delete_after')
delete_timestamp = validate_timestamp(delete_on, time_format.get(cloud))
if (delete_timestamp == ''):
logger.error('The time syntax of snapshot: {} was wrong, skip now'.format(snapshot['name']))
continue
if (today >= delete_timestamp):
logger.info('Remove snapshot {}'
.format(snapshot['name']))
try:
r = service.snapshots().delete(project=gcp_projectId, snapshot=snapshot['name']).execute()
except:
gcplogger.error('Error delete snapshot: {}'.format(r))
else:
gcplogger.info('The snapshot: {} will be kept'
.format(snapshot['name']))
request = service.snapshots().list_next(previous_request=request, previous_response=response)
else:
raise Exception('Unknown cloud provider')
def validate_timestamp(time_str,format):
try:
time_stamp = datetime.datetime.strptime(time_str, format)
return time_stamp
except:
return ''
def calc_retention(backup_configuration):
r_hourly, r_daily, r_weekly, r_monthly, r_yearly = backup_configuration
if today.day == 1:
if today.month == 1 and r_yearly > 0 and today.hour == 0:
return 'yearly', relativedelta(years=r_yearly)
if r_monthly > 0 and today.hour == 0:
return 'monthly', relativedelta(months=r_monthly)
if today.weekday() == 6 and r_weekly > 0 and today.hour == 0:
return 'weekly', relativedelta(weeks=r_weekly)
if today.hour == 0 and r_daily > 0:
return 'daily', relativedelta(days=r_daily)
if r_hourly > 0:
return 'hourly', relativedelta(hours=r_hourly)
return None, None
def delete_orphan_snapshots():
logger.info('STARTING DELETE ORPHAN SNAPSHOTS')
# Get all snapshots in system
all_snapshots = []
logger.info('Get all snapshots in system')
snapshots = ec2.snapshots.filter(OwnerIds=['self'], Filters=[{'Name': 'status', 'Values': ['completed']}])
for snapshot in snapshots:
all_snapshots.append(snapshot.id)
logger.info('There are {} snapshots in system'.format(len(all_snapshots)))
logger.info('Get all running snapshots')
running_snapshots = []
# Get all running snapshots in system
ami_images = ec2.images.filter(Owners=['self'],
Filters=[{'Name': 'state', 'Values': ['available']}])
for image in ami_images:
for device in image.block_device_mappings:
if ('Ebs' in device):
running_snapshots.append(device['Ebs']['SnapshotId'])
logger.info('There are {} running snapshots'.format(len(running_snapshots)))
# Get dead snapshots
dead_snapshots = [s for s in all_snapshots if s not in running_snapshots]
logger.info('Dead snapshots are: {}'.format(dead_snapshots))
for snapshotid in dead_snapshots:
logger.info('Deleting snapshot {}'.format(snapshotid))
ec2.Snapshot(snapshotid).delete(DryRun=False)
def isAMI_done(aws_client,image_id):
try:
available = 0
while available == 0:
logger.info('Wait 20s for creating AMI ')
sleep(20)
image = aws_client.describe_images(ImageIds=[image_id])
if image['Images'][0]['State'] == 'available':
available = 1
if available == 1:
return True
except Exception, e:
logger.error('Something error!')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment