#!/usr/bin/env python3
import logging
import os
import re
import traceback
from datetime import datetime, timedelta
from hashlib import sha1
from html import escape
from queue import Queue, Empty
from time import sleep
from threading import Thread, Event
from typing import Dict, List
import sentry_sdk
from redis import Redis
from telegram.error import Unauthorized, TelegramError
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackQueryHandler, CallbackContext
from telegram import Message, Update, Bot, InlineKeyboardMarkup, InlineKeyboardButton, User, InputMediaPhoto, \
InputMediaVideo, InputMediaAnimation, InputMediaAudio, InputMediaDocument, PhotoSize
from config import BOT_TOKEN, SENTRY_DSN, MANAGEMENT_CHAT, DEBUG
from db import get_conn, Subscriber, commit
from send_users_list import send_users_list
logging.basicConfig(level=logging.WARNING)
queue = Queue()
sentry_sdk.init(dsn=SENTRY_DSN)
conn = get_conn()
redis = Redis()
MAX_MESSAGE_LENGTH = 4096
MAX_CAPTION_LENGTH = 1024
def _antispam(args):
if not args:
return True
args = '|'.join(map(str, args))
digest = sha1(args.encode()).digest()
key = 'lono-' + digest.hex()
if redis.get(key):
return False
redis.set(key, '1', ex=30)
return True
def _notify_access_request(bot: Bot, user: User):
markup = InlineKeyboardMarkup([[InlineKeyboardButton('Добавить', callback_data=f'add {user.id}')]])
bot.send_message(MANAGEMENT_CHAT, f'{escape(user.full_name)} запросил доступ',
parse_mode='html', reply_markup=markup)
def welcome(update: Update, ctx: CallbackContext):
if DEBUG:
_add_user(ctx.bot, update.effective_user.id)
update.message.reply_text('Добро пожаловать (debug)')
return
if update.effective_user.id in conn.root.subscribers:
update.message.reply_text('Вы уже являетесь участником ЛОНО')
else:
update.message.reply_text('Пожалуйста, обратитесь к @lono_contactbot')
_notify_access_request(ctx.bot, update.message.from_user)
def unsubscribe(update: Update, ctx: CallbackContext):
user = _remove_user(update.message.chat_id)
update.message.reply_text('Вы были отписаны от бота. '
'Обратитесь к @lono_contactbot если вы хотите подписаться снова.')
ctx.bot.send_message(MANAGEMENT_CHAT, f'{escape(user.name)} отписался')
def _add_user(bot, uid):
user = conn.root.subscribers[uid] = Subscriber.from_chat(bot.get_chat(uid))
commit()
return user
def add_user(update: Update, ctx: CallbackContext):
if update.callback_query:
update.callback_query.answer()
if ctx.match:
if update.callback_query.message.chat.id != MANAGEMENT_CHAT:
return
uid = ctx.match.group(1)
elif ctx.args:
uid = ctx.args[0]
elif update.message and update.message.reply_to_message and update.message.reply_to_message.forward_from:
uid = update.message.reply_to_message.forward_from.id
else:
return ctx.bot.send_message(MANAGEMENT_CHAT, 'Укажите ID пользователя или ответьте на его сообщение')
try:
uid = int(uid)
except (ValueError, TypeError):
pass
try:
user = _add_user(ctx.bot, uid)
if update.callback_query:
update.callback_query.message.edit_reply_markup()
ctx.bot.send_message(MANAGEMENT_CHAT, f'{escape(user.name)} был добавлен',
parse_mode='html')
ctx.bot.send_message(uid, 'Добро пожаловать. Снова.')
except TelegramError as e:
ctx.bot.send_message(MANAGEMENT_CHAT, str(e))
def _remove_user(uid):
user = conn.root.subscribers[uid]
del conn.root.subscribers[uid]
commit()
return user
def remove_user(update: Update, ctx: CallbackContext):
if update.callback_query:
update.callback_query.answer()
if ctx.match:
if update.callback_query.message.chat.id != MANAGEMENT_CHAT:
return
uid = ctx.match.groups(1)
elif ctx.args:
uid = ctx.args[0]
elif update.message and update.message.reply_to_message and update.message.reply_to_message.forward_from:
uid = update.message.reply_to_message.forward_from.id
else:
return ctx.bot.send_message(MANAGEMENT_CHAT, 'Укажите ID пользователя или ответьте на его сообщение')
try:
uid = int(uid)
except (ValueError, TypeError):
pass
try:
user = _remove_user(uid)
ctx.bot.send_message(MANAGEMENT_CHAT, f'{escape(user.name)} был удален',
parse_mode='html')
if update.callback_query:
update.callback_query.message.edit_reply_markup()
except KeyError:
ctx.bot.send_message(MANAGEMENT_CHAT, f'Пользователь id={uid} не был найден')
def users(update: Update, ctx: CallbackContext):
send_users_list()
def msg(update: Update, ctx: CallbackContext):
queue.put(update.message)
def _sign_text(text, m: Message, limit):
if not text:
text = ''
text = re.sub(r'.*?', '\\1', text)
sign = ''
if text.startswith('!sign') or text.startswith('/sign'):
text = text[5:]
sign = f'\n\n— {escape(m.from_user.full_name)}'
return text[:limit - len(sign)] + sign
def _process_media_group(bot: Bot, messages: List[Message]):
if not messages:
return
m = messages[0]
current_chat = m.chat_id
users = conn.root.subscribers # type: Dict[int, Subscriber]
if current_chat not in users:
if DEBUG:
_add_user(bot, current_chat)
m.reply_text('Добро пожаловать (debug)')
else:
_notify_access_request(bot, m.from_user)
return m.reply_text('Пожалуйста, обратитесь к @lono_contactbot')
reply_to_message_internal_id = None
if m.reply_to_message and m.reply_to_message.message_id in users[current_chat].messages_forward:
reply_to_message_internal_id = users[current_chat].messages_forward[m.reply_to_message.message_id]
media_group = []
for message in messages:
caption = _sign_text(message.caption_html, message, MAX_CAPTION_LENGTH)
if hasattr(message, 'photo') and message.photo:
media_group.append(InputMediaPhoto(message.photo[-1].file_id, caption=caption, parse_mode='html'))
elif hasattr(message, 'video') and message.video:
media_group.append(InputMediaVideo(message.video.file_id, caption=caption, parse_mode='html'))
elif hasattr(message, 'animation') and message.animation:
media_group.append(InputMediaAnimation(message.animation.file_id, caption=caption, parse_mode='html'))
elif hasattr(message, 'document') and message.document:
media_group.append(InputMediaDocument(message.document.file_id, caption=caption, parse_mode='html'))
elif hasattr(message, 'audio') and message.audio:
media_group.append(InputMediaAudio(message.audio.file_id, caption=caption, parse_mode='html'))
remove_uids = []
for uid, user in users.items():
sleep(.02)
reply_to_message_id = None
if reply_to_message_internal_id:
reply_to_message_id = user.messages_reverse.get(reply_to_message_internal_id, None)
try:
sent_messages = bot.send_media_group(uid, media_group, reply_to_message_id=reply_to_message_id)
if sent_messages:
user.update_from_message(sent_messages[0])
for r in sent_messages:
user.messages_forward[r.message_id] = conn.root.counter
user.messages_reverse[conn.root.counter] = r.message_id
except Unauthorized:
remove_uids.append(uid)
bot.send_message(MANAGEMENT_CHAT, f'{user.name} был удален '
f'из-за блокировки бота', parse_mode='html')
except Exception:
traceback.print_exc()
sentry_sdk.capture_exception()
conn.root.counter += len(messages)
commit()
for uid in remove_uids:
_remove_user(uid)
def users_list(update: Update, ctx: CallbackContext):
current_chat = update.effective_chat.id
subs = conn.root.subscribers # type: Dict[int, Subscriber]
if current_chat not in subs:
return
messages = [f'Count: {len(subs)}\n']
for sub in subs.values(): # type: Subscriber
msg = f'{sub.uid:>14}
'
if sub.uid < 0:
msg += str(sub.name)
else:
msg += f'{sub.name}'
messages.append(msg)
for i in range(0, len(messages), 40):
update.effective_message.reply_text('\n'.join(messages[i:i+40]), parse_mode='html')
def _process_message(bot: Bot, m: Message):
if m.sticker or m.animation:
delta = datetime.now() - conn.root.last_media
if delta < timedelta(seconds=15):
bot.send_message(
m.from_user.id,
'Не вайпи, до следующей гифки/стикера осталось {} секунд'.format(15 - int(delta.total_seconds()))
)
return
conn.root.last_media = datetime.now()
commit()
current_chat = m.chat_id
users = conn.root.subscribers # type: Dict[int, Subscriber]
if current_chat not in users:
if DEBUG:
_add_user(bot, current_chat)
m.reply_text('Добро пожаловать (debug)')
else:
_notify_access_request(bot, m.from_user)
return m.reply_text('Пожалуйста, обратитесь к @lono_contactbot')
text = _sign_text(m.text_html, m, MAX_MESSAGE_LENGTH)
caption = _sign_text(m.caption_html, m, MAX_CAPTION_LENGTH)
reply_to_message_internal_id = None
if m.reply_to_message and m.reply_to_message.message_id in users[current_chat].messages_forward:
reply_to_message_internal_id = users[current_chat].messages_forward[m.reply_to_message.message_id]
func = None
args = []
hash_args = None
kwargs = {}
if m.forward_date:
func = m.forward
elif hasattr(m, 'audio') and m.audio:
a = m.audio
func = bot.send_audio
args = [a.file_id, a.duration, a.performer, a.title, caption]
kwargs = dict(parse_mode='html')
elif hasattr(m, 'document') and m.document:
d = m.document
func = bot.send_document
args = [d.file_id, d.file_name, caption]
kwargs = dict(parse_mode='html')
elif hasattr(m, 'photo') and m.photo:
p = m.photo
func = bot.send_photo
args = [p[-1].file_id, caption]
hash_args = [p[-1].file_unique_id, caption]
kwargs = dict(parse_mode='html')
elif hasattr(m, 'sticker') and m.sticker:
s = m.sticker
func = bot.send_sticker
args = [s.file_id]
elif hasattr(m, 'video') and m.video:
v = m.video
func = bot.send_video
args = [v.file_id, v.duration, caption]
kwargs = dict(parse_mode='html')
elif hasattr(m, 'voice') and m.voice:
v = m.voice
func = bot.send_voice
args = [v.file_id, v.duration, caption]
kwargs = dict(parse_mode='html')
elif hasattr(m, 'video_note') and m.video_note:
vn = m.video_note
func = bot.send_video_note
args = [vn.file_id, vn.duration, vn.length]
elif hasattr(m, 'contact') and m.contact:
c = m.contact
func = bot.send_contact
args = [c.phone_number, c.first_name, c.last_name]
elif hasattr(m, 'location') and m.location:
l = m.location
func = bot.send_location
args = [l.latitude, l.longitude]
elif hasattr(m, 'venue') and m.venue:
v = m.venue
l = v.location
func = bot.send_venue
args = [l.latitude, l.longitude, v.title, v.address, v.foursquare_id]
elif hasattr(m, 'text') and m.text:
func = bot.send_message
args = [text, 'html']
if not _antispam(hash_args or args):
return m.reply_text('Не вайпи', quote=True)
remove_uids = []
if func:
for uid, user in users.items():
sleep(.02)
reply_to_message_id = None
if reply_to_message_internal_id:
reply_to_message_id = user.messages_reverse.get(reply_to_message_internal_id, None)
try:
r = func(*([uid] + args), **kwargs, reply_to_message_id=reply_to_message_id)
if r:
user.update_from_message(r)
user.messages_forward[r.message_id] = conn.root.counter
user.messages_reverse[conn.root.counter] = r.message_id
except Unauthorized:
remove_uids.append(uid)
bot.send_message(MANAGEMENT_CHAT, f'{user.name} был удален '
f'из-за блокировки бота', parse_mode='html')
except Exception:
traceback.print_exc()
sentry_sdk.capture_exception()
conn.root.counter += 1
commit()
for uid in remove_uids:
_remove_user(uid)
def task_queue(u: Updater, stop_signal: Event):
while True:
if not u.running or stop_signal.is_set():
return
try:
m = queue.get(timeout=1) # type: Message
if m.media_group_id:
media_group_id = m.media_group_id
group_messages = [m]
while queue.qsize() and queue.queue[0].media_group_id == media_group_id:
group_messages.append(queue.get(block=False))
_process_media_group(u.bot, group_messages)
else:
_process_message(u.bot, m)
except Empty:
pass
except:
traceback.print_exc()
sentry_sdk.capture_exception()
def main():
updater = Updater(BOT_TOKEN, workers=4, use_context=True)
updater.dispatcher.add_handler(CommandHandler('start', welcome, Filters.private))
updater.dispatcher.add_handler(CommandHandler('stop', unsubscribe, Filters.private))
updater.dispatcher.add_handler(CommandHandler('add', add_user, Filters.chat(MANAGEMENT_CHAT), pass_args=True))
updater.dispatcher.add_handler(CallbackQueryHandler(add_user, pattern=r'^add (\d+)$'))
updater.dispatcher.add_handler(CommandHandler('remove', remove_user, Filters.chat(MANAGEMENT_CHAT), pass_args=True))
updater.dispatcher.add_handler(CallbackQueryHandler(remove_user, pattern=r'^remove (\d+)$'))
updater.dispatcher.add_handler(CommandHandler('users', users_list, Filters.private))
updater.dispatcher.add_handler(MessageHandler(Filters.private, msg))
updater.start_polling()
stop_signal = Event()
tq = Thread(target=task_queue, args=(updater, stop_signal))
tq.start()
logging.warning('LONO has started')
updater.idle()
stop_signal.set()
logging.warning('LONO is stopping...')
commit()
conn.close()
if __name__ == '__main__':
main()