#!/usr/bin/env python3 import json import logging import os import re import traceback from datetime import datetime, timedelta from hashlib import sha1 from html import escape from queue import Queue, Empty from time import sleep from threading import Thread, Event from typing import Dict, List import sentry_sdk from redis import Redis from telegram.error import Unauthorized, TelegramError from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackQueryHandler, CallbackContext from telegram import Message, Update, Bot, InlineKeyboardMarkup, InlineKeyboardButton, User, InputMediaPhoto, \ InputMediaVideo, InputMediaAnimation, InputMediaAudio, InputMediaDocument, PhotoSize from config import BOT_TOKEN, SENTRY_DSN, MANAGEMENT_CHAT, DEBUG from db import get_conn, Subscriber, commit from send_users_list import send_users_list logging.basicConfig(level=logging.WARNING) queue = Queue() sentry_sdk.init(dsn=SENTRY_DSN) conn = get_conn() redis = Redis() MAX_MESSAGE_LENGTH = 4096 MAX_CAPTION_LENGTH = 1024 def _antispam(args): if not args: return True args = '|'.join(map(str, args)) digest = sha1(args.encode()).digest() key = 'lono-' + digest.hex() if redis.get(key): return False redis.set(key, '1', ex=30) return True def _notify_access_request(bot: Bot, user: User): markup = InlineKeyboardMarkup([[InlineKeyboardButton('Добавить', callback_data=f'add {user.id}')]]) bot.send_message(MANAGEMENT_CHAT, f'{escape(user.full_name)} запросил доступ', parse_mode='html', reply_markup=markup) def welcome(update: Update, ctx: CallbackContext): if DEBUG: _add_user(ctx.bot, update.effective_user.id) update.message.reply_text('Добро пожаловать (debug)') return if update.effective_user.id in conn.root.subscribers: update.message.reply_text('Вы уже являетесь участником ЛОНО') else: update.message.reply_text('Пожалуйста, обратитесь к @lono_contactbot') _notify_access_request(ctx.bot, update.message.from_user) def unsubscribe(update: Update, ctx: CallbackContext): user = _remove_user(update.message.chat_id) update.message.reply_text('Вы были отписаны от бота. ' 'Обратитесь к @lono_contactbot если вы хотите подписаться снова.') ctx.bot.send_message(MANAGEMENT_CHAT, f'{escape(user.name)} отписался') def _add_user(bot, uid): user = conn.root.subscribers[uid] = Subscriber.from_chat(bot.get_chat(uid)) commit() return user def add_user(update: Update, ctx: CallbackContext): if update.callback_query: update.callback_query.answer() if ctx.match: if update.callback_query.message.chat.id != MANAGEMENT_CHAT: return uid = ctx.match.group(1) elif ctx.args: uid = ctx.args[0] elif update.message and update.message.reply_to_message and update.message.reply_to_message.forward_from: uid = update.message.reply_to_message.forward_from.id else: return ctx.bot.send_message(MANAGEMENT_CHAT, 'Укажите ID пользователя или ответьте на его сообщение') try: uid = int(uid) except (ValueError, TypeError): pass try: user = _add_user(ctx.bot, uid) if update.callback_query: update.callback_query.message.edit_reply_markup() ctx.bot.send_message(MANAGEMENT_CHAT, f'{escape(user.name)} ({uid}) был добавлен', parse_mode='html') # ctx.bot.send_message(uid, 'Добро пожаловать. Снова.') except TelegramError as e: ctx.bot.send_message(MANAGEMENT_CHAT, str(e)) def _remove_user(uid): user = conn.root.subscribers[uid] del conn.root.subscribers[uid] commit() return user def remove_user(update: Update, ctx: CallbackContext): if update.callback_query: update.callback_query.answer() if ctx.match: if update.callback_query.message.chat.id != MANAGEMENT_CHAT: return uid = ctx.match.groups(1) elif ctx.args: uid = ctx.args[0] elif update.message and update.message.reply_to_message and update.message.reply_to_message.forward_from: uid = update.message.reply_to_message.forward_from.id else: return ctx.bot.send_message(MANAGEMENT_CHAT, 'Укажите ID пользователя или ответьте на его сообщение') try: uid = int(uid) except (ValueError, TypeError): pass try: user = _remove_user(uid) ctx.bot.send_message(MANAGEMENT_CHAT, f'{escape(user.name)} был удален', parse_mode='html') if update.callback_query: update.callback_query.message.edit_reply_markup() except KeyError: ctx.bot.send_message(MANAGEMENT_CHAT, f'Пользователь id={uid} не был найден') def users(update: Update, ctx: CallbackContext): send_users_list() def msg(update: Update, ctx: CallbackContext): queue.put(update.message) def _sign_text(text, m: Message, limit): if not text: text = '' text = re.sub(r'.*?', '\\1', text) sign = '' if text.startswith('!sign') or text.startswith('/sign'): text = text[5:] sign = f'\n\n— {escape(m.from_user.full_name)}' return text[:limit - len(sign)] + sign def _process_media_group(bot: Bot, messages: List[Message]): if not messages: return m = messages[0] current_chat = m.chat_id users = conn.root.subscribers # type: Dict[int, Subscriber] if current_chat not in users: if DEBUG: _add_user(bot, current_chat) m.reply_text('Добро пожаловать (debug)') else: _notify_access_request(bot, m.from_user) return m.reply_text('Пожалуйста, обратитесь к @lono_contactbot') reply_to_message_internal_id = None if m.reply_to_message and m.reply_to_message.message_id in users[current_chat].messages_forward: reply_to_message_internal_id = users[current_chat].messages_forward[m.reply_to_message.message_id] media_group = [] for message in messages: caption = _sign_text(message.caption_html, message, MAX_CAPTION_LENGTH) if hasattr(message, 'photo') and message.photo: media_group.append(InputMediaPhoto(message.photo[-1].file_id, caption=caption, parse_mode='html')) elif hasattr(message, 'video') and message.video: media_group.append(InputMediaVideo(message.video.file_id, caption=caption, parse_mode='html')) elif hasattr(message, 'animation') and message.animation: media_group.append(InputMediaAnimation(message.animation.file_id, caption=caption, parse_mode='html')) elif hasattr(message, 'document') and message.document: media_group.append(InputMediaDocument(message.document.file_id, caption=caption, parse_mode='html')) elif hasattr(message, 'audio') and message.audio: media_group.append(InputMediaAudio(message.audio.file_id, caption=caption, parse_mode='html')) remove_uids = [] for uid, user in users.items(): sleep(.02) reply_to_message_id = None if reply_to_message_internal_id: reply_to_message_id = user.messages_reverse.get(reply_to_message_internal_id, None) try: sent_messages = bot.send_media_group(uid, media_group, reply_to_message_id=reply_to_message_id) if sent_messages: user.update_from_message(sent_messages[0]) for r in sent_messages: user.messages_forward[r.message_id] = conn.root.counter user.messages_reverse[conn.root.counter] = r.message_id except Unauthorized: remove_uids.append(uid) bot.send_message(MANAGEMENT_CHAT, f'{user.name} был удален ' f'из-за блокировки бота', parse_mode='html') except Exception: traceback.print_exc() sentry_sdk.capture_exception() conn.root.counter += len(messages) commit() for uid in remove_uids: _remove_user(uid) def users_list(update: Update, ctx: CallbackContext): current_chat = update.effective_chat.id subs = conn.root.subscribers # type: Dict[int, Subscriber] if current_chat not in subs: return try: with open('fake_users.json') as f: subs = {data[0]: Subscriber(int(data[0]), data[1]) for data in json.load(f).items()} except: subs = conn.root.subscribers messages = [f'Count: {len(subs)}\n'] for sub in subs.values(): # type: Subscriber msg = f'{sub.uid:>14} ' if sub.uid < 0: msg += str(sub.name) else: msg += f'{sub.name}' messages.append(msg) for i in range(0, len(messages), 40): update.effective_message.reply_text('\n'.join(messages[i:i+40]), parse_mode='html') def _process_message(bot: Bot, m: Message): if m.sticker or m.animation: delta = datetime.now() - conn.root.last_media if delta < timedelta(seconds=15): bot.send_message( m.from_user.id, 'Не вайпи, до следующей гифки/стикера осталось {} секунд'.format(15 - int(delta.total_seconds())) ) return conn.root.last_media = datetime.now() commit() current_chat = m.chat_id users = conn.root.subscribers # type: Dict[int, Subscriber] if current_chat not in users: if DEBUG: _add_user(bot, current_chat) m.reply_text('Добро пожаловать (debug)') else: _notify_access_request(bot, m.from_user) return m.reply_text('Пожалуйста, GTFO') # if m.text and len(m.text) > 140 or m.caption and len(m.caption) > 140: # return m.reply_text('Сообщение не может содержать более 140 символов') text = _sign_text(m.text_html, m, MAX_MESSAGE_LENGTH) caption = _sign_text(m.caption_html, m, MAX_CAPTION_LENGTH) reply_to_message_internal_id = None if m.reply_to_message and m.reply_to_message.message_id in users[current_chat].messages_forward: reply_to_message_internal_id = users[current_chat].messages_forward[m.reply_to_message.message_id] func = None args = [] hash_args = None kwargs = {} if m.forward_date: func = m.forward elif hasattr(m, 'audio') and m.audio: a = m.audio func = bot.send_audio args = [a.file_id, a.duration, a.performer, a.title, caption] kwargs = dict(parse_mode='html') elif hasattr(m, 'document') and m.document: d = m.document func = bot.send_document args = [d.file_id, d.file_name, caption] kwargs = dict(parse_mode='html') elif hasattr(m, 'photo') and m.photo: p = m.photo func = bot.send_photo args = [p[-1].file_id, caption] hash_args = [p[-1].file_unique_id, caption] kwargs = dict(parse_mode='html') elif hasattr(m, 'sticker') and m.sticker: s = m.sticker func = bot.send_sticker args = [s.file_id] elif hasattr(m, 'video') and m.video: v = m.video func = bot.send_video args = [v.file_id, v.duration, caption] kwargs = dict(parse_mode='html') elif hasattr(m, 'voice') and m.voice: v = m.voice func = bot.send_voice args = [v.file_id, v.duration, caption] kwargs = dict(parse_mode='html') elif hasattr(m, 'video_note') and m.video_note: vn = m.video_note func = bot.send_video_note args = [vn.file_id, vn.duration, vn.length] elif hasattr(m, 'contact') and m.contact: c = m.contact func = bot.send_contact args = [c.phone_number, c.first_name, c.last_name] elif hasattr(m, 'location') and m.location: l = m.location func = bot.send_location args = [l.latitude, l.longitude] elif hasattr(m, 'venue') and m.venue: v = m.venue l = v.location func = bot.send_venue args = [l.latitude, l.longitude, v.title, v.address, v.foursquare_id] elif hasattr(m, 'text') and m.text: func = bot.send_message args = [text, 'html'] if not _antispam(hash_args or args): return m.reply_text('Не вайпи', quote=True) remove_uids = [] if func: for uid, user in users.items(): sleep(.02) reply_to_message_id = None if reply_to_message_internal_id: reply_to_message_id = user.messages_reverse.get(reply_to_message_internal_id, None) try: if func != m.forward: kwargs['reply_to_message_id'] = reply_to_message_id r = func(*([uid] + args), **kwargs) if r: user.update_from_message(r) user.messages_forward[r.message_id] = conn.root.counter user.messages_reverse[conn.root.counter] = r.message_id except Unauthorized: remove_uids.append(uid) bot.send_message(MANAGEMENT_CHAT, f'{user.name} был удален ' f'из-за блокировки бота', parse_mode='html') except Exception: traceback.print_exc() sentry_sdk.capture_exception() conn.root.counter += 1 commit() for uid in remove_uids: _remove_user(uid) def task_queue(u: Updater, stop_signal: Event): while True: if not u.running or stop_signal.is_set(): return try: m = queue.get(timeout=1) # type: Message if m.media_group_id: media_group_id = m.media_group_id group_messages = [m] while queue.qsize() and queue.queue[0].media_group_id == media_group_id: group_messages.append(queue.get(block=False)) _process_media_group(u.bot, group_messages) else: _process_message(u.bot, m) except Empty: pass except: traceback.print_exc() sentry_sdk.capture_exception() def main(): updater = Updater(BOT_TOKEN, workers=4, use_context=True) updater.dispatcher.add_handler(CommandHandler('start', welcome, Filters.private)) updater.dispatcher.add_handler(CommandHandler('stop', unsubscribe, Filters.private)) updater.dispatcher.add_handler(CommandHandler('add', add_user, Filters.chat(MANAGEMENT_CHAT), pass_args=True)) updater.dispatcher.add_handler(CallbackQueryHandler(add_user, pattern=r'^add (\d+)$')) updater.dispatcher.add_handler(CommandHandler('remove', remove_user, Filters.chat(MANAGEMENT_CHAT), pass_args=True)) updater.dispatcher.add_handler(CallbackQueryHandler(remove_user, pattern=r'^remove (\d+)$')) updater.dispatcher.add_handler(CommandHandler('users', users_list, Filters.private)) updater.dispatcher.add_handler(MessageHandler(Filters.private, msg)) updater.start_polling() stop_signal = Event() tq = Thread(target=task_queue, args=(updater, stop_signal)) tq.start() logging.warning('LONO has started') updater.idle() stop_signal.set() logging.warning('LONO is stopping...') commit() conn.close() if __name__ == '__main__': main()