From b35d2ca2518810a9b0692739cf496158d0800d8b Mon Sep 17 00:00:00 2001 From: bakatrouble Date: Sat, 20 Mar 2021 16:21:06 +0300 Subject: [PATCH] optimize worker --- bots/management/commands/bot_worker.py | 57 +++++++++++++++----------- bots/models.py | 9 +++- 2 files changed, 41 insertions(+), 25 deletions(-) diff --git a/bots/management/commands/bot_worker.py b/bots/management/commands/bot_worker.py index 2a5eadb..dd7c178 100644 --- a/bots/management/commands/bot_worker.py +++ b/bots/management/commands/bot_worker.py @@ -2,6 +2,7 @@ import logging import traceback from concurrent.futures.thread import ThreadPoolExecutor from multiprocessing.pool import ThreadPool +from time import sleep import sentry_sdk from django.core.cache import cache @@ -15,50 +16,58 @@ from bots.models import TelegramBot class Command(BaseCommand): def handle(self, *args, **options): - pool = ThreadPool(8) + # pool = ThreadPool(8) def error_handler(update: Update, ctx: CallbackContext): sentry_sdk.capture_exception(ctx.error) logging.exception('Exception while processing update', exc_info=ctx.error) initialized = False - dispatchers = [] + updaters = [] - def check_updates(dispatcher): - try: - updates = dispatcher.bot.get_updates(dispatcher.last_update_id) - except TimedOut: - return - except TelegramError as e: - sentry_sdk.capture_exception(e) - traceback.print_exc() - updates = [] - - for update in updates: - pool.apply_async(dispatcher.process_update, (update,)) - dispatcher.last_update_id = update.update_id + 1 + # def check_updates(dispatcher): + # try: + # updates = dispatcher.bot.get_updates(dispatcher.last_update_id) + # except TimedOut: + # return + # except TelegramError as e: + # sentry_sdk.capture_exception(e) + # traceback.print_exc() + # updates = [] + # + # for update in updates: + # pool.apply_async(dispatcher.process_update, (update,)) + # dispatcher.last_update_id = update.update_id + 1 while True: try: if not initialized or cache.get('bots_reset'): logging.warning('Reloading dispatchers') - dispatchers = [] + for updater in updaters: + updater.stop() + updaters.clear() for bot in TelegramBot.objects.filter(active=True): try: - dispatcher = bot.build_dispatcher(error_handler) - dispatcher.last_update_id = 0 - dispatchers.append(dispatcher) + updater = bot.build_updater(error_handler) + updaters.append(updater) + updater.start_polling() + # dispatcher = bot.build_dispatcher(error_handler) + # dispatcher.last_update_id = 0 + # dispatchers.append(dispatcher) except TelegramError: pass cache.delete('bots_reset') initialized = True - with ThreadPoolExecutor() as executor: - for dispatcher in dispatchers: - executor.submit(check_updates, dispatcher) + # with ThreadPoolExecutor() as executor: + # for dispatcher in dispatchers: + # executor.submit(check_updates, dispatcher) + sleep(1) except KeyboardInterrupt: - pool.terminate() - pool.join() + for updater in updaters: + updater.stop() + # pool.terminate() + # pool.join() return except Exception as e: sentry_sdk.capture_exception(e) diff --git a/bots/models.py b/bots/models.py index d45c3ed..d23af66 100644 --- a/bots/models.py +++ b/bots/models.py @@ -4,7 +4,7 @@ from django.contrib.contenttypes.models import ContentType from django.db import models from django.utils import timezone from telegram import Bot -from telegram.ext import Dispatcher +from telegram.ext import Dispatcher, Updater class TelegramBot(models.Model): @@ -24,6 +24,13 @@ class TelegramBot(models.Model): def get_bot(self): return Bot(self.bot_token) + def build_updater(self, error_handler): + updater = Updater(self.bot_token) + updater.dispatcher.add_error_handler(error_handler) + updater.bot.get_me() + self.config.build_dispatcher(updater.dispatcher) + return updater + def build_dispatcher(self, error_handler): bot = self.get_bot() bot.get_me()