Last active
January 3, 2022 02:39
-
-
Save idriszmy/a51dccc031837d4ae5309c5a3f36d842 to your computer and use it in GitHub Desktop.
Controlling relay via Telegram using Maker Pi Pico and CircuitPython.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Controlling relay via Telegram using Maker Pi Pico and CircuitPython. | |
Items: | |
- Maker Pi Pico | |
https://my.cytron.io/p-maker-pi-pico | |
- ESP8266 ESP-01 WiFi Module | |
https://my.cytron.io/p-esp-01-wifi-serial-transceiver-module-esp8266 | |
- Grove - Relay | |
https://my.cytron.io/p-grove-relay | |
- USB Micro B Cable | |
https://my.cytron.io/p-usb-micro-b-cable | |
Libraries required from bundle (https://circuitpython.org/libraries): | |
- adafruit_espatcontrol | |
- adafruit_requests.mpy | |
- simpleio.mpy | |
References: | |
- https://core.telegram.org/bots/api | |
- https://github.com/shurillu/CTBot | |
Last update: 27 Dec 2021 | |
""" | |
import time | |
import board | |
import digitalio | |
import simpleio | |
import busio | |
import adafruit_requests as requests | |
import adafruit_espatcontrol.adafruit_espatcontrol_socket as socket | |
from adafruit_espatcontrol import adafruit_espatcontrol | |
import json | |
# Get wifi details and more from a secrets.py file | |
try: | |
from secrets import secrets | |
except ImportError: | |
print("All secret keys are kept in secrets.py, please add them there!") | |
raise | |
# Telegram API url. | |
API_URL = "https://api.telegram.org/bot" + secrets["telegram_bot_token"] | |
NOTE_G4 = 392 | |
NOTE_C5 = 523 | |
buzzer = board.GP18 | |
relay = digitalio.DigitalInOut(board.GP3) | |
relay.direction = digitalio.Direction.OUTPUT | |
# Initialize UART connection to the ESP8266 WiFi Module. | |
RX = board.GP17 | |
TX = board.GP16 | |
uart = busio.UART(TX, RX, receiver_buffer_size=2048) # Use large buffer as we're not using hardware flow control. | |
esp = adafruit_espatcontrol.ESP_ATcontrol(uart, 115200, debug=False) | |
requests.set_socket(socket, esp) | |
print("\nResetting ESP module") | |
esp.soft_reset() | |
def init_bot(): | |
get_url = API_URL | |
get_url += "/getMe" | |
r = requests.get(get_url) | |
return r.json()['ok'] | |
first_read = True | |
update_id = 0 | |
def read_message(): | |
global first_read | |
global update_id | |
get_url = API_URL | |
get_url += "/getUpdates?limit=1&allowed_updates=[\"message\",\"callback_query\"]" | |
if first_read == False: | |
get_url += "&offset={}".format(update_id) | |
r = requests.get(get_url) | |
#print(r.json()) | |
try: | |
update_id = r.json()['result'][0]['update_id'] | |
message = r.json()['result'][0]['message']['text'] | |
chat_id = r.json()['result'][0]['message']['chat']['id'] | |
#print("Update ID: {}".format(update_id)) | |
print("Chat ID: {}\tMessage: {}".format(chat_id, message)) | |
first_read = False | |
update_id += 1 | |
simpleio.tone(buzzer, NOTE_G4, duration=0.1) | |
simpleio.tone(buzzer, NOTE_C5, duration=0.1) | |
return chat_id, message | |
except (IndexError) as e: | |
#print("No new message") | |
return False, False | |
def send_message(chat_id, message): | |
get_url = API_URL | |
get_url += "/sendMessage?chat_id={}&text={}".format(chat_id, message) | |
r = requests.get(get_url) | |
#print(r.json()) | |
print("Connecting to a network...") | |
esp.connect(secrets) | |
if init_bot() == False: | |
print("Telegram bot failed.") | |
else: | |
print("Telegram bot ready!\n") | |
simpleio.tone(buzzer, NOTE_C5, duration=0.1) | |
while True: | |
try: | |
while not esp.is_connected: | |
print("Reconnecting...") | |
esp.connect(secrets) | |
chat_id, message_in = read_message() | |
if message_in == "/start": | |
send_message(chat_id, "Welcome to Telegram Bot!") | |
elif message_in == "RELAY ON": | |
relay.value = True | |
send_message(chat_id, "Relay is activated.") | |
elif message_in == "RELAY OFF": | |
relay.value = False | |
send_message(chat_id, "Relay is deactivated.") | |
else: | |
send_message(chat_id, "Command is not available.") | |
except (ValueError, RuntimeError, adafruit_espatcontrol.OKError) as e: | |
#print("Failed, retrying\n", e) | |
continue |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Secret Keys. | |
secrets = { | |
"ssid" : "my_wifi_ssid", | |
"password" : "my_wifi_password", | |
"telegram_bot_token" : "my_telegram_bot_token", | |
"telegram_chat_id" : "my_telegram_chat_id", | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment