Created
January 4, 2022 19:51
-
-
Save 0xpizza/66b316a9946fbc120b673b66345192b4 to your computer and use it in GitHub Desktop.
Snoop and craft ICMP packets
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import dataclasses | |
import enum | |
import socket | |
import struct | |
import sys | |
RUNNING_AS_ADMIN = False | |
if sys.platform == 'win32': | |
import ctypes | |
if ctypes.windll.shell32.IsUserAnAdmin() != 0: | |
RUNNING_AS_ADMIN = True | |
else: | |
import os | |
if os.getuid() == 0: | |
RUNNING_AS_ADMIN = True | |
class IcmpType(enum.IntEnum): | |
ECHO_REPLY = 0 | |
DEST_UNREACHABLE = 3 | |
REDIRECT_MSG = 5 | |
ECHO_REQUEST = 8 | |
TIMESTAMP_REQUEST = 13 | |
TIMESTAMP_REPLY = 14 | |
ECHO_REQUESTX = 42 | |
ECHO_REPLYX = 43 | |
@dataclasses.dataclass | |
class Icmp4Packet: | |
type : IcmpType = IcmpType.ECHO_REQUEST | |
code : int = 0 | |
checksum : int = -1 | |
param : bytes = b'\0\0\0\0' | |
data : bytes = b'' | |
def __post_init__(self): | |
if self.type > 255: | |
raise ValueError('ICMP type must be <= 255') | |
# convert to enum, if available | |
if not isinstance(self.type, IcmpType): | |
for t in IcmpType: | |
if self.type == t: | |
self.type = t | |
if self.checksum == -1: | |
self.checksum = self.compute_checksum() | |
if self.checksum > 0xffff: | |
raise ValueError(f'Checksum (0x{self.checksum:x}) must be <= 0xffff') | |
if isinstance(self.checksum, bytes): | |
if len(self.checksum) == 2: | |
self.checksum = struct.pack('!H', self.checksum) | |
else: | |
raise ValueError('Checksum must be 2 bytes') | |
if isinstance(self.data, str): | |
self.data = self.data.encode('latin1') | |
def compute_checksum(self): | |
data = bytes((self.type, self.code)) + self.param + self.data | |
if len(data) % 2 == 1: | |
data += b'\0' | |
data = struct.unpack(f'!{len(data) // 2}H', data) | |
checksum = sum(data) | |
while checksum > 0xffff: | |
checksum = (checksum & 0xffff) + (checksum >> 16) | |
checksum = ~checksum & 0xffff | |
return checksum | |
def tobytes(self): | |
"""Packs ICMP packet ready to be sent""" | |
return ( | |
struct.pack(f'!BBH', self.type, self.code, self.checksum) + | |
self.param.zfill(4) + self.data | |
) | |
@property | |
def valid_checksum(self): | |
return self.checksum == self.compute_checksum() | |
@classmethod | |
def frombytes(cls, data): | |
return cls(*struct.unpack(f'!BBH4s{len(data)-8}s', data)) | |
class Icmp4Server: | |
"""Loosely based on the asyncio.DatagramProtocol class""" | |
MAX_RECV = 65535 - 8 | |
@staticmethod | |
def _decode_ipv4_header(data) -> tuple[int, tuple[str, int], bytes]: | |
"""Extract source address and data from a raw packet. | |
Returns a tuple: (IP Protocol, source address, data) | |
Currently only supports IPv4 | |
""" | |
ip_version = (data[0] & 0xf0) >> 4 | |
if ip_version == 4: | |
header_len = data[0] & 0xf | |
header_len *= 4 # ihl is expressed as 32-bit words | |
header, data = data[:header_len], data[header_len:] | |
ipproto = header[9] | |
srcip = '.'.join(map(str, header[12:16])) | |
dstip = '.'.join(map(str, header[16:20])) | |
elif ip_version == 6: | |
return None | |
else: | |
raise ValueError(f'Invalid IP Protocol: {ip_version}') | |
return (ipproto, (srcip, dstip), data) | |
def __init__(self, bind_address=None): | |
"""Initialize the ICMP server to accept ICMP Echo traffic""" | |
self.loop = asyncio.get_running_loop() | |
self._egress_queue = asyncio.Queue() | |
self._ingress_queue = asyncio.Queue() | |
if bind_address is None: | |
# 0.0.0.0 doesn't cut it; must bind to actual ip address. | |
bind_address = socket.gethostbyname(socket.gethostname()) | |
try: | |
# windows only allows us to promiscuously listen on IPPROTO_IP | |
# therefore, we must send and listen promiscuously on different sockets. | |
sock_listener = socket.socket(type=socket.SOCK_RAW, proto=socket.IPPROTO_IP) | |
sock_sender = socket.socket(type=socket.SOCK_RAW, proto=socket.IPPROTO_ICMP) | |
except OSError as e: | |
raise OSError( | |
'Creating raw sockets requires administrative privileges.' | |
) from e | |
# set listener to promiscuous mode | |
sock_listener.bind((bind_address, 0)) | |
sock_listener.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, True) | |
sock_listener.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) | |
self.bind_address = bind_address | |
self.sock_listener = sock_listener | |
self.sock_sender = sock_sender | |
async def start_snooper(self): | |
"""Snoop all packets on the interface, extracting ICMP packets. | |
Yes, all packets. Use with care. | |
""" | |
proto_icmp = socket.getprotobyname('icmp') | |
self.loop = asyncio.get_running_loop() | |
self.loop.create_task(self._packet_sender()) | |
self.loop.create_task(self._packet_receiver()) | |
while True: | |
packet = await self.loop.sock_recv(self.sock_listener, self.MAX_RECV) | |
try: | |
hostdata = self._decode_ipv4_header(packet) | |
except ValueError as e: | |
continue | |
if hostdata is None: # ipv6 packet detected | |
continue | |
if hostdata[0] == proto_icmp: | |
srcip, dstip = hostdata[1] | |
if dstip == self.bind_address: | |
icmp_packet = Icmp4Packet.frombytes(hostdata[2]) | |
self._ingress_queue.put_nowait( | |
self.loop.run_in_executor(None, | |
self.packet_received, icmp_packet, srcip | |
) | |
) | |
async def _packet_receiver(self): | |
while True: | |
await ( await self._ingress_queue.get() ) | |
async def _packet_sender(self): | |
while True: | |
host, data = await self._egress_queue.get() | |
await self.loop.run_in_executor(None, | |
self.sock_sender.sendto, data, host | |
) | |
def send_to(self, host:str, data): | |
host = (host, 0) # icmp has no ports, but sendto requires a port. | |
if isinstance(data, bytes): | |
data = Icmp4Packet(data=data) | |
if not isinstance(data, Icmp4Packet): | |
raise TypeError(f'Sent data must be {bytes!r} or {Icmp4Packet!r} not {data!r}') | |
self._egress_queue.put_nowait((host, data.tobytes())) | |
def packet_received(self, icmp_packet:Icmp4Packet, srcip:str) -> None: | |
""" | |
Subclass me! | |
Called when an ICMP packet arrives on the network. | |
WARNING: other processes may be aware of this packet too, | |
and any echo reply processes may have responded to it by | |
the time this code starts running. | |
""" | |
class MyIcmpServer(Icmp4Server): | |
def packet_received(self, packet, src): | |
print(src, packet.data.hex()) | |
async def amain(): | |
server = MyIcmpServer() | |
await server.start_snooper() | |
def main(): | |
if not RUNNING_AS_ADMIN: | |
print('Please run this script with elevated privileges.') | |
raise SystemExit(1) | |
try: | |
asyncio.run(amain(), debug=True) | |
except KeyboardInterrupt: | |
pass | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment