Skip to content

Instantly share code, notes, and snippets.

@NothingCtrl
Last active October 19, 2023 17:11
Show Gist options
  • Save NothingCtrl/f9d5aec4cc41d2e4aeaa1f076e39988a to your computer and use it in GitHub Desktop.
Save NothingCtrl/f9d5aec4cc41d2e4aeaa1f076e39988a to your computer and use it in GitHub Desktop.
Set proxy ON / OFF for selected domain on CloudFlare DNS
# Python 3.6+
# Required library: requests
import json
import requests
import os
import sys
from sys import exit
if getattr(sys, 'frozen', False):
# frozen
base_dir = os.path.dirname(sys.executable)
else:
# unfrozen -- normal python code
base_dir = os.path.dirname(os.path.realpath(__file__))
RECORD_DB = []
def get_dns_records(config: dict):
url = "{}/zones/{}/dns_records".format(config.get('cloudflare_url'), config.get('cloudflare_zone_id'))
headers = {
"X-Auth-Email": config.get('cloudflare_auth'),
"X-Auth-Key": config.get('cloudflare_token'),
"Content-Type": "application/json"
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return True, response.json()['result']
else:
return False, response.text
def set_proxy(dns_name: str, proxied: bool, config: dict):
data = {
"name": dns_name,
"proxied": proxied
}
headers = {
"X-Auth-Email": config.get('cloudflare_auth'),
"X-Auth-Key": config.get('cloudflare_token'),
"Content-Type": "application/json"
}
dns_id = None
for r in RECORD_DB:
if r['name'] == dns_name:
if r['type'] in ('A', 'CNAME'):
dns_id = r['id']
data['type'] = r['type']
data['content'] = r['content']
break
else:
return False, f"Domain name: [{dns_name}] is must type A or CNAME!"
if dns_id:
url = "{}/zones/{}/dns_records/{}".format(config.get('cloudflare_url'), config.get('cloudflare_zone_id'), dns_id)
response = requests.put(url, data=json.dumps(data), headers=headers)
if response.status_code == 200:
content = json.loads(response.text) # using loads when read json string
return True if content['success'] else False, response.text
else:
return False, f"Update failed, response status code: {response.status_code}, text: {response.text}"
return False, f"Domain name: [{dns_name}] is not found!"
if __name__ == "__main__":
debug = False
dns_list = []
if len(sys.argv) > 1:
active_proxy = True if sys.argv[1].lower() in ('true', '1', 'yes', 'on') else False
if '--debug' in sys.argv[1:]:
debug = True
print("- Debug: ON, nothing change will make!")
if '--domain' in sys.argv[1:]:
domain_name = sys.argv[sys.argv.index('--domain') + 1]
dns_list = [domain_name]
print(f"- Using domain name: [{domain_name}] from call params")
print(f"- Set PROXY mode: {'ON' if active_proxy else 'OFF'}")
config_file = os.path.join(base_dir, 'cloudflare_config.json')
if os.path.exists(config_file):
with open(config_file) as f:
config_app = json.load(f)
if debug:
print("[DEBUG] Config file loaded")
if not dns_list:
dns_list = config_app.get('dns_config_list')
if dns_list:
is_ok, RECORD_DB = get_dns_records(config_app)
if not is_ok:
print(f"- Cannot access CloudFlare API, error: {RECORD_DB}")
exit(1)
if debug:
print(f"[DEBUG] Total {len(dns_list)} dns name to update proxy")
print(f"[DEBUG] Total {len(RECORD_DB)} records in CloudFlare DNS")
for item in dns_list:
if not debug:
print(f"- Set proxy request: [{'ON' if active_proxy else 'OFF'}] for domain: [{item}]...")
result, msg_text = set_proxy(item, active_proxy, config_app)
print(f" Result: {'SUCCESS' if result else 'FAILED'}")
if not result:
print(f" Message: {msg_text}")
else:
print(f"[DEBUG] Set proxy: [{'ON' if active_proxy else 'OFF'}] for domain: [{item}]...")
else:
print("Please add list of domain need config proxy to config value 'dns_config_list', or call with param: '--domain <name-of-domain>'!")
exit(1)
else:
print("Please create config file with name: cloudflare_config.json")
with open(os.path.join(base_dir, 'cloudflare_config_example.json'), 'w+') as f:
f.write(json.dumps({
'cloudflare_auth': 'auth-email',
'cloudflare_token': 'api-auth-token',
'cloudflare_url': 'https://api.cloudflare.com/client/v4',
'cloudflare_zone_id': 'your-api-zone-id', # get Zone ID on home page
'dns_config_list': ['aa.b.c', 'bb.b.c', 'b.c']
}, indent=4, sort_keys=True))
else:
print("Required a param value for proxy mode, example: 'yes' "
"to active proxy or 'no' to de-active proxy")
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment