Как остановить цикл событий asyncio из дочернего потока?

#python #python-asyncio #python-multithreading #cryptofeed

Вопрос:

 from cryptofeed import FeedHandler
from cryptofeed.feed import Feed
from cryptofeed.defines import L2_BOOK, BID, ASK
from cryptofeed.exchange.ftx import FTX
from threading import Thread
from time import sleep


class Executor:
    def __init__(self,  feed: Feed, coin_symbol: str, fut_symbol: str):
        self.coin_symbol = coin_symbol
        self.fut_symbol = fut_symbol
        self.feed = feed

        self.fh = FeedHandler()
        self.loop = None

        self._coin_top_book: dict = {}
        self._fut_top_book: dict = {}

    async def _book_update(self, feed, symbol, book, timestamp, receipt_timestamp):
        if symbol == self.coin_symbol:
            self._coin_top_book[BID] = book[BID].peekitem(-1)
            self._coin_top_book[ASK] = book[ASK].peekitem(0)
        elif symbol == self.fut_symbol:
            self._fut_top_book[BID] = book[BID].peekitem(-1)
            self._fut_top_book[ASK] = book[ASK].peekitem(0)

    def start_feed(self):
        self.fh.add_feed(self.feed(symbols=[self.fut_symbol, self.coin_symbol], channels=[L2_BOOK],
                                   callbacks={L2_BOOK: self._book_update}))
        self.fh.run()

    def shoot(self):
        # give the orderbooks time to be populated
        while len(self._coin_top_book) == 0 or len(self._fut_top_book) == 0:
            sleep(1)

        for i in range(5):
            print(self._coin_top_book)
            sleep(1)  # do some stuff

        self.fh.stop()

    def run(self):
        th1 = Thread(target=self.shoot)
        th1.start()

        self.start_feed()


def main():
    g = Executor(feed=FTX, coin_symbol='SOL-USD', fut_symbol='SOL-PERP')
    g.run()


if __name__ == '__main__':
    main()
 

Поэтому в моей нынешней попытке остановить эту программу я звоню self.fh.stop() , когда внутри все закончено shoot() . Однако я получаю эту ошибку:

 Exception in thread Thread-1:
Traceback (most recent call last):
  File "/Users/mc/.pyenv/versions/3.9.1/lib/python3.9/threading.py", line 954, in _bootstrap_inner
    self.run()
  File "/Users/mc/.pyenv/versions/3.9.1/lib/python3.9/threading.py", line 892, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/mc/Library/Application Support/JetBrains/PyCharmCE2021.2/scratches/scratch_1.py", line 43, in shoot
    self.fh.stop()
  File "/Users/mc/.virtualenvs/crypto/lib/python3.9/site-packages/cryptofeed/feedhandler.py", line 175, in stop
    loop = asyncio.get_event_loop()
  File "/Users/mc/.pyenv/versions/3.9.1/lib/python3.9/asyncio/events.py", line 642, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'Thread-1'.
 

Предположительно, это связано с тем, что я пытаюсь получить доступ к циклу событий из дочернего потока, в то время как он существует только в родительском потоке. Однако я не знаю, как с этим правильно справиться.

Комментарии:

1. «FeedHandler.stop» принимает аргумент «цикл». Просто извлеките и сохраните цикл при создании объекта «FeedHandler», а затем используйте его для остановки.

2. @MichaelButscher Я действительно пытался это сделать, но это не сработало. Предположительно, я получаю цикл с помощью » asyncio.get_event_loop()` и сохраняю его. Но куда мне это позвонить?

3. В «start_feed», где вы вызываете «FeedHandler.run».

4. @MichaelButscher Это то, что я пробовал. Выдает мне эту ошибку: «Файл» /Users/mc/.virtualenvs/crypto/lib/python3.9/site-packages/cryptofeed/feedhandler.py», строка 193, в стоп-цикле.run_until_complete(asyncio.gather(*задачи shutdown_tasks)) Файл «uvloop/loop.pyx», строка 1488, в файле uvloop.loop.Loop.run_until_полный файл «uvloop/loop.pyx», строка 1481, в файле uvloop.loop.Loop.run_until_полный файл «uvloop/loop.pyx», строка 1370, в файле uvloop.loop.Loop.run_forever «uvloop/loop.pyx», строка 511, в файле uvloop.loop.Loop._run Ошибка выполнения: этот цикл событий уже запущен.`