Skip to content

Instantly share code, notes, and snippets.

@jkoelker
Last active December 16, 2015 03:44

Revisions

  1. jkoelker revised this gist Dec 16, 2015. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions asn.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,4 @@
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-

    import netaddr
  2. jkoelker created this gist Dec 16, 2015.
    97 changes: 97 additions & 0 deletions asn.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,97 @@
    # -*- coding: utf-8 -*-

    import netaddr
    import socket
    import subprocess
    import tempfile

    ASN_QUERY = '-i origin -T route %s\r\n'


    def _whois(asn):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(10)
    s.connect(('whois.radb.net', 43))
    s.send((ASN_QUERY % asn).encode('idna'))

    response = b''
    chunk = s.recv(4096)
    while chunk:
    response = response + chunk
    chunk = s.recv(4096)

    return response


    def _fetch_routes(asn, summarize=True):
    whois_data = _whois(asn)
    routes = [netaddr.IPNetwork(l.split(b'route:')[-1].strip())
    for l in whois_data.split(b'\n') if l[:6] == b'route:']

    if summarize:
    return netaddr.IPSet(routes)

    return routes


    def _fetch_ipset(asn, summarize=True):
    try:
    ipset_data = subprocess.check_output(('ipset', 'save', '%s' % asn))

    except subprocess.CalledProcessError as e:
    if e.returncode == 1:
    args = ('ipset', 'create', '%s' % asn, 'hash:net')
    subprocess.check_output(args)

    return _fetch_ipset(asn)

    return netaddr.IPSet()

    prefix_match = 'add %s' % asn
    prefix_len = len(prefix_match)
    ipsets = [netaddr.IPNetwork(l.split()[-1])
    for l in ipset_data.split('\n')
    if l[:prefix_len] == prefix_match]

    if summarize:
    return netaddr.IPSet(ipsets)

    return ipsets


    def _update_ipset(asn, adds, deletes):
    restore_data = ['add %s %s/%s' % (asn, a.network, a.prefixlen)
    for a in adds.iter_cidrs()]
    restore_data.extend(['del %s %s/%s' % (asn, d.network, d.prefixlen)
    for d in deletes.iter_cidrs()])

    with tempfile.NamedTemporaryFile() as f:
    f.write('\n'.join(restore_data) + '\n')
    f.flush()
    args = ('ipset', 'restore', '-file', f.name)
    ret = subprocess.check_call(args)

    if ret == 0:
    return True

    return False


    def _flush_delta(asn):
    routes = _fetch_routes(asn)
    ipsets = _fetch_ipset(asn)

    adds = routes - ipsets
    deletes = ipsets - routes & ipsets

    return _update_ipset(asn, adds, deletes)


    if __name__ == '__main__':
    import sys
    rets = [_flush_delta(asn) for asn in sys.argv[1:]]

    if all(rets):
    sys.exit(0)

    sys.exit(1)