Python 3.7: asyncio.run_in_executor() возвращает ошибку времени выполнения «Нет текущей ошибки цикла событий»

#python #asynchronous #python-asyncio

#python #асинхронный #python-asyncio

Вопрос:

Я пытаюсь изучить asyncio и работаю над написанием класса для подключения к API Alpaca Market, чтобы я мог продолжить работу. Я хотел подключиться к двум разным частям API асинхронно, просто чтобы посмотреть, смогу ли я правильно использовать asyncio. Я использовал asyncio.run_in_executor(), потому что подключение не является ожидаемой функцией. Похоже, что программа запускает две сопрограммы асинхронно, но вторая сопрограмма завершается сбоем из-за отсутствия потока, хотя я установил для исполнителя по умолчанию значение ThreadPoolExecutor(2) . Ниже приведено то, что я получил до сих пор, плюс вывод на терминал.

 import os, sys, re
from typing import Union

import csv
import asyncio
import numpy as np
import pandas as pd
from dotenv import load_dotenv
import alpaca_trade_api as alpaca
from concurrent.futures import ThreadPoolExecutor
import time



class DataHouse:

    def __init__(self, env=None):
        self._env = None
        self._conn = None
        self._rest = None
        self._env_keys = ['key_id','secret_key','base_url','data_url','data_stream']
        self._api_keys = dict()


        if self._env is not None:
            load_dotenv(self._env)
            for key in self._env_keys:
                val = os.getenv(key).strip()
                if val is None:
                    raise ValueError(f"Missing {key} value in .env file")
                self._api_keys[key] = val

        async def consume_responses(*coro, loop):
            result = await asyncio.gather(*coro, loop=loop, return_exceptions=True)
            print(result)
        loop = asyncio.get_event_loop()
        loop.set_default_executor(ThreadPoolExecutor(2))
        coros = [self.rest(loop), self.conn(loop)]
        loop.run_until_complete(consume_responses(*coros, loop=loop))

    def get_rest(self):
        return self._rest
    def get_conn(self):
        return self._conn

    async def rest(self, loop, executor=None):
        return await loop.run_in_executor(executor, self.rest_connect)
    def rest_connect(self):
        print("REST BEGIN")
        time.sleep(3)
        print("REST END")
        return alpaca.rest.REST(**{key: self._api_keys[key] for key in self._env_keys[:2]}, api_version='v2')

    async def conn(self, loop, executor=None):
        return await loop.run_in_executor(executor, self.conn_connect)
    def conn_connect(self):
        print("CONN BEGIN")
        time.sleep(3)
        print("CONN END")
        return alpaca.stream2.StreamConn(**self._api_keys)


    @property
    def env(self):
        return self._env
    @env.setter
    def env(self, env):
        if isinstance(env, str) and re.search(r'.env

Затем я тестирую его, запустив

 import DataHouse

test = DataHouse('paper_env.env')
 

И результат, который я получаю, таков

 REST BEGIN
CONN BEGIN
CONN CONNECTED
REST CONNECTED
[<alpaca_trade_api.rest.REST object at 0x000002A133DFD668>, RuntimeError("There is no current event loop in thread 'ThreadPoolExecutor-0_1'.")]
 

Операторы печати выполнялись асинхронно, так что это хорошо, и я смог получить первое соединение с API, но результатом второго соединения является ошибка времени выполнения, и я не могу понять, почему или как это исправить.

Любая помощь приветствуется.


, env) is not None:
self._env = env
else:
raise ValueError("the DataHouse env variable must be set to the *.env string to the file containing API information. env = str(path_to_env.env)")
Затем я тестирую его, запустив


И результат, который я получаю, таков


Операторы печати выполнялись асинхронно, так что это хорошо, и я смог получить первое соединение с API, но результатом второго соединения является ошибка времени выполнения, и я не могу понять, почему или как это исправить.

Любая помощь приветствуется.