66 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			66 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| import traceback
 | |
| from concurrent.futures.thread import ThreadPoolExecutor
 | |
| from multiprocessing.pool import ThreadPool
 | |
| 
 | |
| import sentry_sdk
 | |
| from django.core.cache import cache
 | |
| from django.core.management import BaseCommand
 | |
| from telegram import TelegramError, Update
 | |
| from telegram.error import TimedOut
 | |
| from telegram.ext import CallbackContext
 | |
| 
 | |
| from bots.models import TelegramBot
 | |
| 
 | |
| 
 | |
| class Command(BaseCommand):
 | |
|     def handle(self, *args, **options):
 | |
|         pool = ThreadPool(8)
 | |
| 
 | |
|         def error_handler(update: Update, ctx: CallbackContext):
 | |
|             sentry_sdk.capture_exception(ctx.error)
 | |
|             logging.exception('Exception while processing update', exc_info=ctx.error)
 | |
| 
 | |
|         initialized = False
 | |
|         dispatchers = []
 | |
| 
 | |
|         def check_updates(dispatcher):
 | |
|             try:
 | |
|                 updates = dispatcher.bot.get_updates(dispatcher.last_update_id)
 | |
|             except TimedOut:
 | |
|                 return
 | |
|             except TelegramError as e:
 | |
|                 sentry_sdk.capture_exception(e)
 | |
|                 traceback.print_exc()
 | |
|                 updates = []
 | |
| 
 | |
|             for update in updates:
 | |
|                 pool.apply_async(dispatcher.process_update, (update,))
 | |
|                 dispatcher.last_update_id = update.update_id + 1
 | |
| 
 | |
|         while True:
 | |
|             try:
 | |
|                 if not initialized or cache.get('bots_reset'):
 | |
|                     logging.warning('Reloading dispatchers')
 | |
|                     dispatchers = []
 | |
|                     for bot in TelegramBot.objects.filter(active=True):
 | |
|                         try:
 | |
|                             dispatcher = bot.build_dispatcher(error_handler)
 | |
|                             dispatcher.last_update_id = 0
 | |
|                             dispatchers.append(dispatcher)
 | |
|                         except TelegramError:
 | |
|                             pass
 | |
|                     cache.delete('bots_reset')
 | |
|                     initialized = True
 | |
| 
 | |
|                 with ThreadPoolExecutor() as executor:
 | |
|                     for dispatcher in dispatchers:
 | |
|                         executor.submit(check_updates, dispatcher)
 | |
|             except KeyboardInterrupt:
 | |
|                 pool.terminate()
 | |
|                 pool.join()
 | |
|                 return
 | |
|             except Exception as e:
 | |
|                 sentry_sdk.capture_exception(e)
 | |
|                 traceback.print_exc()
 |