telegram_bots/bots/modules/robot.py

108 lines
5.0 KiB
Python

import os
import re
import time
from tempfile import TemporaryDirectory
import boto3
import filetype
import twitter
from django.db import models
from qrtools import QR
from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton, MessageEntity
from telegram.ext import CallbackContext, Dispatcher, MessageHandler, Filters
from djconfig import config
from bots.models import TelegramBotModuleConfig, BotUser
class RobotBotModuleConfig(TelegramBotModuleConfig):
users = models.ManyToManyField(BotUser)
MODULE_NAME = 'RoBOT'
def build_dispatcher(self, dispatcher: Dispatcher):
dispatcher.add_handler(MessageHandler(Filters.audio | Filters.photo | Filters.video | Filters.document,
self.save_file))
dispatcher.add_handler(MessageHandler(Filters.text & (Filters.entity(MessageEntity.URL) |
Filters.entity(MessageEntity.TEXT_LINK)),
self.document_downloader))
return dispatcher
def save_file(self, update: Update, ctx: CallbackContext):
config._reload_maybe()
if self.users.count() and not self.users.filter(user_id=update.effective_user.id).count():
update.effective_message.reply_text('GTFO')
return
markup = InlineKeyboardMarkup([])
if update.message.photo:
file_id = update.message.photo[-1].file_id
elif update.message.audio:
file_id = update.message.audio.file_id
elif update.message.video:
file_id = update.message.video.file_id
elif update.message.document:
file_id = update.message.document.file_id
else:
return
file = ctx.bot.get_file(file_id)
filename = str(int(time.time())) + '-' + os.path.basename(file.file_path)
text = ''
with TemporaryDirectory() as d:
file_path = os.path.join(d, filename)
file.download(file_path)
tp = filetype.guess(file_path)
s3 = boto3.client('s3', aws_access_key_id=config.aws_access_key_id, aws_secret_access_key=config.aws_secret_access_key)
s3.upload_file(file_path, config.s3_bucket, filename, ExtraArgs={'ACL': 'public-read', 'ContentType': tp.mime})
if update.message.text == 'qr':
qr = QR(filename=file_path)
qr.decode()
text = f'QR:\n{qr.data}'
file_url = f'https://{config.s3_bucket}.s3.amazonaws.com/{filename}'
text += f'\n\n{file_url}'
if update.message.photo:
markup.inline_keyboard = [[
InlineKeyboardButton('Google IS', f'https://www.google.com/searchbyimage?image_url={file_url}'),
InlineKeyboardButton('Yandex IS', f'https://yandex.ru/images/search?rpt=imageview&url={file_url}'),
InlineKeyboardButton('SauceNao', f'https://saucenao.com/search.php?url={file_url}')
]]
update.message.reply_text(text, disable_web_page_preview=True, reply_markup=markup)
def document_downloader(self, update: Update, ctx: CallbackContext):
config._reload_maybe()
if self.users.count() and not self.users.filter(user_id=update.effective_user.id).count():
update.effective_message.reply_text('GTFO')
return
links = update.effective_message.parse_entities([MessageEntity.URL, MessageEntity.TEXT_LINK]).values()
for link in links:
try:
twitter_matches = re.match(r'https?://twitter.com/[A-Za-z0-9_]{1,15}/status/(\d+).*', link)
if link.endswith('.gif'):
update.message.reply_document(link)
elif twitter_matches:
api = twitter.Api(config.twitter_consumer_api_key, config.twitter_consumer_api_secret,
config.twitter_access_token, config.twitter_access_token_secret)
tweet = api.GetStatus(twitter_matches.group(1))
try:
for media in tweet_info['extended_entities']['media']:
if media['type'] == 'video':
max_bitrate = -1
max_bitrate_link = None
for variant in media['video_info']['variants']:
if variant['content_type'] == 'video/mp4' and variant['bitrate'] > max_bitrate:
max_bitrate = variant['bitrate']
max_bitrate_link = variant['url']
if max_bitrate_link is not None:
update.message.reply_video(max_bitrate_link)
except KeyError:
update.message.reply_text('Unable to retrieve video info from tweet')
else:
update.message.reply_text('Can\'t detect link type')
except BadRequest:
update.message.reply_text('Unable to retrieve file')