156 lines
6.4 KiB
Python
156 lines
6.4 KiB
Python
import os
|
||
from datetime import timedelta, datetime, time
|
||
from random import choice, seed
|
||
from uuid import uuid4
|
||
|
||
import humanize
|
||
import markovify
|
||
from django.db import models
|
||
from django.utils.timezone import localdate, now, make_aware, localtime
|
||
from jsoneditor.forms import JSONEditor
|
||
from jsonfield import JSONField
|
||
from telegram import Update, Chat, User, InlineQueryResultArticle, InputTextMessageContent
|
||
from telegram.error import BadRequest
|
||
from telegram.ext import Dispatcher, CallbackContext, MessageHandler, Filters, CommandHandler, InlineQueryHandler
|
||
from telegram.utils.helpers import mention_html
|
||
|
||
from bots.models import TelegramBotModuleConfig
|
||
|
||
|
||
class CyberLinaBotModuleConfig(TelegramBotModuleConfig):
|
||
first_part = JSONField(default='[]')
|
||
second_part = JSONField(default='[]')
|
||
third_part = JSONField(default='[]')
|
||
emoji = JSONField(default='[]')
|
||
already_ran = JSONField(default='[]')
|
||
welcome_reactions = JSONField(default='[]')
|
||
inline_reactions = JSONField(default='[]')
|
||
_mtg_data = JSONField(default={}, blank=True, null=True)
|
||
_mtg_corpus = models.TextField(null=True, blank=True)
|
||
mtg_train = models.TextField(null=True, blank=True)
|
||
|
||
MODULE_NAME = 'Киберлиночка'
|
||
CUSTOM_WIDGETS = {
|
||
'first_part': JSONEditor(),
|
||
'second_part': JSONEditor(),
|
||
'third_part': JSONEditor(),
|
||
'emoji': JSONEditor(),
|
||
'already_ran': JSONEditor(),
|
||
'welcome_reactions': JSONEditor(),
|
||
'inline_reactions': JSONEditor(),
|
||
}
|
||
EXCLUDE_FIELDS = '_mtg_data', '_mtg_corpus',
|
||
|
||
def __init__(self, *args, **kwargs):
|
||
super().__init__(*args, **kwargs)
|
||
try:
|
||
self.markovify = markovify.Text.from_dict(self._mtg_data)
|
||
except:
|
||
pass
|
||
|
||
def message_handler(self, update: Update, ctx: CallbackContext):
|
||
if not update.effective_chat or not update.effective_user:
|
||
return
|
||
CyberLinaUser.from_tg_obj(self, update.effective_chat, update.effective_user)
|
||
|
||
def goodmorning_handler(self, update: Update, ctx: CallbackContext):
|
||
if not all([self.first_part, self.second_part,
|
||
self.third_part, self.emoji]):
|
||
return update.effective_message.reply_text('Я не настроена :c')
|
||
seed(os.urandom(128))
|
||
self.message_handler(update, ctx)
|
||
chat = self.chats.get(chat_id=update.effective_chat.id)
|
||
if chat.last_run and (chat.last_run >= localdate() or
|
||
chat.last_run + timedelta(1) == localdate() and localtime().hour < 6):
|
||
humanize.i18n.activate('ru_RU')
|
||
time_left = make_aware(datetime.combine(chat.last_run + timedelta(1), time(6, 0))) - now()
|
||
return update.effective_message.reply_text(
|
||
choice(self.already_ran).format(
|
||
name=chat.last_choice.name,
|
||
time=humanize.naturaldelta(time_left)
|
||
)
|
||
)
|
||
while True:
|
||
user = chat.users.order_by('?').first() # type: CyberLinaUser
|
||
if not user:
|
||
return update.effective_message.reply_text('Нет известных юзеров в чате')
|
||
try:
|
||
member = ctx.bot.get_chat_member(chat.chat_id, user.user_id)
|
||
CyberLinaUser.from_tg_obj(self, update.effective_chat, member.user)
|
||
break
|
||
except BadRequest:
|
||
user.delete()
|
||
msg = '{}, {}! {}, {} {}'.format(
|
||
choice(self.first_part),
|
||
choice(self.second_part),
|
||
mention_html(user.user_id, user.name),
|
||
choice(self.third_part),
|
||
choice(self.emoji),
|
||
)
|
||
update.effective_chat.send_message(msg, parse_mode='html')
|
||
chat.last_run = localdate()
|
||
chat.last_choice = user
|
||
chat.save()
|
||
|
||
def inline_query_handler(self, update: Update, ctx: CallbackContext):
|
||
if not self.inline_reactions:
|
||
return
|
||
seed(os.urandom(128))
|
||
markov_msg = ''
|
||
for i in range(3):
|
||
markov_msg += (self.markovify.make_sentence(tries=100) or '') + ' '
|
||
results = [
|
||
InlineQueryResultArticle(
|
||
id=uuid4(),
|
||
title='Не нажимай >_<',
|
||
input_message_content=InputTextMessageContent(choice(self.inline_reactions))
|
||
),
|
||
InlineQueryResultArticle(
|
||
id=uuid4(),
|
||
title='ФлаБеПроЛейка',
|
||
input_message_content=InputTextMessageContent(markov_msg)
|
||
),
|
||
]
|
||
update.inline_query.answer(results, cache_time=0)
|
||
|
||
def build_dispatcher(self, dispatcher: Dispatcher):
|
||
dispatcher.add_handler(CommandHandler('goodmorning', self.goodmorning_handler))
|
||
dispatcher.add_handler(MessageHandler(Filters.all, self.message_handler))
|
||
dispatcher.add_handler(InlineQueryHandler(self.inline_query_handler))
|
||
return dispatcher
|
||
|
||
def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
|
||
if self.mtg_train:
|
||
self._mtg_corpus += '\n' + self.mtg_train
|
||
self._mtg_data = markovify.Text(self._mtg_corpus).to_dict()
|
||
self.mtg_train = None
|
||
super().save(force_insert, force_update, using, update_fields)
|
||
|
||
|
||
class CyberLinaChat(models.Model):
|
||
config = models.ForeignKey(CyberLinaBotModuleConfig, on_delete=models.CASCADE, related_name='chats')
|
||
name = models.TextField()
|
||
chat_id = models.BigIntegerField(db_index=True)
|
||
last_run = models.DateField(null=True, blank=True)
|
||
last_choice = models.ForeignKey('CyberLinaUser', on_delete=models.SET_NULL, null=True, blank=True, related_name='+')
|
||
|
||
class Meta:
|
||
unique_together = 'config', 'chat_id',
|
||
|
||
|
||
class CyberLinaUser(models.Model):
|
||
chat = models.ForeignKey(CyberLinaChat, on_delete=models.CASCADE, related_name='users')
|
||
user_id = models.BigIntegerField(db_index=True)
|
||
name = models.TextField()
|
||
|
||
@staticmethod
|
||
def from_tg_obj(config: CyberLinaBotModuleConfig, chat: Chat, user: User):
|
||
chat_title = chat.title or user.full_name
|
||
chat, _ = CyberLinaChat.objects.update_or_create(config=config, chat_id=chat.id,
|
||
defaults={'name': chat_title})
|
||
CyberLinaUser.objects.update_or_create(chat=chat, user_id=user.id,
|
||
defaults={'name': user.full_name})
|
||
|
||
class Meta:
|
||
unique_together = 'chat', 'user_id',
|