#!/usr/bin/env python3 import logging import traceback from html import escape from queue import Queue, Empty from time import sleep from threading import Thread from typing import Dict, List import sentry_sdk from telegram.error import Unauthorized, TelegramError from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackQueryHandler from telegram import Message, Update, Bot, InlineKeyboardMarkup, InlineKeyboardButton, User, InputMediaPhoto, \ InputMediaVideo, InputMediaAnimation, InputMediaAudio, InputMediaDocument from config import BOT_TOKEN, SENTRY_DSN, MANAGEMENT_CHAT, DEBUG from db import get_conn, Subscriber, PersistentMapping, commit from send_users_list import send_users_list logging.basicConfig(level=logging.WARNING) queue = Queue() sentry_sdk.init(dsn=SENTRY_DSN) conn = get_conn() MAX_MESSAGE_LENGTH = 4096 MAX_CAPTION_LENGTH = 1024 def _notify_access_request(bot: Bot, user: User): markup = InlineKeyboardMarkup([[InlineKeyboardButton('Добавить', callback_data=f'add {user.id}')]]) bot.send_message(MANAGEMENT_CHAT, f'{escape(user.full_name)} запросил доступ', parse_mode='html', reply_markup=markup) def welcome(bot: Bot, update: Update): if DEBUG: _add_user(bot, update.effective_user.id) update.message.reply_text('Добро пожаловать (debug)') return if update.effective_user.id in conn.root.subscribers: update.message.reply_text('Вы уже являетесь участником ЛОНО') else: update.message.reply_text('Пожалуйста, обратитесь к @lono_contactbot') _notify_access_request(bot, update.message.from_user) def unsubscribe(bot: Bot, update: Update): user = _remove_user(update.message.chat_id) update.message.reply_text('Вы были отписаны от бота. ' 'Обратитесь к @lono_contactbot если вы хотите подписаться снова.') bot.send_message(MANAGEMENT_CHAT, f'{escape(user.name)} отписался') def _add_user(bot, uid): user = conn.root.subscribers[uid] = Subscriber.from_chat(bot.get_chat(uid)) commit() return user def add_user(bot: Bot, update: Update, groups=(), args=()): if update.callback_query: update.callback_query.answer() if groups: if update.callback_query.message.chat.id != MANAGEMENT_CHAT: return uid = groups[0] elif args: uid = args[0] elif update.message and update.message.reply_to_message and update.message.reply_to_message.forward_from: uid = update.message.reply_to_message.forward_from.id else: return bot.send_message(MANAGEMENT_CHAT, 'Укажите ID пользователя или ответьте на его сообщение') try: uid = int(uid) except (ValueError, TypeError): pass try: user = _add_user(bot, uid) if update.callback_query: update.callback_query.message.edit_reply_markup() bot.send_message(MANAGEMENT_CHAT, f'{escape(user.name)} был добавлен', parse_mode='html') bot.send_message(uid, 'Добро пожаловать. Снова.') except TelegramError as e: bot.send_message(MANAGEMENT_CHAT, str(e)) def _remove_user(uid): user = conn.root.subscribers[uid] del conn.root.subscribers[uid] commit() return user def remove_user(bot: Bot, update: Update, groups=(), args=()): if update.callback_query: update.callback_query.answer() if groups: if update.callback_query.message.chat.id != MANAGEMENT_CHAT: return uid = groups[0] elif args: uid = args[0] elif update.message and update.message.reply_to_message and update.message.reply_to_message.forward_from: uid = update.message.reply_to_message.forward_from.id else: return bot.send_message(MANAGEMENT_CHAT, 'Укажите ID пользователя или ответьте на его сообщение') try: uid = int(uid) except (ValueError, TypeError): pass try: user = _remove_user(uid) bot.send_message(MANAGEMENT_CHAT, f'{escape(user.name)} был удален', parse_mode='html') if update.callback_query: update.callback_query.message.edit_reply_markup() except KeyError: bot.send_message(MANAGEMENT_CHAT, f'Пользователь id={uid} не был найден') def users(bot: Bot, update: Update): send_users_list() def msg(bot: Bot, update: Update): queue.put(update.message) def _sign_text(text, m: Message, limit): if not text: text = '' sign = '' if text.startswith('!sign') or text.startswith('/sign'): text = text[5:] sign = f'\n\n— {escape(m.from_user.full_name)}' return text[:limit - len(sign)] + sign def _process_media_group(bot: Bot, messages: List[Message]): if not messages: return m = messages[0] current_chat = m.chat_id users = conn.root.subscribers # type: Dict[int, Subscriber] if current_chat not in users: if DEBUG: _add_user(bot, current_chat) m.reply_text('Добро пожаловать (debug)') else: _notify_access_request(bot, m.from_user) return m.reply_text('Пожалуйста, обратитесь к @lono_contactbot') reply_to_message_internal_id = None if m.reply_to_message and m.reply_to_message.message_id in users[current_chat].messages_forward: reply_to_message_internal_id = users[current_chat].messages_forward[m.reply_to_message.message_id] media_group = [] for message in messages: caption = _sign_text(message.caption_html, message, MAX_CAPTION_LENGTH) if hasattr(message, 'photo') and message.photo: media_group.append(InputMediaPhoto(message.photo[-1].file_id, caption=caption, parse_mode='html')) elif hasattr(message, 'video') and message.video: media_group.append(InputMediaVideo(message.video.file_id, caption=caption, parse_mode='html')) elif hasattr(message, 'animation') and message.animation: media_group.append(InputMediaAnimation(message.animation.file_id, caption=caption, parse_mode='html')) elif hasattr(message, 'document') and message.document: media_group.append(InputMediaDocument(message.document.file_id, caption=caption, parse_mode='html')) elif hasattr(message, 'audio') and message.audio: media_group.append(InputMediaAudio(message.audio.file_id, caption=caption, parse_mode='html')) remove_uids = [] for uid, user in users.items(): sleep(.02) reply_to_message_id = None if reply_to_message_internal_id: reply_to_message_id = user.messages_reverse.get(reply_to_message_internal_id, None) try: sent_messages = bot.send_media_group(uid, media_group, reply_to_message_id=reply_to_message_id) if sent_messages: user.update_from_message(sent_messages[0]) for r in sent_messages: user.messages_forward[r.message_id] = conn.root.counter user.messages_reverse[conn.root.counter] = r.message_id except Unauthorized: remove_uids.append(uid) bot.send_message(MANAGEMENT_CHAT, f'{user.name} был удален ' f'из-за блокировки бота', parse_mode='html') except Exception: traceback.print_exc() sentry_sdk.capture_exception() conn.root.counter += len(messages) commit() for uid in remove_uids: _remove_user(uid) def users_list(bot: Bot, update: Update): current_chat = update.effective_chat.id subs = conn.root.subscribers # type: Dict[int, Subscriber] if current_chat not in subs: return messages = [f'Count: {len(subs)}\n'] for sub in subs.values(): # type: Subscriber msg = f'{sub.uid:>14} ' if sub.uid < 0: msg += str(sub.name) else: msg += f'{sub.name}' messages.append(msg) for i in range(0, len(messages), 40): update.effective_message.reply_text('\n'.join(messages[i:i+40]), parse_mode='html') def _process_message(bot: Bot, m: Message): current_chat = m.chat_id users = conn.root.subscribers # type: Dict[int, Subscriber] if current_chat not in users: if DEBUG: _add_user(bot, current_chat) m.reply_text('Добро пожаловать (debug)') else: _notify_access_request(bot, m.from_user) return m.reply_text('Пожалуйста, обратитесь к @lono_contactbot') text = _sign_text(m.text_html, m, MAX_MESSAGE_LENGTH) caption = _sign_text(m.caption_html, m, MAX_CAPTION_LENGTH) reply_to_message_internal_id = None if m.reply_to_message and m.reply_to_message.message_id in users[current_chat].messages_forward: reply_to_message_internal_id = users[current_chat].messages_forward[m.reply_to_message.message_id] remove_uids = [] for uid, user in users.items(): sleep(.02) reply_to_message_id = None if reply_to_message_internal_id: reply_to_message_id = user.messages_reverse.get(reply_to_message_internal_id, None) try: r = None if m.forward_date: r = m.forward(uid) elif hasattr(m, 'audio') and m.audio: a = m.audio r = bot.send_audio(uid, a.file_id, a.duration, a.performer, a.title, caption, reply_to_message_id=reply_to_message_id, parse_mode='html') elif hasattr(m, 'document') and m.document: d = m.document if d.mime_type == 'video/mp4': d.file_id = 'CgADAgAD-wIAAsWRKUpcsMl_cdwp3wI' r = bot.send_document(uid, d.file_id, d.file_name, caption, reply_to_message_id=reply_to_message_id, parse_mode='html') elif hasattr(m, 'photo') and m.photo: p = m.photo r = bot.send_photo(uid, p[-1].file_id, caption, reply_to_message_id=reply_to_message_id, parse_mode='html') elif hasattr(m, 'sticker') and m.sticker: s = m.sticker r = bot.send_sticker(uid, s.file_id, reply_to_message_id=reply_to_message_id) elif hasattr(m, 'video') and m.video: v = m.video r = bot.send_video(uid, v.file_id, v.duration, caption, reply_to_message_id=reply_to_message_id, parse_mode='html') elif hasattr(m, 'voice') and m.voice: v = m.voice r = bot.send_voice(uid, v.file_id, v.duration, caption, reply_to_message_id=reply_to_message_id, parse_mode='html') elif hasattr(m, 'video_note') and m.video_note: vn = m.video_note r = bot.send_video_note(uid, vn.file_id, vn.duration, vn.length, reply_to_message_id=reply_to_message_id) elif hasattr(m, 'contact') and m.contact: c = m.contact r = bot.send_contact(uid, c.phone_number, c.first_name, c.last_name, reply_to_message_id=reply_to_message_id) elif hasattr(m, 'location') and m.location: l = m.location r = bot.send_location(uid, l.latitude, l.longitude, reply_to_message_id=reply_to_message_id) elif hasattr(m, 'venue') and m.venue: v = m.venue l = v.location r = bot.send_venue(uid, l.latitude, l.longitude, v.title, v.address, v.foursquare_id, reply_to_message_id=reply_to_message_id) elif hasattr(m, 'text') and m.text: r = bot.send_message(uid, text, 'html', reply_to_message_id=reply_to_message_id) if r: user.update_from_message(r) user.messages_forward[r.message_id] = conn.root.counter user.messages_reverse[conn.root.counter] = r.message_id except Unauthorized: remove_uids.append(uid) bot.send_message(MANAGEMENT_CHAT, f'{user.name} был удален ' f'из-за блокировки бота', parse_mode='html') except Exception: traceback.print_exc() sentry_sdk.capture_exception() conn.root.counter += 1 commit() for uid in remove_uids: _remove_user(uid) def task_queue(u: Updater): while True: if not u.running: return try: m = queue.get(timeout=1) # type: Message if m.media_group_id: media_group_id = m.media_group_id group_messages = [m] while queue.qsize() and queue.queue[0].media_group_id == media_group_id: group_messages.append(queue.get(block=False)) _process_media_group(u.bot, group_messages) else: _process_message(u.bot, m) except Empty: pass except: traceback.print_exc() sentry_sdk.capture_exception() if __name__ == '__main__': updater = Updater(BOT_TOKEN, workers=4) updater.dispatcher.add_handler(CommandHandler('start', welcome, Filters.private)) updater.dispatcher.add_handler(CommandHandler('stop', unsubscribe, Filters.private)) updater.dispatcher.add_handler(CommandHandler('add', add_user, Filters.chat(MANAGEMENT_CHAT), pass_args=True)) updater.dispatcher.add_handler(CallbackQueryHandler(add_user, pattern=r'^add (\d+)$', pass_groups=True)) updater.dispatcher.add_handler(CommandHandler('remove', remove_user, Filters.chat(MANAGEMENT_CHAT), pass_args=True)) updater.dispatcher.add_handler(CallbackQueryHandler(remove_user, pattern=r'^remove (\d+)$', pass_groups=True)) updater.dispatcher.add_handler(CommandHandler('users', users_list, Filters.private)) updater.dispatcher.add_handler(MessageHandler(Filters.private, msg)) updater.start_polling() tq = Thread(target=task_queue, args=(updater,)) tq.start() logging.warning('LONO has started') updater.idle() logging.warning('LONO is stopping...') commit() conn.close()