Распараллеливание для цикла в R

#r #parallel-processing #data-manipulation

#r #параллельная обработка #обработка данных

Вопрос:

Я пытаюсь научиться использовать параллельную обработку в R. Снимок данных и кода приведен ниже.

Создание приблизительного набора данных

 library(truncnorm)
#Creating a mock dataframe
Market =c('City1','City2','City3','City4','City5','City2','City4','City1','City3','City5')
Car_type = c('A','A','A','A','A','B','B','B','B','B')
Variable1=c(.34,.19,.85,.27,.32,.43,.22,.56,.17,.11)
Car_purchased = c(1,0,0,1,0,1,0,0,1,1)
Market_data = data.frame(Market,Car_type,Variable1,Car_purchased)    
Market_data2=do.call("rbind", replicate(100, Market_data, simplify = FALSE)) 
#Create a bigger dataset
Market_data2$Final_value = 0 #create a column of for future calculation
empty_list = list()
  

Написание функции и выполнение функции

 Car_Value=function(data){
  market_list=unique(Market_data2$Market)
  for (m in market_list){
    market_subset = Market_data2[which(Market_data2$Market==m),]
    for (i in 1:nrow(market_subset)){
      if(market_subset[i,'Car_purchased']==1){
        market_subset[i,'Final_value'] = rtruncnorm(1,a=-10,b=0,mean=max(market_subset$Variable1),sd=1)
      } else{
        market_subset[i,'Final_value'] = rtruncnorm(1,a=-10,b=0,mean = market_subset[i,'Variable1'],sd=1)
      }
    }
    empty_list=rbind(empty_list,market_subset)
  }
  return(empty_list)
}

get_value = Car_Value(data=Market_data2)
  

В приведенном выше примере всего существует 5 «Market» для автомобилей и 2 «Car_type». Потребители могли покупать автомобили на любом рынке. Я должен вычислить значение («Final_value») из заданного усеченного нормального распределения. Это значение зависит только от значения переменной 1 для данного рынка. Вот почему я использую внешний цикл for. Среднее значение усеченного нормального распределения зависит от значения переменной 1 (max(Variable1) на рынке, если Car_purchased ==1, или заданного значения, если Car_purchased ==0). Эта версия кода выполняется отлично (хотя и не оптимизирована по скорости).

Проблема

Следующее, что я хотел бы сделать, это использовать параллельную обработку для внешнего цикла for, т. Е. для цикла по рынкам, поскольку конечное значение market зависит только от наблюдений внутри рынка.

К сожалению, я знаю только, как реализовать параллельную обработку для каждой строки набора данных. Например. мой код (представленный ниже) присваивает 1-ю строку 1-му ядру, 2-ю строку 2-му ядру и так далее.Это неэффективно и занимает много времени, поскольку каждая строка должна создавать подмножество, а затем находить максимальное значение подмножества.

Моя неэффективная версия

 library(parallel)
library(foreach)
library(doParallel)
library(iterators)
library(utils)
library(truncnorm)

cl=parallel::makeCluster(4,type="PSOCK") 
registerDoParallel(cl)
clusterEvalQ(cl, {library(truncnorm)})

Car_Value_Parallel <- function(market_data){
  output <- foreach(x = iter(market_data, by = "row"), .combine = rbind) %dopar% {
    market_subset = market_data[which(market_data$Market==x$Market),]
    if(x['Car_purchased']==1){
      x['Final_value'] = rtruncnorm(1,a=-10,b=0,mean=max(market_subset$Variable1),sd=1)
    } else{
      x['Final_value'] = rtruncnorm(1,a=-10,b=0,mean = x['Variable1'],sd=1)
    }
    return(x)
  }
  output
}

get_value_parallel = Car_Value_Parallel(market_data = Market_data2)
stopCluster(cl)
  

Это крайне неэффективно, если я запускаю его на наборе данных размером > 100 КБ (мой фактический набор данных составляет около 1,2 миллиона строк). Однако я не смог реализовать распараллеливание на рыночном уровне, где параллельное вычисление будет следующим: Запустите вычисление для City1 в 1-м ядре, City2 во 2-м ядре и так далее.Может кто-нибудь, пожалуйста, помочь? Приветствуется любая помощь. Спасибо.

P.S. Мои извинения за длинный вопрос. Я просто хотел показать все версии кода, которые я использовал.

Ответ №1:

Я не вижу причин продолжать параллельную обработку с вашим набором данных. Вместо этого загляните в такие пакеты, как dplyr или data.table , для более эффективного решения.

Насколько я понимаю вашу проблему, для каждого из них Market вы хотите применить rtruncnorm для создания переменной, Final_value где средний аргумент rtruncnorm функции зависит от переменной Car_purchased .

Мы можем выполнить это без необходимости цикла for, используя dplyr .

 library(truncnorm)
library(dplyr)

# Creating a mock dataframe
Market <- c("City1", "City2", "City3", "City4", "City5", "City2", "City4", "City1", "City3", "City5")
Variable1 <- c(.34, .19, .85, .27, .32, .43, .22, .56, .17, .11)
Car_purchased <- c(1, 0, 0, 1, 0, 1, 0, 0, 1, 1)
Market_data <- data.frame(Market, Car_type, Variable1, Car_purchased)
Market_data2 <- replicate(100, Market_data, simplify = FALSE) %>% bind_rows()
#Create a bigger dataset
Market_data2$Final_value = 0 #create a column of for future calculation
empty_list = list()

Car_Value2 <- function(data) {
  data %>%
    group_by(Market) %>%
    mutate(
      Final_value = if_else(
        Car_purchased == 1,
        rtruncnorm(1, a = -10, b = 0, mean = max(Variable1), sd = 1),
        rtruncnorm(1, a = -10, b = 0, mean = Variable1, sd = 1)
      )
    )
}


microbenchmark::microbenchmark(
  Car_Value(Market_data2),
  Car_Value2(Market_data2),
  times = 100
)
#> Unit: milliseconds
#>                      expr       min        lq      mean   median        uq
#>   Car_Value(Market_data2) 66.109304 68.043575 69.030763 68.56569 69.681255
#>  Car_Value2(Market_data2)  1.073318  1.101578  1.204737  1.17583  1.230687
#>        max neval cld
#>  89.497035   100   b
#>   3.465425   100  a


# Even bigger dataframe
Market_data3 <- replicate(120000, Market_data, simplify = FALSE) %>% bind_rows()


microbenchmark::microbenchmark(
  Car_Value2(data = Market_data3),
  times = 100 
)
#> Unit: milliseconds
#>                             expr      min       lq     mean   median
#>  Car_Value2(data = Market_data3) 338.4615 341.7134 375.8769 397.7133
#>        uq      max neval
#>  399.8733 412.5134   100
  

Создано 2019-03-10 пакетом reprex (версия 0.2.1)