telegram_bots/bots/modules/channel_helper.py

148 lines
5.4 KiB
Python

import base64
import json
import os
import tempfile
from io import BytesIO
from uuid import uuid4
import requests
from PIL import Image
from django.db import models
from telegram import Update, Bot
from telegram.ext import Dispatcher, CallbackContext, MessageHandler, Filters, CommandHandler
from jsonrpc import Dispatcher as RPCDispatcher
from djconfig import config
from bots.models import TelegramBotModuleConfig, BotUser
class ChannelHelperBotModuleConfig(TelegramBotModuleConfig):
chat_id = models.CharField(max_length=32)
queued = models.BooleanField(default=False)
users = models.ManyToManyField(BotUser)
MODULE_NAME = 'Channel helper'
rpc_dispatcher = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.rpc_dispatcher = RPCDispatcher()
self.rpc_dispatcher['post_photo'] = self.rpc_post_photo
def rpc_post_photo(self, photo, is_base64=False):
config._reload_maybe()
bot = self.bot.get_bot()
try:
if is_base64:
f = BytesIO(base64.b64decode(photo))
else:
resp = requests.get(photo)
resp.raise_for_status()
f = BytesIO(resp.content)
except:
raise RuntimeError('Could not load image')
im = Image.open(f) # type: Image.Image
width, height = im.size
if width > 2000 or height > 2000:
im.thumbnail((2000, 2000))
im = im.convert('RGB')
with tempfile.TemporaryDirectory() as d:
fpath = os.path.join(d, '{}.jpg'.format(uuid4()))
im.save(fpath)
if self.queued:
m = bot.send_photo(config.tmp_uploads_chat_id, open(fpath, 'rb'))
i = QueuedItem(config=self, type='photo', args=json.dumps([m.photo[-1].file_id]))
i.save()
else:
bot.send_photo(self.chat_id, open(fpath, 'rb'))
return True
def periodic_task(self, bot: Bot):
i = self.queued_items.order_by('?').first() # type: QueuedItem
if i:
i.send(bot)
i.delete()
def handle_message(self, update: Update, ctx: CallbackContext):
if self.users.count() and not self.users.filter(user_id=update.effective_user.id).count():
update.effective_message.reply_text('GTFO')
return
m = update.effective_message
bot = ctx.bot
i = QueuedItem(config=self, message_id=m.message_id)
if hasattr(m, 'audio') and m.audio:
a = m.audio
i.type = 'audio'
i.args = json.dumps([a.file_id, a.duration, a.performer, a.title])
elif hasattr(m, 'document') and m.document:
d = m.document
i.type = 'document'
i.args = json.dumps([d.file_id, d.file_name])
elif hasattr(m, 'photo') and m.photo:
p = m.photo
i.type = 'photo'
i.args = json.dumps([p[-1].file_id])
elif hasattr(m, 'sticker') and m.sticker:
s = m.sticker
i.type = 'sticker'
i.args = json.dumps([s.file_id])
elif hasattr(m, 'video') and m.video:
v = m.video
i.type = 'video'
i.args = json.dumps([v.file_id, v.duration])
elif hasattr(m, 'voice') and m.voice:
v = m.voice
i.type = 'voice'
i.args = json.dumps([v.file_id, v.duration])
elif hasattr(m, 'video_note') and m.video_note:
vn = m.video_note
i.type = 'video_note'
i.args = json.dumps([vn.file_id, vn.duration, vn.length])
elif hasattr(m, 'contact') and m.contact:
c = m.contact
i.type = 'contact'
i.args = json.dumps([c.phone_number, c.first_name, c.last_name])
elif hasattr(m, 'location') and m.location:
l = m.location
i.type = 'location'
i.args = json.dumps([l.latitude, l.longitude])
elif hasattr(m, 'venue') and m.venue:
v = m.venue
i.type = 'venue'
i.args = json.dumps([v.location.latitude, v.location.longitude, v.title, v.address, v.foursquare_id])
elif hasattr(m, 'text') and m.text:
i.type = 'message'
i.args = json.dumps([m.text_html, 'html'])
if self.queued:
i.save()
else:
i.send(bot)
def handle_delete(self, update: Update, ctx: CallbackContext):
if update.effective_message.chat_id != self.chat_id:
return
reply_to_id = update.effective_message.reply_to_message.message_id
try:
msg = QueuedItem.objects.get(message_id=reply_to_id)
msg.delete()
except QueuedItem.DoesNotExist:
update.effective_message.reply_text('Deleted')
def build_dispatcher(self, dispatcher: Dispatcher):
dispatcher.add_handler(MessageHandler(Filters.private, self.handle_message))
dispatcher.add_handler(CommandHandler(['delete', 'del', 'remove', 'rem'], self.handle_delete, Filters.reply))
return dispatcher
class QueuedItem(models.Model):
config = models.ForeignKey(ChannelHelperBotModuleConfig, on_delete=models.CASCADE, related_name='queued_items')
type = models.CharField(max_length=12)
args = models.TextField()
message_id = models.PositiveBigIntegerField(default=None, db_index=True)
def send(self, bot: Bot):
getattr(bot, 'send_' + self.type)(self.config.chat_id, *json.loads(self.args))