Как фиксировать ошибки в сопрограммах событий телемарафона?

#python #python-3.x #python-asyncio #telegram #telethon

#питон #python-3.x #python-asyncio #телеграмма #телемарафон

Вопрос:

Я использую следующий скрипт для прослушивания новых сообщений из общедоступных телеграмм-каналов и групп.

 import configparser from telethon.errors import SessionPasswordNeededError from telethon import TelegramClient, events, sync from telethon.tl.functions.messages import (GetHistoryRequest) from telethon.tl.types import ( PeerChannel )  api_id = 'xxxxxx' api_hash = 'xxxxxxxxxxxxxxxxxxxxxxx'  #target channels that you want to listen to: input_channels = ('https://t.me/xxxxxx','https://t.me/xxxx','https://t.me/xxxxxxx')  #create a client client = TelegramClient('anon', api_id, api_hash)  # Listen to messages from target channels  @client.on(events.NewMessage(chats=input_channels))  async def newMessageListener(event):  # Get message text   newMessage = event.message.message    print(newMessage)   with client:   client.run_until_disconnected()  

Когда канал закрыт, я получаю следующую ошибку: Ошибка значения: Ни у одного пользователя нет имени пользователя «closed_channel_name» в качестве имени пользователя, и я перестаю получать какие-либо данные.

Есть ли способ определить недопустимые каналы?

До сих пор я нашел следующее, что могло бы определить действительный канал, но, возможно, есть лучший способ:

 client.start() result = client.get_entity('https://t.me/xxxxxx')  

Ответ №1:

Следующее работает для обнаружения ошибок (внутри сопрограммы) на телемарафоне

 import asyncio from telethon import TelegramClient, events  session_name = 'anon' api_id = 'xxxxxx' api_hash = 'xxxxxxxxxxxxxxxxxxxxx'  chat_list = ('https://t.me/xxxxxxxx', 'https://t.me/xxxxxxxxxx')   async def main():  async with TelegramClient(session_name, api_id, api_hash) as client:  @client.on(events.NewMessage(chats=chat_list))  async def handler(event):  print(event.message.message)   await client.run_until_disconnected()   def custom_exception_handler(loop, context):  # first, handle with default handler  loop.default_exception_handler(context)   exception = context.get('exception')  if isinstance(exception, ValueError):  print(context['exception'])  loop.stop()   loop = asyncio.get_event_loop()  # Set custom handler loop.set_exception_handler(custom_exception_handler) loop.create_task(main()) loop.run_forever()