Last active
August 29, 2015 14:13
-
-
Save sstelfox/f04fe9ef1f279b1526f8 to your computer and use it in GitHub Desktop.
Playing around with manual packet inspection using ruby and pcaprub
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'ipaddr' | |
require 'pcaprub' | |
require 'strscan' | |
# https://en.wikipedia.org/wiki/EtherType | |
ETHER_TYPE = { | |
0x0800 => :ipv4, | |
0x0806 => :arp, | |
0x86dd => :ipv6 | |
} | |
module DataConverters | |
def mac_address(byte_str) | |
byte_str.unpack('C6').map { |s| sprintf('%02x', s) }.join(':') | |
end | |
def int8(byte_str) | |
byte_str.unpack('C')[0] | |
end | |
def int16(byte_str) | |
byte_str.unpack('n')[0] | |
end | |
def int32(byte_str) | |
byte_str.unpack('N')[0] | |
end | |
def ipv4_addr(byte_str) | |
IPAddr.new(int32(byte_str), Socket::AF_INET) | |
end | |
def get_bytes(str_scanner, bytes) | |
val = str_scanner.peek(bytes) | |
str_scanner.pos += bytes | |
val | |
end | |
end | |
class EthernetII | |
extend DataConverters | |
attr_reader :fields | |
def self.parse(raw_pkt_str) | |
pkt_scanner = StringScanner.new(raw_pkt_str.force_encoding(Encoding::BINARY)) | |
fields = { | |
mac_dest: mac_address(get_bytes(pkt_scanner, 6)), | |
mac_src: mac_address(get_bytes(pkt_scanner, 6)) | |
} | |
# VLAN tag information is 4 bytes that exist between the src_destination | |
# and ethertype fields but is only present when a tag is set. This is | |
# indicated with the special value 0x8100 where the ether_type field | |
# normally is. | |
if int16(pkt_scanner.peek(2)) == 0x8100 | |
fields[:vlan_tag] = int32(get_bytes(pkt_scanner, 4)) | |
end | |
fields[:ether_type] = int16(get_bytes(pkt_scanner, 2)) | |
fields[:payload] = pkt_scanner.rest | |
new(fields) | |
end | |
def ether_type | |
ETHER_TYPE[fields[:ether_type]] || fields[:ether_type] | |
end | |
def initialize(fields = {}) | |
@fields = fields | |
end | |
def payload | |
return @payload if @payload | |
# Check if this is an ARP packet | |
case ether_type | |
when :arp | |
# ARP | |
@payload = ARPPacket.parse(fields[:payload]) | |
else | |
# Unknown | |
@payload = fields[:payload] | |
end | |
end | |
end | |
class ARPPacket | |
extend DataConverters | |
HARDWARE_TYPE = { | |
1 => :ethernet | |
} | |
ARP_OPERATION = { | |
1 => :request, | |
2 => :reply | |
} | |
attr_reader :fields | |
def self.parse(raw_pkt_str) | |
pkt_scanner = StringScanner.new(raw_pkt_str.force_encoding(Encoding::BINARY)) | |
fields = { | |
hardware_type: int16(get_bytes(pkt_scanner, 2)), | |
protocol_type: int16(get_bytes(pkt_scanner, 2)), | |
hardware_len: int8(pkt_scanner.get_byte), | |
protocol_len: int8(pkt_scanner.get_byte), | |
operation: int16(get_bytes(pkt_scanner, 2)) | |
} | |
fields[:sender_hw_addr] = get_bytes(pkt_scanner, fields[:hardware_len]) | |
fields[:sender_proto_addr] = get_bytes(pkt_scanner, fields[:protocol_len]) | |
fields[:target_hw_addr] = get_bytes(pkt_scanner, fields[:hardware_len]) | |
fields[:target_proto_addr] = get_bytes(pkt_scanner, fields[:protocol_len]) | |
new(fields) | |
end | |
def hardware_type | |
HARDWARE_TYPE[fields[:hardware_type]] || fields[:hardware_type] | |
end | |
def initialize(fields = {}) | |
@fields = fields | |
end | |
def operation | |
ARP_OPERATION[fields[:operation]] || fields[:operation] | |
end | |
def output | |
{ | |
hardware_type: hardware_type, | |
protocol_type: protocol_type, | |
operation: operation, | |
sender_hw_addr: sender_hw_addr, | |
sender_proto_addr: sender_proto_addr, | |
target_hw_addr: target_hw_addr, | |
target_proto_addr: target_proto_addr | |
} | |
end | |
def protocol_type | |
ETHER_TYPE[fields[:protocol_type]] || fields[:protocol_type] | |
end | |
def sender_hw_addr | |
return self.class.mac_address(fields[:sender_hw_addr]) if hardware_type == :ethernet | |
fields[:sender_hw_addr] | |
end | |
def sender_proto_addr | |
return self.class.ipv4_addr(fields[:sender_proto_addr]) if protocol_type == :ipv4 | |
fields[:sender_proto_addr] | |
end | |
def target_hw_addr | |
return self.class.mac_address(fields[:target_hw_addr]) if hardware_type == :ethernet | |
fields[:target_hw_addr] | |
end | |
def target_proto_addr | |
return self.class.ipv4_addr(fields[:target_proto_addr]) if protocol_type == :ipv4 | |
fields[:target_proto_addr] | |
end | |
end | |
capture = PCAPRUB::Pcap.open_live('em1', 65535, true, 0) | |
capture.setfilter('arp') | |
begin | |
loop do | |
while (pkt = capture.next()) | |
if (ef = EthernetII.parse(pkt)).ether_type == :arp | |
puts ef.payload.output.inspect | |
end | |
end | |
sleep 0.1 | |
end | |
rescue Interrupt | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment