import os from tempfile import TemporaryDirectory from time import sleep from PIL import Image from django.db import models from django import forms from django.forms import modelformset_factory from jsonfield import JSONField from telegram import Update from telegram.ext import Dispatcher, CallbackContext, MessageHandler, Filters, CommandHandler from telegram.utils.helpers import mention_html from bots.models import TelegramBotModuleConfig class PingBotCategory(models.Model): chat_id = models.BigIntegerField(db_index=True) cat_name = models.CharField(max_length=64, db_index=True) user_ids = JSONField(null=True) def add_user(self, uid: int): user_ids = set(self.user_ids or []) user_ids.add(uid) self.user_ids = list(user_ids) def remove_user(self, uid: int): user_ids = set(self.user_ids or []) user_ids.add(uid) self.user_ids = list(user_ids) class Meta: unique_together = 'chat_id', 'cat_name', class PingBotModuleConfig(TelegramBotModuleConfig): no_users_text = models.TextField(default='No users found') message_template = models.TextField(default='{mentions}') MODULE_NAME = 'Ping' @staticmethod def get_category(update: Update, name: str): obj, created = PingBotCategory.objects.get_or_create(chat_id=update.effective_chat.id, cat_name=name) return obj def i_am_handler(self, update: Update, ctx: CallbackContext): _, category = update.effective_message.text.split() cat = self.get_category(update, category) cat.add_user(update.effective_user.id) cat.save() update.effective_message.reply_text('Done') def i_am_not_handler(self, update: Update, ctx: CallbackContext): _, category = update.effective_message.text.split() cat = self.get_category(update, category) cat.remove_user(update.effective_user.id) cat.save() update.effective_message.reply_text('Done') def ping_handler(self, update: Update, ctx: CallbackContext): _, category = update.effective_message.text.split() cat = self.get_category(update, category) if not cat.user_ids: return update.effective_message.reply_text(self.no_users_text) mentions = [] for uid in cat.user_ids: member = ctx.bot.get_chat_member(update.effective_chat.id, uid) mentions.append(mention_html(uid, member.user.full_name)) update.effective_message.reply_markdown(self.message_template.format(mentions=' '.join(mentions))) def categories_handler(self, update: Update, ctx: CallbackContext): categories = [] for category in PingBotCategory.objects.filter(chat_id=update.effective_chat.id): # type: PingBotCategory if category.user_ids: categories.append('- {} ({})'.format(category.cat_name, len(category.user_ids))) if not categories: update.effective_message.reply_text('No categories for this chat') else: update.effective_message.reply_text('\n'.join(categories)) def build_dispatcher(self, dispatcher: Dispatcher): dispatcher.add_handler(CommandHandler('iam', self.i_am_handler)) dispatcher.add_handler(CommandHandler('iamnot', self.i_am_not_handler)) dispatcher.add_handler(CommandHandler('ping', self.ping_handler)) dispatcher.add_handler(CommandHandler('categories', self.categories_handler)) return dispatcher