telegram_bots/bots/modules/channel_helper.py

144 lines
5.0 KiB
Python

import base64
import json
import os
import tempfile
from io import BytesIO
from uuid import uuid4
import requests
from PIL import Image
from django.db import models
from telegram import Update, Bot
from telegram.ext import Dispatcher, CallbackContext, MessageHandler, Filters
from jsonrpc import Dispatcher as RPCDispatcher
from djconfig import config
from bots.models import TelegramBotModuleConfig
class ChannelHelperUser(models.Model):
name = models.CharField(max_length=32)
user_id = models.BigIntegerField(db_index=True)
def __str__(self):
return self.name
class ChannelHelperBotModuleConfig(TelegramBotModuleConfig):
chat_id = models.CharField(max_length=32)
queued = models.BooleanField(default=False)
users = models.ManyToManyField(ChannelHelperUser)
MODULE_NAME = 'Channel helper'
rpc_dispatcher = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.rpc_dispatcher = RPCDispatcher()
self.rpc_dispatcher['post_photo'] = self.rpc_post_photo
def rpc_post_photo(self, photo, is_base64=False):
config._reload_maybe()
bot = self.bot.get_bot()
try:
if is_base64:
f = BytesIO(base64.b64decode(photo))
else:
resp = requests.get(photo)
resp.raise_for_status()
f = BytesIO(resp.content)
except:
raise RuntimeError('Could not load image')
im = Image.open(f) # type: Image.Image
width, height = im.size
if width > 2000 or height > 2000:
im.thumbnail((2000, 2000))
im = im.convert('RGB')
with tempfile.TemporaryDirectory() as d:
fpath = os.path.join(d, '{}.jpg'.format(uuid4()))
im.save(fpath)
if self.queued:
m = bot.send_photo(config.tmp_uploads_chat_id, open(fpath, 'rb'))
i = QueuedItem(config=self, type='photo', args=json.dumps([m.photo[-1].file_id]))
i.save()
else:
bot.send_photo(self.chat_id, open(fpath, 'rb'))
return True
def periodic_task(self, bot: Bot):
i = self.queued_items.order_by('?').first() # type: QueuedItem
if i:
i.send(bot)
i.delete()
def handle_message(self, update: Update, ctx: CallbackContext):
if self.users.count() and not self.users.filter(user_id=update.effective_user.id).count():
update.effective_message.reply_text('GTFO')
return
m = update.effective_message
bot = ctx.bot
i = QueuedItem(config=self)
if hasattr(m, 'audio') and m.audio:
a = m.audio
i.type = 'audio'
i.args = json.dumps([a.file_id, a.duration, a.performer, a.title])
elif hasattr(m, 'document') and m.document:
d = m.document
i.type = 'document'
i.args = json.dumps([d.file_id, d.file_name])
elif hasattr(m, 'photo') and m.photo:
p = m.photo
i.type = 'photo'
i.args = json.dumps([p[-1].file_id])
elif hasattr(m, 'sticker') and m.sticker:
s = m.sticker
i.type = 'sticker'
i.args = json.dumps([s.file_id])
elif hasattr(m, 'video') and m.video:
v = m.video
i.type = 'video'
i.args = json.dumps([v.file_id, v.duration])
elif hasattr(m, 'voice') and m.voice:
v = m.voice
i.type = 'voice'
i.args = json.dumps([v.file_id, v.duration])
elif hasattr(m, 'video_note') and m.video_note:
vn = m.video_note
i.type = 'video_note'
i.args = json.dumps([vn.file_id, vn.duration, vn.length])
elif hasattr(m, 'contact') and m.contact:
c = m.contact
i.type = 'contact'
i.args = json.dumps([c.phone_number, c.first_name, c.last_name])
elif hasattr(m, 'location') and m.location:
l = m.location
i.type = 'location'
i.args = json.dumps([l.latitude, l.longitude])
elif hasattr(m, 'venue') and m.venue:
v = m.venue
i.type = 'venue'
i.args = json.dumps([v.location.latitude, v.location.longitude, v.title, v.address, v.foursquare_id])
elif hasattr(m, 'text') and m.text:
i.type = 'message'
i.args = json.dumps([m.text_html, 'html'])
if self.queued:
i.save()
else:
i.send(bot)
def build_dispatcher(self, dispatcher: Dispatcher):
dispatcher.add_handler(MessageHandler(Filters.private, self.handle_message))
return dispatcher
class QueuedItem(models.Model):
config = models.ForeignKey(ChannelHelperBotModuleConfig, on_delete=models.CASCADE, related_name='queued_items')
type = models.CharField(max_length=12)
args = models.TextField()
def send(self, bot: Bot):
getattr(bot, 'send_' + self.type)(self.config.chat_id, *json.loads(self.args))