Last active
May 28, 2021 15:20
-
-
Save sidd607/1085970568d2e94c798c0fc1141d4697 to your computer and use it in GitHub Desktop.
Python Script to keep querying the Cowin website to fetch available time slots
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Conditions | |
USE_PINCODES = False # FLAG to search by pincode or district | |
VACCINE_CHOICE = ["COVISHIELD", "COVAXIN"] | |
AGE_LIMIT = [18] | |
PINCODES_TO_SEARCH = ["110037"] | |
DISTRICT_CODES_TO_SEARCH = ["140","141","142","143","144","145","146","147","148","149","150"] | |
TIME_INTERVAL = 60 # in seconds | |
ALLOW_DESKTOP_NOTIFICATION = True # Requires additional libraries -> https://pypi.org/project/py-notifier/ | |
DOSE1 = "available_capacity_dose1" | |
DOSE2 = "available_capacity_dose2" | |
DOSES_TO_CHECK = DOSE1 | |
THRESHOLD = 5 # This is to remove smaller slots that get filled up by the time I open the website | |
SEND_MESSAGE = True # Requires registration with Twilio to be able to send messages and required additional Libraries | |
SEND_MESSAGE_NUMBERS = [""] | |
from datetime import datetime | |
import requests # pip install requests | |
import json | |
import time | |
#Request URL: https://cdn-api.co-vin.in/api/v2/appointment/sessions/calendarByDistrict?district_id=147&date=27-05-2021 | |
URL_PINCODE = "https://cdn-api.co-vin.in/api/v2/appointment/sessions/calendarByPin?pincode={0}&date={1}" | |
URL_DISTRICT = "https://cdn-api.co-vin.in/api/v2/appointment/sessions/calendarByDistrict?district_id={0}&date={1}" | |
def queryCowin(): | |
URL = URL_DISTRICT | |
if USE_PINCODES: | |
URL = URL_PINCODE | |
# URL = "https://cdn-api.co-vin.in/api/v2/appointment/sessions/calendarByPin?pincode={0}&date={1}" | |
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'} | |
date = datetime.now() | |
dateStr = "{0}-{1}-{2}".format("{:02d}".format(date.day), "{:02d}".format(date.month), date.year) | |
slots = {} | |
PINCODES = DISTRICT_CODES_TO_SEARCH | |
if USE_PINCODES: | |
PINCODES = PINCODES_TO_SEARCH | |
for pincode in PINCODES: | |
urlToPing = URL.format(pincode, dateStr) | |
print (urlToPing) | |
r = requests.get(urlToPing, headers=headers) | |
# print(r.text) | |
# text = """{"centers":[{"center_id":693559,"name":"Sarvodaya School Sec 6 Site 3.","address":"Govt. Sarvodaya Co-ed Senior Sec School R.K.Puram Sec-6","state_name":"Delhi","district_name":"New Delhi","block_name":"Not Applicable","pincode":110022,"lat":28,"long":77,"from":"09:00:00","to":"17:00:00","fee_type":"Free","sessions":[{"session_id":"a1190c1e-7978-45f2-9948-2d887c4a3656","date":"12-05-2021","available_capacity":10,"min_age_limit":18,"vaccine":"COVISHIELD","slots":["09:00AM-11:00AM","11:00AM-01:00PM","01:00PM-03:00PM","03:00PM-05:00PM"]}]}]}""" | |
print (r.status_code, urlToPing) | |
jsonResponse = json.loads(r.text) | |
centers = jsonResponse['centers'] | |
availableSlots = [] | |
for center in centers: | |
slot = { | |
"centerName" : center['name'], | |
"centerAddress": center['address'], | |
"centerDistrict": center['district_name'], | |
"centerPinCode": center['pincode'], | |
"centerFeeType": center['fee_type'], | |
"sessions": [] | |
} | |
sessions = center['sessions'] | |
for session in sessions: | |
if session['available_capacity'] > THRESHOLD and session[DOSES_TO_CHECK] > THRESHOLD: | |
if session['vaccine'] in VACCINE_CHOICE and session['min_age_limit'] in AGE_LIMIT: | |
# ADD DETAILS | |
sessionDetails = { | |
"availableCapacity": session[DOSES_TO_CHECK], | |
"minAgeLimit": session['min_age_limit'], | |
"vaccine": session['vaccine'], | |
"slots": session['slots'], | |
"date": session['date'] | |
} | |
slot['sessions'].append(sessionDetails) | |
if len(slot["sessions"]) > 0: | |
slotDetails = slot.get(pincode, []) | |
slotDetails.append(slot) | |
slots[pincode] = slotDetails | |
# Pretty Print Slot Details | |
formattedSting = "" | |
notificationString = "" | |
CENTER_TEMPLATE = """{0} | |
{1} | |
{2} {3} | |
Vacconation : {4} | |
Slot Details : | |
""" | |
SLOTDETAILS_TEMPLATE = """\tDate: {0} | |
\tAvailavble Capacity: {1} | |
\tMin Age: {2} | |
\tVaccine: {3} | |
""" | |
for key in slots: | |
formattedSting += "\n----------" + key + "----------\n" | |
slotDetails = "" | |
# print(slots[key]) | |
for center in slots[key]: | |
for session in center['sessions']: | |
slotDetails += SLOTDETAILS_TEMPLATE.format(session['date'], session['availableCapacity'], session['minAgeLimit'], session['vaccine']) | |
centerDetails = CENTER_TEMPLATE.format(center['centerName'], center['centerAddress'], center['centerDistrict'], center['centerPinCode'], center['centerFeeType']) | |
centerDetails += slotDetails | |
formattedSting += centerDetails | |
# print (formattedSting) | |
# from pynotifier import Notification | |
# Notification( | |
# title='COWIN Availability Notification', | |
# description=formattedSting, | |
# duration=10, # Duration in seconds | |
# urgency='critical' | |
# ).send() | |
notificationString +="{0} {1}\n".format(center['centerName'], center['centerPinCode']) | |
return formattedSting, notificationString | |
if __name__ == "__main__": | |
while (1): | |
try: | |
result, notificationString = queryCowin() | |
if len(result) == 0: | |
print ("\n----------------------------------------------------------------------------") | |
print ("No Available Slots @ "+ str(datetime.now())) | |
print ("----------------------------------------------------------------------------\n") | |
else: | |
print (result) | |
if (ALLOW_DESKTOP_NOTIFICATION): | |
from pynotifier import Notification | |
Notification( | |
title='COWIN Availability Notification', | |
description=notificationString, | |
duration=10, # Duration in seconds | |
urgency='critical' | |
).send() | |
if SEND_MESSAGE: | |
from twilio.rest import Client | |
account_sid = "" | |
auth_token = "" | |
client = Client(account_sid, auth_token) | |
try: | |
notificationString += " "+ "https://selfregistration.cowin.gov.in/" | |
message = client.messages.create(body = notificationString, from_ = "+14439918570", to="+919886680325") | |
print ("Message Sent: {0}".format(message.sid)) | |
except: | |
print ("Error sending message") | |
time.sleep(TIME_INTERVAL) | |
except: | |
print ("SOME ERROR") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment