Skip to content

Instantly share code, notes, and snippets.

@twobob
Last active April 23, 2026 07:15
Show Gist options
  • Select an option

  • Save twobob/57f873f4f6c25e6626878fe56ab6e56b to your computer and use it in GitHub Desktop.

Select an option

Save twobob/57f873f4f6c25e6626878fe56ab6e56b to your computer and use it in GitHub Desktop.
Spot the things doing underlying network operations.
import socket
import struct
import os
import time
import threading
import subprocess
import sys
import ctypes
import argparse
if not ctypes.windll.shell32.IsUserAnAdmin():
sys.exit(1)
port_to_process = {}
pid_to_name = {}
recent_dns_requests = []
dns_lock = threading.Lock()
def etw_monitor():
global recent_dns_requests
ps_script = """
$LogName = 'Microsoft-Windows-DNS-Client/Operational'
wevtutil sl $LogName /e:true
$lastTime = Get-Date
try {
while ($true) {
$events = Get-WinEvent -FilterHashtable @{LogName=$LogName; ID=3008; StartTime=$lastTime} -ErrorAction SilentlyContinue | Sort-Object TimeCreated
if ($events) {
foreach ($event in $events) {
$xml = [xml]$event.ToXml()
$q = $xml.Event.EventData.Data | Where-Object { $_.Name -eq 'QueryName' } | Select-Object -ExpandProperty '#text'
[Console]::WriteLine("$($event.ProcessId)|$q")
}
$lastTime = $events[-1].TimeCreated
}
Start-Sleep -Milliseconds 250
}
} finally {
wevtutil sl $LogName /e:false
}
"""
process = subprocess.Popen(['powershell', '-NoProfile', '-Command', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True, bufsize=1)
process.stdin.write(ps_script)
process.stdin.flush()
process.stdin.close()
for line in iter(process.stdout.readline, ''):
line = line.strip()
if '|' in line:
parts = line.split('|', 1)
if len(parts) == 2 and parts[0].isdigit():
with dns_lock:
recent_dns_requests.append({
'time': time.time(),
'domain': parts[1],
'caller_pid': int(parts[0])
})
def update_mappings():
global port_to_process, pid_to_name
while True:
try:
task_output = subprocess.check_output('tasklist /svc /fo csv /nh', shell=True, text=True)
new_tasks = {}
for line in task_output.splitlines():
if not line.strip(): continue
parts = line.replace('"', '').split(',')
if len(parts) >= 3:
exe_name = parts[0]
pid = int(parts[1]) if parts[1].isdigit() else 0
services = parts[2]
if exe_name.lower() == 'svchost.exe' and services.lower() != 'n/a':
new_tasks[pid] = f"svchost [{services}]"
else:
new_tasks[pid] = exe_name
pid_to_name = new_tasks
net_output = subprocess.check_output('netstat -ano', shell=True, text=True)
new_ports = {}
for line in net_output.splitlines():
parts = line.split()
if len(parts) >= 5 and parts[0] in ('TCP', 'UDP'):
local_addr = parts[1]
pid = parts[-1]
if ':' in local_addr and pid.isdigit():
port_str = local_addr.rsplit(':', 1)[1]
if port_str.isdigit():
new_ports[int(port_str)] = int(pid)
port_to_process = new_ports
except Exception:
pass
time.sleep(2)
t_mappings = threading.Thread(target=update_mappings, daemon=True)
t_mappings.start()
t_etw = threading.Thread(target=etw_monitor, daemon=True)
t_etw.start()
time.sleep(1)
def main():
parser = argparse.ArgumentParser(description="Temporal Network Connection Logger")
parser.add_argument('-f', '--filter', action='append', dest='filters', help='Filter OUT (hide) processes or domains containing this string')
args = parser.parse_args()
# Blacklist strings
target_filters = [f.lower() for f in args.filters] if args.filters else []
HOST = socket.gethostbyname(socket.gethostname())
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IP)
s.bind((HOST, 0))
s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
s.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
traffic = {}
last_print = time.time()
try:
while True:
packet, addr = s.recvfrom(65535)
packet_len = len(packet)
if packet_len < 20:
continue
ip_header = packet[0:20]
iph = struct.unpack('!BBHHHBBH4s4s', ip_header)
ihl = (iph[0] & 0xF) * 4
protocol = iph[6]
s_addr = socket.inet_ntoa(iph[8])
d_addr = socket.inet_ntoa(iph[9])
local_port = None
if protocol == 6 and packet_len >= ihl + 20:
tcp_header = packet[ihl:ihl+20]
tcph = struct.unpack('!HHLLBBHHH', tcp_header)
if s_addr == HOST: local_port = tcph[0]
elif d_addr == HOST: local_port = tcph[1]
elif protocol == 17 and packet_len >= ihl + 8:
udp_header = packet[ihl:ihl+8]
udph = struct.unpack('!HHHH', udp_header)
if s_addr == HOST: local_port = udph[0]
elif d_addr == HOST: local_port = udph[1]
if local_port and local_port in port_to_process:
pid = port_to_process[local_port]
traffic[pid] = traffic.get(pid, 0) + packet_len
now = time.time()
if now - last_print >= 1.0:
os.system('cls' if os.name == 'nt' else 'clear')
header_msg = f"Monitoring host: {HOST}. Press Ctrl+C to stop."
if target_filters:
header_msg += f" (Blocking: {', '.join(target_filters)})"
print(header_msg + "\n")
print(f"{'PID':<8} {'Process / Service Name':<120} {'Bytes/sec':<15}")
print("-" * 145)
with dns_lock:
recent_dns_requests[:] = [req for req in recent_dns_requests if now - req['time'] <= 120.0]
immediate_requests = [req for req in recent_dns_requests if now - req['time'] <= 2.0]
for p, b in sorted(traffic.items(), key=lambda item: item[1], reverse=True):
base_name = pid_to_name.get(p, 'Unknown')
final_name = base_name
show_row = True
# Apply general process blacklist
if target_filters and any(f in base_name.lower() for f in target_filters):
show_row = False
if 'Dnscache' in base_name:
if immediate_requests:
unique_displays = []
for req in immediate_requests:
caller_name = pid_to_name.get(req['caller_pid'], f"PID:{req['caller_pid']}")
# Drop specific DNS queries if the caller or domain matches the blacklist
if target_filters and any(f in caller_name.lower() or f in req['domain'].lower() for f in target_filters):
continue
display = f"{caller_name} -> {req['domain']}"
if display not in unique_displays:
unique_displays.append(display)
if unique_displays:
domains_str = " | ".join(unique_displays[:2])
if len(unique_displays) > 2:
domains_str += " | ..."
final_name = f"{base_name} [{domains_str}]"
show_row = True
else:
# If all queries within this Dnscache block were blacklisted, hide the row
show_row = False
else:
if not show_row:
pass # Already hidden by base process name check
else:
final_name = f"{base_name} [Background / Broadcast]"
if not show_row:
continue
print(f"{p:<8} {final_name:<120} {b:<15}")
traffic.clear()
print("\n" + "=" * 145)
print("UNIQUE DNS CALLS (LAST 2 MINUTES)")
print("=" * 145)
with dns_lock:
if recent_dns_requests:
seen_combos = set()
displayed_count = 0
for req in reversed(recent_dns_requests):
combo = (req['caller_pid'], req['domain'])
if combo not in seen_combos:
seen_combos.add(combo)
caller_name = pid_to_name.get(req['caller_pid'], f"PID:{req['caller_pid']}")
# Filter historical block with blacklist
if target_filters and any(f in caller_name.lower() or f in req['domain'].lower() for f in target_filters):
continue
time_str = time.strftime('%H:%M:%S', time.localtime(req['time']))
print(f"[{time_str}] {req['caller_pid']:<8} {caller_name:<40} -> {req['domain']}")
displayed_count += 1
if displayed_count == 0 and target_filters:
print(f"No DNS queries remaining after applying filters.")
else:
print("No DNS queries detected in the last 120 seconds.")
last_print = now
except KeyboardInterrupt:
s.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
s.close()
subprocess.run('wevtutil sl Microsoft-Windows-DNS-Client/Operational /e:false', shell=True)
if __name__ == '__main__':
main()
@twobob
Copy link
Copy Markdown
Author

twobob commented Apr 22, 2026

run as admin

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment