#python #python-3.x #parallel-processing #multiprocessing #python-multiprocessing
#python #python-3.x #параллельная обработка #многопроцессорность #python -многопроцессорность
Вопрос:
Я пытаюсь одновременно точно запустить две функции Python на двух ядрах. Каждый процесс выполняет очень длинный цикл (теоретически бесконечный цикл). Важно, чтобы они оставались синхронизированными в одно и то же время, даже малейшая задержка может вызвать проблемы в долгосрочной перспективе.
Я думаю, моя проблема в том, что я запускаю их последовательно следующим образом
# define the processes and assign them functions
first_process = multiprocessing.Process(name='p1', target='first_function')
second_process = multiprocessing.Process(name='p2', target='second_function')
# start the processes
first_process.start()
second_process.start()
Я напечатал time.time()
в начале каждой функции, чтобы измерить разницу во времени. Результат стал:
first function time: 1553812298.9244068
second function time: 1553812298.9254067
разница составляет 0.0009999275207519531
секунды. Как упоминалось ранее, это различие окажет значительное влияние на долгосрочную перспективу.
Подводя итог, как запустить две функции на двух разных ядрах в точно одно и то же время? Если Python не способен на это, какие другие варианты я должен проверить?
Комментарии:
1. Не могли бы вы синхронизировать их с помощью модулей
time
илиdatetime
? Например, заставить их ждать начала выполнения до того же времени?2. Как вы гарантируете, что они работают на разных ядрах, пожалуйста? Сколько у вас ядер и на какой ОС вы работаете? Вы говорите, что они должны запускаться одновременно, затем вы говорите, что они должны оставаться синхронизированными в течение длительного времени — что, пожалуйста? Какая ошибка допустима? И, наконец, просто чтобы быть любопытным, как / почему возникает это требование?
3. Это не проблема Python, а аппаратная проблема, проблема планировщика ОС или более общая проблема параллельной обработки. Можете ли вы предоставить описание, почему эти процессы должны выполняться идеально параллельно? Потому что, вероятно, есть лучшее решение, которого вы пытаетесь достичь (что практически невозможно во многих ситуациях и по многим причинам)?
Ответ №1:
То, что вы просите, на самом деле не то, что должна предоставлять обычная ОС. У вас есть вмешательство в планирование операционной системы, миграции ядра, различные тактовые частоты через термодинамику процессора, различные попадания в кэш и промахи и так далее. Можно повысить приоритеты процессов и привязать процессы к определенным ядрам (для этого загляните в psutil), но вряд ли вы сможете увидеть стабильные улучшения от этого. Ваша ОС обычно выполняет работу лучше, чем вы могли бы здесь.
Для действительно жестких ограничений в реальном времени вам придется изучить RTOSes. Кроме того, вам пришлось бы выбрать язык среднего уровня (например, C / C ), который позволяет осуществлять мелкозернистое управление памятью (уменьшить дорогостоящие пропуски процессора в кэше). Возможно, вы просите о чем-то, что вам все равно следует сделать по-другому (проблема XY), поэтому, когда я продолжаю показывать вам, как получить некоторую синхронизацию, не понимайте это как одобрение всего вашего подхода к любой проблеме, которую вы действительно пытаетесь решить здесь.
Предпочтительным оружием здесь является multiprocessing.Barrier
. Это примитив синхронизации, который позволяет указать количество исполнителей (потоков / процессов), которые необходимо вызвать .wait()
в экземпляре барьера. Когда вызвано указанное количество исполнителей wait()
, барьер освобождает всех ожидающих исполнителей одновременно. Таким образом, все исполнители могут быть синхронизированы для каждой такой барьерной операции.
Обратите внимание, что одной такой операции недостаточно для того, о чем вы просите. Факторы операционной системы, о которых я упоминал ранее, всегда будут возвращать хаос, и время процессора снова будет отличаться от этой точки синхронизации. Это означает, что вам придется повторять синхронизацию через определенные интервалы снова и снова. Конечно, это будет стоить вам некоторой пропускной способности. Более короткие интервалы синхронизации означают меньшее расхождение в среднем.
Ниже вы видите две функции, реализующие этот метод. syncstart_foo
синхронизируется только один раз (как ответ @blhsing), sync_foo
делает это каждую sync_interval
итерацию. Когда все итерации выполнены, функции возвращаются time.time()
к родительской, где вычисляется временная дельта.
import time
from multiprocessing import Process, Barrier, Queue
def syncstart_foo(outqueue, barrier, n_iter):
barrier.wait() # synchronize only once at start
for _ in range(int(n_iter)):
pass # do stuff
outqueue.put(time.time())
def sync_foo(outqueue, barrier, n_iter, sync_interval):
for i in range(int(n_iter)):
if i % sync_interval == 0: # will sync first time for i==0
barrier.wait()
# do stuff
outqueue.put(time.time())
Вспомогательные функции для запуска бенчмарка:
def test_sync():
"""Run test for `sync_foo`."""
special_args = (SYNC_INTERVAL,)
_run_test(sync_foo, special_args)
def test_syncstart():
"""Run test for `syncstart_foo`."""
_run_test(syncstart_foo)
def _run_test(f, special_args=None):
outqueue = Queue()
barrier = Barrier(N_WORKERS)
args = (outqueue, barrier, N_ITER)
if special_args:
args = special_args
pool = [Process(target=f, args=args) for _ in range(N_WORKERS)]
print(f'starting test for {f.__name__}')
for p in pool:
p.start()
results = [outqueue.get() for _ in range(N_WORKERS)]
for p in pool:
p.join()
print(f"delta: {(abs(results[1] - results[0])) * 1e3:>{6}.{2}f} ms")
print("-" * 60)
Главная запись:
if __name__ == '__main__':
N_WORKERS = 2
N_ITER = 50e6 # 1e6 == 1M
SYNC_INTERVAL = 250_000 # synchronize every x iterations
for _ in range(5):
test_syncstart()
test_sync()
Пример вывода:
starting test for syncstart_foo
delta: 28.90 ms
------------------------------------------------------------
starting test for sync_foo
delta: 1.38 ms
------------------------------------------------------------
starting test for syncstart_foo
delta: 70.33 ms
------------------------------------------------------------
starting test for sync_foo
delta: 0.33 ms
------------------------------------------------------------
starting test for syncstart_foo
delta: 4.45 ms
------------------------------------------------------------
starting test for sync_foo
delta: 0.17 ms
------------------------------------------------------------
starting test for syncstart_foo
delta: 168.80 ms
------------------------------------------------------------
starting test for sync_foo
delta: 0.30 ms
------------------------------------------------------------
starting test for syncstart_foo
delta: 79.42 ms
------------------------------------------------------------
starting test for sync_foo
delta: 1.24 ms
------------------------------------------------------------
Process finished with exit code 0
Вы можете видеть, что одной синхронизации, как это делает syncstart_foo
, недостаточно.
Ответ №2:
Вы можете назначить multiprocessing.Queue
объект для каждого из процессов и при запуске функции для процесса поместить элемент в очередь для другого процесса с помощью multiprocessing.Queue.put
, а затем немедленно попытаться удалить из очереди его собственную очередь с помощью multiprocessing.Queue.get
. Поскольку multiprocessing.Queue.get
блокируется до тех пор, пока в очереди не появится элемент, это эффективно синхронизирует два процесса:
import multiprocessing
import time
def func(queue_self, queue_other):
queue_other.put(None)
queue_self.get()
print(time.time())
q1 = multiprocessing.Queue()
q2 = multiprocessing.Queue()
p1 = multiprocessing.Process(target=func, args=(q1, q2))
p2 = multiprocessing.Process(target=func, args=(q2, q1))
if __name__ == '__main__':
p1.start()
p2.start()
Пример вывода:
1553814412.7520192
1553814412.7520192
Комментарии:
1. Это гениально!