Created
February 13, 2025 08:57
-
-
Save alonsoir/830b62ff6e78a6c10d8fdc48bb900673 to your computer and use it in GitHub Desktop.
A scapy based sample about how to use in a multi thread way an sniffer and a packets sending tool.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import argparse | |
import queue | |
from scapy.all import * | |
from scapy.layers.dns import * | |
from scapy.layers.inet import ICMP | |
''' | |
Mejoras aplicadas con respecto a fuzzing_dns_multi_thread: | |
1) Se elimina time.sleep(1) dentro del bucle de envío de ICMP → send() ya maneja su propio control de flujo. | |
2) Se usa sniff(store=False) en sniff_packets() para evitar uso innecesario de memoria. | |
3) Se eliminan las funciones lambda x:x.summary() por prn=packet_handler para mejorar legibilidad. | |
4) Se añade queue.Queue para sincronizar los hilos y garantizar que el sniffer se inicie antes del envío de paquetes. | |
5) Se optimiza el uso de threading.Thread eliminando la lista innecesaria threads. | |
(attendance-system-py3.10) ┌<▸> ~/g/a/scapy | |
└➤ sudo poetry run python multi_thread_optimized.py 1.1.1.1 --packets 10 --timeout 10 | |
Target: 1.1.1.1, Packets: 10, Timeout: 10 | |
.......... | |
Sent 10 packets. | |
Ether / IP / ICMP 192.168.1.112 > 1.1.1.1 echo-request 0 | |
Ether / IP / ICMP 192.168.1.112 > 1.1.1.1 echo-request 0 | |
Ether / IP / ICMP 192.168.1.112 > 1.1.1.1 echo-request 0 | |
Ether / IP / ICMP 192.168.1.112 > 1.1.1.1 echo-request 0 | |
Ether / IP / ICMP 192.168.1.112 > 1.1.1.1 echo-request 0 | |
Ether / IP / ICMP 192.168.1.112 > 1.1.1.1 echo-request 0 | |
Ether / IP / ICMP 192.168.1.112 > 1.1.1.1 echo-request 0 | |
Ether / IP / ICMP 192.168.1.112 > 1.1.1.1 echo-request 0 | |
Ether / IP / ICMP 192.168.1.112 > 1.1.1.1 echo-request 0 | |
Ether / IP / ICMP 192.168.1.112 > 1.1.1.1 echo-request 0 | |
Ether / IP / ICMP 1.1.1.1 > 192.168.1.112 echo-reply 0 / Padding | |
Ether / IP / ICMP 1.1.1.1 > 192.168.1.112 echo-reply 0 / Padding | |
Ether / IP / ICMP 1.1.1.1 > 192.168.1.112 echo-reply 0 / Padding | |
Ether / IP / ICMP 1.1.1.1 > 192.168.1.112 echo-reply 0 / Padding | |
Ether / IP / ICMP 1.1.1.1 > 192.168.1.112 echo-reply 0 / Padding | |
Ether / IP / ICMP 1.1.1.1 > 192.168.1.112 echo-reply 0 / Padding | |
Ether / IP / ICMP 1.1.1.1 > 192.168.1.112 echo-reply 0 / Padding | |
Ether / IP / ICMP 1.1.1.1 > 192.168.1.112 echo-reply 0 / Padding | |
Ether / IP / ICMP 1.1.1.1 > 192.168.1.112 echo-reply 0 / Padding | |
Ether / IP / ICMP 1.1.1.1 > 192.168.1.112 echo-reply 0 / Padding | |
(attendance-system-py3.10) ┌<▸> ~/g/a/scapy | |
└➤ | |
''' | |
def test_dns_valid(): | |
target = "8.8.8.8" | |
pkt = IP(dst=target) / UDP(dport=53) / DNS(rd=1, qd=DNSQR(qname="google.com")) | |
response = sr1(pkt, timeout=2, verbose=False) | |
if response: | |
response.show() | |
return response | |
def send_icmp_packages(target, num_packets, start_event): | |
""" Envía paquetes ICMP después de que el sniffer haya comenzado. """ | |
start_event.wait() # Espera a que el sniffer inicie | |
icmp = IP(dst=target) / ICMP() | |
send(icmp, count=num_packets, verbose=True) | |
def packet_handler(pkt): | |
""" Maneja los paquetes capturados mostrando solo el resumen. """ | |
print(pkt.summary()) | |
def sniff_packets(timeout, start_event): | |
""" Inicia el sniffer y avisa a los otros hilos que ha comenzado. """ | |
start_event.set() # Notifica que el sniffer está activo | |
sniff(filter="icmp", prn=packet_handler, timeout=timeout, store=False) | |
def main(target, num_packets, timeout): | |
start_event = threading.Event() | |
sniffer_thread = threading.Thread(target=sniff_packets, args=(timeout, start_event), daemon=True) | |
icmp_thread = threading.Thread(target=send_icmp_packages, args=(target, num_packets, start_event), daemon=True) | |
sniffer_thread.start() | |
icmp_thread.start() | |
sniffer_thread.join() | |
icmp_thread.join() | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="ICMP MultiThread sniffer") | |
parser.add_argument('target', type=str, help="Target IP") | |
parser.add_argument('--packets', type=int, default=10, help="Number of packets to send") | |
parser.add_argument('--timeout', type=int, default=10, help="Timeout for the sniffer") | |
args = parser.parse_args() | |
print(f"Target: {args.target}, Packets: {args.packets}, Timeout: {args.timeout}") | |
main(args.target, args.packets, args.timeout) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment