Last active
January 21, 2021 20:15
-
-
Save dutchLuck/923ada0f621b43ce18e06c5388ccb2d4 to your computer and use it in GitHub Desktop.
Python code to test non privileged access to raw sockets with use of (IP4) ICMP Echo.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/python | |
# | |
# R A W S C K T P I N G . P Y | |
# | |
# Code to test python non-priveleged access to raw socket ICMP Echo (Ping). | |
# | |
# | |
# Usage (assumes python version 2): | |
# python rawScktPing [-d][-D][-h][-v] [targetComputer ..[targetComputerN]] | |
# where; - | |
# -d forces SOCK_DGRAM instead of SOCK_RAW socket use | |
# -D prints Debug information | |
# -h prints usage information | |
# -v prints more verbose output | |
# targetComputer is the machine to ping | |
# | |
# | |
# This code started as someone elses snippet and was probably | |
# authored by "lilydjwg". | |
# | |
# I don't appear to have noted down were I obtained the original | |
# python raw sockets code snippet from and then after I left it for | |
# a while I couldn't remember were it was from. The search to find | |
# the source of the original code wasn't as easy as I imagined. | |
# It appears likely that it came from; - | |
# | |
# https://github.com/lilydjwg/winterpy/blob/master/pylib/icmplib.py | |
# | |
# This URL was found from a reference in a stackoverflow discussion | |
# about not requiring system privileges to access raw sockets to | |
# send icmp packets. The stackoverflow discussion is on; - | |
# | |
# https://stackoverflow.com/questions/1189389/python-non-privileged-icmp | |
# | |
# So thanks to lilydjwg. | |
# | |
# | |
# This code snippet doesn't need priveleges to run on Windows 10 using raw | |
# sockets in either a Cygwin64 or a Win32 python 2.7 environment. | |
# Raw socket (i.e. SOCK_RAW) use does require priveleges provided by | |
# sudo on MacOS X 10.11.6 ( which has python 2.7.10 ) | |
# and does require priveleges through sudo on Ubuntu 14.4 linux. | |
# | |
# This code snippet doesn't need priveleges to run on MacOS X if SOCK_DGRAM | |
# is used instead of SOCK_RAW (invoked by the -d option - see Usage). This | |
# appears to be how apple's ping command works, using DGRAM for non- | |
# privileged access (see following URL); - | |
# https://apple.stackexchange.com/questions/312857/how-does-macos-allow-standard-users-to-ping | |
# | |
# This code snippet doesn't need priveleges to run on Ubuntu 14.4 | |
# if two conditions are met - The -d option is used (SOCK_DGRAM | |
# is used instead of SOCK_RAW); and the Kernel is preset to accept | |
# this using the following command; - | |
# | |
# sudo sysctl net.ipv4.ping_group_range="1234 1234" | |
# | |
# where: you substitute your group number for 1234. | |
# | |
import socket as _socket | |
import time | |
import struct | |
import array # required in calcChecksum() | |
import os # getpid() | |
import sys # exit() | |
import getopt # getopt() | |
''' | |
Utilities for ICMP socket. | |
For the socket usage: https://lkml.org/lkml/2011/5/10/389 | |
For the packet structure: https://bitbucket.org/delroth/python-ping | |
''' | |
# RFC 792 (ICMP) Message types | |
ICMP_ECHO_REPLY = 0 | |
ICMP_DESTINATION_UNREACHABLE = 3 | |
ICMP_SOURCE_QUENCH = 4 | |
ICMP_REDIRECT = 5 | |
ICMP_ECHO_REQUEST = 8 | |
ICMP_TIME_EXCEEDED = 11 | |
ICMP_PARAMETER_PROBLEM = 12 | |
ICMP_TIMESTAMP_REQUEST = 13 | |
ICMP_TIMESTAMP_REPLY = 14 | |
ICMP_INFORMATION_REQUEST = 15 | |
ICMP_INFORMATION_REPLY = 16 | |
_d_size = struct.calcsize('d') | |
# Calculate 16 bit check sum for a data string | |
# (mostly borrowed from scapy's utils.py) | |
def calcChecksum(dataString): | |
if len(dataString) % 2 == 1: # test for odd number of bytes in the string | |
dataString += "\0" # add extra zero to make even number of bytes | |
s = sum(array.array("H", dataString)) | |
s = (s >> 16) + (s & 0xffff) | |
s += s >> 16 | |
s = ~s | |
if struct.pack("=H",1) != struct.pack("!H",1): # handle endianess of architecture | |
s = (((s >> 8) & 0xff) | s << 8 ) # swap checksum bytes if little endian | |
return s & 0xffff | |
# Construct the ICMP header and add it to the ICMP body data | |
def pack_packet(seq, payload): | |
# Header is type (8), code (8), checksum (16), id (16), sequence (16) | |
pid = os.getpid() & 0xffff | |
headerSansCheckSum = struct.pack('!BBHHH', ICMP_ECHO_REQUEST, 0, 0, pid, seq) | |
chckSum = calcChecksum(headerSansCheckSum + payload) | |
header = struct.pack('!BBHHH', ICMP_ECHO_REQUEST, 0, chckSum, pid, seq) | |
return header + payload | |
# Get best time accuracy for given system | |
def getClockTime(): | |
if sys.platform == 'win32': | |
wallClockTime = time.clock() | |
else: | |
wallClockTime = time.time() # best on most platforms is time.time | |
return wallClockTime | |
# Construct an ICMP Echo Request | |
def pack_packet_with_time(seq, packetsize=56): | |
padding = (packetsize - _d_size) * b'Q' | |
timeinfo = struct.pack('!d', getClockTime()) | |
return pack_packet(seq, timeinfo + padding) | |
# Print a Hex dump of a string of data | |
def printDataStringInHex(dataString): | |
length = len(dataString) | |
for cnt in xrange(0, length, 1): | |
if (cnt % 16) == 0: | |
print '\n%04u: %02x' % ( cnt, ord(dataString[ cnt ])), | |
else: | |
print '%02x' % ord(dataString[ cnt ]), | |
def printIP4_Header(header): | |
print 'IP ver .. ', (header["ver"] >> 4) | |
print 'IP hdr len', ((header["ver"] & 0xf) * 4) | |
print 'dscp ... 0x%02x ' % header["dscp"] | |
print 'totl len %u' % header["totl_len"] | |
print 'id ..... 0x%04x ' % header["pkt_id"] | |
print 'frag ... 0x%04x ' % header["frag"] | |
print 'ttl ....', header["ttl"] | |
print 'proto .. 0x%02x ' % header["prot"] | |
print 'csum ... 0x%04x ' % header["csum"] | |
print 'src IP . %03u.%03u.%03u.%03u' % (header["s1"], header["s2"], header["s3"], header["s4"]) | |
print 'dst IP . %03u.%03u.%03u.%03u' % (header["d1"], header["d2"], header["d3"], header["d4"]) | |
return | |
# Unpack the header of an IPv4 packet | |
def parseIP4_packetHeader(data, options): | |
if sys.platform == 'darwin': # Undo MacOS cooking some IP4 header fields | |
ver, dscp, totalLen, b1, b2, fragLen = struct.unpack('=BBHBBH', data[:8]) | |
totalLen = totalLen + ((ver & 0xf) * 4) | |
tmpData = struct.pack('!BBHBBH', ver, dscp, totalLen, b1, b2, fragLen) | |
data = tmpData + data[8:] | |
# Check to see if the local interface is being used; i.e. src == dest | |
srcAddr, destAddr = struct.unpack('!LL', data[12:20]) | |
# If local interface then don't check the checksum of the packet | |
if srcAddr == destAddr: | |
chckSum = 0 | |
else: | |
chckSum = calcChecksum(data) | |
# Unpack the IPv4 Header | |
ver, dscp, totl_len, pkt_id, frag, ttl, prot, csum, s1, s2, s3, s4, d1, d2, d3, d4 = struct.unpack('!BBHHHBBHBBBBBBBB', data[:20]) | |
ipv4Hdr = { "ver" : ver, "dscp" : dscp, "totl_len" : totl_len, "pkt_id" : pkt_id, "frag" : frag, | |
"ttl" : ttl, "prot" : prot, "csum" : csum, | |
"s1" : s1, "s2" : s2, "s3" : s3, "s4" : s4, | |
"d1" : d1, "d2" : d2, "d3" : d3, "d4" : d4 } | |
if chckSum != 0: | |
print '\n?? The packet check sum calculates to 0x%04x not zero' % chckSum | |
print '\nPacket', | |
printDataStringInHex(data) | |
printIP4_Header(ipv4Hdr) | |
elif options["debug"]: | |
print '\nThe IPv4 packet received is; -' | |
printIP4_Header(ipv4Hdr) | |
printDataStringInHex(data) | |
return csum, data[20:] | |
def printICMP_Header(header): | |
print 'ICMP type ... 0x%02x ' % header["ICMP_Type"] | |
print 'ICMP code ... 0x%02x ' % header["code"] | |
print 'ICMP checksum 0x%04x ' % header["checksum"] | |
print 'ICMP id ..... 0x%04x ' % header["id"] | |
print 'ICMP sequence 0x%04x ' % header["sequence"] | |
return | |
def parseICMP_data(data): | |
chckSum = calcChecksum(data) | |
type, code, checksum, id, sequence = struct.unpack('!BBHHH', data[:8]) | |
ICMP_Header = {"ICMP_Type":type,"code":code,"checksum":checksum,"id":id,"sequence":sequence} | |
if chckSum != 0: | |
print '\n?? The ICMP check sum test failed (it calculates to 0x%04x, not 0)' % chckSum | |
return ICMP_Header, data[8:] | |
def parsePacketWithTimeStamp(data, optns): | |
if optns["dgram"]: # Process all SOCK_DGRAM socket returned packets | |
if sys.platform == 'linux2': # linux SOCK_DGRAM socket returns do not have IP4 header | |
hdr, payload = parseICMP_data(data) | |
else: # non-linux SOCK_DGRAM socket returns have an IP4 header | |
_, icmpData = parseIP4_packetHeader(data, optns) | |
hdr, payload = parseICMP_data(icmpData) | |
else: # Process all SOCK_RAW socket returned information | |
_, icmpData = parseIP4_packetHeader(data, optns) | |
hdr, payload = parseICMP_data(icmpData) | |
if hdr["ICMP_Type"] == ICMP_ECHO_REPLY: | |
if optns["debug"]: | |
print '\nICMP Echo Reply is; -' | |
printICMP_Header(hdr) | |
printDataStringInHex(payload) | |
timeStamp = struct.unpack('!d', payload[:_d_size])[0] | |
else: | |
timeStamp = 0 | |
if optns["verbose"]: | |
print '\n?? ICMP (type %d) packet returned is not an ICMP Echo Reply' % hdr["icmpType"] | |
return timeStamp | |
# Use a Raw Socket (SOCK_RAW) by default, but use SOCK_DGRAM if -d option was specified | |
def selectSocket(dgram): | |
if dgram: | |
return _socket.socket(_socket.AF_INET, _socket.SOCK_DGRAM, _socket.IPPROTO_ICMP) | |
else: | |
return _socket.socket(_socket.AF_INET, _socket.SOCK_RAW, _socket.IPPROTO_ICMP) | |
def ping(address, optns): | |
if optns["verbose"]: | |
print '\nAttempting to Ping "%s"' % address, | |
try: | |
addr = _socket.gethostbyname(address) | |
if optns["verbose"]: | |
if address != str(addr): | |
print '(', addr, ')', | |
s = selectSocket(optns["dgram"]) | |
s.settimeout(2) | |
# Build an ICMP Echo Request Packet | |
pingPacket = pack_packet_with_time(1) | |
if optns["debug"]: | |
print '\nICMP Echo Request is; -' | |
ICMP_Hdr, ICMP_Payload = parseICMP_data(pingPacket) | |
printICMP_Header(ICMP_Hdr) | |
printDataStringInHex(ICMP_Payload) | |
# Send the ICMP Echo Request | |
s.sendto(pingPacket, (addr, 0)) | |
# Loop until we get an ICMP Echo Reply Packet or time out | |
while True: | |
packet, peer = s.recvfrom(2048) | |
recvTime = getClockTime() | |
if peer[0] != addr: # Ignore the packet if it is not from the target machine | |
if optns["verbose"]: | |
print 'Received a packet from %s but not from the target machine' % peer[0] | |
else: | |
ICMP_EchoRequestTimeStamp = parsePacketWithTimeStamp(packet, optns) | |
if ICMP_EchoRequestTimeStamp != 0: | |
break | |
return recvTime - ICMP_EchoRequestTimeStamp | |
except _socket.error, msg: | |
print '\n?? An error occurred:', msg, '\n' | |
return 9.999999 | |
# Obtain the local machines IPv4 address | |
def getLocalIP(): | |
s = _socket.socket(_socket.AF_INET, _socket.SOCK_DGRAM) | |
try: | |
s.connect(('10.255.255.255', 1)) | |
localIP = s.getsockname()[0] | |
except: | |
localIP = '127.0.0.1' # Susbstitute the loopback address | |
finally: | |
s.close | |
return localIP | |
def usage(): | |
print 'Usage:\n%s [-dDhv] [targetMachine ..[targetMachineN]]' % sys.argv[0] | |
print ' where; -\n -d or --dgram selects SOCK_DGRAM socket instead of SOCK_RAW socket' | |
print ' -D or --debug prints out Debug information' | |
print ' -h or --help outputs this usage message' | |
print ' -v or --verbose prints verbose output' | |
print ' targetMachine is either the name or IP address of the computer to ping' | |
def processCommandLine(): | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], "dDhv", ["dgram","debug","help","verbose"]) | |
except getopt.GetoptError as err: | |
print str(err) | |
usage() | |
sys.exit() | |
optns = {"dgram" : False, "debug" : False, "help" : False, "verbose" : False} | |
for o, a in opts: | |
if o in ("-d", "--dgram"): | |
optns["dgram"] = True | |
elif o in ("-D", "--debug"): | |
optns["debug"] = True | |
elif o in ("-h", "--help"): | |
optns["help"] = True | |
elif o in ("-v", "--verbose"): | |
optns["verbose"] = True | |
if optns["debug"]: | |
optns["verbose"] = True # Debug implies verbose output | |
return optns, args | |
def printPingTime( trgtAddr, opts, startTime ): | |
travelTime = ping(trgtAddr, opts) # Ping the specified computer | |
if travelTime <= (getClockTime() - startTime): # If Ping fails then the travel time is deliberatly set large | |
print 'Ping round trip time to "%s" was: %9.3f mS.' % (trgtAddr,travelTime * 1000) | |
def main(): | |
startTime = getClockTime() | |
opts, args = processCommandLine() | |
if opts["verbose"]: | |
print '\nRaw Socket IPv4 ICMP Ping Test Program' | |
if opts["debug"]: | |
print '\n"rawScktPing.py" Python script running on system type "%s"' % sys.platform | |
if len(args) < 1: | |
print '\n?? Please specify the computer to ping?\n' | |
usage() | |
localInterface = getLocalIP() | |
print '\nDefaulting to ping the local interface (%s)' % localInterface | |
printPingTime(localInterface, opts, startTime) # If there is no target specified then use local Interface IP | |
else: | |
if opts["help"]: | |
usage() | |
# Step through ping targets specified on the command line | |
for trgtAddr in args: | |
printPingTime(trgtAddr, opts, startTime) | |
if opts["debug"]: | |
print '\nrawScktPing.py execution time was: %9.3f mS.\n' % ((getClockTime() - startTime) * 1000) | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment