import logging import traceback from concurrent.futures.thread import ThreadPoolExecutor from multiprocessing.pool import ThreadPool import sentry_sdk from django.core.cache import cache from django.core.management import BaseCommand from telegram import TelegramError, Update from telegram.error import TimedOut from telegram.ext import CallbackContext from bots.models import TelegramBot class Command(BaseCommand): def handle(self, *args, **options): pool = ThreadPool(8) def error_handler(update: Update, ctx: CallbackContext): sentry_sdk.capture_exception(ctx.error) logging.exception('Exception while processing update', exc_info=ctx.error) dispatchers = [] def check_updates(dispatcher): try: updates = dispatcher.bot.get_updates(dispatcher.last_update_id) except TimedOut: return except TelegramError as e: sentry_sdk.capture_exception(e) traceback.print_exc() updates = [] for update in updates: pool.apply_async(dispatcher.process_update, (update,)) dispatcher.last_update_id = update.update_id + 1 while True: try: if not dispatchers or cache.get('bots_reset'): logging.warning('Reloading dispatchers') dispatchers = [] for bot in TelegramBot.objects.filter(active=True): try: dispatcher = bot.build_dispatcher(error_handler) dispatcher.last_update_id = 0 dispatchers.append(dispatcher) except TelegramError: pass cache.delete('bots_reset') with ThreadPoolExecutor() as executor: for dispatcher in dispatchers: executor.submit(check_updates, dispatcher) except KeyboardInterrupt: pool.terminate() pool.join() return except Exception as e: sentry_sdk.capture_exception(e) traceback.print_exc()