telegram_bots/bots/modules/ping.py
2021-03-11 23:43:54 +03:00

84 lines
3.4 KiB
Python

from django.db import models
from telegram import Update
from telegram.error import BadRequest
from telegram.ext import Dispatcher, CallbackContext, CommandHandler
from telegram.utils.helpers import mention_html
from bots.models import TelegramBotModuleConfig
class PingBotCategory(models.Model):
chat_id = models.BigIntegerField(db_index=True)
cat_name = models.CharField(max_length=64, db_index=True)
user_ids = models.JSONField(null=True)
def add_user(self, uid: int):
user_ids = set(self.user_ids or [])
user_ids.add(uid)
self.user_ids = list(user_ids)
def remove_user(self, uid: int):
user_ids = set(self.user_ids or [])
user_ids.remove(uid)
self.user_ids = list(user_ids)
class Meta:
unique_together = 'chat_id', 'cat_name',
class PingBotModuleConfig(TelegramBotModuleConfig):
no_users_text = models.TextField(default='No users found')
message_template = models.TextField(default='{mentions}')
MODULE_NAME = 'Ping'
@staticmethod
def get_category(update: Update, name: str):
obj, created = PingBotCategory.objects.get_or_create(chat_id=update.effective_chat.id, cat_name=name)
return obj
def i_am_handler(self, update: Update, ctx: CallbackContext):
_, category = update.effective_message.text.split(maxsplit=1)
cat = self.get_category(update, category)
cat.add_user(update.effective_user.id)
cat.save()
update.effective_message.reply_text('Done')
def i_am_not_handler(self, update: Update, ctx: CallbackContext):
_, category = update.effective_message.text.split(maxsplit=1)
cat = self.get_category(update, category)
cat.remove_user(update.effective_user.id)
cat.save()
update.effective_message.reply_text('Done')
def ping_handler(self, update: Update, ctx: CallbackContext):
_, category = update.effective_message.text.split(maxsplit=1)
cat = self.get_category(update, category)
if not cat.user_ids:
return update.effective_message.reply_text(self.no_users_text)
mentions = []
for uid in cat.user_ids:
try:
member = ctx.bot.get_chat_member(update.effective_chat.id, uid)
mentions.append(mention_html(uid, '#{} [{}]'.format(uid, member.user.full_name)))
except BadRequest:
cat.remove_user(uid)
update.effective_message.reply_html(self.message_template.format(mentions='; '.join(mentions)))
def categories_handler(self, update: Update, ctx: CallbackContext):
categories = []
for category in PingBotCategory.objects.filter(chat_id=update.effective_chat.id): # type: PingBotCategory
if category.user_ids:
categories.append('- {} ({})'.format(category.cat_name, len(category.user_ids)))
if not categories:
update.effective_message.reply_text('No categories for this chat')
else:
update.effective_message.reply_text('\n'.join(categories))
def build_dispatcher(self, dispatcher: Dispatcher):
dispatcher.add_handler(CommandHandler('iam', self.i_am_handler))
dispatcher.add_handler(CommandHandler('iamnot', self.i_am_not_handler))
dispatcher.add_handler(CommandHandler('ping', self.ping_handler))
dispatcher.add_handler(CommandHandler('categories', self.categories_handler))
return dispatcher