Last active
February 12, 2024 03:04
-
-
Save s3rgeym/64fe47c31cd2eac5f87c5f8be78f8ae3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""Find domains on ip address using a ssl certificate""" | |
import argparse | |
import ipaddress | |
import logging | |
import os | |
import socket | |
import ssl | |
import sys | |
import tempfile | |
import typing | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from functools import partial | |
from itertools import chain | |
try: | |
from collections import Sequence | |
except ImportError: | |
from typing import Sequence | |
RESET = "\x1b[m" | |
RED = "\x1b[31m" | |
GREEN = "\x1b[32m" | |
YELLOW = "\x1b[33m" | |
BLUE = "\x1b[34m" | |
MAGENTA = "\x1b[35m" | |
CYAN = "\x1b[36m" | |
GREY = "\x1b[37m" | |
BANNER = r""" | |
_ _____ _ _ | |
(_) / __ \ | | (_) | |
_ _ __ `' / /' __| | ___ _ __ ___ __ _ _ _ __ ___ | |
| | '_ \ / / / _` |/ _ \| '_ ` _ \ / _` | | '_ \/ __| | |
| | |_) ./ /__| (_| | (_) | | | | | | (_| | | | | \__ \ | |
|_| .__/\_____/\__,_|\___/|_| |_| |_|\__,_|_|_| |_|___/ | |
| | | |
|_| | |
""" | |
print_err = partial(print, file=sys.stderr) | |
class ColorHandler(logging.StreamHandler): | |
LOG_COLORS = { | |
logging.DEBUG: CYAN, | |
logging.INFO: GREEN, | |
logging.WARNING: RED, | |
logging.ERROR: RED, | |
logging.CRITICAL: RED, | |
} | |
_fmt = logging.Formatter("%(levelname)-8s : %(message)s") | |
def format(self, record: logging.LogRecord) -> str: | |
message = self._fmt.format(record) | |
return f"{self.LOG_COLORS[record.levelno]}{message}{RESET}" | |
class NameSpace(argparse.Namespace): | |
input: typing.TextIO | |
addresses: list[str] | |
workers_num: int | |
timeout: float | |
verbosity: int | |
def _parse_args( | |
argv: Sequence[str] | None, | |
) -> tuple[argparse.ArgumentParser, NameSpace]: | |
parser = argparse.ArgumentParser( | |
description=__doc__, | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
) | |
parser.add_argument( | |
"-a", | |
"--addr", | |
nargs="+", | |
help="ip address or cidr", | |
default=[], | |
) | |
parser.add_argument( | |
"-i", | |
"--input", | |
type=argparse.FileType(), | |
default="-", | |
help="input file containing list of ip addresses each on a new line", | |
) | |
parser.add_argument( | |
"-o", | |
"--output", | |
type=argparse.FileType("w"), | |
default="-", | |
help="output file", | |
) | |
parser.add_argument( | |
"-w", | |
"--workers", | |
dest="workers_num", | |
type=int, | |
default=50, | |
help="number of workers", | |
) | |
parser.add_argument( | |
"-t", | |
"--timeout", | |
type=float, | |
default=1.0, | |
help="timeout in seconds", | |
) | |
parser.add_argument( | |
"--banner", | |
action=argparse.BooleanOptionalAction, | |
default=True, | |
help="show banner", | |
) | |
parser.add_argument( | |
"-v", | |
"--verbosity", | |
action="count", | |
default=0, | |
help="be more verbose", | |
) | |
return parser, parser.parse_args(argv, NameSpace()) | |
def find_common_name(ip: str, timeout: float) -> str | None: | |
cert_dict = get_cert_info(ip, timeout) | |
if "subject" in cert_dict: | |
subject = dict(x[0] for x in cert_dict["subject"]) | |
logging.debug(subject) | |
return subject.get("commonName") | |
def get_cert_info(ip: str, timeout: float) -> dict: | |
try: | |
contents = ssl.get_server_certificate((ip, 443), timeout=timeout) | |
except (socket.timeout, socket.error): | |
return {} | |
with tempfile.NamedTemporaryFile("w+", delete=False) as tmp: | |
tmp.write(contents) | |
try: | |
return ssl._ssl._test_decode_cert(tmp.name) | |
finally: | |
os.unlink(tmp.name) | |
def main(argv: Sequence[str] | None = None) -> int | None: | |
parser, args = _parse_args(argv) | |
if args.banner: | |
print_err(BANNER) | |
log_level = max( | |
logging.DEBUG, logging.ERROR - args.verbosity * logging.DEBUG | |
) | |
logging.basicConfig(level=log_level, handlers=[ColorHandler()]) | |
addresses = args.addr.copy() | |
if not args.input.isatty(): | |
addresses.extend(filter(None, map(str.strip, args.input))) | |
# logging.debug(addresses) | |
with ThreadPoolExecutor(args.workers_num) as pool: | |
futs = { | |
pool.submit(find_common_name, str(ip), args.timeout): ip | |
for ip in chain(*map(ipaddress.ip_network, addresses)) | |
} | |
logging.debug("futures: %d", len(futs)) | |
for fut in as_completed(futs): | |
try: | |
if domain := fut.result(): | |
ip = futs[fut] | |
args.output.write(f"{ip}\t{domain}\n") | |
args.output.flush() | |
except BaseException as ex: | |
logging.warning(ex) | |
finally: | |
fut.cancel() | |
logging.info("Finished!") # MGIMO | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment