-
-
Save 0xc0re/36a08de878b94db6335fba7d09d2985a to your computer and use it in GitHub Desktop.
A simple python script to check broken links of a wesite
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bs4 import BeautifulSoup,SoupStrainer | |
import re | |
import Queue | |
import threading | |
import requests | |
import urlparse | |
import time | |
import copy | |
import signal | |
import sys | |
# Maximum number of threads to send network requests | |
# (Reduce this number to reduce the number of concurrent network requests) | |
MAX_THREADS = 5 | |
# How many seconds we wait for a response from the server before timing out | |
REQUEST_TIMEOUT = 10 | |
# How many seconds to sleep before making a network request | |
# (Set this value only if requests appear to be timing out frequently) | |
REQUEST_DELAY = 2 | |
# certain <link> tags will fail on GET request, don't include these in our report | |
SKIP_REL_LINKS = ['pingback', 'dns-prefetch'] | |
#https://stackoverflow.com/questions/9626535/get-domain-name-from-url | |
def extract_domain(url): | |
spltAr = url.split("://") | |
i = (0,1)[len(spltAr)>1] | |
return spltAr[i].split("?")[0].split('/')[0].split(':')[0].lower() | |
class LinkResult(): | |
def __init__(self, link, parent, result, error = False): | |
self.link = link | |
self.parent = parent | |
self.result = result | |
self.error = error | |
self.domain = extract_domain(link) | |
def __str__(self): | |
return "{},{},{},{}".format(self.parent, | |
self.link, | |
self.result, | |
'Fail' if self.error else 'Pass') | |
def __iter__(self): | |
""" Defined to allow accessing the class members as a sequence. """ | |
fields = [self.link, self.parent, self.result, self.error, self.domain] | |
for field in fields: | |
yield field | |
def __lt__(self, other): | |
""" Defined to allow sorting, we consider errors to be lt non errors, | |
domains that match the parent to be lt those that don't, | |
with ties broken by result text. """ | |
if self.error and not other.error: | |
return True | |
elif not self.error and other.error: | |
return False | |
elif (self.domain == extract_domain(self.parent) and | |
other.domain != extract_domain(other.parent)): | |
return True | |
elif (self.domain != extract_domain(self.parent) and | |
other.domain == extract_domain(other.parent)): | |
return False | |
else: | |
return self.result < other.result | |
class LinkChecker(): | |
def __init__(self): | |
self.sitemap_parsed = False | |
self.domain = "" | |
self.check_link_cache = {} | |
self.processed = set() | |
self.link_results = [] # note: python lists are thread safe | |
self.result_data = {} | |
self.q = None | |
self.threads = [] | |
self.running = False | |
def update_cache(self, result): | |
updated_cache = self.check_link_cache.get(result.link, []) + [result] | |
self.check_link_cache[result.link] = updated_cache | |
def handle_request_exception(self, e, link, parent): | |
result = None | |
if type(e) in (requests.exceptions.Timeout, requests.exceptions.RequestException, | |
requests.exceptions.HTTPError): | |
result = LinkResult(link, parent, str(e), True) | |
elif type(e) is requests.exceptions.ConnectionError: | |
result = LinkResult(link, parent, "Connection Error", True) | |
elif type(e) is requests.exceptions.SSLError: | |
result = LinkResult(link, parent, "SSL Error", True) | |
else: | |
print (link, parent, e) | |
if result: | |
self.link_results.append(result) | |
self.update_cache(result) | |
def check_link(self, link, parent): | |
# Check if we have link results already in the cache | |
if link in self.check_link_cache: | |
add_result = True | |
for cached_result in self.check_link_cache[link]: | |
if cached_result.parent == parent: | |
add_result = False | |
# Create a new result only if the link appeared on a new page | |
if add_result: | |
result = copy.copy(self.check_link_cache[link][0]) | |
result.parent = parent | |
self.link_results.append(result) | |
return | |
print "Checking link {} --> {}".format(parent,link) | |
time.sleep(REQUEST_DELAY) | |
try: | |
resp=requests.get(link, timeout=REQUEST_TIMEOUT) | |
resp.raise_for_status() | |
result = LinkResult(link, parent, "{} {}".format(resp.status_code, resp.reason)) | |
self.link_results.append(result) | |
self.update_cache(result) | |
except Exception as e: | |
self.handle_request_exception(e, link, parent) | |
def pattern_adjust(self, a): | |
try: | |
if re.match('^#' ,a):return None | |
r=urlparse.urlsplit(a) | |
if r.scheme=='' and (r.netloc!='' or r.path!=''): | |
d=urlparse.urlunsplit(r) | |
if re.match('^//' ,d): | |
m=re.search('(?<=//)\S+', d) | |
d=m.group(0) | |
m="https://"+d | |
return m | |
elif r.scheme=='' and r.netloc=='': | |
return self.domain+a | |
else:return a | |
except Exception as e: | |
print (a, e) | |
return a | |
def extract_links(self, address, parent): | |
# Don't process a given page more than once | |
if address in self.processed: | |
return | |
try: | |
time.sleep(REQUEST_DELAY) | |
response=requests.get(address, timeout=REQUEST_TIMEOUT) | |
# Only extract links from html pages | |
if 'text/html' not in response.headers['Content-Type']: | |
result = LinkResult(address, parent, | |
"{} {}".format(response.status_code, response.reason)) | |
self.link_results.append(result) | |
self.update_cache(result) | |
self.processed.add(address) | |
return | |
except Exception as e: | |
self.handle_request_exception(e, address, parent) | |
return | |
tags = {'a':'href', 'img':'src', 'script':'src', 'link':'href' } | |
for tag, field in iter(tags.items()): | |
if not self.running: | |
return | |
try: | |
for tag_elem in BeautifulSoup(response.text,"html.parser",parse_only=SoupStrainer(tag)): | |
if tag_elem.has_attr(field): | |
link=self.pattern_adjust(tag_elem[field]) | |
if link: | |
# Check if the link has already been processed | |
if link not in self.processed and link != address: | |
# Don't check edge case links (will fail false positive) | |
if (tag == 'link' and tag_elem.has_attr('rel') and | |
tag_elem['rel'][0] in SKIP_REL_LINKS): | |
continue | |
self.check_link(link, address) | |
# Only extract links from pages on the target domain | |
# if a sitemap was not parsed and the link came from an <a> tag | |
if (extract_domain(link) == self.domain and not self.sitemap_parsed | |
and tag == 'a'): | |
self.q.put((link, address)) | |
except Exception as e: | |
print (e,address) | |
self.processed.add(address) | |
def link_thread(self): | |
while self.running: | |
try: | |
address,parent=self.q.get(True, 2) | |
result=self.extract_links(address,parent) | |
self.q.task_done() | |
except Queue.Empty: | |
continue | |
def restart(self, url): | |
self.sitemap_parsed = False | |
self.domain = extract_domain(url) | |
self.check_link_cache = {} | |
self.processed = set() | |
self.link_results = [] | |
self.result_data = {} | |
self.q=Queue.Queue() | |
self.running = True | |
self.threads = [] | |
def check_sitemap(self,sitemap): | |
if self.running: | |
raise RuntimeError("Error: Already running link checker!") | |
self.restart(sitemap) | |
try: | |
resp = requests.get(sitemap) | |
except: | |
raise RuntimeError("Error: Invalid sitemap url provided!") | |
if(resp.status_code != requests.codes.ok): | |
raise RuntimeError("Error: Could not open sitemap!") | |
print "Parsing sitemap: {}".format(sitemap) | |
if(sitemap.split('.')[-1] == "xml"): | |
xml = resp.text | |
urls = BeautifulSoup(xml, 'html.parser', parse_only=SoupStrainer('url')) | |
if not urls: | |
raise RuntimeError("Error: No urls found in sitemap!") | |
for url in urls: | |
link = url.findNext("loc").text | |
self.q.put((link, "")) | |
self.sitemap_parsed = True | |
else: | |
self.extract_links(sitemap, "") | |
self.sitemap_parsed = True | |
print "Spawning {} worker threads...".format(MAX_THREADS) | |
for x in range(MAX_THREADS): | |
t=threading.Thread(target=self.link_thread) | |
self.threads.append(t) | |
t.start() | |
# Rather than using self.q.join() the following allows us to | |
# handle SIGINT signal (ctrl^c) | |
while self.running and self.q.unfinished_tasks: | |
time.sleep(1) | |
self.running = False | |
print "Cleaning up outstanding worker threads..." | |
for thread in self.threads: | |
thread.join() | |
return self.link_results | |
def crawl_page(self, page): | |
if self.running: | |
raise RuntimeError("Error: Already running link checker!") | |
self.restart(page) | |
self.q.put((page,"")) | |
print "Spawning {} worker threads...".format(MAX_THREADS) | |
for x in range(MAX_THREADS): | |
t=threading.Thread(target=self.link_thread) | |
self.threads.append(t) | |
t.start() | |
while self.running and self.q.unfinished_tasks: | |
time.sleep(1) | |
self.running = false | |
print "Cleaning up outstanding worker threads..." | |
for thread in self.threads: | |
thread.join() | |
return self.link_results | |
def get_result_data(self): | |
if self.result_data: | |
return self.result_data | |
self.result_data['domain_pages'] = set([result.parent for result in self.link_results | |
if result.domain == self.domain]) | |
self.result_data['links'] = [result.link for result in self.link_results] | |
self.result_data['unique_broken_links'] = set(result.link for result in self.link_results if result.error) | |
self.result_data['broken_link_domain_pages'] = set([result.parent for result in self.link_results | |
if not result.parent or (extract_domain(result.parent) == self.domain) and result.error]) | |
return self.result_data | |
def get_csv_report(self): | |
plain_report = "Page, Link, Result, Pass/Fail\n" | |
self.link_results.sort() | |
plain_report += "\n".join([str(result) for result in self.link_results]) | |
return plain_report | |
def get_html_report(self): | |
result_data = self.get_result_data() | |
html = "<head><style>td{padding:10px;text-align:left;border: 1px solid #ddd;max-width:300px;"\ | |
"word-wrap:break-word;}tr:nth-of-type(2n){background-color:#f2f2f2;}"\ | |
"thead{background-color:#00aeef;color:white;cursor:pointer;font-weight:bold}"\ | |
"table{border-collapse:collapse;}body{padding-left:25px;}</style></head>" | |
html += "<body><h2>Link Checker Results</h2>" | |
# Add result_data to report | |
html += "<h4>Domain ({}) Pages Checked: {}<br>".format(self.domain, len(result_data['domain_pages'])) | |
html += "Links Checked: {}<br>".format(len(result_data['links'])) | |
broken_link_color = 'color:' + ('#FF6347;' if len(result_data['unique_broken_links']) > 0 else '#00FF7F;') | |
html += "Unique Broken Links Found: <span style='{}'>{}</span><br>".format( | |
broken_link_color, len(result_data['unique_broken_links'])) | |
html += "Domain ({}) Pages Containing Broken Links: <span style='{}'>{}</span></h4>".format( | |
self.domain, broken_link_color, len(result_data['broken_link_domain_pages'])) | |
# Add full result table to report | |
html += "<table><thead><tr><td>Page</td><td>Link"\ | |
"</td><td>Result<td>Pass/Fail</td></tr></thead><tbody>" | |
self.link_results.sort() # results with errors will be at the front of the list | |
for link, parent, result, error, domain in self.link_results: | |
error_style = 'background-color:' + ('#FF6347;' if error > 0 else '#00FF7F;') | |
error_text = 'Fail' if error else 'Pass' | |
html += "<tr><td><a href={0}>{0}</a></td><td><a href={1}>{1}</a></td>"\ | |
"<td>{2}</td><td style='{3}'>{4}</td></tr>".format( | |
parent, link, result, error_style, error_text) | |
html += "</tbody></table>" | |
return html | |
class SignalHandler: | |
def __init__(self, link_checker): | |
self.link_checker = link_checker | |
def __call__(self, signum, frame): | |
print "Cleaning up outstanding worker threads..." | |
self.link_checker.running = False | |
for thread in self.link_checker.threads: | |
thread.join() | |
sys.exit(0) | |
def generate_example_reports(): | |
link_checker = LinkChecker() | |
link_checker.domain = "example.com" | |
result = LinkResult("example.com/test1", "example.com/1", "200 OK") | |
link_checker.link_results.append(result) | |
result = LinkResult("example.com/test2", "example.com/2", "410 BAD", True) | |
link_checker.link_results.append(result) | |
result = LinkResult("example.com/test3", "example.com/1", "200 OK") | |
link_checker.link_results.append(result) | |
result = LinkResult("example.com/test4", "example.com/test1", "200 OK") | |
link_checker.link_results.append(result) | |
f = open("report.html", "w") | |
f.write(link_checker.get_html_report()) | |
f.close() | |
f = open("report.csv", "w") | |
f.write(link_checker.get_csv_report()) | |
f.close() | |
import sys | |
sys.exit(0) | |
if __name__=="__main__": | |
link_checker = LinkChecker() | |
signal_handler = SignalHandler(link_checker) | |
signal.signal(signal.SIGINT, signal_handler) | |
signal.signal(signal.SIGTERM, signal_handler) | |
sitemap=raw_input("Please enter the sitemap address: ") | |
try: | |
link_results = link_checker.check_sitemap(sitemap) | |
except RuntimeError as e: | |
link_results = [] | |
print e | |
if link_results: | |
domain = link_checker.domain | |
if domain.startswith('www.'): | |
domain = domain[4:] | |
domain = domain.split('.')[0] | |
default_filename = "link_results_{}".format(domain) | |
filename=raw_input("Input result filename (default {}):".format(default_filename)) | |
if not filename: | |
filename = default_filename | |
# Generate html report | |
f = open(filename + ".html", "w") | |
f.write(link_checker.get_html_report()) | |
f.close() | |
# Generate csv report | |
f = open(filename + ".csv", "w") | |
f.write(link_checker.get_csv_report()) | |
f.close() | |
result_data = link_checker.get_result_data() | |
print "=" * 26 | |
print "|| Link Checker Results ||" | |
print "=" * 26 | |
print "Domain ({}) Pages Checked: {}".format(link_checker.domain, len(result_data['domain_pages'])) | |
print "Links Checked: {}".format(len(result_data['links'])) | |
print "Unique Broken Links Found: {}".format(len(result_data['unique_broken_links'])) | |
print "Domain ({}) Pages Containing Broken Links: {}".format( | |
link_checker.domain, len(result_data['broken_link_domain_pages'])) | |
else: | |
print "Could not check links." |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment