Last active
September 7, 2019 18:34
-
-
Save MrDrMcCoy/a1bce4dddcb49af631c3db74e244dc02 to your computer and use it in GitHub Desktop.
Check for compromised credentials from CSV export.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Author: Jeremy McCoy ([email protected]) | |
License: WTFPL | |
About: | |
This script helps you to check if any of your usernames and passwords have | |
been found in major security leaks. | |
It Will take your decrypted passwords from KeePass or simlar and send SHA1 | |
hashes to haveibeenpwned.com to check if they have been compromised. | |
Assumptions: | |
- CSV password dump from KeePass2/LastPass or similar named passdb.csv. | |
It must have a header line that includes the following columns: | |
Title or Name,Username,Password,URL | |
- Python 2.7+ | |
- Python requests library is installed | |
- OpenSSL version > 1.0 installed | |
""" | |
import argparse | |
import csv | |
import hashlib | |
import json | |
import logging | |
import re | |
import requests | |
import sys | |
import time | |
# Set up logger | |
logging.basicConfig( | |
format='%(levelname)s [%(lineno)d] %(message)s') | |
log = logging.getLogger('amipwned') | |
parser = argparse.ArgumentParser( | |
description=""" | |
This script checks usernames and passwords against the haveibeenpwned | |
database for leaks and delivers a report of any compromised accounts. | |
""") | |
parser.add_argument('--passfile', default='passwords.csv', | |
help=""" | |
CSV export (with header line) of your passwords from | |
LastPass, KeePass, Enpass, etc. Default = ./passwords.csv | |
""") | |
parser.add_argument('--report', default='report.json', | |
help=""" | |
File to write report to in JSON format. Default = ./report.json | |
""") | |
parser.add_argument('--interval', default=1.51, type=float, | |
help=""" | |
Time between requests to avoid rate limiting in seconds. Default = 1.51 | |
""") | |
parser.add_argument('--strength', action='store_true', | |
help=""" | |
Only checks for credential strength and exits. | |
""") | |
parser.add_argument('--loglevel', default='info', | |
help=""" | |
Set logging level. Default = info | |
""") | |
conf = vars(parser.parse_args()) | |
log.setLevel(logging.getLevelName(conf['loglevel'].upper())) | |
api_base = 'https://haveibeenpwned.com/api/v2/' | |
api_headers = {'User-Agent': 'amipwned.py'} | |
usernames = [] | |
passwords = {} | |
breached_usernames = {} | |
pasted_emails = {} | |
pwned_passwords = {} | |
weak_credentials = [] | |
skip_search = [ | |
'admin', | |
'administrator', | |
'anonymous', | |
'info', | |
'root', | |
'username', | |
'webmaster', | |
'' | |
] | |
total_emails = 0 | |
total_csv = 0 | |
def mrjson(stuff): | |
# This is a dict-to-json function that can safely handle non-serializable items | |
return json.dumps( | |
stuff, | |
default=lambda o: 'ERROR: Item not JSON serializable', | |
sort_keys=True, | |
indent=3) | |
def column_matcher(header, row): | |
username_column = None | |
password_column = None | |
title_column = None | |
url_column = None | |
email_column = None | |
# read header | |
for col_index, column in enumerate(header): | |
if column.lower() == 'username': | |
username_column = col_index | |
elif column.lower() == 'password': | |
password_column = col_index | |
elif column.lower() == 'url': | |
url_column = col_index | |
elif column.lower() == 'email': | |
email_column = col_index | |
elif column.lower() == 'name' or column.lower() == 'title': | |
title_column = col_index | |
# if header has most columns | |
if None not in [username_column, password_column, title_column, url_column]: | |
return username_column, password_column, title_column, url_column, email_column | |
else: | |
# otherwise, each row might be key,value. enpass is dumb like that. | |
username_column = None | |
password_column = None | |
title_column = None | |
url_column = None | |
email_column = None | |
# try to read row | |
for col_index, column in enumerate(row): | |
if column.lower() == 'username': | |
username_column = col_index + 1 | |
elif column.lower() == 'password': | |
password_column = col_index + 1 | |
elif column.lower() == 'url': | |
url_column = col_index + 1 | |
elif column.lower() == 'email': | |
email_column = col_index + 1 | |
elif column.lower() == 'name' or column.lower() == 'title': | |
title_column = col_index | |
return username_column, password_column, title_column, url_column, email_column | |
with open('lastpass.csv', 'rb') as passfile: | |
passdb = csv.reader(passfile, delimiter=',', quotechar='"') | |
for row_index, row in enumerate(passdb): | |
print(row_index, row) | |
break | |
try: | |
log.info('Reading password file...') | |
with open(conf['passfile'], 'rb') as passfile: | |
passdb = csv.reader(passfile, delimiter=',', quotechar='"') | |
for row_index, row in enumerate(passdb): | |
total_csv += 1 | |
sha = None | |
# Save header | |
if row_index == 0: | |
header = row | |
continue | |
# Extract header and detect columns from CSV | |
username_column, password_column, title_column, url_column, email_column = column_matcher(header, row) | |
if password_column: | |
password = row[password_column] | |
hasher = hashlib.sha1() | |
hasher.update(password) | |
sha = hasher.hexdigest() | |
if sha not in passwords: | |
passwords[sha] = { | |
'usernames': [], | |
'titles': [], | |
'password': password, | |
'urls': [] | |
} | |
if username_column: | |
username = row[username_column].lower() | |
if username not in usernames and username not in skip_search: | |
usernames.append(username) | |
if sha and username not in passwords[sha]['usernames']: | |
passwords[sha]['usernames'].append(username) | |
if email_column: | |
email = row[email_column].lower() | |
if email not in usernames and email not in skip_search: | |
usernames.append(email) | |
if sha and email not in passwords[sha]['usernames']: | |
passwords[sha]['usernames'].append(email) | |
if title_column: | |
title = row[title_column].lower() | |
if sha and title not in passwords[sha]['titles']: | |
passwords[sha]['titles'].append(title) | |
if url_column: | |
url = row[url_column] | |
if sha and url not in passwords[sha]['urls']: | |
passwords[sha]['urls'].append(url) | |
except Exception as e: | |
log.exception('Issue parsing CSV: ' + conf['passfile'] + '\n' + str(e)) | |
sys.exit(1) | |
def credential_strength(credential): | |
log.debug('Checking credential:\n' + mrjson(credential)) | |
credential['reasons'] = [] | |
if len(credential['password']) <= 8: | |
credential['reasons'].append('too short') | |
if not re.search(r'\d', credential['password']): | |
credential['reasons'].append('no digits') | |
if not re.search(r'[a-z]', credential['password']): | |
credential['reasons'].append('no lowercase') | |
if not re.search(r'[A-Z]', credential['password']): | |
credential['reasons'].append('no uppercase') | |
if not re.search(r'[^\w]', credential['password']): | |
credential['reasons'].append('no special characters') | |
if len(credential['usernames']) > 1 or len(credential['titles']) > 1 or len(credential['urls']) > 2: | |
credential['reasons'].append('password reuse') | |
if len(credential['reasons']) > 0: | |
weak_credentials.append(credential) | |
log.warn('Weak credential:\n' + mrjson(credential)) | |
log.info('Checking for weak passwords...') | |
for credential in passwords: | |
credential_strength(passwords[credential]) | |
if not conf['strength']: | |
log.info('Checking for breached usernames...') | |
for username in usernames: | |
if ' ' not in username: | |
log.debug('Checking username: ' + username) | |
r = requests.get(api_base + 'breachedaccount/' + username, headers=api_headers) | |
if str(r.status_code) == 429: | |
log.error('Rate-limiting error received. Please try again later.') | |
sys.exit(1) | |
elif str(r.status_code) == 200: | |
breached_usernames[username] = json.loads(r.text) | |
log.warn(str(r.status_code) + ' Account breached: ' + username) | |
else: | |
log.debug(str(r.status_code) + ' Account OK: ' + username) | |
time.sleep(conf['interval']) | |
else: | |
log.debug('Skipping username: ' + username) | |
log.info('Checking for pasted emails...') | |
for username in usernames: | |
if '@' in username: | |
total_emails += 1 | |
log.debug('Checking email: ' + username) | |
r = requests.get(api_base + 'pasteaccount/' + username, headers=api_headers) | |
if str(r.status_code) == 429: | |
log.error('Rate-limiting error received. Please try again later.') | |
sys.exit(1) | |
elif str(r.status_code) == 200: | |
pasted_emails[username] = json.loads(r.text) | |
log.warn(str(r.status_code) + ' Email in paste: ' + username) | |
else: | |
log.debug(str(r.status_code) + ' Email OK: ' + username) | |
time.sleep(conf['interval']) | |
else: | |
log.debug('Skipping non-email: ' + username) | |
log.info('Checking for pwned passwords...') | |
for sha in passwords: | |
log.debug('Checking password hash: ' + sha) | |
r = requests.get(api_base + 'pwnedpassword/' + sha, headers=api_headers) | |
if str(r.status_code) == 429: | |
log.error('Rate-limiting error received. Please try again later.') | |
sys.exit(1) | |
elif str(r.status_code) == 200: | |
pwned_passwords[sha] = passwords[sha] | |
log.warn(str(r.status_code) + ' Password pwned: ' + passwords[sha]['password']) | |
else: | |
log.debug(str(r.status_code) + ' Password OK: ' + passwords[sha]['password']) | |
time.sleep(conf['interval']) | |
log.info('Summary:\n' + mrjson({ | |
'Total Usernames': len(usernames), | |
'Breached Usernames': len(breached_usernames), | |
'Total Emails': total_emails, | |
'Pasted Emails': len(pasted_emails), | |
'Total Passwords': len(passwords), | |
'Pwned Passwords': len(pwned_passwords), | |
'Total CSV entries': total_csv, | |
'Weak credentials': len(weak_credentials) | |
})) | |
report = mrjson({ | |
'Breached Usernames': breached_usernames, | |
'Pasted Emails': pasted_emails, | |
'Pwned Passwords:': pwned_passwords, | |
'Weak credentials': weak_credentials | |
}) | |
log.debug('Findings:\n' + report) | |
try: | |
with open(conf['report'], 'w') as outfile: | |
outfile.write(report) | |
log.info('Wrote detailed report to file: ' + conf['report']) | |
except Exception, e: | |
log.exception('Could not write report to file: ' + conf['report'] + '\n' + str(e)) | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment