-
-
Save suryadina/3d1079c1e78b51f35e44d728b4a20ffd to your computer and use it in GitHub Desktop.
Simple DNS server (UDP and TCP) in Python using dnslib.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # coding=utf-8 | |
| import socket | |
| from dnslib import * | |
| class DomainName(str): | |
| def __getattr__(self, item): | |
| return DomainName(item + '.' + self) | |
| D = DomainName('example.com') | |
| IP = '127.0.0.1' | |
| TTL = 60 * 5 | |
| soa_record = SOA( | |
| mname=D.ns1, # primary name server | |
| rname=D.andrei, # email of the domain administrator | |
| times=( | |
| 201307231, # serial number | |
| 60 * 60 * 1, # refresh | |
| 60 * 60 * 3, # retry | |
| 60 * 60 * 24, # expire | |
| 60 * 60 * 1, # minimum | |
| ) | |
| ) | |
| ns_records = [NS(D.ns1), NS(D.ns2)] | |
| records = { | |
| D: [A(IP), AAAA((0,) * 16), MX(D.mail), soa_record] + ns_records, | |
| D.ns1: [A(IP)], # MX and NS records must never point to a CNAME alias (RFC 2181 section 10.3) | |
| D.ns2: [A(IP)], | |
| D.mail: [A(IP)], | |
| D.andrei: [CNAME(D)], | |
| } | |
| # TODO: DNSKEY, RRSIG | |
| def dns_response(data): | |
| request = DNSRecord.parse(data) | |
| print request | |
| reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q) | |
| qname = request.q.qname | |
| qn = str(qname) | |
| qtype = request.q.qtype | |
| qt = QTYPE[qtype] | |
| if qn == D or qn.endswith('.' + D): | |
| for name, rrs in records.iteritems(): | |
| if name == qn: | |
| for rdata in rrs: | |
| rqt = rdata.__class__.__name__ | |
| if qt in ['*', rqt]: | |
| reply.add_answer(RR(rname=qname, rtype=QTYPE[rqt], rclass=1, ttl=TTL, rdata=rdata)) | |
| for rdata in ns_records: | |
| reply.add_ns(RR(rname=D, rtype=QTYPE.NS, rclass=1, ttl=TTL, rdata=rdata)) | |
| reply.add_ns(RR(rname=D, rtype=QTYPE.SOA, rclass=1, ttl=TTL, rdata=soa_record)) | |
| print "---- Reply" | |
| print reply | |
| return reply.pack() | |
| if __name__ == '__main__': | |
| print "Starting nameserver..." | |
| udps = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| udps.bind(('', 5053)) | |
| try: | |
| while 1: | |
| print "\nWaiting for query...\n" | |
| data, addr = udps.recvfrom(8192) | |
| try: | |
| print "---- Request (%s %s):" % tuple(addr) | |
| udps.sendto(dns_response(data), addr) | |
| except: | |
| import traceback | |
| traceback.print_exc() | |
| except KeyboardInterrupt: | |
| udps.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment