import os from datetime import timedelta, datetime, time from random import choice, seed from uuid import uuid4 import humanize import markovify from django.db import models from django.utils.timezone import localdate, now, make_aware, localtime from jsoneditor.forms import JSONEditor from telegram import Update, Chat, User, InlineQueryResultArticle, InputTextMessageContent from telegram.error import BadRequest from telegram.ext import Dispatcher, CallbackContext, MessageHandler, Filters, CommandHandler, InlineQueryHandler from telegram.utils.helpers import mention_html from bots.models import TelegramBotModuleConfig class CyberLinaBotModuleConfig(TelegramBotModuleConfig): first_part = models.JSONField(default=list) second_part = models.JSONField(default=list) third_part = models.JSONField(default=list) emoji = models.JSONField(default=list) already_ran = models.JSONField(default=list) welcome_reactions = models.JSONField(default=list) inline_reactions = models.JSONField(default=list) _mtg_data = models.JSONField(default=dict, blank=True, null=True) _mtg_corpus = models.TextField(null=True, blank=True) mtg_train = models.TextField(null=True, blank=True) MODULE_NAME = 'Киберлиночка' CUSTOM_WIDGETS = { 'first_part': JSONEditor(), 'second_part': JSONEditor(), 'third_part': JSONEditor(), 'emoji': JSONEditor(), 'already_ran': JSONEditor(), 'welcome_reactions': JSONEditor(), 'inline_reactions': JSONEditor(), } EXCLUDE_FIELDS = '_mtg_data', '_mtg_corpus', def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) try: self.markovify = markovify.Text.from_dict(self._mtg_data) except: pass def message_handler(self, update: Update, ctx: CallbackContext): if not update.effective_chat or not update.effective_user: return CyberLinaUser.from_tg_obj(self, update.effective_chat, update.effective_user) def goodmorning_handler(self, update: Update, ctx: CallbackContext): if not all([self.first_part, self.second_part, self.third_part, self.emoji]): return update.effective_message.reply_text('Я не настроена :c') seed(os.urandom(128)) self.message_handler(update, ctx) chat = self.chats.get(chat_id=update.effective_chat.id) if chat.last_run and (chat.last_run >= localdate() or chat.last_run + timedelta(1) == localdate() and localtime().hour < 6): humanize.i18n.activate('ru_RU') time_left = make_aware(datetime.combine(chat.last_run + timedelta(1), time(6, 0))) - now() return update.effective_message.reply_text( choice(self.already_ran).format( name=chat.last_choice.name, time=humanize.naturaldelta(time_left) ) ) while True: user = chat.users.order_by('?').first() # type: CyberLinaUser if not user: return update.effective_message.reply_text('Нет известных юзеров в чате') try: member = ctx.bot.get_chat_member(chat.chat_id, user.user_id) CyberLinaUser.from_tg_obj(self, update.effective_chat, member.user) break except BadRequest: user.delete() msg = '{}, {}! {}, {} {}'.format( choice(self.first_part), choice(self.second_part), mention_html(user.user_id, user.name), choice(self.third_part), choice(self.emoji), ) update.effective_chat.send_message(msg, parse_mode='html') chat.last_run = localdate() chat.last_choice = user chat.save() def inline_query_handler(self, update: Update, ctx: CallbackContext): if not self.inline_reactions: return seed(os.urandom(128)) markov_msg = '' for i in range(3): markov_msg += (self.markovify.make_sentence(tries=100) or '') + ' ' results = [ InlineQueryResultArticle( id=uuid4(), title='Не нажимай >_<', input_message_content=InputTextMessageContent(choice(self.inline_reactions)) ), InlineQueryResultArticle( id=uuid4(), title='ФлаБеПроЛейка', input_message_content=InputTextMessageContent(markov_msg) ), ] update.inline_query.answer(results, cache_time=0) def build_dispatcher(self, dispatcher: Dispatcher): dispatcher.add_handler(CommandHandler('goodmorning', self.goodmorning_handler)) dispatcher.add_handler(MessageHandler(Filters.all, self.message_handler)) dispatcher.add_handler(InlineQueryHandler(self.inline_query_handler)) return dispatcher def save(self, force_insert=False, force_update=False, using=None, update_fields=None): if self.mtg_train: self._mtg_corpus += '\n' + self.mtg_train self._mtg_data = markovify.Text(self._mtg_corpus).to_dict() self.mtg_train = None super().save(force_insert, force_update, using, update_fields) class CyberLinaChat(models.Model): config = models.ForeignKey(CyberLinaBotModuleConfig, on_delete=models.CASCADE, related_name='chats') name = models.TextField() chat_id = models.BigIntegerField(db_index=True) last_run = models.DateField(null=True, blank=True) last_choice = models.ForeignKey('CyberLinaUser', on_delete=models.SET_NULL, null=True, blank=True, related_name='+') class Meta: unique_together = 'config', 'chat_id', class CyberLinaUser(models.Model): chat = models.ForeignKey(CyberLinaChat, on_delete=models.CASCADE, related_name='users') user_id = models.BigIntegerField(db_index=True) name = models.TextField() @staticmethod def from_tg_obj(config: CyberLinaBotModuleConfig, chat: Chat, user: User): chat_title = chat.title or user.full_name chat, _ = CyberLinaChat.objects.update_or_create(config=config, chat_id=chat.id, defaults={'name': chat_title}) CyberLinaUser.objects.update_or_create(chat=chat, user_id=user.id, defaults={'name': user.full_name}) class Meta: unique_together = 'chat', 'user_id',