Created
May 6, 2020 19:04
-
-
Save ruslanchek/e1f72a0d5b035322d027844c3fb09e2c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from selenium import webdriver | |
from selenium.webdriver.common.keys import Keys | |
from selenium.webdriver.support.ui import Select | |
import time | |
from datetime import date, timedelta | |
import numpy as np | |
import threading | |
from threading import Lock | |
def alarm(): | |
import os | |
file = "'Twin-bell-alarm-clock-sound.mp3'" | |
os.system("mpg123 " + file) | |
class Booking: | |
def find_bookdate(self, driver, centre): | |
for i in range(1000): | |
try: | |
driver.find_element_by_name('asd').clear() | |
break | |
except: | |
time.sleep(0.1) | |
for week in range(6): | |
dt = date(day=16, month=12, year=2019) + timedelta(days=week * 7) | |
date_str = dt.strftime('%d/%m/%Y') | |
time.sleep(1) | |
driver.find_element_by_name('asd').clear() | |
driver.find_element_by_name('asd').send_keys(date_str) | |
time.sleep(1) | |
driver.find_element_by_name('asd').send_keys(Keys.ENTER) | |
time.sleep(2) | |
for row in range(3, 14 + 1): | |
for col in range(1, 7 + 1): | |
try: | |
xpath = f'/html/body/form/span/table[2]/tbody/tr[{row}]/td[{col}]' | |
t = driver.find_element_by_xpath(xpath) | |
bg = t.get_attribute('background') | |
if bg is not None and bg != '/WebPhase1/images/drivercalendar/redtime.gif': | |
print(bg) | |
if not self.complete: | |
t.click() | |
time.sleep(1) | |
for i in range(100): | |
try: | |
driver.switch_to.window(driver.window_handles[1]) | |
time.sleep(0.1) | |
driver.save_screenshot(f'booking-{centre}.png') | |
driver.find_element_by_xpath('/html/body/form/center/input').click() | |
time.sleep(1) | |
driver.save_screenshot(f'booking2-{centre}.png') | |
threading.Thread(target=alarm).start() | |
self.complete = True | |
return True | |
except: | |
time.sleep(0.1) | |
driver.switch_to.window(driver.window_handles[0]) | |
except: | |
pass | |
@classmethod | |
def reopen_calendar(cls, driver, centre): | |
while True: | |
try: | |
driver.get('http://ya.ru') | |
cls.open_calendar(driver, centre) | |
return | |
except: | |
time.sleep(5) | |
@classmethod | |
def open_calendar(cls, driver, centre): | |
while True: | |
try: | |
driver.get('https://rtd.mcw.gov.cy/WebPhase1/gui/dlcalendar/PersonalDataPage.jsp?lang=gr&comingfrom=3') | |
Select(driver.find_element_by_name('h_id_type')).select_by_value('5') | |
driver.find_element_by_name('h_pid_number').clear() | |
driver.find_element_by_name('h_pid_number').send_keys('779514') | |
Select(driver.find_element_by_name('h_day_of_birth')).select_by_value('18') | |
Select(driver.find_element_by_name('h_month_of_birth')).select_by_value('02') | |
Select(driver.find_element_by_name('h_year_of_birth')).select_by_value('1993') | |
driver.find_element_by_name('h_next').click() | |
time.sleep(np.random.uniform(10, 20)) | |
Select(driver.find_element_by_name('h_centre')).select_by_value(str(centre)) | |
Select(driver.find_element_by_name('h_licence_category')).select_by_index(1) | |
driver.find_element_by_name('h_vrm').clear() | |
driver.find_element_by_name('h_vrm').send_keys('NHE533') | |
driver.find_element_by_xpath('/html/body/form/span/table[2]/tbody/tr[6]/td[1]/input').click() | |
time.sleep(10) | |
return | |
except: | |
pass | |
def _thread_function(self, centre: int): | |
start_time = time.time() | |
driver = webdriver.Firefox() | |
self.open_calendar(driver, centre) | |
while not self.complete and time.time() - start_time < 60 * 30: | |
try: | |
self.find_bookdate(driver, centre) | |
except Exception as e: | |
print(e) | |
time.sleep(10) | |
self.reopen_calendar(driver, centre) | |
self.complete = True | |
try: | |
driver.quit() | |
except: | |
pass | |
def __init__(self, centres: list): | |
self.centres = centres | |
self.threads = [] | |
self.complete = False | |
def start(self): | |
for centre in self.centres: | |
thread = threading.Thread(target=self._thread_function, args=[centre]) | |
thread.start() | |
self.threads.append(thread) | |
time.sleep(1) | |
while not self.complete: | |
time.sleep(0.1) | |
for thread in self.threads: | |
thread.join() | |
# b = Booking([1, 2, 3, 4]) | |
while True: | |
b = Booking([1,2]) | |
b.start() | |
time.sleep(80) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment