Почему OpenMP застревает при 10% потреблении процессора для этого кода fft?

#c #parallel-processing #openmp

#c #параллельная обработка #openmp

Вопрос:

Я немного застрял с этой проблемой.. Я создал простой код, который выполняет несколько fft (fft — быстрое преобразование Фурье) по сигналу. fft — это обычная функция, используемая во многих приложениях, но на всякий случай я вставил код для fft ниже для справки…

Сигнал содержит 16384 элемента — вызовите сигнал X. Для каждого подмножества сигнала размером 4096 я хочу запустить fft, т.Е. 1-й fft предназначен для X[0]-X[4095], 2-й fft предназначен для X [1] -X[4096], 3-й fft предназначен для X[2]-X[4097] и т.д.

Приведенный ниже код «должен» обрабатывать 2048 fft параллельно при каждом запуске цикла for с использованием OpenMP, чтобы, надеюсь, сэкономить время вычислений… Однако, когда я проверил Performance monitor, он застревает при 10% загрузке процессора (с использованием процессора Xeon)

Я использовал #pragma omp parallel один раз в коде, поскольку я не собираюсь выполнять какой-либо параллелизм внутри функции fft. Я просто хочу распределить функцию fft между потоками.

Это странно, потому что я создал другую версию кода на Python, и она загружает процессор на 100% (используя пакет Python joblib), а код Python выполняется намного быстрее.

Я ввел весь приведенный ниже код, чтобы он был воспроизводимым и мог быть скомпилирован с использованием g -fopenmp. Он имеет все вспомогательные функции и бпф..

 #include <iostream>
#include <complex>
#include <cstdlib>
#include <vector>
#include <iostream>
#include <omp.h>
#include <fstream>
#include <condition_variable>
using namespace std::complex_literals;

int N = 4096;
int T = 4096*4;
int iterations = 10;
double length = std::log2(N)   1.0;
std::vector<std::complex<double>> signalx(T); //source signal
std::vector<std::complex<double>> fundamental_freq(length); //just a vector of twiddle factors

//some helper functions
void get_even_elements(std::vector<std::complex<double>> amp;inpt, std::vector<std::complex<double>> amp;out) {for (int i = 0; i < inpt.size()-1; i = i   2) {out[i/2] = std::move(inpt[i]);}}
void get_odd_elements(std::vector<std::complex<double>> amp;inpt, std::vector<std::complex<double>> amp;out) {for (int i = 1; i < inpt.size(); i = i   2) {out[i/2] = std::move(inpt[i]);}}
void attach(std::vector<std::complex<double>> amp;a, std::vector<std::complex<double>> amp;b, std::vector<std::complex<double>> amp;out) 
    {
    std::move(a.begin(), a.end(), out.begin());
    std::move(b.begin(), b.end(), out.begin() a.size());
    }
void add_vectors(std::vector<std::complex<double>> amp;x, std::vector<std::complex<double>> amp;y, std::vector<std::complex<double>> amp;z) {for (int i = 0; i < x.size(); i  ) {z[i] = x[i]   y[i];}}
void sub_vectors(std::vector<std::complex<double>> amp;y, std::vector<std::complex<double>> amp;x, std::vector<std::complex<double>> amp;z) {for (int i = 0; i < x.size(); i  ) {z[i] = y[i] - x[i];}}

