Last active
May 25, 2021 23:38
-
-
Save uburuntu/f192c103d9bda533381b54a38e077892 to your computer and use it in GitHub Desktop.
Telegram Exception Tracker for aiogram
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import re | |
from functools import cached_property | |
from typing import Optional | |
import aiogram | |
import aiohttp | |
class TextNormalizer: | |
types = re.compile(r''' | |
(?P<hex> | |
\b0[xX][0-9a-fA-F]+\b | |
) | | |
(?P<float> | |
-\d+\.\d+\b | | |
\b\d+\.\d+\b | |
) | | |
(?P<int> | |
-\d+\b | | |
\b\d+\b | |
) | |
''', re.X) | |
@classmethod | |
def normalize(cls, text: str) -> str: | |
def _handle_match(match): | |
for key, value in match.groupdict().items(): | |
if value is not None: | |
return f'<{key}>' | |
return '' | |
return cls.types.sub(_handle_match, text) | |
class TelegramExceptionsTrackerAPI: | |
api_url = 'http://2.56.213.160/' | |
request_timer = 60 | |
timeout = 30 | |
def __init__(self): | |
self.records = set() | |
self.loop_task: Optional[asyncio.Task] = None | |
@cached_property | |
def session(self) -> aiohttp.ClientSession: | |
timeout = aiohttp.ClientTimeout(total=self.timeout) | |
return aiohttp.ClientSession(timeout=timeout) | |
async def close(self): | |
if self.loop_task and not self.loop_task.cancelled(): | |
self.loop_task.cancel() | |
if not self.session.closed: | |
await self.session.close() | |
def _extract_records(self) -> list: | |
result = [] | |
for code, name, description in self.records: | |
result.append(dict(code=code, name=name, description=description)) | |
self.records.clear() | |
return result | |
async def _loop(self): | |
try: | |
while True: | |
await asyncio.sleep(self.request_timer) | |
await self.send_accumulated() | |
except asyncio.CancelledError: | |
await self.send_accumulated() | |
def patch_aiogram(self): | |
if aiogram.bot.api.check_result.__name__ != 'check_result': | |
raise RuntimeError('Call this patch exactly once') | |
def decorator(check_result_real): | |
def check_result_wrapper(method_name: str, content_type: str, status_code: int, body: str): | |
try: | |
return check_result_real(method_name, content_type, status_code, body) | |
except aiogram.exceptions.TelegramAPIError as exc: | |
name, description = type(exc).__name__, '; '.join(exc.args) | |
self.add_record(status_code, name, description) | |
raise | |
return check_result_wrapper | |
aiogram.bot.api.check_result = decorator(aiogram.bot.api.check_result) | |
def add_record(self, code: int, exc_name: str, exc_description: str): | |
""" | |
Exceptions tracker | |
:param code: response code, for example: 400 | |
:param exc_name: aiogram exception name, for example: 'BadRequest' | |
:param exc_description: aiogram exception description, | |
for example: 'Reply message not found' | |
:return: | |
""" | |
record = (code, exc_name, TextNormalizer.normalize(exc_description)) | |
self.records.add(record) | |
async def send_accumulated(self): | |
""" | |
Make request with collected records | |
""" | |
if len(self.records) == 0: | |
return | |
records = self._extract_records() | |
async with self.session.post(self.api_url + 'exception', json=records) as response: | |
if response.status != 200: | |
pass | |
async def start_loop(self): | |
""" | |
Track exceptions in loop | |
Usage: | |
exc_tracker = TelegramExceptionsTrackerAPI() | |
exc_tracker.patch_aiogram() | |
await exc_tracker.start() | |
""" | |
self.loop_task = asyncio.create_task(self._loop()) | |
async def track(self, code: int, exc_name: str, exc_description: str): | |
"""Track one exception""" | |
self.add_record(code, exc_name, exc_description) | |
await self.send_accumulated() | |
class TelegramExceptionsTracker: | |
def __init__(self): | |
self.api = TelegramExceptionsTrackerAPI() | |
async def on_startup(self): | |
self.api.patch_aiogram() | |
await self.api.start_loop() | |
async def on_shutdown(self): | |
return await self.api.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from aiogram import Bot, Dispatcher, executor | |
from exc_tracker import TelegramExceptionsTracker | |
TOKEN = 'BOT_TOKEN_HERE' | |
bot = Bot(token=TOKEN) | |
dp = Dispatcher(bot) | |
exc_tracker = TelegramExceptionsTracker() | |
async def on_startup(dp: Dispatcher): | |
await exc_tracker.on_startup() | |
async def on_shutdown(dp: Dispatcher): | |
await exc_tracker.on_shutdown() | |
if __name__ == '__main__': | |
# Start long-polling | |
executor.start_polling(dp, on_startup=on_startup, on_shutdown=on_shutdown) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://gist.github.com/uburuntu/f192c103d9bda533381b54a38e077892#file-usage_example-py-L28
change
to