#python #asynchronous #python-asyncio
#python #асинхронный #python-asyncio
Вопрос:
Я пытаюсь изучить asyncio и работаю над написанием класса для подключения к API Alpaca Market, чтобы я мог продолжить работу. Я хотел подключиться к двум разным частям API асинхронно, просто чтобы посмотреть, смогу ли я правильно использовать asyncio. Я использовал asyncio.run_in_executor(), потому что подключение не является ожидаемой функцией. Похоже, что программа запускает две сопрограммы асинхронно, но вторая сопрограмма завершается сбоем из-за отсутствия потока, хотя я установил для исполнителя по умолчанию значение ThreadPoolExecutor(2) . Ниже приведено то, что я получил до сих пор, плюс вывод на терминал.
import os, sys, re
from typing import Union
import csv
import asyncio
import numpy as np
import pandas as pd
from dotenv import load_dotenv
import alpaca_trade_api as alpaca
from concurrent.futures import ThreadPoolExecutor
import time
class DataHouse:
def __init__(self, env=None):
self._env = None
self._conn = None
self._rest = None
self._env_keys = ['key_id','secret_key','base_url','data_url','data_stream']
self._api_keys = dict()
if self._env is not None:
load_dotenv(self._env)
for key in self._env_keys:
val = os.getenv(key).strip()
if val is None:
raise ValueError(f"Missing {key} value in .env file")
self._api_keys[key] = val
async def consume_responses(*coro, loop):
result = await asyncio.gather(*coro, loop=loop, return_exceptions=True)
print(result)
loop = asyncio.get_event_loop()
loop.set_default_executor(ThreadPoolExecutor(2))
coros = [self.rest(loop), self.conn(loop)]
loop.run_until_complete(consume_responses(*coros, loop=loop))
def get_rest(self):
return self._rest
def get_conn(self):
return self._conn
async def rest(self, loop, executor=None):
return await loop.run_in_executor(executor, self.rest_connect)
def rest_connect(self):
print("REST BEGIN")
time.sleep(3)
print("REST END")
return alpaca.rest.REST(**{key: self._api_keys[key] for key in self._env_keys[:2]}, api_version='v2')
async def conn(self, loop, executor=None):
return await loop.run_in_executor(executor, self.conn_connect)
def conn_connect(self):
print("CONN BEGIN")
time.sleep(3)
print("CONN END")
return alpaca.stream2.StreamConn(**self._api_keys)
@property
def env(self):
return self._env
@env.setter
def env(self, env):
if isinstance(env, str) and re.search(r'.env
Затем я тестирую его, запустив
import DataHouse
test = DataHouse('paper_env.env')
И результат, который я получаю, таков
REST BEGIN
CONN BEGIN
CONN CONNECTED
REST CONNECTED
[<alpaca_trade_api.rest.REST object at 0x000002A133DFD668>, RuntimeError("There is no current event loop in thread 'ThreadPoolExecutor-0_1'.")]
Операторы печати выполнялись асинхронно, так что это хорошо, и я смог получить первое соединение с API, но результатом второго соединения является ошибка времени выполнения, и я не могу понять, почему или как это исправить.
Любая помощь приветствуется.
Затем я тестирую его, запустив
, env) is not None:
self._env = env
else:
raise ValueError("the DataHouse env variable must be set to the *.env string to the file containing API information. env = str(path_to_env.env)")
И результат, который я получаю, таков
Операторы печати выполнялись асинхронно, так что это хорошо, и я смог получить первое соединение с API, но результатом второго соединения является ошибка времени выполнения, и я не могу понять, почему или как это исправить.
Любая помощь приветствуется.