Created
September 15, 2021 11:05
-
-
Save Mizzlr/6fa9a399eb7f51d40222bb3021137de7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# from selenium import webdriver | |
from seleniumwire import webdriver | |
from selenium.webdriver.common.action_chains import ActionChains | |
from selenium.webdriver.chrome.options import Options | |
import time | |
import os | |
import calendar | |
import traceback | |
import json | |
from tqdm import tqdm | |
import random | |
from fabulous.color import bold, green, red | |
MONTHS = list(calendar.month_abbr) | |
DOWNLOAD_DIR = os.path.expanduser('~/Downloads/selenium-dataset/') | |
REQUESTED = False | |
SEEN_ALL_REPORTS_JS = False | |
def make_driver(): | |
global REQUESTED | |
global SEEN_ALL_REPORTS_JS | |
options = Options() | |
options.binary_location = '/Applications/Brave Browser.app/Contents/MacOS/Brave Browser' | |
# options.binary_location = '/Applications/Google Chrome.app/Contents/MacOS/Google Chrome' | |
chromePrefs = { | |
"profile.default_content_settings.popups": 0, | |
"download.prompt_for_download": False, | |
'devtools.open_docked': True, | |
"download.default_directory": DOWNLOAD_DIR, | |
} | |
options.add_experimental_option("prefs", chromePrefs) | |
options.add_argument("disable-popup-blocking") | |
options.add_argument("--auto-open-devtools-for-tabs") | |
# options.add_argument('--start-fullscreen') | |
# options.add_argument('--host-resolver-rules=MAP c.go-mpulse.net 192.168.0.120') | |
# options.add_argument('--host-resolver-rules=MAP s.go-mpulse.net 192.168.0.121') | |
def request_interceptor(request): | |
global SEEN_ALL_REPORTS_JS | |
print(bold(green('GET')), request.url) | |
if any(x in str(request.url) for x in ['go-mpulse.net', 'google-analytics.com', 'googletagmanager.com/gtm.js', 'akam/11/']): #, 'google-analytics.com', | |
# 'doubleclick.net', 'akam/11/', 'googletagmanager.com/gtm.js']): | |
print(red('aborting ... '), request.url) | |
request.abort() | |
# if 'all-reports' in request.url: | |
# print(bold(red('Deleting cookie for all-reports endpoint')), | |
# request.headers.get('Cookie', None), request.url) | |
# del request.headers['Cookie'] | |
# if not SEEN_ALL_REPORTS_JS and 'ak_bmsc=' in request.headers.get('Cookie', ''): | |
# print(bold(red('Deleting cookie for all-reports endpoint')), | |
# request.headers.get('Cookie', None), request.url) | |
# del request.headers['Cookie'] | |
# if 'all-reports.js' in request.url: | |
# SEEN_ALL_REPORTS_JS = True | |
# if 'allMarketStatus' in request.url: | |
# del request.headers['Cookie'] | |
elif 'https://www.nseindia.com/api/reports?archives=' in str(request.url): | |
REQUESTED = True | |
# del request.headers['Cookie'] | |
def response_interceptor(request, response): # A response interceptor takes two args | |
if 'https://www.nseindia.com/api/reports?archives=' in str(request.url): | |
REQUESTED = False | |
if response.status_code != 200: | |
print(red(' >> FAILED'), response.status_code, request.url) | |
driver = webdriver.Chrome(options=options) | |
driver.request_interceptor = request_interceptor | |
driver.response_interceptor = response_interceptor | |
return driver | |
def pick_date(day, month, desired_year, driver): | |
dp_button = driver.find_element_by_xpath('//*[@id="cr_equity_archives"]/div/div[1]/div[2]/div/div/div[2]/span/button') | |
time.sleep(2) | |
dp_button.click() | |
cal = driver.find_element_by_xpath('/html/body/div[@role="calendar"]') | |
while True: | |
period = cal.find_element_by_xpath('//*/div[@role="period"]') | |
month_year = str(period.get_attribute('innerHTML')).strip() | |
current_month_index = MONTHS.index(month_year[:3]) | |
desired_month_index = MONTHS.index(month) | |
current_year = int(month_year[-4:]) | |
print(f'{desired_year=}, {current_year=}, {desired_month_index=}, {current_month_index=}') | |
if desired_year > current_year or (desired_year == current_year and desired_month_index > current_month_index): | |
# move right | |
left = cal.find_element_by_class_name("fa-chevron-right") | |
left.click() | |
period = cal.find_element_by_xpath('//*/div[@role="period"]') | |
print('moved right', period.get_attribute('innerHTML')) | |
# time.sleep(0.1) | |
elif desired_year < current_year or (desired_year == current_year and desired_month_index < current_month_index): | |
# move right | |
right = cal.find_element_by_class_name("fa-chevron-left") | |
right.click() | |
period = cal.find_element_by_xpath('//*/div[@role="period"]') | |
print('moved left', period.get_attribute('innerHTML')) | |
# time.sleep(0.1) | |
else: | |
assert desired_year == current_year and desired_month_index == current_month_index | |
day = cal.find_element_by_xpath('//*/td[@day="' + str(day) + '"]') | |
day.click() | |
print(' selectedday after click', cal.get_attribute('selectedday')) | |
time.sleep(2) | |
break | |
def sorted_bhav_copy_files(): | |
files = os.listdir(os.path.expanduser('~/bhavcopy-dataset/basic/')) | |
dates = [] | |
file_map = {} | |
for date in tqdm(os.listdir(os.path.expanduser('~/bhavcopy-dataset/basic/')), desc='Downloading Bhavcopy PR'): | |
day, month, year = date[2:4], date[4:7], date[7:11] | |
month_number = str(MONTHS.index(month)) | |
if len(month_number) == 1: | |
month_number = '0' + month_number | |
dates.append(f'{year}{month_number}{day}') | |
file_map[dates[-1]] = date | |
dates = reversed(sorted(dates)) | |
files = [file_map[k] for k in dates] | |
return files | |
def run_driver(driver, files): | |
try: | |
driver.get('https://www.nseindia.com/all-reports#cr_equity_archives') | |
time.sleep(10) | |
filtered_files = [] | |
for date in files: | |
day, month, year = int(date[2:4]), date[4:7], int(date[7:11]) | |
month_number = str(MONTHS.index(month)) | |
if len(month_number) == 1: | |
month_number = '0' + month_number | |
already_downloaded = f'PR{date[2:4]}{month_number}{date[9:11]}.zip' | |
print('Checking for file', os.path.join(DOWNLOAD_DIR, already_downloaded)) | |
if os.path.isfile(os.path.join(DOWNLOAD_DIR, already_downloaded)): | |
print(f'File {already_downloaded} already downloaded, skipping it ...') | |
continue | |
filtered_files.append(date) | |
print('ALL FILES:', len(files)) | |
print('FILTERED :', len(filtered_files)) | |
# random.shuffle(filtered_files) | |
for date in tqdm(filtered_files, desc='Downloading Bhavcopy PR'): | |
day, month, year = int(date[2:4]), date[4:7], int(date[7:11]) | |
month_number = str(MONTHS.index(month)) | |
if len(month_number) == 1: | |
month_number = '0' + month_number | |
already_downloaded = f'PR{date[2:4]}{month_number}{date[9:11]}.zip' | |
print('Checking for file', os.path.join(DOWNLOAD_DIR, already_downloaded)) | |
if os.path.isfile(os.path.join(DOWNLOAD_DIR, already_downloaded)): | |
print(f'File {already_downloaded} already downloaded, skipping it ...') | |
continue | |
already_downloaded = f'PR{date[2:4]}{month_number}{date[9:11]} (1).zip' | |
print('Checking for file', os.path.join(DOWNLOAD_DIR, already_downloaded)) | |
if os.path.isfile(os.path.join(DOWNLOAD_DIR, already_downloaded)): | |
print(f'File {already_downloaded} already downloaded, skipping it ...') | |
continue | |
print(f'Downloading file {already_downloaded}, ...') | |
# continue | |
# for day in [11, 12, 13]: | |
# while REQUESTED: | |
# print('Waiting for download ...') | |
# time.sleep(1) | |
pick_date(day, month, year, driver) | |
def scroll(by, steps=10): | |
driver.execute_script('window.scrollBy(0,' + str(by) + ');') | |
return | |
# step = by // steps | |
# for _ in range(0, by, step): | |
# driver.execute_script('window.scrollBy(0,' + str(step) + ');') | |
# # time.sleep(0.2) | |
scroll(500) | |
time.sleep(2) | |
checkbox = driver.find_element_by_xpath('//*[@id="cr_equity_archives"]/div/div[3]/div[11]/div/div/label') | |
checkbox.click() | |
time.sleep(2) | |
scroll(-500) | |
time.sleep(2) | |
download_button = driver.find_element_by_xpath('//*[@id="cr_equity_archives"]/div/div[3]/div[11]/div/div/span') | |
print('clicking download button') | |
download_button.click() | |
driver.implicitly_wait(10) | |
scroll(-500) | |
print('Completed downloading all the files.') | |
time.sleep(1000) | |
except: | |
traceback.print_exc() | |
finally: | |
time.sleep(1000) | |
driver.close() | |
if __name__ == "__main__": | |
files = sorted_bhav_copy_files() | |
# print('Sorted dates:', files[:100]) | |
time.sleep(10) | |
while True: | |
try: | |
driver = make_driver() | |
run_driver(driver, files) # files[1960 + 50:len(files) - 650]) | |
except KeyboardInterrupt: | |
traceback.print_exc() | |
exit(1) | |
except Exception: | |
print('Ignoring exception, continuing anyways ...') | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment