Почему кривая моего анализа теста перестановки не гладкая?

#python #numpy #random #multiprocessing #permutation

#python #numpy #Случайный #многопроцессорность #перестановка

Вопрос:

Я использую тест перестановки (извлекая случайные подвыборки), чтобы проверить разницу между двумя экспериментами. Каждый эксперимент был проведен 100 раз (= 100 копий каждого). Каждая реплика состоит из 801 точки измерения с течением времени. Теперь я хотел бы выполнить своего рода перестановку (или привязку загрузки), чтобы проверить, сколько реплик приходится на эксперимент (и сколько (временных) точек измерения) Мне нужно получить определенный уровень надежности.

Для этой цели я написал код, из которого я извлек минимальный рабочий пример (с большим количеством жестко закодированных элементов) (пожалуйста, смотрите Ниже). Входные данные генерируются в виде случайных чисел. Здесь np.random.rand(100, 801) для 100 реплик и 801 временных точек.

Этот код работает в принципе, однако полученные кривые иногда падают не так плавно, как можно было бы ожидать при выборе случайных подвыборок 5000 раз. Вот вывод кода ниже:

Результат приведенного ниже кода

Можно видеть, что на 2 оси x есть пик, которого там не должно быть. Если я изменю случайное начальное значение с 52389 на 324235, оно исчезнет, и кривая станет гладкой. Кажется, что-то не так с тем, как выбираются случайные числа?

Почему это так? У меня есть семантически похожий код в Matlab, и там кривые полностью гладкие уже при 1000 перестановках (здесь 5000).

У меня ошибка кодирования или генератор случайных чисел numpy не подходит?

Кто-нибудь видит здесь проблему?

 import matplotlib.pyplot as plt
import numpy as np
from multiprocessing import current_process, cpu_count, Process, Queue
import matplotlib.pylab as pl


def groupDiffsInParallel (queue, d1, d2, nrOfReplicas, nrOfPermuts, timesOfInterestFramesIter):

    allResults = np.zeros([nrOfReplicas, nrOfPermuts])  # e.g. 100 x 3000
    for repsPerGroupIdx in range(1, nrOfReplicas   1):
        for permutIdx in range(nrOfPermuts):
            d1TimeCut = d1[:, 0:int(timesOfInterestFramesIter)]
            d1Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d1Sel = d1TimeCut[d1Idxs, :]
            d1Mean = np.mean(d1Sel.flatten())

            d2TimeCut = d2[:, 0:int(timesOfInterestFramesIter)]
            d2Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d2Sel = d2TimeCut[d2Idxs, :]
            d2Mean = np.mean(d2Sel.flatten())

            diff = d1Mean - d2Mean

            allResults[repsPerGroupIdx - 1, permutIdx] = np.abs(diff)

    queue.put(allResults)



def evalDifferences_parallel (d1, d2):
    # d1 and d2 are of size reps x time (e.g. 100x801)

    nrOfReplicas = d1.shape[0]
    nrOfFrames = d1.shape[1]
    timesOfInterestNs = [0.25, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]  # 17
    nrOfTimesOfInterest = len(timesOfInterestNs)
    framesPerNs = (nrOfFrames-1)/100  # sim time == 100 ns
    timesOfInterestFrames = [x*framesPerNs for x in timesOfInterestNs]

    nrOfPermuts = 5000

    allResults = np.zeros([nrOfTimesOfInterest, nrOfReplicas, nrOfPermuts]) # e.g. 17 x 100 x 3000
    nrOfProcesses = cpu_count()
    print('{} cores available'.format(nrOfProcesses))
    queue = Queue()
    jobs = []
    print('Starting ...')

    # use one process for each time cut
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter in enumerate(timesOfInterestFrames):
        p = Process(target=groupDiffsInParallel, args=(queue, d1, d2, nrOfReplicas, nrOfPermuts, timesOfInterestFramesIter))
        p.start()
        jobs.append(p)
        print('Process {} started work on time "{} ns"'.format(timesOfInterestFramesIterIdx, timesOfInterestNs[timesOfInterestFramesIterIdx]), end='n', flush=True)
    # collect the results
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter in enumerate(timesOfInterestFrames):
        oneResult = queue.get()
        allResults[timesOfInterestFramesIterIdx, :, :] = oneResult
        print('Process number {} returned the results.'.format(timesOfInterestFramesIterIdx), end='n', flush=True)
    # hold main thread and wait for the child process to complete. then join back the resources in the main thread
    for proc in jobs:
        proc.join()
    print("All parallel done.")

    allResultsMeanOverPermuts = allResults.mean(axis=2)  # size: 17 x 100

    replicaNumbersToPlot = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100])
    replicaNumbersToPlot -= 1  # zero index!
    colors = pl.cm.jet(np.linspace(0, 1, len(replicaNumbersToPlot)))

    ctr = 0

    f, ax = plt.subplots(nrows=2, ncols=2, figsize=(12, 12))
    axId = (1, 0)
    for lineIdx in replicaNumbersToPlot:
        lineData = allResultsMeanOverPermuts[:, lineIdx]
        ax[axId].plot(lineData, ".-", color=colors[ctr], linewidth=0.5, label="nReps=" str(lineIdx 1))
        ctr =1

    ax[axId].set_xticks(range(nrOfTimesOfInterest))  # careful: this is not the same as plt.xticks!!
    ax[axId].set_xticklabels(timesOfInterestNs)
    ax[axId].set_xlabel("simulation length taken into account")
    ax[axId].set_ylabel("average difference between mean values boot strapping samples")
    ax[axId].set_xlim([ax[axId].get_xlim()[0], ax[axId].get_xlim()[1] 1])  # increase x max by 2

    plt.show()


##### MAIN ####
np.random.seed(83737)  # some number for reproducibility
d1 = np.random.rand(100, 801)
d2 = np.random.rand(100, 801)

np.random.seed(52389)  # if changed to 324235 the peak is gone
evalDifferences_parallel(d1, d2)
  

————- ОБНОВИТЬ —————

Изменение генератора случайных чисел с numpy на «from random import randint» не устраняет проблему:

От:

 d1Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
d2Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
  

Для:

 d1Idxs = [randint(0, nrOfReplicas-1) for p in range(repsPerGroupIdx)]
d2Idxs = [randint(0, nrOfReplicas-1) for p in range(repsPerGroupIdx)]
  

— ОБНОВЛЕНИЕ 2 —

timesOfInterestNs можно просто установить в:

 timesOfInterestNs = [0.25, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 40, 50]
  

чтобы ускорить его на машинах с меньшим количеством ядер.

— ОБНОВЛЕНИЕ 3 —

Повторная инициализация генератора случайных начальных данных в каждом дочернем процессе (случайное начальное значение — это репликация между дочерними процессами) также не устраняет проблему:

 pid = str(current_process())
pid = int(re.split("(W)", pid)[6])
ms = int(round(time.time() * 1000))
mySeed = np.mod(ms, 4294967295)
mySeed = mySeed   25000 * pid   100 * pid   pid
mySeed = np.mod(mySeed, 4294967295)
np.random.seed(seed=mySeed)
  

— ОБНОВЛЕНИЕ 4 —
На компьютере с Windows вам понадобится:

 if __name__ == '__main__':
  

чтобы избежать рекурсивного создания подпроцессов (и сбоя).

Ответ №1:

Я думаю, это классическая ошибка многопроцессорной обработки. Ничто не гарантирует, что процессы завершатся в том же порядке, в каком они были запущены. Это означает, что вы не можете быть уверены, что инструкция allResults[timesOfInterestFramesIterIdx, :, :] = oneResult сохранит результат процесса timesOfInterestFramesIterIdx в местоположении timesOfInterestFramesIterIdx в allResults . Чтобы было понятнее, допустим, timesOfInterestFramesIterIdx равно 2, тогда у вас нет абсолютно никакой гарантии, что oneResult это результат процесса 2.

Я реализовал очень быстрое исправление ниже. Идея состоит в том, чтобы отслеживать порядок, в котором были запущены процессы, путем добавления дополнительного аргумента к groupDiffsInParallel , который затем сохраняется в очереди и, таким образом, служит идентификатором процесса при сборе результатов.

 import matplotlib.pyplot as plt
import numpy as np
from multiprocessing import cpu_count, Process, Queue
import matplotlib.pylab as pl


def groupDiffsInParallel(queue, d1, d2, nrOfReplicas, nrOfPermuts,
                         timesOfInterestFramesIter,
                         timesOfInterestFramesIterIdx):

    allResults = np.zeros([nrOfReplicas, nrOfPermuts])  # e.g. 100 x 3000
    for repsPerGroupIdx in range(1, nrOfReplicas   1):
        for permutIdx in range(nrOfPermuts):
            d1TimeCut = d1[:, 0:int(timesOfInterestFramesIter)]
            d1Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d1Sel = d1TimeCut[d1Idxs, :]
            d1Mean = np.mean(d1Sel.flatten())

            d2TimeCut = d2[:, 0:int(timesOfInterestFramesIter)]
            d2Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d2Sel = d2TimeCut[d2Idxs, :]
            d2Mean = np.mean(d2Sel.flatten())

            diff = d1Mean - d2Mean

            allResults[repsPerGroupIdx - 1, permutIdx] = np.abs(diff)

    queue.put({'allResults': allResults,
               'number': timesOfInterestFramesIterIdx})


def evalDifferences_parallel (d1, d2):
    # d1 and d2 are of size reps x time (e.g. 100x801)

    nrOfReplicas = d1.shape[0]
    nrOfFrames = d1.shape[1]
    timesOfInterestNs = [0.25, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 40, 50, 60, 70,
                         80, 90, 100]  # 17
    nrOfTimesOfInterest = len(timesOfInterestNs)
    framesPerNs = (nrOfFrames-1)/100  # sim time == 100 ns
    timesOfInterestFrames = [x*framesPerNs for x in timesOfInterestNs]

    nrOfPermuts = 5000

    allResults = np.zeros([nrOfTimesOfInterest, nrOfReplicas,
                           nrOfPermuts])  # e.g. 17 x 100 x 3000
    nrOfProcesses = cpu_count()
    print('{} cores available'.format(nrOfProcesses))
    queue = Queue()
    jobs = []
    print('Starting ...')

    # use one process for each time cut
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter 
            in enumerate(timesOfInterestFrames):
        p = Process(target=groupDiffsInParallel,
                    args=(queue, d1, d2, nrOfReplicas, nrOfPermuts,
                          timesOfInterestFramesIter,
                          timesOfInterestFramesIterIdx))
        p.start()
        jobs.append(p)
        print('Process {} started work on time "{} ns"'.format(
            timesOfInterestFramesIterIdx,
            timesOfInterestNs[timesOfInterestFramesIterIdx]),
              end='n', flush=True)
    # collect the results
    resultdict = {}
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter 
            in enumerate(timesOfInterestFrames):
        resultdict.update(queue.get())
        allResults[resultdict['number'], :, :] = resultdict['allResults']
        print('Process number {} returned the results.'.format(
            resultdict['number']), end='n', flush=True)
    # hold main thread and wait for the child process to complete. then join
    # back the resources in the main thread
    for proc in jobs:
        proc.join()
    print("All parallel done.")

    allResultsMeanOverPermuts = allResults.mean(axis=2)  # size: 17 x 100

    replicaNumbersToPlot = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40,
                                     50, 60, 70, 80, 90, 100])
    replicaNumbersToPlot -= 1  # zero index!
    colors = pl.cm.jet(np.linspace(0, 1, len(replicaNumbersToPlot)))

    ctr = 0

    f, ax = plt.subplots(nrows=2, ncols=2, figsize=(12, 12))
    axId = (1, 0)
    for lineIdx in replicaNumbersToPlot:
        lineData = allResultsMeanOverPermuts[:, lineIdx]
        ax[axId].plot(lineData, ".-", color=colors[ctr], linewidth=0.5,
                      label="nReps=" str(lineIdx 1))
        ctr  = 1

    ax[axId].set_xticks(range(nrOfTimesOfInterest))
    # careful: this is not the same as plt.xticks!!
    ax[axId].set_xticklabels(timesOfInterestNs)
    ax[axId].set_xlabel("simulation length taken into account")
    ax[axId].set_ylabel("average difference between mean values boot "
                          "strapping samples")
    ax[axId].set_xlim([ax[axId].get_xlim()[0], ax[axId].get_xlim()[1] 1])
    # increase x max by 2

    plt.show()


# #### MAIN ####
np.random.seed(83737)  # some number for reproducibility
d1 = np.random.rand(100, 801)
d2 = np.random.rand(100, 801)

np.random.seed(52389)  # if changed to 324235 the peak is gone
evalDifferences_parallel(d1, d2)
  

Это результат, который я получаю, который, очевидно, показывает, что порядок, в котором возвращаются процессы, перетасован по сравнению с начальным порядком.

 20 cores available
Starting ...
Process 0 started work on time "0.25 ns"
Process 1 started work on time "0.5 ns"
Process 2 started work on time "1 ns"
Process 3 started work on time "2 ns"
Process 4 started work on time "3 ns"
Process 5 started work on time "4 ns"
Process 6 started work on time "5 ns"
Process 7 started work on time "10 ns"
Process 8 started work on time "20 ns"
Process 9 started work on time "30 ns"
Process 10 started work on time "40 ns"
Process 11 started work on time "50 ns"
Process 12 started work on time "60 ns"
Process 13 started work on time "70 ns"
Process 14 started work on time "80 ns"
Process 15 started work on time "90 ns"
Process 16 started work on time "100 ns"
Process number 3 returned the results.
Process number 0 returned the results.
Process number 4 returned the results.
Process number 7 returned the results.
Process number 1 returned the results.
Process number 2 returned the results.
Process number 5 returned the results.
Process number 8 returned the results.
Process number 6 returned the results.
Process number 9 returned the results.
Process number 10 returned the results.
Process number 11 returned the results.
Process number 12 returned the results.
Process number 13 returned the results.
Process number 14 returned the results.
Process number 15 returned the results.
Process number 16 returned the results.
All parallel done.
  

И полученная фигура.

Правильный вывод

Комментарии:

1. Это многое объясняет и имеет полный смысл: на моем графике выше значения во время 1 и во время 2 просто переключены, иначе кривая была бы гладкой (к сожалению, я не видел этого до вашего исправления). Просто странно, что Windows сохраняет порядок процессов, в то время как Linux этого не делает. Теперь работает как по маслу!

2. Рад, что смог помочь, многопроцессорная обработка всегда требует дополнительной осторожности, и очень легко попасть в ловушку. 🙂

3. @Patol75 Я вижу, ты налетел на эту награду, отличное решение! Я был на шаг позади тебя, лол!

Ответ №2:

не уверен, что вы все еще зациклены на этой проблеме, но я только что запустил ваш код на своем компьютере (MacBook Pro (15-дюймовый, 2018)) в Jupyter 4.4.0 , и мои графики гладкие с точно такими же начальными значениями, которые вы изначально опубликовали:

 ##### MAIN ####
np.random.seed(83737)  # some number for reproducibility
d1 = np.random.rand(100, 801)
d2 = np.random.rand(100, 801)

np.random.seed(52389)  # if changed to 324235 the peak is gone
evalDifferences_parallel(d1, d2) 
  

График

Возможно, в вашем коде нет ничего плохого и ничего особенного в исходном коде 324235, и вам просто нужно дважды проверить версии ваших модулей, поскольку любые изменения в исходном коде, внесенные в более поздние версии, могут повлиять на ваши результаты. Для справки я использую numpy 1.15.4 , matplotlib 3.0.2 и multiprocessing 2.6.2.1 .

Комментарии:

1. Хорошая мысль! Сегодня банковская ярмарка (1 мая), поэтому я не могу проверить версии пакетов на моем компьютере отдела (Fedora), но я только что запустил код дома на своем ноутбуке с Windows 7 (numpy 1.12.1, matplotlib 2.0.0, многопроцессорная обработка, похоже, встроена и не выдает версию) с 5 разными начальными значениями (1234, 4321, 111222333, 235462 и 8857345), и для всех них кривая равна идеально гладкий! Итак, я предполагаю, что это проблема либо версии numpy, либо операционной системы компьютера моего отдела.

2. @господи, я бы не исключал проблему с кодом; она может основываться на определенных предположениях, которые не гарантируются библиотеками (на разных платформах). У меня есть пики в Ubuntu 16.04 с python 3.5.2 и numpy 1.15.4.

3. Хм, есть идеи, какое предположение?

4. Только что попробовал это на компьютере Fedora. У него была numpy версия 1.12.0, и я обновил ее до 1.16.3, но проблема остается той же. Похоже, это зависит от операционной системы, а не от версии numpy…

5. @господи, рад, что Patol75 разобрался в этом; мой загруженный участок все еще продолжается.