#!/usr/bin/env python3 import logging import os import traceback from datetime import datetime, timedelta from html import escape from queue import Queue, Empty from time import sleep from threading import Thread from typing import Dict, List from uuid import uuid4 import sentry_sdk from telegram.error import Unauthorized, TelegramError from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackQueryHandler from telegram import Message, Update, Bot, InlineKeyboardMarkup, InlineKeyboardButton, User, InputMediaPhoto, \ InputMediaVideo, InputMediaAnimation, InputMediaAudio, InputMediaDocument from config import BOT_TOKEN, SENTRY_DSN, MANAGEMENT_CHAT, DEBUG from db import get_conn, Subscriber, PersistentMapping, commit from morj import draw_morj from send_users_list import send_users_list logging.basicConfig(level=logging.WARNING) queue = Queue() sentry_sdk.init(dsn=SENTRY_DSN) conn = get_conn() MAX_MESSAGE_LENGTH = 4096 MAX_CAPTION_LENGTH = 1024 def _notify_access_request(bot: Bot, user: User): markup = InlineKeyboardMarkup([[InlineKeyboardButton('Добавить', callback_data=f'add {user.id}')]]) bot.send_message(MANAGEMENT_CHAT, f'{escape(user.full_name)} запросил доступ', parse_mode='html', reply_markup=markup) def welcome(bot: Bot, update: Update): if DEBUG: _add_user(bot, update.effective_user.id) update.message.reply_text('Добро пожаловать (debug)') return if update.effective_user.id in conn.root.subscribers: update.message.reply_text('Вы уже являетесь участником ЛОНО') else: update.message.reply_text('Пожалуйста, обратитесь к @lono_contactbot') _notify_access_request(bot, update.message.from_user) def unsubscribe(bot: Bot, update: Update): user = _remove_user(update.message.chat_id) update.message.reply_text('Вы были отписаны от бота. ' 'Обратитесь к @lono_contactbot если вы хотите подписаться снова.') bot.send_message(MANAGEMENT_CHAT, f'{escape(user.name)} отписался') def _add_user(bot, uid): user = conn.root.subscribers[uid] = Subscriber.from_chat(bot.get_chat(uid)) commit() return user def add_user(bot: Bot, update: Update, groups=(), args=()): if update.callback_query: update.callback_query.answer() if groups: if update.callback_query.message.chat.id != MANAGEMENT_CHAT: return uid = groups[0] elif args: uid = args[0] elif update.message and update.message.reply_to_message and update.message.reply_to_message.forward_from: uid = update.message.reply_to_message.forward_from.id else: return bot.send_message(MANAGEMENT_CHAT, 'Укажите ID пользователя или ответьте на его сообщение') try: uid = int(uid) except (ValueError, TypeError): pass try: user = _add_user(bot, uid) if update.callback_query: update.callback_query.message.edit_reply_markup() bot.send_message(MANAGEMENT_CHAT, f'{escape(user.name)} был добавлен', parse_mode='html') bot.send_message(uid, 'Добро пожаловать. Снова.') except TelegramError as e: bot.send_message(MANAGEMENT_CHAT, str(e)) def _remove_user(uid): user = conn.root.subscribers[uid] del conn.root.subscribers[uid] commit() return user def remove_user(bot: Bot, update: Update, groups=(), args=()): if update.callback_query: update.callback_query.answer() if groups: if update.callback_query.message.chat.id != MANAGEMENT_CHAT: return uid = groups[0] elif args: uid = args[0] elif update.message and update.message.reply_to_message and update.message.reply_to_message.forward_from: uid = update.message.reply_to_message.forward_from.id else: return bot.send_message(MANAGEMENT_CHAT, 'Укажите ID пользователя или ответьте на его сообщение') try: uid = int(uid) except (ValueError, TypeError): pass try: user = _remove_user(uid) bot.send_message(MANAGEMENT_CHAT, f'{escape(user.name)} был удален', parse_mode='html') if update.callback_query: update.callback_query.message.edit_reply_markup() except KeyError: bot.send_message(MANAGEMENT_CHAT, f'Пользователь id={uid} не был найден') def users(bot: Bot, update: Update): send_users_list() def msg(bot: Bot, update: Update): queue.put(update.message) def _sign_text(text, m: Message, limit): if not text: text = '' sign = '' if text.startswith('!sign') or text.startswith('/sign'): text = text[5:] sign = f'\n\n— {escape(m.from_user.full_name)}' return text[:limit - len(sign)] + sign def _process_media_group(bot: Bot, messages: List[Message]): if not messages: return m = messages[0] current_chat = m.chat_id users = conn.root.subscribers # type: Dict[int, Subscriber] if current_chat not in users: if DEBUG: _add_user(bot, current_chat) m.reply_text('Добро пожаловать (debug)') else: _notify_access_request(bot, m.from_user) return m.reply_text('Пожалуйста, обратитесь к @lono_contactbot') reply_to_message_internal_id = None if m.reply_to_message and m.reply_to_message.message_id in users[current_chat].messages_forward: reply_to_message_internal_id = users[current_chat].messages_forward[m.reply_to_message.message_id] media_group = [] for message in messages: caption = _sign_text(message.caption_html, message, MAX_CAPTION_LENGTH) if hasattr(message, 'photo') and message.photo: media_group.append(InputMediaPhoto(message.photo[-1].file_id, caption=caption, parse_mode='html')) elif hasattr(message, 'video') and message.video: media_group.append(InputMediaVideo(message.video.file_id, caption=caption, parse_mode='html')) elif hasattr(message, 'animation') and message.animation: media_group.append(InputMediaAnimation(message.animation.file_id, caption=caption, parse_mode='html')) elif hasattr(message, 'document') and message.document: media_group.append(InputMediaDocument(message.document.file_id, caption=caption, parse_mode='html')) elif hasattr(message, 'audio') and message.audio: media_group.append(InputMediaAudio(message.audio.file_id, caption=caption, parse_mode='html')) remove_uids = [] for uid, user in users.items(): sleep(.02) reply_to_message_id = None if reply_to_message_internal_id: reply_to_message_id = user.messages_reverse.get(reply_to_message_internal_id, None) try: sent_messages = bot.send_media_group(uid, media_group, reply_to_message_id=reply_to_message_id) if sent_messages: user.update_from_message(sent_messages[0]) for r in sent_messages: user.messages_forward[r.message_id] = conn.root.counter user.messages_reverse[conn.root.counter] = r.message_id except Unauthorized: remove_uids.append(uid) bot.send_message(MANAGEMENT_CHAT, f'{user.name} был удален ' f'из-за блокировки бота', parse_mode='html') except Exception: traceback.print_exc() sentry_sdk.capture_exception() conn.root.counter += len(messages) commit() for uid in remove_uids: _remove_user(uid) def users_list(bot: Bot, update: Update): current_chat = update.effective_chat.id subs = conn.root.subscribers # type: Dict[int, Subscriber] if current_chat not in subs: return messages = [f'Count: {len(subs)}\n'] for sub in subs.values(): # type: Subscriber msg = f'{sub.uid:>14} ' if sub.uid < 0: msg += str(sub.name) else: msg += f'{sub.name}' messages.append(msg) for i in range(0, len(messages), 40): update.effective_message.reply_text('\n'.join(messages[i:i+40]), parse_mode='html') def morj(bot: Bot, update: Update): text = update.effective_message.text[5:] fname = '/tmp/morj{}.png'.format(uuid4()) draw_morj(text, fname) update.effective_message.reply_document(open(fname, 'rb')) os.unlink(fname) def _process_message(bot: Bot, m: Message): if m.sticker or m.animation: delta = datetime.now() - conn.root.last_media if delta < timedelta(seconds=15): bot.send_message( m.from_user.id, 'Не вайпи, до следующей гифки/стикера осталось {} секунд'.format(15 - int(delta.total_seconds())) ) return conn.root.last_media = datetime.now() commit() current_chat = m.chat_id users = conn.root.subscribers # type: Dict[int, Subscriber] if current_chat not in users: if DEBUG: _add_user(bot, current_chat) m.reply_text('Добро пожаловать (debug)') else: _notify_access_request(bot, m.from_user) return m.reply_text('Пожалуйста, обратитесь к @lono_contactbot') text = _sign_text(m.text_html, m, MAX_MESSAGE_LENGTH) caption = _sign_text(m.caption_html, m, MAX_CAPTION_LENGTH) reply_to_message_internal_id = None if m.reply_to_message and m.reply_to_message.message_id in users[current_chat].messages_forward: reply_to_message_internal_id = users[current_chat].messages_forward[m.reply_to_message.message_id] remove_uids = [] for uid, user in users.items(): sleep(.02) reply_to_message_id = None if reply_to_message_internal_id: reply_to_message_id = user.messages_reverse.get(reply_to_message_internal_id, None) try: r = None if m.forward_date: r = m.forward(uid) elif hasattr(m, 'audio') and m.audio: a = m.audio r = bot.send_audio(uid, a.file_id, a.duration, a.performer, a.title, caption, reply_to_message_id=reply_to_message_id, parse_mode='html') elif hasattr(m, 'document') and m.document: d = m.document r = bot.send_document(uid, d.file_id, d.file_name, caption, reply_to_message_id=reply_to_message_id, parse_mode='html') elif hasattr(m, 'photo') and m.photo: p = m.photo r = bot.send_photo(uid, p[-1].file_id, caption, reply_to_message_id=reply_to_message_id, parse_mode='html') elif hasattr(m, 'sticker') and m.sticker: s = m.sticker r = bot.send_sticker(uid, s.file_id, reply_to_message_id=reply_to_message_id) elif hasattr(m, 'video') and m.video: v = m.video r = bot.send_video(uid, v.file_id, v.duration, caption, reply_to_message_id=reply_to_message_id, parse_mode='html') elif hasattr(m, 'voice') and m.voice: v = m.voice r = bot.send_voice(uid, v.file_id, v.duration, caption, reply_to_message_id=reply_to_message_id, parse_mode='html') elif hasattr(m, 'video_note') and m.video_note: vn = m.video_note r = bot.send_video_note(uid, vn.file_id, vn.duration, vn.length, reply_to_message_id=reply_to_message_id) elif hasattr(m, 'contact') and m.contact: c = m.contact r = bot.send_contact(uid, c.phone_number, c.first_name, c.last_name, reply_to_message_id=reply_to_message_id) elif hasattr(m, 'location') and m.location: l = m.location r = bot.send_location(uid, l.latitude, l.longitude, reply_to_message_id=reply_to_message_id) elif hasattr(m, 'venue') and m.venue: v = m.venue l = v.location r = bot.send_venue(uid, l.latitude, l.longitude, v.title, v.address, v.foursquare_id, reply_to_message_id=reply_to_message_id) elif hasattr(m, 'text') and m.text: r = bot.send_message(uid, text, 'html', reply_to_message_id=reply_to_message_id) if r: user.update_from_message(r) user.messages_forward[r.message_id] = conn.root.counter user.messages_reverse[conn.root.counter] = r.message_id except Unauthorized: remove_uids.append(uid) bot.send_message(MANAGEMENT_CHAT, f'{user.name} был удален ' f'из-за блокировки бота', parse_mode='html') except Exception: traceback.print_exc() sentry_sdk.capture_exception() conn.root.counter += 1 commit() for uid in remove_uids: _remove_user(uid) def task_queue(u: Updater): while True: if not u.running: return try: m = queue.get(timeout=1) # type: Message if m.media_group_id: media_group_id = m.media_group_id group_messages = [m] while queue.qsize() and queue.queue[0].media_group_id == media_group_id: group_messages.append(queue.get(block=False)) _process_media_group(u.bot, group_messages) else: _process_message(u.bot, m) except Empty: pass except: traceback.print_exc() sentry_sdk.capture_exception() if __name__ == '__main__': updater = Updater(BOT_TOKEN, workers=4) updater.dispatcher.add_handler(CommandHandler('start', welcome, Filters.private)) updater.dispatcher.add_handler(CommandHandler('stop', unsubscribe, Filters.private)) updater.dispatcher.add_handler(CommandHandler('add', add_user, Filters.chat(MANAGEMENT_CHAT), pass_args=True)) updater.dispatcher.add_handler(CallbackQueryHandler(add_user, pattern=r'^add (\d+)$', pass_groups=True)) updater.dispatcher.add_handler(CommandHandler('remove', remove_user, Filters.chat(MANAGEMENT_CHAT), pass_args=True)) updater.dispatcher.add_handler(CallbackQueryHandler(remove_user, pattern=r'^remove (\d+)$', pass_groups=True)) updater.dispatcher.add_handler(CommandHandler('users', users_list, Filters.private)) updater.dispatcher.add_handler(CommandHandler('morj', morj)) updater.dispatcher.add_handler(MessageHandler(Filters.private, msg)) updater.start_polling() tq = Thread(target=task_queue, args=(updater,)) tq.start() logging.warning('LONO has started') updater.idle() logging.warning('LONO is stopping...') commit() conn.close()