-
-
Save byt3bl33d3r/065fbf2c11dbffc76da8aa0d704a9541 to your computer and use it in GitHub Desktop.
Pythonic tcpdump: copy, paste, and enjoy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
It has been tested with either py2 or py3. | |
Beware ancient versions of Linux kernel which may not support SOCK_NONBLOCK | |
or the memory mapped ring buffer. | |
BPF filter listed below is compiled form of "not port 22" | |
if you want to change it, do something like | |
# tcpdump -dd "udp and port 53" | tr "{" "(" | tr "}" ")" | |
( 0x28, 0, 0, 0x0000000c ), | |
( 0x15, 0, 6, 0x000086dd ), | |
( 0x30, 0, 0, 0x00000014 ), | |
... | |
( 0x6, 0, 0, 0x00000000 ), | |
and then paste the output to the script, and use it as filter in the Sniffer constructor | |
some parameters could be tweaked. Default values are: | |
* ringbuffer framesize: 4096 (a page) | |
* ringbuffer frame number: 4096 (ring buffer is 16 MB big) | |
* poll timeout value: 500ms | |
''' | |
import os | |
import socket | |
import ctypes | |
import fcntl | |
from struct import pack, unpack | |
import sys | |
import mmap | |
import select | |
class Const(object): | |
ETH_P_ALL = 0x0003 | |
ETH_P_IP = 0x0800 | |
IFF_PROMISC = 0x100 | |
SIOCGIFFLAGS = 0x8913 | |
SIOCSIFFLAGS = 0x8914 | |
SO_ATTACH_FILTER = 26 | |
SOCK_NONBLOCK = 0x800 | |
SOL_PACKET = 263 | |
PACKET_RX_RING = 5 | |
PACKET_HOST = 0 # To us | |
PACKET_BROADCAST = 1 # To all | |
PACKET_MULTICAST = 2 # To group | |
PACKET_OTHERHOST = 3 # To someone else | |
PACKET_OUTGOING = 4 # Outgoing | |
PACKET_USER = 6 # To userspace | |
PACKET_KERNEL = 7 # To kernel | |
PAGESIZE = 4096 | |
TP_STATUS_KERNEL = 0 | |
TP_STATUS_USER = 1 | |
class tp_packet_req(ctypes.Structure): | |
_fields_ = [ | |
('tp_block_size', ctypes.c_uint), | |
('tp_block_nr', ctypes.c_uint), | |
('tp_frame_size', ctypes.c_uint), | |
('tp_frame_nr', ctypes.c_uint), | |
] | |
class tpacket_hdr(ctypes.Structure): | |
_fields_ = [ | |
('tp_status', ctypes.c_ulong), | |
('tp_len', ctypes.c_uint), | |
('tp_snaplen', ctypes.c_uint), | |
('tp_mac', ctypes.c_ushort), | |
('tp_net', ctypes.c_ushort), | |
('tp_sec', ctypes.c_uint), | |
('tp_usec', ctypes.c_uint), | |
] | |
def bpf_pack(x): | |
return pack('HBBI', x[0], x[1], x[2], x[3]) | |
NotPort22 = [ | |
( 0x28, 0, 0, 0x0000000c ), | |
( 0x15, 0, 8, 0x000086dd ), | |
( 0x30, 0, 0, 0x00000014 ), | |
( 0x15, 2, 0, 0x00000084 ), | |
( 0x15, 1, 0, 0x00000006 ), | |
( 0x15, 0, 17, 0x00000011 ), | |
( 0x28, 0, 0, 0x00000036 ), | |
( 0x15, 14, 0, 0x00000016 ), | |
( 0x28, 0, 0, 0x00000038 ), | |
( 0x15, 12, 13, 0x00000016 ), | |
( 0x15, 0, 12, 0x00000800 ), | |
( 0x30, 0, 0, 0x00000017 ), | |
( 0x15, 2, 0, 0x00000084 ), | |
( 0x15, 1, 0, 0x00000006 ), | |
( 0x15, 0, 8, 0x00000011 ), | |
( 0x28, 0, 0, 0x00000014 ), | |
( 0x45, 6, 0, 0x00001fff ), | |
( 0xb1, 0, 0, 0x0000000e ), | |
( 0x48, 0, 0, 0x0000000e ), | |
( 0x15, 2, 0, 0x00000016 ), | |
( 0x48, 0, 0, 0x00000010 ), | |
( 0x15, 0, 1, 0x00000016 ), | |
( 0x6, 0, 0, 0x00000000 ), | |
( 0x6, 0, 0, 0x00040000 ), | |
] | |
class Sniffer(object): | |
def __init__(self, nr_frames, filter=NotPort22): | |
# check number of frames is a power of 2 | |
assert(nr_frames & (nr_frames-1) == 0) | |
s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW|Const.SOCK_NONBLOCK, socket.htons(Const.ETH_P_ALL)) | |
assert(s is not None and s != -1) | |
# attach BPF filter | |
filter_content = b'' | |
for elm in filter: | |
filter_content += bpf_pack(elm) | |
addr_filter = ctypes.create_string_buffer(filter_content) | |
fprog = pack('HL', len(filter), ctypes.addressof(addr_filter)) | |
s.setsockopt(socket.SOL_SOCKET, Const.SO_ATTACH_FILTER, fprog) | |
# create packets ring buffer | |
tp = tp_packet_req() | |
tp.tp_block_size = nr_frames * Const.PAGESIZE | |
tp.tp_block_nr = 1 | |
tp.tp_frame_size = Const.PAGESIZE | |
tp.tp_frame_nr = nr_frames | |
self.nr_frames = nr_frames | |
s.setsockopt(Const.SOL_PACKET, Const.PACKET_RX_RING, tp) | |
self.sock = s | |
# map packets ring buffer | |
self.ringbuffer = mmap.mmap(s.fileno(), tp.tp_frame_size*tp.tp_frame_nr, | |
mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE) | |
self.offset = 0 | |
def recv_packets(self): | |
while True: | |
hdr = tpacket_hdr.from_buffer(self.ringbuffer, self.offset*Const.PAGESIZE) | |
if (hdr.tp_status & Const.TP_STATUS_USER) == 0: | |
break | |
pkt_offset = self.offset*Const.PAGESIZE + hdr.tp_mac | |
pkt_length = hdr.tp_snaplen | |
yield ((hdr.tp_sec, hdr.tp_usec), self.ringbuffer[pkt_offset:pkt_offset+pkt_length]) | |
hdr.tp_status = Const.TP_STATUS_KERNEL | |
self.offset += 1 | |
# should be a modulo, but we required to have a power of two | |
# in this case, &= (self.nr_frames - 1) is equivalent to %= self.nr_frames | |
self.offset &= (self.nr_frames - 1) | |
n_packets = 0 | |
with open(sys.argv[1], 'wb') as f: | |
# libpcap file format, tcpdump 2.4 | |
f.write(pack('!IHHIIII', 0xa1b2c3d4, 2, 4, 0, 0, 65536, 1)) | |
s = Sniffer(nr_frames=4096) | |
poller = select.poll() | |
poller.register(s.sock, select.POLLIN) | |
while True: | |
events = poller.poll(500) | |
for (fd, evt) in events: | |
assert(fd == s.sock.fileno()) | |
assert(evt == select.POLLIN) | |
for (ts, pkt) in s.recv_packets(): | |
(tv_sec, tv_usec) = ts | |
f.write(pack('!IIII', tv_sec, tv_usec, len(pkt), len(pkt))) | |
f.write(pkt) | |
n_packets += 1 | |
f.flush() | |
sys.stdout.write('\r captured %06d packets' % n_packets) | |
sys.stdout.flush() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment