Created
January 14, 2025 08:42
-
-
Save kyhwana/62fa3424746bf3c40348333b05e46508 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding=utf-8 | |
import argparse | |
import datetime | |
import sys | |
import time | |
import threading | |
import traceback | |
import socketserver | |
import struct | |
try: | |
from dnslib import * | |
except ImportError: | |
print("Missing dependency dnslib: <https://pypi.python.org/pypi/dnslib>. Please install it with `pip`.") | |
sys.exit(2) | |
class DomainName(str): | |
def __getattr__(self, item): | |
return DomainName(item + '.' + self) | |
D = DomainName('cname.fyi.') | |
IP = '127.0.0.1' | |
TTL = 60 | |
MYCNAME = 'kyhwana.org' | |
soa_record = SOA( | |
mname=D.ns1, # primary name server | |
rname=D.andrei, # email of the domain administrator | |
times=( | |
201307231, # serial number | |
60 * 60 * 1, # refresh | |
60 * 60 * 3, # retry | |
60 * 60 * 24, # expire | |
60 * 60 * 1, # minimum | |
) | |
) | |
ns_records = [NS(D.ns1), NS(D.ns2)] | |
records = { | |
D: [CNAME(MYCNAME), soa_record] + ns_records, | |
D.ns1: [CNAME(MYCNAME)], # MX and NS records must never point to a CNAME alias (RFC 2181 section 10.3) | |
D.ns2: [CNAME(MYCNAME)], | |
# D.mail: [A(CNAME)], | |
} | |
def dns_response(data): | |
request = DNSRecord.parse(data) | |
print(request) | |
#QTYPE = 5 for cname | |
#reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q) | |
#reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("cname.fyi",rdata=CNAME("kyhwana.org"),rtype=5)) | |
# reply.add_answer(RR( | |
qname = request.q.qname | |
qn = str(qname) | |
qtype = request.q.qtype | |
print("qname/qtype", qname, qtype) | |
if qname == "foo.cname.fyi.": | |
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("foo.cname.fyi",rdata=A("127.0.0.1"),rtype=1)) | |
elif qtype == 15 and qname == "cname.fyi.": | |
print("in MX for cname.fyi") | |
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("cname.fyi",rdata=MX("mail.cname.fyi",666),rtype=15)) | |
reply.add_answer(RR("cname.fyi",rdata=MX("mail.protonmail.ch",10),rtype=15)) | |
reply.add_answer(RR("cname.fyi",rdata=MX("mailsec.protonmail.ch",20),rtype=15)) | |
elif qtype == 2: | |
print(" in NS") | |
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR(qname,rdata=NS("dnscrimes1.cname.fyi"),rtype=2)) | |
elif qname == "dnscrimes1.cname.fyi." and qtype == 1: | |
print(" in A for dnscrimes1.cname.fyi") | |
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("dnscrimes1.cname.fyi",rdata=A("170.64.197.215"),rtype=1)) | |
elif qname == "dnscrimes1.cname.fyi." and qtype == 28: | |
print(" in AAAAA for dnscrimes1.cname.fyi") | |
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("dnscrimes1.cname.fyi",rdata=AAAA("2400:6180:10:200::1991:b000"),rtype=28)) | |
elif qtype == 16: | |
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("cname.fyi",rdata=TXT("protonmail-verification=0157c4126294dd3b42de23cf9ce4a25fc880e97e"),rtype=16)) | |
elif qtype == 1 and qname == "cf.cname.fyi.": | |
print(" in A cf.cname.fyi") | |
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("cf.cname.fyi",rdata=A("1.1.1.1"),rtype=1)) | |
# elif qtype == 65 and qname == "cf.cname.fyi": | |
# print(" in A for dnscrimes1.cname.fyi") | |
# reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("dnscrimes1.cname.fyi",rdata=HTTPS("1","1.1.1.1",""),rtype=65)) | |
elif qname == "mail.cname.fyi.": | |
print("in mail.cname.fyi") | |
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR("mail.cname.fyi",rdata=CNAME("foo.cname.fyi"),rtype=5)) | |
else: | |
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q, a=RR(qname,rdata=CNAME("kyhwana.org"),rtype=5)) | |
print("---- Reply:\n", reply) | |
return reply.pack() | |
class BaseRequestHandler(socketserver.BaseRequestHandler): | |
def get_data(self): | |
raise NotImplementedError | |
def send_data(self, data): | |
raise NotImplementedError | |
def handle(self): | |
now = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f') | |
print("\n\n%s request %s (%s %s):" % (self.__class__.__name__[:3], now, self.client_address[0], | |
self.client_address[1])) | |
try: | |
data = self.get_data() | |
print(len(data), data) # repr(data).replace('\\x', '')[1:-1] | |
self.send_data(dns_response(data)) | |
except Exception: | |
traceback.print_exc(file=sys.stderr) | |
class TCPRequestHandler(BaseRequestHandler): | |
def get_data(self): | |
data = self.request.recv(8192).strip() | |
sz = struct.unpack('>H', data[:2])[0] | |
if sz < len(data) - 2: | |
raise Exception("Wrong size of TCP packet") | |
elif sz > len(data) - 2: | |
raise Exception("Too big TCP packet") | |
return data[2:] | |
def send_data(self, data): | |
sz = struct.pack('>H', len(data)) | |
return self.request.sendall(sz + data) | |
class UDPRequestHandler(BaseRequestHandler): | |
def get_data(self): | |
return self.request[0].strip() | |
def send_data(self, data): | |
return self.request[1].sendto(data, self.client_address) | |
def main(): | |
parser = argparse.ArgumentParser(description='Start a DNS implemented in Python.') | |
parser = argparse.ArgumentParser(description='Start a DNS implemented in Python. Usually DNSs use UDP on port 53.') | |
parser.add_argument('--port', default=5053, type=int, help='The port to listen on.') | |
parser.add_argument('--tcp', action='store_true', help='Listen to TCP connections.') | |
parser.add_argument('--udp', action='store_true', help='Listen to UDP datagrams.') | |
args = parser.parse_args() | |
if not (args.udp or args.tcp): parser.error("Please select at least one of --udp or --tcp.") | |
print("Starting nameserver...") | |
servers = [] | |
if args.udp: servers.append(socketserver.ThreadingUDPServer(('', args.port), UDPRequestHandler)) | |
if args.tcp: servers.append(socketserver.ThreadingTCPServer(('', args.port), TCPRequestHandler)) | |
for s in servers: | |
thread = threading.Thread(target=s.serve_forever) # that thread will start one more thread for each request | |
thread.daemon = True # exit the server thread when the main thread terminates | |
thread.start() | |
print("%s server loop running in thread: %s" % (s.RequestHandlerClass.__name__[:3], thread.name)) | |
try: | |
while 1: | |
time.sleep(1) | |
sys.stderr.flush() | |
sys.stdout.flush() | |
except KeyboardInterrupt: | |
pass | |
finally: | |
for s in servers: | |
s.shutdown() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment