Реализация сопрограммы на python

#python #c #python-asyncio

#python #c #python-asyncio

Вопрос:

Допустим, у меня есть функция C result_type compute(input_type input) , которую я сделал доступной для python с помощью cython. Мой код на python выполняет несколько вычислений, подобных этому:

 
def compute_total_result()
  inputs = ...
  total_result = ...

  for input in inputs:
    result = compute_python_wrapper(input)
    update_total_result(total_result)

  return total_result
  

Поскольку вычисления занимают много времени, я внедрил пул потоков C (например, этот) и написал функцию std::future<result_type> compute_threaded(input_type input) , которая возвращает будущее, которое становится готовым, как только пул потоков будет выполнен. выполнение.

Что я хотел бы сделать, так это использовать эту функцию C и в python. Простым способом сделать это было бы обернуть std::future<result_type> , включая его get() функцию, дождаться всех результатов, подобных этому:

 
def compute_total_results_parallel()
  inputs = ...
  total_result = ...
  futures = []
  for input in inputs:
    futures.append(compute_threaded_python_wrapper(input))

  for future in futures:
    update_total_result(future.get())
  
  return total_result
  

Я полагаю, что в этом случае это работает достаточно хорошо, но очень быстро становится очень сложным, потому что мне приходится передавать фьючерсы.

Однако я думаю, что концептуально ожидание этих результатов C ничем не отличается от ожидания файлового или сетевого ввода-вывода.

Для облегчения операций ввода-вывода разработчики python ввели async await ключевые слова / . Если бы мой compute_threaded_python_wrapper был частью asyncio , я мог бы просто переписать его как

 
async def compute_total_results_async()
  inputs = ...
  total_result = ...
  for input in inputs:
    result = await compute_threaded_python_wrapper(input)
    update_total_result(total_result)

  return total_result
  

И я мог бы выполнить весь код через result = asyncio.run(compute_total_results_async()) .

Существует множество руководств по асинхронному программированию на python, но большинство из них касаются использования сопрограмм, где основой, по-видимому, является некоторый вызов asyncio пакета, в основном вызываемый asyncio.sleep(delay) как прокси для ввода-вывода.

Мой вопрос: (Как) я могу реализовать сопрограммы в python, включив python в await обернутый будущий объект (есть некоторые упоминания о __await__ методе, возвращающем итератор)?

Ответ №1:

Во-первых, необходимо исправить неточность в вопросе:

Если бы мой compute_threaded_python_wrapper был частью asyncio, я мог бы просто переписать его как […]

Перезапись неверна: await означает «дождитесь завершения вычисления», поэтому цикл, как написано, будет выполнять код последовательно. Перезапись, которая фактически выполняет задачи параллельно, будет выглядеть примерно так:

 # a direct translation of the "parallel" version
def compute_total_results_async()
    inputs = ...
    total_result = ...
    tasks = []
    # first spawn all the tasks
    for input in inputs:
        tasks.append(
            asyncio.create_task(compute_threaded_python_wrapper(input))
        )
    # and then await them
    for task in tasks:
        update_total_result(await task)
    return total_result
  

Этот шаблон spawn-all-await-all настолько уникален, что asyncio предоставляет вспомогательную функцию, asyncio.gather() , что делает его намного короче, особенно в сочетании с пониманием списка:

 # a more idiomatic version
def compute_total_results_async()
    inputs = ...
    total_result = ...
    results = await asyncio.gather(
        *[compute_threaded_python_wrapper(input) for input in inputs]
    )
    for result in results:
        update_total_result(result)
    return total_result
  

С этим покончено, мы можем перейти к главному вопросу:

Мой вопрос: (Как) я могу реализовать сопрограммы в python, позволяя python ожидать обернутого объекта future (есть некоторые упоминания о __await__ методе, возвращающем итератор)?

Да, ожидаемые объекты реализуются с использованием итераторов, которые дают указание на приостановку. Но это слишком низкоуровневый инструмент для того, что вам действительно нужно. Вам нужен не просто ожидаемый, а тот, который работает с циклом событий asyncio, который имеет определенные ожидания от базового итератора. Вам нужен механизм для возобновления ожидания, когда результат будет готов, где вы снова зависите от asyncio.

Asyncio уже предоставляет ожидаемые объекты, которым можно присвоить значение извне: futures. Будущее asyncio представляет асинхронное значение, которое станет доступным в какой-то момент в будущем. Они связаны, но не семантически эквивалентны фьючерсам C , и их не следует путать с многопоточными фьючерсами из модуля concurrent.futures stdlib.

Чтобы создать ожидаемый объект, который активируется чем-то, что происходит в другом потоке, вам нужно создать future, а затем запустить свою задачу вне потока, указав ей пометить future как завершенную, когда она завершит выполнение. Поскольку фьючерсы asyncio не являются потокобезопасными, это должно быть сделано с использованием call_soon_threadsafe метода цикла событий, предоставляемого asyncio для таких ситуаций. В Python это было бы сделано следующим образом:

 def run_async():
    loop = asyncio.get_event_loop()
    future = loop.create_future()
    def on_done(result):
        # when done, notify the future in a thread-safe manner
        loop.call_soon_threadsafe(future.set_result, resut)
    # start the worker in a thread owned by the pool
    pool.submit(_worker, on_done)
    # returning a future makes run_async() awaitable, and
    # passable to asyncio.gather() etc.
    return future

def _worker(on_done):
    # this runs in a different thread
    ... processing goes here ...
    result = ...
    on_done(result)
  

В вашем случае рабочий, предположительно, будет реализован в Cython в сочетании с C .

Комментарии:

1. Я ценю ваш ответ по этому вопросу, конечно, я не должен ждать отдельных результатов. Я попытался настроить MWE в соответствии с вашими предложениями. Проблема в том, что, насколько я вас понимаю, задача C , назначенная пулу потоков, должна перезваниваться в python, чтобы задать будущее, правильно? Я вижу Fatal Python error: PyThreadState_Get: no current thread , когда пытаюсь…

2. @hfhc2 Правильно. Вам также необходимо получить GIL, используя PyGILState_Ensure , прежде чем что-либо делать с python из потока, отличного от Python (и выпустить его PyGILState_Release впоследствии), подробности см. В документации . Если вы используете оболочку C для API Pythnon / C, она, вероятно, также предлагает защиту в стиле RAII для этого.

3. @hfhc2 конечно, я не должен ждать отдельных результатов — Извиняюсь, если я указал на то, что вы уже знаете. Многие новички в asyncio, особенно те, кто ранее не сталкивался с async / await с других языков, ожидают, что await это автоматически распараллелит их код, и неприятно удивлены, узнав, что он делает почти прямо противоположное .

4. Спасибо за вашу помощь, я добавил рабочий пример здесь: github.com/hfhc2/native-future-example