diff --git a/bots/management/commands/bot_worker.py b/bots/management/commands/bot_worker.py index 8e67514..afdd7e5 100644 --- a/bots/management/commands/bot_worker.py +++ b/bots/management/commands/bot_worker.py @@ -1,5 +1,6 @@ import logging import traceback +from concurrent.futures.thread import ThreadPoolExecutor from multiprocessing.pool import ThreadPool import sentry_sdk @@ -21,6 +22,21 @@ class Command(BaseCommand): logging.exception('Exception while processing update', exc_info=ctx.error) dispatchers = [] + + def check_updates(dispatcher): + try: + updates = dispatcher.bot.get_updates(dispatcher.last_update_id) + except TimedOut: + return + except TelegramError as e: + sentry_sdk.capture_exception(e) + traceback.print_exc() + updates = [] + + for update in updates: + pool.apply_async(dispatcher.process_update, (update,)) + dispatcher.last_update_id = update.update_id + 1 + while True: try: if not dispatchers or cache.get('bots_reset'): @@ -35,19 +51,9 @@ class Command(BaseCommand): pass cache.delete('bots_reset') - for dispatcher in dispatchers: - try: - updates = dispatcher.bot.get_updates(dispatcher.last_update_id) - except TimedOut: - continue - except TelegramError as e: - sentry_sdk.capture_exception(e) - traceback.print_exc() - updates = [] - - for update in updates: - pool.apply_async(dispatcher.process_update, (update,)) - dispatcher.last_update_id = update.update_id + 1 + with ThreadPoolExecutor() as executor: + for dispatcher in dispatchers: + executor.submit(check_updates, dispatcher) except KeyboardInterrupt: pool.terminate() pool.join()