Created
May 6, 2019 19:46
-
-
Save LoranKloeze/95788b4d3bcad81c3b34dfa35bc89cfd to your computer and use it in GitHub Desktop.
To which external domains does a given Dutch politcal website connect?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2019 - https://www.twitter.com/LoranKloeze | |
# License: MIT | |
# | |
# This script was used for collecting the data for this Dutch article: | |
# https://www.lorankloeze.nl/2019/05/06/vreemde-communicatie-websites-politieke-partijen/ | |
# | |
# Parse HAR files and print a table containg websites of Dutch political parties versus foreign requests | |
# This script answers the following question: 'To which external domains does a given website connect?' | |
# | |
# All HAR files are expected to contain the information provided by Google Chrome using the following workflow: | |
# 1) Open an empty tab 2) Open DevTools 3) Select 'Network' panel 4) Open a website and wait for the requests | |
# to finish 5) Right-click on one of the request and select 'Save all as HAR with content' | |
import json | |
import os | |
import dateutil.parser | |
from tabulate import tabulate | |
from urllib.parse import urlparse | |
from colorama import init as coloroma_init, Fore | |
# Initialize library for colorized output | |
coloroma_init() | |
# Global options | |
options = { | |
'subdomain_is_foreign': False, | |
'sites_of_parties': { | |
'50pluspartij.nl': '50PLUS', | |
'd66.nl': 'D66', | |
'forumvoordemocratie.nl': 'FVD', | |
'groenlinks.nl': 'GL', | |
'pvv.nl': 'PVV', | |
'bewegingdenk.nl': 'DENK', | |
'cda.nl': 'CDA', | |
'christenunie.nl': 'CU', | |
'partijvoordedieren.nl': 'PVDD', | |
'pvda.nl': 'PVDA', | |
'sgp.nl': 'SGP', | |
'sp.nl': 'SP', | |
'vvd.nl': 'VVD', | |
} | |
} | |
# Entry point of the script | |
def main(): | |
# Print informational message in table format | |
start_msg = Fore.RED + '-- Overzicht websites politieke partijen en hun communicatie met externe servers -- \n\n' | |
start_msg += Fore.GREEN | |
print(start_msg) | |
rows = [ | |
['Verzameld door', '@LoranKloeze'], | |
['Browser', 'Google Chrome 74.0.3729.131 64-bits op Windows 10 / 1809 / 17763.437'], | |
['Browserconfiguratie', 'Incognito | uitgeschakelde cache'], | |
['Methode dataverzameling', | |
'Export in HAR formaat van de requests vanaf de eerste gebruikersrequest tot 30 seconden erna.'], | |
['Timeframe dataverzameling', '01-05-2019 - 21:00 tot 22:45'], | |
['Overig', 'De data is verzameld vóór het eventuele akkoord geven op cookieplaatsing'], | |
] | |
print(tabulate(rows, headers=['Verantwoording', ''])) | |
print('\n') | |
# Process the HAR files | |
websites = parse_har_files('./hars/') | |
# Analyze the processed HAR files and print the results | |
print(tabulate_foreign_hosts_vs_sites(websites)) | |
# Print a table containing names of political parties versus the foreign requests their websites connect to | |
def tabulate_foreign_hosts_vs_sites(websites): | |
# Initialize the message this function returns with an explanation of the upcoming table | |
msg = Fore.WHITE + 'Onderstaande tabel laat zien met welke externe servers de websites van politieke partijen contact leggen.\n' | |
msg += 'Elk getal geeft weer hoe vaak de website contact heeft gelegd met de externe server in de eerste 30 \n' | |
msg += 'seconden na het inladen van de website.\n\n' | |
msg += Fore.WHITE | |
# Get a list of parties sorted by number of foreign requests | |
parties = get_parties_by_nr_of_foreign_requests(websites) | |
cols = parties.copy() | |
cols.insert(0, 'Externe server/partij') | |
# Create a table of foreign requests versus political parties and append it to the initialized string above | |
rows = [] | |
totals_row = ['Totaal'] | |
for host in get_foreign_hosts(websites): | |
row = [host] | |
for party in parties: | |
val = '-' | |
for site in websites: | |
if site['party'] == party: | |
totals_row.append(site['total_foreign_requests']) | |
host_used = next((item for item in site['foreign_hosts'] if item['host'] == host), None) | |
if host_used is not None: | |
val = host_used['nr_of_requests'] | |
row.append(val) | |
rows.append(row) | |
rows = sorted(rows, key=lambda k: k[0]) | |
rows.append([]) | |
rows.append(totals_row) | |
msg += tabulate(rows, cols) + '\n' | |
return msg | |
# Parse a HAR file and create a dictionary of requests, time info and some statistics | |
def parse_har(location): | |
# Initialize site info dictionary with HAR file name and location | |
site_info = {'file_name': location} | |
# Read the HAR file in json format | |
with open(location, encoding='UTF-8') as f: | |
data_json = json.load(f) | |
# Normalize the hostname to domain.tld format by dropping any subdomains like www | |
initiating_host_parts = urlparse(data_json['log']['pages'][0]['title']).hostname.split('.') | |
initiating_host = \ | |
initiating_host_parts[len(initiating_host_parts) - 2] + '.' + \ | |
initiating_host_parts[len(initiating_host_parts) - 1] | |
site_info['hostname'] = initiating_host | |
# Add request time frame to the site info dictionary | |
first_request_at = dateutil.parser.parse(data_json['log']['pages'][0]['startedDateTime']) | |
last_request_at = dateutil.parser.parse(data_json['log']['entries'][-1]['startedDateTime']) | |
site_info['first_request_at'] = first_request_at | |
site_info['last_request_at'] = last_request_at | |
# Collect the domestic and foreign hostnames from the requests | |
domestic_hostnames = {} | |
foreign_hostnames = {} | |
total_domestic_requests = 0 | |
total_foreign_requests = 0 | |
for entry in data_json['log']['entries']: | |
hostname = urlparse(entry['request']['url']).hostname | |
if options['subdomain_is_foreign']: | |
is_foreign = hostname != initiating_host | |
else: | |
is_foreign = hostname.endswith(initiating_host) == False | |
if is_foreign: | |
total_foreign_requests += 1 | |
if hostname in foreign_hostnames: | |
foreign_hostnames[hostname] += 1 | |
else: | |
foreign_hostnames[hostname] = 1 | |
else: | |
total_domestic_requests += 1 | |
if hostname in domestic_hostnames: | |
domestic_hostnames[hostname] += 1 | |
else: | |
domestic_hostnames[hostname] = 1 | |
# Add the collected domestic hosts to the site info dictionary | |
site_info['domestic_hosts'] = [] | |
for host in domestic_hostnames: | |
site_info['domestic_hosts'].append({ | |
'host': host, | |
'nr_of_requests': domestic_hostnames[host] | |
}) | |
# Add the collected foreign hosts to the site info dictionary | |
site_info['foreign_hosts'] = [] | |
for host in foreign_hostnames: | |
site_info['foreign_hosts'].append({ | |
'host': host, | |
'nr_of_requests': foreign_hostnames[host] | |
}) | |
# Add a few statistics to the site info dictionary | |
site_info['total_requests'] = total_domestic_requests + total_foreign_requests | |
site_info['total_domestic_requests'] = total_domestic_requests | |
site_info['total_foreign_requests'] = total_foreign_requests | |
# Determine the name of the political party based on the hostname | |
if initiating_host in options['sites_of_parties'].keys(): | |
site_info['party'] = options['sites_of_parties'][initiating_host] | |
else: | |
site_info['party'] = None | |
# Return the site info dictionary | |
return site_info | |
# Return a deduplicated list of all foreign hosts found in the HAR files | |
def get_foreign_hosts(websites): | |
hosts = [] | |
for site in websites: | |
for fh in site['foreign_hosts']: | |
if not fh['host'] in hosts: | |
hosts.append(fh['host']) | |
hosts.sort() | |
return hosts | |
# Return a list of parties sorted by the number of foreign requests their websites make | |
def get_parties_by_nr_of_foreign_requests(websites): | |
hosts = [] | |
for site in websites: | |
hosts.append( | |
[site['party'], site['total_foreign_requests']]) | |
hosts = sorted(hosts, key=lambda k: k[1], reverse=True) | |
parties = [] | |
for host in hosts: | |
parties.append(host[0]) | |
return parties | |
# Iterate over the HAR files and return a list of the parsed output | |
def parse_har_files(directory='./'): | |
websites = [] | |
for file in os.listdir(os.fsencode(directory)): | |
filename = os.fsdecode(file) | |
if filename.endswith(".har"): | |
websites.append(parse_har(directory + filename)) | |
else: | |
continue | |
return websites | |
# Is this script running as the entry script? | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment