import os import re import time from tempfile import TemporaryDirectory import boto3 import filetype import twitter from django.db import models # from qrtools import QR from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton, MessageEntity from telegram.error import BadRequest from telegram.ext import CallbackContext, Dispatcher, MessageHandler, Filters from djconfig import config from bots.models import TelegramBotModuleConfig, BotUser class RobotBotModuleConfig(TelegramBotModuleConfig): users = models.ManyToManyField(BotUser) MODULE_NAME = 'RoBOT' def build_dispatcher(self, dispatcher: Dispatcher): dispatcher.add_handler(MessageHandler(Filters.audio | Filters.photo | Filters.video | Filters.document, self.save_file)) dispatcher.add_handler(MessageHandler(Filters.text & (Filters.entity(MessageEntity.URL) | Filters.entity(MessageEntity.TEXT_LINK)), self.document_downloader)) return dispatcher def save_file(self, update: Update, ctx: CallbackContext): config._reload_maybe() if self.users.count() and not self.users.filter(user_id=update.effective_user.id).count(): update.effective_message.reply_text('GTFO') return markup = InlineKeyboardMarkup([]) if update.message.photo: file_id = update.message.photo[-1].file_id elif update.message.audio: file_id = update.message.audio.file_id elif update.message.video: file_id = update.message.video.file_id elif update.message.document: file_id = update.message.document.file_id else: return file = ctx.bot.get_file(file_id) filename = str(int(time.time())) + '-' + os.path.basename(file.file_path) text = '' with TemporaryDirectory() as d: file_path = os.path.join(d, filename) file.download(file_path) tp = filetype.guess(file_path) s3 = boto3.client('s3', aws_access_key_id=config.aws_access_key_id, aws_secret_access_key=config.aws_secret_access_key) s3.upload_file(file_path, config.s3_bucket, filename, ExtraArgs={'ACL': 'public-read', 'ContentType': tp.mime}) # if update.message.text == 'qr': # qr = QR(filename=file_path) # qr.decode() # text = f'QR:\n{qr.data}' file_url = f'https://{config.s3_bucket}.s3.amazonaws.com/{filename}' text += f'\n\n{file_url}' if update.message.photo: markup.inline_keyboard = [[ InlineKeyboardButton('Google IS', f'https://www.google.com/searchbyimage?image_url={file_url}'), InlineKeyboardButton('Yandex IS', f'https://yandex.ru/images/search?rpt=imageview&url={file_url}'), InlineKeyboardButton('SauceNao', f'https://saucenao.com/search.php?url={file_url}') ]] update.message.reply_text(text, disable_web_page_preview=True, reply_markup=markup) def document_downloader(self, update: Update, ctx: CallbackContext): config._reload_maybe() if self.users.count() and not self.users.filter(user_id=update.effective_user.id).count(): update.effective_message.reply_text('GTFO') return links = update.effective_message.parse_entities([MessageEntity.URL, MessageEntity.TEXT_LINK]).values() for link in links: try: twitter_matches = re.match(r'https?://twitter.com/[A-Za-z0-9_]{1,15}/status/(\d+).*', link) if link.endswith('.gif'): update.message.reply_document(link) elif twitter_matches: api = twitter.Api(config.twitter_consumer_api_key, config.twitter_consumer_api_secret, config.twitter_access_token, config.twitter_access_token_secret) tweet = api.GetStatus(twitter_matches.group(1)) try: for media in tweet.media: if media.type == 'video': max_bitrate = -1 max_bitrate_link = None for variant in media.video_unfo['variants']: if variant['content_type'] == 'video/mp4' and variant['bitrate'] > max_bitrate: max_bitrate = variant['bitrate'] max_bitrate_link = variant['url'] if max_bitrate_link is not None: update.message.reply_video(max_bitrate_link) except KeyError: update.message.reply_text('Unable to retrieve video info from tweet') else: update.message.reply_text('Can\'t detect link type') except BadRequest: update.message.reply_text('Unable to retrieve file')