Многопоточность и асинхронное выполнение с ib_insync

#python #multithreading #python-asyncio #coroutine #interactive-brokers

#python #многопоточность #python-asyncio #сопрограмма #interactive-брокеры

Вопрос:

Я сталкиваюсь с непростой проблемой при реализации многопоточности с помощью ib_insync. Моих знаний как о многопоточном, так и об асинхронном программировании, безусловно, недостаточно для решения этой проблемы. Интересно, что препятствие связано только с командой ib.sleep, а другие функции, похоже, работают просто отлично.

Ниже приведены два сценария — один без многопоточности и один с. Без многопоточности все работает нормально, но когда для параллельной работы используется многопоточность, я получаю следующее предупреждение, и код останавливается.

 RuntimeWarning: coroutine 'sleep' was never awaited
  

Чего мне не хватает? Я понимаю, что ib.sleep — это сопрограмма, и ее нужно ожидать в асинхронной функции, но я не понимаю, как она работает при прямом вызове, но сбой происходит только в потоке.
Каков наилучший способ реализовать это правильно?

 from ib_insync import * 
from concurrent.futures import ThreadPoolExecutor 
import random 
import nest_asyncio 
nest_asyncio.apply()
    
# Define IB object 
ib = IB()
    
# Connect to IB 
ib.connect('127.0.0.1', 7497, clientId=131)
    
# Define sample function with ib.sleep 
def sample_function(sleep_time):
    print('Sleeping now') 
    ib.sleep(sleep_time)
    print('Sleep completed!')
    
# Call function directly - works fine
sample_function()
    
# Run multiple threads and call function within each thread - RunTimeWarning and stops
executor = ThreadPoolExecutor(max_workers=2) 
executor.submit(sample_function, random.randint(1,20)) 
executor.submit(sample_function, random.randint(1,20))
  

Редактировать: вариант использования, который я пытаюсь решить, заключается в параллельном вызове функции с разными параметрами. Отредактировал приведенный выше пример кода соответствующим образом. Проблема здесь в том, что ib_insync — это асинхронная реализация, основанная на asyncio, поэтому запуск нескольких потоков завершается неудачно.

Комментарии:

1. Я очень сомневаюсь, что он «отлично работает» при вызове из основного потока. Существует ли какая-либо фактическая задержка между двумя «спящими» сообщениями? Если нет, то это не работает ни в том, ни в другом случае, просто случается, что в основном потоке предупреждение не отображается. Предупреждение о времени выполнения для не ожидаемых сопрограмм предоставляется на основе максимальных усилий, и нет никакой гарантии, что оно будет отправлено при любых обстоятельствах. Если вы не знаете, что делаете, вам не следует смешивать потоки и асинхронность. Если вы опишите, чего вы пытались достичь с помощью потоков, мы могли бы помочь вам сделать то же самое с помощью asyncio.

2. Я в основном пытаюсь параллельно вызвать функцию с разными параметрами. Для функции есть только один параметр, который является тикером компании, поэтому идея заключается в том, что каждый поток предназначен для одного тикера компании. Пример, которым я поделился в описании, можно рассматривать как прокси, где я хочу запускать функцию ожидания несколько раз параллельно с разным временем ожидания… Буду признателен за информацию о том, как реализовать это только с помощью asyncio.

3. Вы можете использовать await asyncio.gather(coro1(param), coro2(param), ...) для параллельного запуска нескольких сопрограмм. Пожалуйста, отредактируйте вопрос, чтобы включить информацию о вашем варианте использования.

4. Только что сделал, спасибо! Если вы хотите добавить ответ, я буду рад принять.

5. Приятно слышать, теперь я отправил ответ.

Ответ №1:

Функция не работает нормально в любом случае, просто случается, что в основном потоке предупреждение не отображается. RuntimeWarning на не ожидаемых сопрограммах предоставляется на основе наилучших усилий без гарантии, что оно будет отправлено при любых обстоятельствах.

Чтобы устранить проблему, вам следует избегать смешивания потоков и asyncio в первую очередь. Например, если вам нужно запустить несколько сопрограмм параллельно, вы можете создавать их как задачи, используя asyncio.crate_task , а затем ожидать некоторые или все из них. Или вы можете использовать asyncio.gather служебную функцию, которая сделает это за вас:

 result1, result2, ... = await asyncio.gather(coro1(), coro2(), ...)