//the FFT recursion
void fft(std::vector<std::complex<double>> amp;x)
    {
        if (x.size() == 1) {x = x;}
        else {
            int size = x.size();
            std::vector<std::complex<double>> e(size/2);
            std::vector<std::complex<double>> o(size/2);
            get_even_elements(std::ref(x),std::ref(e));
            get_odd_elements(std::ref(x),std::ref(o));

            for (int q = 0; q < 2; q  ) {
                if (q == 0) {
                    fft(std::ref(e));
                    }
                else {
                    fft(std::ref(o));
                    }
                }

            std::vector<std::complex<double>> z1(size/2);
            std::vector<std::complex<double>> z2(size/2);
            std::complex<double> pf;
            std::complex<double> pe;
            std::complex<double> pp;
            int eo_size = size/2;
            int s = std::log2(size);
            double limit = std::exp2(s-1);
            if (eo_size == 1.0) {
                    z1[0] = e[0]   o[0];
                    z2[0] = e[0] - o[0];
                }
            else {
                for (double i = 0.0; i < limit; i  ) {
                    pp = std::pow(fundamental_freq[s],i);
                    z1[i] = e[i]   pp * o[i];
                    z2[i] = e[i] - pp * o[i];
                    }
                }
            e.clear();
            e.shrink_to_fit();
            o.clear();
            o.shrink_to_fit();
            std::vector<std::complex<double>> z(size);
            attach(std::ref(z1),std::ref(z2),std::ref(z));
            z1.clear();
            z1.shrink_to_fit();
            z2.clear();
            z2.shrink_to_fit();
            x = std::move(z);
            z.clear();
            z.shrink_to_fit();
        }
    }

//main loop
int main(int argc, char* argv[])
{

    //create sample signal (i.e. 0 0i, 1 0i, 2 0i, etc...)
    for (double i = 0; i < T; i  ) {
        signalx[i] = {i   0.0i};
        }

    //set W frequencies
    for (int s = 0; s < length; s  ) {
        double denominator = std::exp2(s);
        std::complex<double> exponent = -2*3.14159265359*1i / denominator;
        fundamental_freq[s] = std::exp(exponent);
        }


    //the main parallel portion
    for (int iter = 0; iter < iterations; iter  ) {
        auto t_start = std::chrono::high_resolution_clock::now();
        //create a N/2 batch of FFT outputs
        for (double i = 1; i < T-N; i = i   N/2) {
            int j, z;
            #pragma omp parallel for num_threads(6)//why does this not consume 100% CPU? 
            for (j = 0; j < N/2; j  ) {
                std::vector<std::complex<double>> input(N); 
                for (z = 0; z < N; z  ) {
                    input[z] = signalx[i j z];
                    }
                fft(std::ref(input));
                input.clear();
                input.shrink_to_fit();
                }
        }
    }
}
 

для справки, вот эквивалентный код python, который выполняется на 100% CPU с использованием пакета joblib

 from joblib import Parallel, delayed
import multiprocessing
num_cores = multiprocessing.cpu_count()
import numpy as np
import time

j = 1
T = 4096*4
N = 4096
iterations = 10
A = np.random.randn(T).astype(np.complex)
length = int(np.log2(N))
skip = int(N/2)

def dumb_fft(x):
    if x.size == 1:
        return x
    else:
        e = x[0::2]
        o = x[1::2]
        e1 = dumb_fft(e)
        o1 = dumb_fft(o)
        x = np.array([e1,o1]).reshape(-1)
        return x

fundamental_freq = []
for s in np.arange(0,length 1,1):
    fundamental_freq.append(np.exp((-2*np.pi*1*1j)/(np.power(2,s))))

power_freq = []
for s in np.arange(0,length 1,1):
    if s == 0:
        power_freq.append([1])
    else:
        hrange = np.arange(0,np.power(2,s-1),1)
        power_freq.append(np.power(fundamental_freq[s],hrange))

def stft(x):
    for i in np.arange(1,x.shape[0]-N,int(N/2)):
        batch = []
        for j in range(skip):
            sample = x[(i j):(i j N)]
            batch.append(sample)
        r = Parallel(n_jobs=num_cores)(delayed(fft)(i) for i in batch)
    return r

def fft(x):
    if x.size == 1:
        return x
    else:
        e = x[0::2]
        o = x[1::2]
        e1 = fft(e)
        o1 = fft(o)
        x = np.concatenate([e1,o1])
        z = x   0.0
        check = int(np.log2(x.size))
        half = e1.size
        z[:half] = x[:half]   power_freq[check]*x[half:]
        z[half:] = x[:half] - power_freq[check]*x[half:]
        return z

time_meter = []

for t in range(iterations):
    t0 = time.time()
    fft(A[0:N])
    x = stft(A); #print(x)
    t1 = time.time()
    print(t1-t0)
    time_meter.append(t1-t0)
time_meter = np.array(time_meter)
print(time_meter)
 

Комментарии:

1. У вас есть несколько потоков, записывающих в input и output и много выделений памяти, которые вам не нужны (вы можете использовать std::move или std::swap в местах).

2. спасибо, я обновил код, чтобы сделать ввод и вывод частных переменных в #pragma omp параллельными, но загрузка ЦП остается прежней

3. теперь код обновлен для включения std::move , и я удалил другие выделения памяти… но загрузка ЦП по-прежнему не очень высока

4. Вы забыли включить OpenMP в настройках вашего компилятора? Этого #pragma недостаточно.

5. Да, я скомпилировал код с использованием g с помощью -fopenmp под Cygwin

Ответ №1:

Я думаю, это из-за рекурсивной функции fft, описанной выше. В этом коде я использую цикл for для параллельного БПФ, который выполняется в O (logN). Теперь он потребляет около 100% ЦП с OpenMP. Еще не оптимизирован (по-прежнему содержит много ненужных выделений памяти, но теперь он работает намного быстрее, чем Python). Не знал, что OpenMP чувствителен к типу функции в своих потоках.

 #include <iostream>
#include <complex>
#include <cstdlib>
#include <vector>
#include <iostream>
#include <omp.h>
#include <fstream>
#include <condition_variable>
using namespace std::complex_literals;

int N = 4096;
int T = 4096*4;
int iterations = 10;
double length = std::log2(N)   1.0;
int length_plus = length   1;
std::vector<std::complex<double>> signalx(T); //source signal
std::vector<std::complex<double>> fundamental_freq(length); //just a vector of twiddle factors
std::vector<std::vector<std::complex<double>>> power_freq;

//some helper functions
void get_even_elements(std::vector<std::complex<double>> amp;inpt, std::vector<std::complex<double>> amp;out) {for (int i = 0; i < inpt.size()-1; i = i   2) {out[i/2] = std::move(inpt[i]);}}
void get_odd_elements(std::vector<std::complex<double>> amp;inpt, std::vector<std::complex<double>> amp;out) {for (int i = 1; i < inpt.size(); i = i   2) {out[i/2] = std::move(inpt[i]);}}
void attach(std::vector<std::complex<double>> amp;a, std::vector<std::complex<double>> amp;b, std::vector<std::complex<double>> amp;out) 
    {
    std::move(a.begin(), a.end(), out.begin());
    std::move(b.begin(), b.end(), out.begin() a.size());
    }
void add_vectors(std::vector<std::complex<double>> amp;x, std::vector<std::complex<double>> amp;y, std::vector<std::complex<double>> amp;z) {for (int i = 0; i < x.size(); i  ) {z[i] = x[i]   y[i];}}
void sub_vectors(std::vector<std::complex<double>> amp;y, std::vector<std::complex<double>> amp;x, std::vector<std::complex<double>> amp;z) {for (int i = 0; i < x.size(); i  ) {z[i] = y[i] - x[i];}}

void dumb_get_even_elements(std::vector<double> amp;inpt, std::vector<double> amp;out) {for (int i = 0; i < inpt.size()-1; i = i   2) {out[i/2] = inpt[i];}}
void dumb_get_odd_elements(std::vector<double> amp;inpt, std::vector<double> amp;out) {for (int i = 1; i < inpt.size(); i = i   2) {out[i/2] = inpt[i];}}
void dumb_attach(std::vector<double> amp;a, std::vector<double> amp;b, std::vector<double> amp;out) {
    std::copy(a.begin(), a.end(), out.begin());
    std::copy(b.begin(), b.end(), out.begin() a.size());
}

std::vector<double> dumb_fft(std::vector<double> x)
    {
        if (x.size() == 1) {return x;}
        else {
            int size = x.size();
            std::vector<double> e(size/2);
            std::vector<double> o(size/2);
            dumb_get_even_elements(std::ref(x),std::ref(e));
            dumb_get_odd_elements(std::ref(x),std::ref(o));
            std::vector<double> e1 = dumb_fft(e);
            std::vector<double> o1 = dumb_fft(o);
            std::vector<double> z(size);
            dumb_attach(std::ref(e1),std::ref(o1),std::ref(z));
            return z;
        }
    }

void compute_fft(std::vector<std::complex<double>> amp;x, int s) 
{
    std::vector<std::complex<double>> out(N);
    int m = std::exp2(s);

    #pragma omp parallel for
    for (int i = 0; i < N; i  ) {
        int j = i % m;
        if (j < m/2) {
            int idx = j;
            std::complex<double> xr = power_freq[s][idx]*x[i m/2];
            out[i] = x[i]   xr;
            }
        else {
            int idx = j-m/2;
            std::complex<double> xr = power_freq[s][idx]*x[i];
            out[i] = x[i-m/2] - xr;
            }
        }
    x = std::move(out);
    out.clear();
    out.shrink_to_fit();
}

//main loop
int main(int argc, char* argv[])
{

    //create sample signal (i.e. 0 0i, 1 0i, 2 0i, etc...)
    for (double i = 0; i < T; i  ) {
        signalx[i] = {i   0.0i};
        }

    //set W frequencies
    for (int s = 0; s < length; s  ) {
        double denominator = std::exp2(s);
        std::complex<double> exponent = -2*3.14159265359*1i / denominator;
        fundamental_freq[s] = std::exp(exponent);
        }

    for (int s = 0; s < length_plus; s  ) {
            power_freq.push_back({});
            if (s == 0) {
                power_freq[s].push_back({1.0});
                }
            else {
                std::vector<double> hrange;
                double range = std::pow(2,s-1);
                hrange.resize(range);

                for (int i = 0; i < range; i  ) {
                    hrange[i] = i;
                    }

                std::complex<double> base = fundamental_freq[s];
                for (int i = 0; i < hrange.size(); i  ) {
                   std::complex<double> power_freq_input = std::pow(base,hrange[i]);
                   power_freq[s].push_back(power_freq_input);
                }
            }
        }

    std::vector<double> dumb_fft_input(N);
    for (double p = 0.0; p < N; p  ) {dumb_fft_input[p] = p;}
    std::vector<double> ordering = dumb_fft(dumb_fft_input);

    std::ofstream myfile;
    myfile.open ("output_cplusplus_parallel_batch.csv");

    //the main parallel portion
    for (int iter = 0; iter < iterations; iter  ) {
        auto t_start = std::chrono::high_resolution_clock::now();
        //create a N/2 batch of FFT outputs
        int bs = N/2;

        for (double i = 1; i < T-(N-1); i = i   bs) {
            int j, z;

            #pragma omp parallel for num_threads(11) private(j,z) //now consumes a lot of CPU
            for (j = 0; j < bs; j  ) {
                std::vector<std::complex<double>> input(N); 
                std::vector<std::complex<double>> input2(N);

                for (z = 0; z < N; z  ) {
                    input2[z] = signalx[i j z];
                    }

                for (z = 0; z < N; z  ) {
                    input[z] = std::move(input2[ordering[z]]);
                    }

                int num_stages = (int)std::log2(N) 1;

                for (int k = 1; k < num_stages; k  ) 
                {
                    compute_fft(std::ref(input),k);
                }

                //for (int k = 0; k < input.size(); k  ) {
                //  std::cout << input[k];
                //  std::cout << "n";
                //  }
                //std::cout << "n";
                input.clear();
                input.shrink_to_fit();
                input2.clear();
                input2.shrink_to_fit();
                }

        }
        auto t_end = std::chrono::high_resolution_clock::now();
        std::cout << "n";
        std::cout << "finishedn";
        std::cout << "elapsed time: ";

        auto duration = std::chrono::duration<double,std::milli>(t_end-t_start).count()/1000.0;
        myfile << duration;
        myfile << "n";
        std::cout << duration;
        std::cout << "n";
    }
    myfile.close();
}