Last active
April 23, 2026 07:15
-
-
Save twobob/57f873f4f6c25e6626878fe56ab6e56b to your computer and use it in GitHub Desktop.
Spot the things doing underlying network operations.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import socket | |
| import struct | |
| import os | |
| import time | |
| import threading | |
| import subprocess | |
| import sys | |
| import ctypes | |
| import argparse | |
| if not ctypes.windll.shell32.IsUserAnAdmin(): | |
| sys.exit(1) | |
| port_to_process = {} | |
| pid_to_name = {} | |
| recent_dns_requests = [] | |
| dns_lock = threading.Lock() | |
| def etw_monitor(): | |
| global recent_dns_requests | |
| ps_script = """ | |
| $LogName = 'Microsoft-Windows-DNS-Client/Operational' | |
| wevtutil sl $LogName /e:true | |
| $lastTime = Get-Date | |
| try { | |
| while ($true) { | |
| $events = Get-WinEvent -FilterHashtable @{LogName=$LogName; ID=3008; StartTime=$lastTime} -ErrorAction SilentlyContinue | Sort-Object TimeCreated | |
| if ($events) { | |
| foreach ($event in $events) { | |
| $xml = [xml]$event.ToXml() | |
| $q = $xml.Event.EventData.Data | Where-Object { $_.Name -eq 'QueryName' } | Select-Object -ExpandProperty '#text' | |
| [Console]::WriteLine("$($event.ProcessId)|$q") | |
| } | |
| $lastTime = $events[-1].TimeCreated | |
| } | |
| Start-Sleep -Milliseconds 250 | |
| } | |
| } finally { | |
| wevtutil sl $LogName /e:false | |
| } | |
| """ | |
| process = subprocess.Popen(['powershell', '-NoProfile', '-Command', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True, bufsize=1) | |
| process.stdin.write(ps_script) | |
| process.stdin.flush() | |
| process.stdin.close() | |
| for line in iter(process.stdout.readline, ''): | |
| line = line.strip() | |
| if '|' in line: | |
| parts = line.split('|', 1) | |
| if len(parts) == 2 and parts[0].isdigit(): | |
| with dns_lock: | |
| recent_dns_requests.append({ | |
| 'time': time.time(), | |
| 'domain': parts[1], | |
| 'caller_pid': int(parts[0]) | |
| }) | |
| def update_mappings(): | |
| global port_to_process, pid_to_name | |
| while True: | |
| try: | |
| task_output = subprocess.check_output('tasklist /svc /fo csv /nh', shell=True, text=True) | |
| new_tasks = {} | |
| for line in task_output.splitlines(): | |
| if not line.strip(): continue | |
| parts = line.replace('"', '').split(',') | |
| if len(parts) >= 3: | |
| exe_name = parts[0] | |
| pid = int(parts[1]) if parts[1].isdigit() else 0 | |
| services = parts[2] | |
| if exe_name.lower() == 'svchost.exe' and services.lower() != 'n/a': | |
| new_tasks[pid] = f"svchost [{services}]" | |
| else: | |
| new_tasks[pid] = exe_name | |
| pid_to_name = new_tasks | |
| net_output = subprocess.check_output('netstat -ano', shell=True, text=True) | |
| new_ports = {} | |
| for line in net_output.splitlines(): | |
| parts = line.split() | |
| if len(parts) >= 5 and parts[0] in ('TCP', 'UDP'): | |
| local_addr = parts[1] | |
| pid = parts[-1] | |
| if ':' in local_addr and pid.isdigit(): | |
| port_str = local_addr.rsplit(':', 1)[1] | |
| if port_str.isdigit(): | |
| new_ports[int(port_str)] = int(pid) | |
| port_to_process = new_ports | |
| except Exception: | |
| pass | |
| time.sleep(2) | |
| t_mappings = threading.Thread(target=update_mappings, daemon=True) | |
| t_mappings.start() | |
| t_etw = threading.Thread(target=etw_monitor, daemon=True) | |
| t_etw.start() | |
| time.sleep(1) | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Temporal Network Connection Logger") | |
| parser.add_argument('-f', '--filter', action='append', dest='filters', help='Filter OUT (hide) processes or domains containing this string') | |
| args = parser.parse_args() | |
| # Blacklist strings | |
| target_filters = [f.lower() for f in args.filters] if args.filters else [] | |
| HOST = socket.gethostbyname(socket.gethostname()) | |
| s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IP) | |
| s.bind((HOST, 0)) | |
| s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) | |
| s.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) | |
| traffic = {} | |
| last_print = time.time() | |
| try: | |
| while True: | |
| packet, addr = s.recvfrom(65535) | |
| packet_len = len(packet) | |
| if packet_len < 20: | |
| continue | |
| ip_header = packet[0:20] | |
| iph = struct.unpack('!BBHHHBBH4s4s', ip_header) | |
| ihl = (iph[0] & 0xF) * 4 | |
| protocol = iph[6] | |
| s_addr = socket.inet_ntoa(iph[8]) | |
| d_addr = socket.inet_ntoa(iph[9]) | |
| local_port = None | |
| if protocol == 6 and packet_len >= ihl + 20: | |
| tcp_header = packet[ihl:ihl+20] | |
| tcph = struct.unpack('!HHLLBBHHH', tcp_header) | |
| if s_addr == HOST: local_port = tcph[0] | |
| elif d_addr == HOST: local_port = tcph[1] | |
| elif protocol == 17 and packet_len >= ihl + 8: | |
| udp_header = packet[ihl:ihl+8] | |
| udph = struct.unpack('!HHHH', udp_header) | |
| if s_addr == HOST: local_port = udph[0] | |
| elif d_addr == HOST: local_port = udph[1] | |
| if local_port and local_port in port_to_process: | |
| pid = port_to_process[local_port] | |
| traffic[pid] = traffic.get(pid, 0) + packet_len | |
| now = time.time() | |
| if now - last_print >= 1.0: | |
| os.system('cls' if os.name == 'nt' else 'clear') | |
| header_msg = f"Monitoring host: {HOST}. Press Ctrl+C to stop." | |
| if target_filters: | |
| header_msg += f" (Blocking: {', '.join(target_filters)})" | |
| print(header_msg + "\n") | |
| print(f"{'PID':<8} {'Process / Service Name':<120} {'Bytes/sec':<15}") | |
| print("-" * 145) | |
| with dns_lock: | |
| recent_dns_requests[:] = [req for req in recent_dns_requests if now - req['time'] <= 120.0] | |
| immediate_requests = [req for req in recent_dns_requests if now - req['time'] <= 2.0] | |
| for p, b in sorted(traffic.items(), key=lambda item: item[1], reverse=True): | |
| base_name = pid_to_name.get(p, 'Unknown') | |
| final_name = base_name | |
| show_row = True | |
| # Apply general process blacklist | |
| if target_filters and any(f in base_name.lower() for f in target_filters): | |
| show_row = False | |
| if 'Dnscache' in base_name: | |
| if immediate_requests: | |
| unique_displays = [] | |
| for req in immediate_requests: | |
| caller_name = pid_to_name.get(req['caller_pid'], f"PID:{req['caller_pid']}") | |
| # Drop specific DNS queries if the caller or domain matches the blacklist | |
| if target_filters and any(f in caller_name.lower() or f in req['domain'].lower() for f in target_filters): | |
| continue | |
| display = f"{caller_name} -> {req['domain']}" | |
| if display not in unique_displays: | |
| unique_displays.append(display) | |
| if unique_displays: | |
| domains_str = " | ".join(unique_displays[:2]) | |
| if len(unique_displays) > 2: | |
| domains_str += " | ..." | |
| final_name = f"{base_name} [{domains_str}]" | |
| show_row = True | |
| else: | |
| # If all queries within this Dnscache block were blacklisted, hide the row | |
| show_row = False | |
| else: | |
| if not show_row: | |
| pass # Already hidden by base process name check | |
| else: | |
| final_name = f"{base_name} [Background / Broadcast]" | |
| if not show_row: | |
| continue | |
| print(f"{p:<8} {final_name:<120} {b:<15}") | |
| traffic.clear() | |
| print("\n" + "=" * 145) | |
| print("UNIQUE DNS CALLS (LAST 2 MINUTES)") | |
| print("=" * 145) | |
| with dns_lock: | |
| if recent_dns_requests: | |
| seen_combos = set() | |
| displayed_count = 0 | |
| for req in reversed(recent_dns_requests): | |
| combo = (req['caller_pid'], req['domain']) | |
| if combo not in seen_combos: | |
| seen_combos.add(combo) | |
| caller_name = pid_to_name.get(req['caller_pid'], f"PID:{req['caller_pid']}") | |
| # Filter historical block with blacklist | |
| if target_filters and any(f in caller_name.lower() or f in req['domain'].lower() for f in target_filters): | |
| continue | |
| time_str = time.strftime('%H:%M:%S', time.localtime(req['time'])) | |
| print(f"[{time_str}] {req['caller_pid']:<8} {caller_name:<40} -> {req['domain']}") | |
| displayed_count += 1 | |
| if displayed_count == 0 and target_filters: | |
| print(f"No DNS queries remaining after applying filters.") | |
| else: | |
| print("No DNS queries detected in the last 120 seconds.") | |
| last_print = now | |
| except KeyboardInterrupt: | |
| s.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF) | |
| s.close() | |
| subprocess.run('wevtutil sl Microsoft-Windows-DNS-Client/Operational /e:false', shell=True) | |
| if __name__ == '__main__': | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
run as admin