Skip to content

Instantly share code, notes, and snippets.

@kyhwana
Created January 14, 2025 08:42
Show Gist options
  • Save kyhwana/62fa3424746bf3c40348333b05e46508 to your computer and use it in GitHub Desktop.
Save kyhwana/62fa3424746bf3c40348333b05e46508 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# coding=utf-8
import argparse
import datetime
import sys
import time
import threading
import traceback
import socketserver
import struct
try:
from dnslib import *
except ImportError:
print("Missing dependency dnslib: <https://pypi.python.org/pypi/dnslib>. Please install it with `pip`.")
sys.exit(2)
class DomainName(str):
def __getattr__(self, item):
return DomainName(item + '.' + self)
D = DomainName('cname.fyi.')
IP = '127.0.0.1'
TTL = 60
MYCNAME = 'kyhwana.org'
soa_record = SOA(
mname=D.ns1, # primary name server
rname=D.andrei, # email of the domain administrator
times=(
201307231, # serial number
60 * 60 * 1, # refresh
60 * 60 * 3, # retry
60 * 60 * 24, # expire
60 * 60 * 1, # minimum
)
)
ns_records = [NS(D.ns1), NS(D.ns2)]
records = {
D: [CNAME(MYCNAME), soa_record] + ns_records,
D.ns1: [CNAME(MYCNAME)], # MX and NS records must never point to a CNAME alias (RFC 2181 section 10.3)
D.ns2: [CNAME(MYCNAME)],
# D.mail: [A(CNAME)],
}
def dns_response(data):
request = DNSRecord.parse(data)
print(request)
#QTYPE = 5 for cname
#reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q)
#reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("cname.fyi",rdata=CNAME("kyhwana.org"),rtype=5))
# reply.add_answer(RR(
qname = request.q.qname
qn = str(qname)
qtype = request.q.qtype
print("qname/qtype", qname, qtype)
if qname == "foo.cname.fyi.":
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("foo.cname.fyi",rdata=A("127.0.0.1"),rtype=1))
elif qtype == 15 and qname == "cname.fyi.":
print("in MX for cname.fyi")
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("cname.fyi",rdata=MX("mail.cname.fyi",666),rtype=15))
reply.add_answer(RR("cname.fyi",rdata=MX("mail.protonmail.ch",10),rtype=15))
reply.add_answer(RR("cname.fyi",rdata=MX("mailsec.protonmail.ch",20),rtype=15))
elif qtype == 2:
print(" in NS")
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR(qname,rdata=NS("dnscrimes1.cname.fyi"),rtype=2))
elif qname == "dnscrimes1.cname.fyi." and qtype == 1:
print(" in A for dnscrimes1.cname.fyi")
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("dnscrimes1.cname.fyi",rdata=A("170.64.197.215"),rtype=1))
elif qname == "dnscrimes1.cname.fyi." and qtype == 28:
print(" in AAAAA for dnscrimes1.cname.fyi")
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("dnscrimes1.cname.fyi",rdata=AAAA("2400:6180:10:200::1991:b000"),rtype=28))
elif qtype == 16:
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("cname.fyi",rdata=TXT("protonmail-verification=0157c4126294dd3b42de23cf9ce4a25fc880e97e"),rtype=16))
elif qtype == 1 and qname == "cf.cname.fyi.":
print(" in A cf.cname.fyi")
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("cf.cname.fyi",rdata=A("1.1.1.1"),rtype=1))
# elif qtype == 65 and qname == "cf.cname.fyi":
# print(" in A for dnscrimes1.cname.fyi")
# reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("dnscrimes1.cname.fyi",rdata=HTTPS("1","1.1.1.1",""),rtype=65))
elif qname == "mail.cname.fyi.":
print("in mail.cname.fyi")
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("mail.cname.fyi",rdata=CNAME("foo.cname.fyi"),rtype=5))
else:
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR(qname,rdata=CNAME("kyhwana.org"),rtype=5))
print("---- Reply:\n", reply)
return reply.pack()
class BaseRequestHandler(socketserver.BaseRequestHandler):
def get_data(self):
raise NotImplementedError
def send_data(self, data):
raise NotImplementedError
def handle(self):
now = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')
print("\n\n%s request %s (%s %s):" % (self.__class__.__name__[:3], now, self.client_address[0],
self.client_address[1]))
try:
data = self.get_data()
print(len(data), data) # repr(data).replace('\\x', '')[1:-1]
self.send_data(dns_response(data))
except Exception:
traceback.print_exc(file=sys.stderr)
class TCPRequestHandler(BaseRequestHandler):
def get_data(self):
data = self.request.recv(8192).strip()
sz = struct.unpack('>H', data[:2])[0]
if sz < len(data) - 2:
raise Exception("Wrong size of TCP packet")
elif sz > len(data) - 2:
raise Exception("Too big TCP packet")
return data[2:]
def send_data(self, data):
sz = struct.pack('>H', len(data))
return self.request.sendall(sz + data)
class UDPRequestHandler(BaseRequestHandler):
def get_data(self):
return self.request[0].strip()
def send_data(self, data):
return self.request[1].sendto(data, self.client_address)
def main():
parser = argparse.ArgumentParser(description='Start a DNS implemented in Python.')
parser = argparse.ArgumentParser(description='Start a DNS implemented in Python. Usually DNSs use UDP on port 53.')
parser.add_argument('--port', default=5053, type=int, help='The port to listen on.')
parser.add_argument('--tcp', action='store_true', help='Listen to TCP connections.')
parser.add_argument('--udp', action='store_true', help='Listen to UDP datagrams.')
args = parser.parse_args()
if not (args.udp or args.tcp): parser.error("Please select at least one of --udp or --tcp.")
print("Starting nameserver...")
servers = []
if args.udp: servers.append(socketserver.ThreadingUDPServer(('', args.port), UDPRequestHandler))
if args.tcp: servers.append(socketserver.ThreadingTCPServer(('', args.port), TCPRequestHandler))
for s in servers:
thread = threading.Thread(target=s.serve_forever) # that thread will start one more thread for each request
thread.daemon = True # exit the server thread when the main thread terminates
thread.start()
print("%s server loop running in thread: %s" % (s.RequestHandlerClass.__name__[:3], thread.name))
try:
while 1:
time.sleep(1)
sys.stderr.flush()
sys.stdout.flush()
except KeyboardInterrupt:
pass
finally:
for s in servers:
s.shutdown()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment