Last active
November 1, 2019 18:00
-
-
Save leyyce/27cf5989dfc67750f9c0414dca565e01 to your computer and use it in GitHub Desktop.
Simple and leightweight multi-threaded port scanner written in python that can work with port and ip address ranges. At the moment this script only supports full-connection tcp scans. For usage help see >python pyscan.py -h
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
class ProgressPrinter: | |
def __init__(self, units, unit_type='', max_length=50, pre='', post='', fill='=', head='>', empty=' '): | |
if units > max_length: | |
self.steps = max_length / units | |
self.max_length = max_length | |
else: | |
self.steps = 1 | |
self.max_length = units | |
self.units = units | |
self.unit_type = unit_type | |
self.pre = pre | |
self.post = post | |
self.fill = fill | |
self.head = head | |
self.empty = empty | |
def print_progress(self, current_unit=0, pre=''): | |
if self.head != '' and current_unit != 0: | |
progress = self.fill * (round(current_unit * self.steps) - 1) + self.head | |
else: | |
progress = self.fill * round(current_unit * self.steps) | |
empty = self.empty * (self.max_length - len(progress)) | |
if current_unit == 0: | |
if self.pre != '': | |
sys.stdout.write('\n' + self.pre + '\n') | |
if pre != '': | |
sys.stdout.write(pre + ' ' * self.max_length + '\n') | |
sys.stdout.write('[' + progress + empty + | |
'] - Finished {}/{} {}'.format(current_unit, self.units, self.unit_type)) | |
else: | |
if pre != '': | |
sys.stdout.write('\r' + pre + ' ' * self.max_length + '\n') | |
sys.stdout.write('[' + progress + empty + | |
'] - Finished {}/{} {}'.format(current_unit, self.units, self.unit_type)) | |
else: | |
sys.stdout.write('\r[' + progress + empty + | |
'] - Finished {}/{} {}'.format(current_unit, self.units, self.unit_type)) | |
if current_unit == self.units: | |
sys.stdout.write('\n') | |
if self.post != '': | |
sys.stdout.write(self.post + '\n') | |
sys.stdout.flush() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import optparse | |
from socket import * | |
from threading import Lock, Thread | |
from queue import Queue | |
import datetime | |
from progress_printer import ProgressPrinter | |
scan_lock = Lock() | |
print_lock = Lock() | |
finished_scans = 0 | |
num_worker_threads = 10 | |
num_port_threads = 50 | |
def conn_scan(host, port, results): | |
conn_sock = socket(AF_INET, SOCK_STREAM) | |
conn_sock.settimeout(4) | |
try: | |
conn_sock.connect((host, port)) | |
conn_sock.send(b'HTTP_Request - 404\r\n') | |
response = conn_sock.recv(100) | |
scan_lock.acquire() | |
results['resolved'][host]['open'].append({'port': port, 'response': response}) | |
scan_lock.release() | |
except error as e: | |
scan_lock.acquire() | |
results['resolved'][host]['closed'].append({'port': port, 'error': e}) | |
scan_lock.release() | |
finally: | |
conn_sock.close() | |
def port_scan(host, ports, results, progress_printer): | |
global finished_scans | |
port_jobs = Queue() | |
try: | |
tgt_ip = gethostbyname(host) | |
except error as e: | |
scan_lock.acquire() | |
results['unresolved'].append({'host': host, 'error': e}) | |
print_lock.acquire() | |
finished_scans += 1 | |
pre = "[*] Scan for host {} failed!".format(host) | |
progress_printer.print_progress(finished_scans, pre=pre) | |
scan_lock.release() | |
print_lock.release() | |
return False | |
try: | |
tgt_name = gethostbyaddr(tgt_ip) | |
scan_lock.acquire() | |
results['resolved'][host] = {'hostname': tgt_name[0], 'ip': tgt_ip, 'initial': host, 'open': [], 'closed': []} | |
except error as _: | |
scan_lock.acquire() | |
results['resolved'][host] = {'hostname': host, 'ip': tgt_ip, 'initial': host, 'open': [], 'closed': []} | |
finally: | |
scan_lock.release() | |
def worker(): | |
while True: | |
port = port_jobs.get() | |
conn_scan(host, port, results) | |
port_jobs.task_done() | |
for i in range(num_port_threads): | |
t = Thread(target=worker) | |
t.daemon = True | |
t.start() | |
for port in ports: | |
port_jobs.put(port) | |
port_jobs.join() | |
print_lock.acquire() | |
finished_scans += 1 | |
pre = "[*] Scan for host {} finished!".format(host) | |
progress_printer.print_progress(finished_scans, pre=pre) | |
print_lock.release() | |
return True | |
def validate_ip_or_range(s): | |
valid = True | |
a = s.split('.') | |
if len(a) != 4: | |
return False | |
for x in a: | |
if x.isdigit() and not 0 < int(x) <= 255: | |
valid = False | |
elif "-" in x: | |
r = [int(r) for r in x.split("-")] | |
if not ((r[0] < r[1]) and (0 < r[0] <= 255) and (0 < r[1] <= 255)): | |
valid = False | |
return valid | |
def parse_host_args(host_args): | |
host_args = host_args.split(",") | |
tgt_hosts = [] | |
for host_arg in host_args: | |
if validate_ip_or_range(host_arg): | |
if "-" in host_arg: | |
addr_parts = host_arg.split(".") | |
parts = [] | |
for addr_part in addr_parts: | |
if "-" in addr_part: | |
r = [int(part) for part in addr_part.split("-")] | |
parts.append([str(part) for part in range(r[0], r[1] + 1)]) | |
else: | |
parts.append([addr_part]) | |
for p0 in parts[0]: | |
for p1 in parts[1]: | |
for p2 in parts[2]: | |
for p3 in parts[3]: | |
tgt_hosts.append(p0 + "." + p1 + "." + p2 + "." + p3) | |
else: | |
tgt_hosts.append(host_arg) | |
else: | |
tgt_hosts.append(host_arg) | |
return tgt_hosts | |
def parse_port_args(port_args): | |
port_args = port_args.split(",") | |
tgt_ports = [] | |
for port_arg in port_args: | |
port_arg = [int(port) for port in port_arg.split('-')] | |
if len(port_arg) == 1: | |
tgt_ports.append(int(port_arg[0])) | |
else: | |
for port in range(port_arg[0], port_arg[1] + 1): | |
tgt_ports.append(port) | |
return tgt_ports | |
def print_results(results, only_open): | |
print('\n----- SHOWING SCAN RESULTS -----') | |
resolved = results['resolved'] | |
unresolved = results['unresolved'] | |
for host in unresolved: | |
print("[-] Couldn't resolve host {} - {}".format(host['host'], host['error'])) | |
for host, result in resolved.items(): | |
print("[+] Scan results for {} [{} -> {}]".format(result['ip'], result['initial'], result['hostname'])) | |
for port in result['open']: | |
print(" [+] {}/tcp open - {}".format(port['port'], port['response'])) | |
if not only_open: | |
for port in result['closed']: | |
print(" [-] {}/tcp closed - {}".format(port['port'], port['error'])) | |
def save_to_file(results, only_open): | |
time = datetime.datetime.now() | |
hours, minutes, seconds, days, months, years = time.hour, time.minute, time.second, time.day, time.month, time.year | |
name = "{}-{}-{}_{}-{}-{}_PortScan".format(years, months, days, hours, minutes, seconds) | |
resolved = results['resolved'] | |
unresolved = results['unresolved'] | |
f = open(name + '.txt', 'w+') | |
f.write("{} - Scanned {} hosts: \r\n".format(str(time), len(resolved.keys()) + len(unresolved))) | |
for host in unresolved: | |
f.write("[-] Couldn't resolve host {} - {}\r\n".format(host['host'], host['error'])) | |
for host, result in resolved.items(): | |
f.write("[+] Scan results for {} [{} -> {}]\r\n".format(result['ip'], result['initial'], result['hostname'])) | |
for port in result['open']: | |
f.write(" [+] {}/tcp open - {}\r\n".format(port['port'], port['response'])) | |
if not only_open: | |
for port in result['closed']: | |
f.write(" [-] {}/tcp closed - {}\r\n".format(port['port'], port['error'])) | |
def main(): | |
parser = optparse.OptionParser('pyscan.py -H <target_host(s)> -p <target_port(s)> [-o -f]') | |
parser.add_option('-H', '--Hosts', dest='tgtHosts', type="string", | |
help='specify target host(s) as a comma separated ' | |
'list of hostname(s) or IP addresses or ip ' | |
'address ranges ' | |
'[eg. google.com,127.0.0.1,192.168.192.1-124]') | |
parser.add_option('-p', '--ports', dest='tgtPorts', type="string", | |
help='specify target port(s) separated trough commas ' | |
'and/or port ranges [e.g. 80,100-110,90]') | |
parser.add_option('-o', '--open-only', dest='openOnly', action="store_true", default=False, | |
help='Only show open ports in console and file output') | |
parser.add_option('-f', '--to-file', dest='toFile', action="store_true", default=False, | |
help='Saves a copy of the scan results to a file on disk.') | |
(options, args) = parser.parse_args() | |
host_args = options.tgtHosts | |
port_args = options.tgtPorts | |
open_only = options.openOnly | |
to_file = options.toFile | |
results = {'resolved': {}, 'unresolved': []} | |
scan_jobs = Queue() | |
if host_args is None or port_args is None: | |
print("usage {}".format(parser.usage)) | |
exit(0) | |
else: | |
target_hosts = parse_host_args(host_args) | |
target_ports = parse_port_args(port_args) | |
pre = "Running scan for {} host(s) with {} port(s) per host. This can take some time...\n" \ | |
"Concurrent scans are ENABLED: Scanning with a max of {} host and {} port threads!\n"\ | |
.format(len(target_hosts), len(target_ports), num_worker_threads, num_port_threads) | |
pp = ProgressPrinter(len(target_hosts), unit_type='scans', pre=pre) | |
pp.print_progress() | |
def worker(): | |
while True: | |
host = scan_jobs.get() | |
port_scan(host, target_ports, results, pp) | |
scan_jobs.task_done() | |
for i in range(num_worker_threads): | |
t = Thread(target=worker) | |
t.daemon = True | |
t.start() | |
for host in target_hosts: | |
scan_jobs.put(host) | |
scan_jobs.join() | |
print_results(results, open_only) | |
if to_file: | |
save_to_file(results, open_only) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment