optimize worker
This commit is contained in:
		@@ -2,6 +2,7 @@ import logging
 | 
			
		||||
import traceback
 | 
			
		||||
from concurrent.futures.thread import ThreadPoolExecutor
 | 
			
		||||
from multiprocessing.pool import ThreadPool
 | 
			
		||||
from time import sleep
 | 
			
		||||
 | 
			
		||||
import sentry_sdk
 | 
			
		||||
from django.core.cache import cache
 | 
			
		||||
@@ -15,50 +16,58 @@ from bots.models import TelegramBot
 | 
			
		||||
 | 
			
		||||
class Command(BaseCommand):
 | 
			
		||||
    def handle(self, *args, **options):
 | 
			
		||||
        pool = ThreadPool(8)
 | 
			
		||||
        # pool = ThreadPool(8)
 | 
			
		||||
 | 
			
		||||
        def error_handler(update: Update, ctx: CallbackContext):
 | 
			
		||||
            sentry_sdk.capture_exception(ctx.error)
 | 
			
		||||
            logging.exception('Exception while processing update', exc_info=ctx.error)
 | 
			
		||||
 | 
			
		||||
        initialized = False
 | 
			
		||||
        dispatchers = []
 | 
			
		||||
        updaters = []
 | 
			
		||||
 | 
			
		||||
        def check_updates(dispatcher):
 | 
			
		||||
            try:
 | 
			
		||||
                updates = dispatcher.bot.get_updates(dispatcher.last_update_id)
 | 
			
		||||
            except TimedOut:
 | 
			
		||||
                return
 | 
			
		||||
            except TelegramError as e:
 | 
			
		||||
                sentry_sdk.capture_exception(e)
 | 
			
		||||
                traceback.print_exc()
 | 
			
		||||
                updates = []
 | 
			
		||||
 | 
			
		||||
            for update in updates:
 | 
			
		||||
                pool.apply_async(dispatcher.process_update, (update,))
 | 
			
		||||
                dispatcher.last_update_id = update.update_id + 1
 | 
			
		||||
        # def check_updates(dispatcher):
 | 
			
		||||
        #     try:
 | 
			
		||||
        #         updates = dispatcher.bot.get_updates(dispatcher.last_update_id)
 | 
			
		||||
        #     except TimedOut:
 | 
			
		||||
        #         return
 | 
			
		||||
        #     except TelegramError as e:
 | 
			
		||||
        #         sentry_sdk.capture_exception(e)
 | 
			
		||||
        #         traceback.print_exc()
 | 
			
		||||
        #         updates = []
 | 
			
		||||
        #
 | 
			
		||||
        #     for update in updates:
 | 
			
		||||
        #         pool.apply_async(dispatcher.process_update, (update,))
 | 
			
		||||
        #         dispatcher.last_update_id = update.update_id + 1
 | 
			
		||||
 | 
			
		||||
        while True:
 | 
			
		||||
            try:
 | 
			
		||||
                if not initialized or cache.get('bots_reset'):
 | 
			
		||||
                    logging.warning('Reloading dispatchers')
 | 
			
		||||
                    dispatchers = []
 | 
			
		||||
                    for updater in updaters:
 | 
			
		||||
                        updater.stop()
 | 
			
		||||
                    updaters.clear()
 | 
			
		||||
                    for bot in TelegramBot.objects.filter(active=True):
 | 
			
		||||
                        try:
 | 
			
		||||
                            dispatcher = bot.build_dispatcher(error_handler)
 | 
			
		||||
                            dispatcher.last_update_id = 0
 | 
			
		||||
                            dispatchers.append(dispatcher)
 | 
			
		||||
                            updater = bot.build_updater(error_handler)
 | 
			
		||||
                            updaters.append(updater)
 | 
			
		||||
                            updater.start_polling()
 | 
			
		||||
                            # dispatcher = bot.build_dispatcher(error_handler)
 | 
			
		||||
                            # dispatcher.last_update_id = 0
 | 
			
		||||
                            # dispatchers.append(dispatcher)
 | 
			
		||||
                        except TelegramError:
 | 
			
		||||
                            pass
 | 
			
		||||
                    cache.delete('bots_reset')
 | 
			
		||||
                    initialized = True
 | 
			
		||||
 | 
			
		||||
                with ThreadPoolExecutor() as executor:
 | 
			
		||||
                    for dispatcher in dispatchers:
 | 
			
		||||
                        executor.submit(check_updates, dispatcher)
 | 
			
		||||
                # with ThreadPoolExecutor() as executor:
 | 
			
		||||
                #     for dispatcher in dispatchers:
 | 
			
		||||
                #         executor.submit(check_updates, dispatcher)
 | 
			
		||||
                sleep(1)
 | 
			
		||||
            except KeyboardInterrupt:
 | 
			
		||||
                pool.terminate()
 | 
			
		||||
                pool.join()
 | 
			
		||||
                for updater in updaters:
 | 
			
		||||
                    updater.stop()
 | 
			
		||||
                # pool.terminate()
 | 
			
		||||
                # pool.join()
 | 
			
		||||
                return
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                sentry_sdk.capture_exception(e)
 | 
			
		||||
 
 | 
			
		||||
@@ -4,7 +4,7 @@ from django.contrib.contenttypes.models import ContentType
 | 
			
		||||
from django.db import models
 | 
			
		||||
from django.utils import timezone
 | 
			
		||||
from telegram import Bot
 | 
			
		||||
from telegram.ext import Dispatcher
 | 
			
		||||
from telegram.ext import Dispatcher, Updater
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TelegramBot(models.Model):
 | 
			
		||||
@@ -24,6 +24,13 @@ class TelegramBot(models.Model):
 | 
			
		||||
    def get_bot(self):
 | 
			
		||||
        return Bot(self.bot_token)
 | 
			
		||||
 | 
			
		||||
    def build_updater(self, error_handler):
 | 
			
		||||
        updater = Updater(self.bot_token)
 | 
			
		||||
        updater.dispatcher.add_error_handler(error_handler)
 | 
			
		||||
        updater.bot.get_me()
 | 
			
		||||
        self.config.build_dispatcher(updater.dispatcher)
 | 
			
		||||
        return updater
 | 
			
		||||
 | 
			
		||||
    def build_dispatcher(self, error_handler):
 | 
			
		||||
        bot = self.get_bot()
 | 
			
		||||
        bot.get_me()
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user