Created
September 28, 2025 17:33
-
-
Save toriato/92b9e98b9fdbd7cd47f51a1255253674 to your computer and use it in GitHub Desktop.
웅나웅나나
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import signal | |
| import logging | |
| import asyncio | |
| import re | |
| from os import environ | |
| import uvloop | |
| from discord import Intents, Guild, Member, Message, RawMessageUpdateEvent | |
| from discord.abc import Messageable | |
| from discord.ext.commands import Bot, when_mentioned | |
| from discord.ext.tasks import loop | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| ANTI_PATTERN = re.compile(r'[^ㅇ우운웅ㄴㅏ나난.,!?$#`*~\-\s]') | |
| ALLOWED_GUILD_IDS = map(int, environ['ALLOWED_GUILD_IDS'].split(',')) | |
| TARGET_CHANNEL_ID = int(environ['TARGET_CHANNEL_ID']) | |
| class CustomBot(Bot): | |
| __slots__ = ( | |
| 'message_bucket', | |
| ) | |
| message_bucket: asyncio.Queue[Message] | |
| @loop(seconds=1) | |
| async def flush_message_bucket(self): | |
| messages: set[Message] = set() | |
| while self.message_bucket.qsize() > 0: | |
| try: | |
| messages.add(self.message_bucket.get_nowait()) | |
| except asyncio.QueueEmpty: | |
| break | |
| if not messages: | |
| return | |
| channel = self.get_channel(TARGET_CHANNEL_ID) | |
| if not isinstance(channel, Messageable): | |
| logging.error('target channel is messagable') | |
| return | |
| author_mentions = { | |
| message.author.mention | |
| for message in messages | |
| if not message.author.bot | |
| } | |
| logging.info(f'deleting {len(messages)} message(s)') | |
| async def wrap_delete(): | |
| try: | |
| await channel.delete_messages(messages) | |
| except: | |
| logging.exception('failed to flush message bucket') | |
| async def wrap_response(): | |
| if not author_mentions: | |
| return | |
| mentions = ', '.join(author_mentions) | |
| title = '님들' if len(author_mentions) > 1 else '님' | |
| try: | |
| response_message = await channel.send(f'{mentions}{title} 웅나세여?') | |
| await asyncio.sleep(5) | |
| await self.message_bucket.put(response_message) | |
| except: | |
| logging.exception('failed to response to authors') | |
| asyncio.create_task(wrap_delete()) | |
| asyncio.create_task(wrap_response()) | |
| async def sanitze_message(self, message: Message): | |
| if message.author.bot: | |
| return | |
| if isinstance(message.author, Member) and message.author.guild_permissions.manage_messages: | |
| return | |
| if message.channel.id != TARGET_CHANNEL_ID: | |
| return | |
| content = message.content.strip() | |
| if content: | |
| try: | |
| next(ANTI_PATTERN.finditer(content)) | |
| except StopIteration: | |
| return | |
| await self.message_bucket.put(message) | |
| async def setup_hook(self): | |
| self.message_bucket = asyncio.Queue() | |
| async def on_ready(self): | |
| for guild in filter(lambda x: x.id not in ALLOWED_GUILD_IDS, self.guilds): | |
| try: | |
| logger.info(f'leaving {guild}') | |
| await guild.leave() | |
| except: | |
| logger.exception(f'failed to leave {guild}') | |
| try: | |
| channel = self.get_channel(TARGET_CHANNEL_ID) | |
| if not isinstance(channel, Messageable): | |
| logging.error('target channel is messagable') | |
| return | |
| async for message in channel.history(limit=100): | |
| await self.sanitze_message(message) | |
| except: | |
| logger.exception('failed to fetch history') | |
| finally: | |
| self.flush_message_bucket.start() | |
| logger.info('client is ready to roll') | |
| async def on_guild_join(self, guild: Guild): | |
| if guild.id not in ALLOWED_GUILD_IDS: | |
| try: | |
| logger.info(f'leaving {guild}') | |
| await guild.leave() | |
| except: | |
| logger.exception(f'failed to leave {guild}') | |
| async def on_message(self, message: Message): | |
| await self.sanitze_message(message) | |
| async def on_raw_message_edit(self, event: RawMessageUpdateEvent): | |
| await self.sanitze_message(event.message) | |
| async def main(): | |
| loop = asyncio.get_running_loop() | |
| intents = Intents.default() | |
| intents.message_content = True | |
| async with CustomBot(when_mentioned, intents=intents) as bot: | |
| def on_signal(*_: int): | |
| logger.info(f'signal received, shutting down client gracefully') | |
| loop.create_task(bot.close()) | |
| for s in {signal.SIGINT, signal.SIGTERM}: | |
| loop.add_signal_handler(s, on_signal) | |
| await bot.start(environ['DISCORD_TOKEN']) | |
| if __name__ == '__main__': | |
| uvloop.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment