#!/usr/bin/env python3
import logging
import os
import re
import traceback
from datetime import datetime, timedelta
from html import escape
from queue import Queue, Empty
from time import sleep
from threading import Thread
from typing import Dict, List
from uuid import uuid4
import sentry_sdk
from telegram.error import Unauthorized, TelegramError
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackQueryHandler
from telegram import Message, Update, Bot, InlineKeyboardMarkup, InlineKeyboardButton, User, InputMediaPhoto, \
InputMediaVideo, InputMediaAnimation, InputMediaAudio, InputMediaDocument
from config import BOT_TOKEN, SENTRY_DSN, MANAGEMENT_CHAT, DEBUG
from db import get_conn, Subscriber, PersistentMapping, commit
from morj import draw_morj
from send_users_list import send_users_list
logging.basicConfig(level=logging.WARNING)
queue = Queue()
sentry_sdk.init(dsn=SENTRY_DSN)
conn = get_conn()
MAX_MESSAGE_LENGTH = 4096
MAX_CAPTION_LENGTH = 1024
def _notify_access_request(bot: Bot, user: User):
markup = InlineKeyboardMarkup([[InlineKeyboardButton('Добавить', callback_data=f'add {user.id}')]])
bot.send_message(MANAGEMENT_CHAT, f'{escape(user.full_name)} запросил доступ',
parse_mode='html', reply_markup=markup)
def welcome(bot: Bot, update: Update):
if DEBUG:
_add_user(bot, update.effective_user.id)
update.message.reply_text('Добро пожаловать (debug)')
return
if update.effective_user.id in conn.root.subscribers:
update.message.reply_text('Вы уже являетесь участником ЛОНО')
else:
update.message.reply_text('Пожалуйста, обратитесь к @lono_contactbot')
_notify_access_request(bot, update.message.from_user)
def unsubscribe(bot: Bot, update: Update):
user = _remove_user(update.message.chat_id)
update.message.reply_text('Вы были отписаны от бота. '
'Обратитесь к @lono_contactbot если вы хотите подписаться снова.')
bot.send_message(MANAGEMENT_CHAT, f'{escape(user.name)} отписался')
def _add_user(bot, uid):
user = conn.root.subscribers[uid] = Subscriber.from_chat(bot.get_chat(uid))
commit()
return user
def add_user(bot: Bot, update: Update, groups=(), args=()):
if update.callback_query:
update.callback_query.answer()
if groups:
if update.callback_query.message.chat.id != MANAGEMENT_CHAT:
return
uid = groups[0]
elif args:
uid = args[0]
elif update.message and update.message.reply_to_message and update.message.reply_to_message.forward_from:
uid = update.message.reply_to_message.forward_from.id
else:
return bot.send_message(MANAGEMENT_CHAT, 'Укажите ID пользователя или ответьте на его сообщение')
try:
uid = int(uid)
except (ValueError, TypeError):
pass
try:
user = _add_user(bot, uid)
if update.callback_query:
update.callback_query.message.edit_reply_markup()
bot.send_message(MANAGEMENT_CHAT, f'{escape(user.name)} был добавлен',
parse_mode='html')
bot.send_message(uid, 'Добро пожаловать. Снова.')
except TelegramError as e:
bot.send_message(MANAGEMENT_CHAT, str(e))
def _remove_user(uid):
user = conn.root.subscribers[uid]
del conn.root.subscribers[uid]
commit()
return user
def remove_user(bot: Bot, update: Update, groups=(), args=()):
if update.callback_query:
update.callback_query.answer()
if groups:
if update.callback_query.message.chat.id != MANAGEMENT_CHAT:
return
uid = groups[0]
elif args:
uid = args[0]
elif update.message and update.message.reply_to_message and update.message.reply_to_message.forward_from:
uid = update.message.reply_to_message.forward_from.id
else:
return bot.send_message(MANAGEMENT_CHAT, 'Укажите ID пользователя или ответьте на его сообщение')
try:
uid = int(uid)
except (ValueError, TypeError):
pass
try:
user = _remove_user(uid)
bot.send_message(MANAGEMENT_CHAT, f'{escape(user.name)} был удален',
parse_mode='html')
if update.callback_query:
update.callback_query.message.edit_reply_markup()
except KeyError:
bot.send_message(MANAGEMENT_CHAT, f'Пользователь id={uid} не был найден')
def users(bot: Bot, update: Update):
send_users_list()
def msg(bot: Bot, update: Update):
queue.put(update.message)
def _sign_text(text, m: Message, limit):
if not text:
text = ''
text = re.sub(r'.*?', '\\1', text)
sign = ''
if text.startswith('!sign') or text.startswith('/sign'):
text = text[5:]
sign = f'\n\n— {escape(m.from_user.full_name)}'
return text[:limit - len(sign)] + sign
def _process_media_group(bot: Bot, messages: List[Message]):
if not messages:
return
m = messages[0]
current_chat = m.chat_id
users = conn.root.subscribers # type: Dict[int, Subscriber]
if current_chat not in users:
if DEBUG:
_add_user(bot, current_chat)
m.reply_text('Добро пожаловать (debug)')
else:
_notify_access_request(bot, m.from_user)
return m.reply_text('Пожалуйста, обратитесь к @lono_contactbot')
reply_to_message_internal_id = None
if m.reply_to_message and m.reply_to_message.message_id in users[current_chat].messages_forward:
reply_to_message_internal_id = users[current_chat].messages_forward[m.reply_to_message.message_id]
media_group = []
for message in messages:
caption = _sign_text(message.caption_html, message, MAX_CAPTION_LENGTH)
if hasattr(message, 'photo') and message.photo:
media_group.append(InputMediaPhoto(message.photo[-1].file_id, caption=caption, parse_mode='html'))
elif hasattr(message, 'video') and message.video:
media_group.append(InputMediaVideo(message.video.file_id, caption=caption, parse_mode='html'))
elif hasattr(message, 'animation') and message.animation:
media_group.append(InputMediaAnimation(message.animation.file_id, caption=caption, parse_mode='html'))
elif hasattr(message, 'document') and message.document:
media_group.append(InputMediaDocument(message.document.file_id, caption=caption, parse_mode='html'))
elif hasattr(message, 'audio') and message.audio:
media_group.append(InputMediaAudio(message.audio.file_id, caption=caption, parse_mode='html'))
remove_uids = []
for uid, user in users.items():
sleep(.02)
reply_to_message_id = None
if reply_to_message_internal_id:
reply_to_message_id = user.messages_reverse.get(reply_to_message_internal_id, None)
try:
sent_messages = bot.send_media_group(uid, media_group, reply_to_message_id=reply_to_message_id)
if sent_messages:
user.update_from_message(sent_messages[0])
for r in sent_messages:
user.messages_forward[r.message_id] = conn.root.counter
user.messages_reverse[conn.root.counter] = r.message_id
except Unauthorized:
remove_uids.append(uid)
bot.send_message(MANAGEMENT_CHAT, f'{user.name} был удален '
f'из-за блокировки бота', parse_mode='html')
except Exception:
traceback.print_exc()
sentry_sdk.capture_exception()
conn.root.counter += len(messages)
commit()
for uid in remove_uids:
_remove_user(uid)
def users_list(bot: Bot, update: Update):
current_chat = update.effective_chat.id
subs = conn.root.subscribers # type: Dict[int, Subscriber]
if current_chat not in subs:
return
messages = [f'Count: {len(subs)}\n']
for sub in subs.values(): # type: Subscriber
msg = f'{sub.uid:>14}
'
if sub.uid < 0:
msg += str(sub.name)
else:
msg += f'{sub.name}'
messages.append(msg)
for i in range(0, len(messages), 40):
update.effective_message.reply_text('\n'.join(messages[i:i+40]), parse_mode='html')
def morj(bot: Bot, update: Update):
text = update.effective_message.text[6:]
fname = '/tmp/morj{}.png'.format(uuid4())
draw_morj(text, fname)
update.effective_message.reply_photo(open(fname, 'rb'))
os.unlink(fname)
def _process_message(bot: Bot, m: Message):
if m.sticker or m.animation:
delta = datetime.now() - conn.root.last_media
if delta < timedelta(seconds=15):
bot.send_message(
m.from_user.id,
'Не вайпи, до следующей гифки/стикера осталось {} секунд'.format(15 - int(delta.total_seconds()))
)
return
conn.root.last_media = datetime.now()
commit()
current_chat = m.chat_id
users = conn.root.subscribers # type: Dict[int, Subscriber]
if current_chat not in users:
if DEBUG:
_add_user(bot, current_chat)
m.reply_text('Добро пожаловать (debug)')
else:
_notify_access_request(bot, m.from_user)
return m.reply_text('Пожалуйста, обратитесь к @lono_contactbot')
text = _sign_text(m.text_html, m, MAX_MESSAGE_LENGTH)
caption = _sign_text(m.caption_html, m, MAX_CAPTION_LENGTH)
reply_to_message_internal_id = None
if m.reply_to_message and m.reply_to_message.message_id in users[current_chat].messages_forward:
reply_to_message_internal_id = users[current_chat].messages_forward[m.reply_to_message.message_id]
remove_uids = []
for uid, user in users.items():
sleep(.02)
reply_to_message_id = None
if reply_to_message_internal_id:
reply_to_message_id = user.messages_reverse.get(reply_to_message_internal_id, None)
try:
r = None
if m.forward_date:
r = m.forward(uid)
elif hasattr(m, 'audio') and m.audio:
a = m.audio
r = bot.send_audio(uid, a.file_id, a.duration, a.performer, a.title, caption,
reply_to_message_id=reply_to_message_id, parse_mode='html')
elif hasattr(m, 'document') and m.document:
d = m.document
r = bot.send_document(uid, d.file_id, d.file_name, caption, reply_to_message_id=reply_to_message_id,
parse_mode='html')
elif hasattr(m, 'photo') and m.photo:
p = m.photo
r = bot.send_photo(uid, p[-1].file_id, caption, reply_to_message_id=reply_to_message_id,
parse_mode='html')
elif hasattr(m, 'sticker') and m.sticker:
s = m.sticker
r = bot.send_sticker(uid, s.file_id, reply_to_message_id=reply_to_message_id)
elif hasattr(m, 'video') and m.video:
v = m.video
r = bot.send_video(uid, v.file_id, v.duration, caption, reply_to_message_id=reply_to_message_id,
parse_mode='html')
elif hasattr(m, 'voice') and m.voice:
v = m.voice
r = bot.send_voice(uid, v.file_id, v.duration, caption, reply_to_message_id=reply_to_message_id,
parse_mode='html')
elif hasattr(m, 'video_note') and m.video_note:
vn = m.video_note
r = bot.send_video_note(uid, vn.file_id, vn.duration, vn.length,
reply_to_message_id=reply_to_message_id)
elif hasattr(m, 'contact') and m.contact:
c = m.contact
r = bot.send_contact(uid, c.phone_number, c.first_name, c.last_name,
reply_to_message_id=reply_to_message_id)
elif hasattr(m, 'location') and m.location:
l = m.location
r = bot.send_location(uid, l.latitude, l.longitude, reply_to_message_id=reply_to_message_id)
elif hasattr(m, 'venue') and m.venue:
v = m.venue
l = v.location
r = bot.send_venue(uid, l.latitude, l.longitude, v.title, v.address, v.foursquare_id,
reply_to_message_id=reply_to_message_id)
elif hasattr(m, 'text') and m.text:
r = bot.send_message(uid, text, 'html', reply_to_message_id=reply_to_message_id)
if r:
user.update_from_message(r)
user.messages_forward[r.message_id] = conn.root.counter
user.messages_reverse[conn.root.counter] = r.message_id
except Unauthorized:
remove_uids.append(uid)
bot.send_message(MANAGEMENT_CHAT, f'{user.name} был удален '
f'из-за блокировки бота', parse_mode='html')
except Exception:
traceback.print_exc()
sentry_sdk.capture_exception()
conn.root.counter += 1
commit()
for uid in remove_uids:
_remove_user(uid)
def task_queue(u: Updater):
while True:
if not u.running:
return
try:
m = queue.get(timeout=1) # type: Message
if m.media_group_id:
media_group_id = m.media_group_id
group_messages = [m]
while queue.qsize() and queue.queue[0].media_group_id == media_group_id:
group_messages.append(queue.get(block=False))
_process_media_group(u.bot, group_messages)
else:
_process_message(u.bot, m)
except Empty:
pass
except:
traceback.print_exc()
sentry_sdk.capture_exception()
if __name__ == '__main__':
updater = Updater(BOT_TOKEN, workers=4)
updater.dispatcher.add_handler(CommandHandler('start', welcome, Filters.private))
updater.dispatcher.add_handler(CommandHandler('stop', unsubscribe, Filters.private))
updater.dispatcher.add_handler(CommandHandler('add', add_user, Filters.chat(MANAGEMENT_CHAT), pass_args=True))
updater.dispatcher.add_handler(CallbackQueryHandler(add_user, pattern=r'^add (\d+)$', pass_groups=True))
updater.dispatcher.add_handler(CommandHandler('remove', remove_user, Filters.chat(MANAGEMENT_CHAT), pass_args=True))
updater.dispatcher.add_handler(CallbackQueryHandler(remove_user, pattern=r'^remove (\d+)$', pass_groups=True))
updater.dispatcher.add_handler(CommandHandler('users', users_list, Filters.private))
updater.dispatcher.add_handler(CommandHandler('morj', morj))
updater.dispatcher.add_handler(MessageHandler(Filters.private, msg))
updater.start_polling()
tq = Thread(target=task_queue, args=(updater,))
tq.start()
logging.warning('LONO has started')
updater.idle()
logging.warning('LONO is stopping...')
commit()
conn.close()