Skip to content

Instantly share code, notes, and snippets.

@vulpicastor
Last active April 26, 2017 01:32
Show Gist options
  • Save vulpicastor/30e482fc3c2acd419dfb66ac2dff4a72 to your computer and use it in GitHub Desktop.
Save vulpicastor/30e482fc3c2acd419dfb66ac2dff4a72 to your computer and use it in GitHub Desktop.
A script to check the SOA serial numbers of all nameservers of a domain and notify via Zephyr
#!/usr/bin/env python3
from __future__ import print_function
import sys
import time
import dns.resolver
import zwrite
DEFAULT_ARGS = {"classname": "test",
"instance": "dns",
"sender": "bot",
"opcode": "auto"}
TEMPLATE = "{:15} SOA Serial: @b({})"
def find_nameservers(domain):
nameservers = []
answer = dns.resolver.query(domain, "NS")
for ns in answer:
nameservers.append(ns.target)
return nameservers
def find_ips(domains):
ips = []
for d in domains:
answer = dns.resolver.query(d)
for a in answer:
ips.append(a.address)
return ips
def spawn_resolvers(nameservers):
resolvers = []
for ns in nameservers:
resolvers.append(dns.resolver.Resolver())
resolvers[-1].nameservers = [ns]
return resolvers
def serials_gt(a, b):
# TODO: actually use the algorithm specified in RFC 1982
return int(a) > int(b)
def serials_eq(a, b):
return int(a) == int(b)
def check_soa_serial(domain, resolvers, serials):
answers = []
new_serials = {}
updated = False
for r in resolvers:
soa = r.query(domain, "SOA")
n = r.nameservers[0]
s = soa[0].serial
if serials_gt(s, serials[n]):
answers.append(TEMPLATE.format(n, s) + " @b(@color(green)UPDATE)")
updated = True
elif serials_eq(s, serials[n]):
if updated:
answers.append(TEMPLATE.format(n, s) + " @b(@color(yellow)UNCHANGED)")
else:
answers.append(TEMPLATE.format(n, s) + " @b(@color(red)REGRESSION)")
updated = True
new_serials[n] = s
serials.update(new_serials)
return "\n".join(answers)
def main():
domain = sys.argv[1]
resolvers = spawn_resolvers(find_ips(find_nameservers(domain)))
serials = {}
for r in resolvers:
serials[r.nameservers[0]] = "0"
while True:
message = check_soa_serial(domain, resolvers, serials)
if message:
zwrite.zwrite(message, zsig=domain, **DEFAULT_ARGS)
time.sleep(300)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment