Created
January 6, 2025 06:40
-
-
Save worldwise001/10af68753ddccc771bc848c8710d52ec to your computer and use it in GitHub Desktop.
TripIt trip extractor using selenium
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from selenium import webdriver | |
from selenium.webdriver.common.keys import Keys | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.support.wait import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.common.exceptions import NoSuchElementException | |
from urllib.parse import urlparse | |
import getpass | |
import json | |
import pprint | |
import time | |
def process_browser_logs_for_network_events(logs): | |
for entry in logs: | |
log = json.loads(entry["message"])["message"] | |
if ("Network.response" in log["method"] or "Network.request" in log["method"] or "Network.webSocket" in log["method"]): | |
yield log | |
def setup_webdriver(): | |
options = Options() | |
options.add_argument("--disable-extensions") | |
options.add_argument("--headless=new") | |
options.add_argument("--incognito") | |
#options.add_experimental_option("detach", True) | |
options.set_capability('goog:loggingPrefs', {'performance': 'ALL'}) | |
driver = webdriver.Chrome(options=options) | |
return driver | |
def navigate_tripit(driver, email, password): | |
driver.get('https://www.tripit.com/account/login') | |
consent = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, 'truste-consent-required'))) | |
#consent.click() | |
time.sleep(3) | |
form = driver.find_element(By.ID, 'authenticate') | |
form_email = driver.find_element(By.ID, 'email_address') | |
form_password = driver.find_element(By.ID, 'password') | |
form_submit = driver.find_element(By.ID, 'signin-submit-btn') | |
form_email.send_keys(email) | |
form_password.send_keys(password) | |
form.submit() | |
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, "//div[@data-cy='trip-list-item-display-address']"))) | |
print('Logged in!') | |
time.sleep(1) | |
has_pages = True | |
while has_pages: | |
try: | |
pagination = driver.find_element(By.XPATH, "//button[@aria-label='Go to next page']") | |
pagination.click() | |
print('click') | |
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, "//div[@data-cy='trip-list-item-display-address']"))) | |
time.sleep(1) | |
except NoSuchElementException: | |
print('No further trips on Current Trips page') | |
has_pages = False | |
past_trips = driver.find_element(By.ID, 'trips-list-tab-past') | |
past_trips.click() | |
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, "//div[@data-cy='trip-list-item-display-address']"))) | |
time.sleep(1) | |
has_pages = True | |
while has_pages: | |
try: | |
pagination = driver.find_element(By.XPATH, "//button[@aria-label='Go to next page']") | |
pagination.click() | |
print('click') | |
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, "//div[@data-cy='trip-list-item-display-address']"))) | |
time.sleep(1) | |
except NoSuchElementException: | |
print('No further trips on Past Trips page') | |
has_pages = False | |
logs = driver.get_log("performance") | |
return list(process_browser_logs_for_network_events(logs)) | |
def extract_req_data(driver, data): | |
reqs = {} | |
for request_id in data: | |
reqs[request_id] = data[request_id] | |
req_data = data[request_id]['data'] | |
reqs[request_id]['data'] = [] | |
for req in req_data: | |
entry = {} | |
entry['method'] = req['method'] | |
if entry['method'] == 'Network.requestWillBeSent': | |
entry['headers'] = req['params']['request']['headers'] | |
entry['request'] = req['params']['request'] | |
if 'ExtraInfo' in entry['method']: | |
entry['headers'] = req['params']['headers'] | |
if entry['method'] == 'Network.responseReceived': | |
entry['headers'] = req['params']['response']['headers'] | |
entry['response'] = req['params']['response'] | |
entry['body'] = driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': request_id}) | |
reqs[request_id]['data'].append(entry) | |
return reqs | |
def dump_json_bodies(reqs): | |
for req_id in reqs: | |
for entry in reqs[req_id]['data']: | |
if entry['method'] == 'Network.responseReceived': | |
with open(req_id + '.json', 'w') as fp: | |
fp.write(entry['body']['body']) | |
def extract_trip_data(reqs): | |
trips = [] | |
for req_id in reqs: | |
for entry in reqs[req_id]['data']: | |
if entry['method'] == 'Network.responseReceived': | |
body = json.loads(entry['body']['body']) | |
btrips = body['Trip'] | |
if type(btrips) is dict: | |
btrips = [btrips] | |
for btrip in btrips: | |
pprint.pprint(btrip) | |
trip = { | |
'uuid': btrip['uuid'], | |
'start_date': btrip['start_date'], | |
'end_date': btrip['end_date'], | |
'location': btrip['PrimaryLocationAddress'], | |
'name': btrip['display_name'] | |
} | |
trips.append(trip) | |
trips = sorted(trips, key=lambda trip: trip['start_date']) | |
return trips | |
def reformat_data(data, allowed_domains = [], allowed_paths = []): | |
reqs = {} | |
for entry in data: | |
request_id = entry['params']['requestId'] | |
if not reqs.get(request_id, None): | |
reqs[request_id] = {'url': None, 'data': []} | |
reqs[request_id]['data'].append(entry) | |
if entry['method'] == 'Network.requestWillBeSent' and not reqs[request_id]['url']: | |
url = entry['params']['request']['url'] | |
parsed = urlparse(url) | |
reqs[request_id]['url'] = {'full': url, 'netloc': parsed.netloc, 'path': parsed.path} | |
if len(allowed_domains) > 0 or len(allowed_paths) > 0: | |
reqs_old = reqs | |
reqs = {} | |
for key in reqs_old: | |
if reqs_old[key]['url']['netloc'] not in allowed_domains and len(allowed_domains) > 0: | |
continue | |
if len(allowed_paths) > 0: | |
found = False | |
for path in allowed_paths: | |
if path in reqs_old[key]['url']['path']: | |
found = True | |
break | |
if not found: | |
continue | |
reqs[key] = reqs_old[key] | |
return reqs | |
if __name__ == '__main__': | |
try: | |
driver = setup_webdriver() | |
data = navigate_tripit(driver, '[email protected]', getpass.getpass()) | |
reqs = reformat_data(data, ['www.tripit.com'], ['/api/v2/list/trip']) | |
reqs = extract_req_data(driver, reqs) | |
trips = extract_trip_data(reqs) | |
with open('trips.json', 'w') as fp: | |
json.dump(trips, fp) | |
finally: | |
driver.quit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment