обнаружение аномалий с использованием параллельных вычислений в R

#r #doparallel

Вопрос:

У меня есть фрейм данных с более чем 300 миллионами строк, и я хочу обнаружить аномалию в каждой группе, которая состоит из страны и идентификаторов (каждой группы), затем я написал следующий код для обнаружения точек аномалии, однако это занимает много времени. Не могли бы вы, пожалуйста, предложить любой другой вариант, который сделает это быстрее. Формат фрейма данных:

  df <- data.frame("id" = 1:n,"country"= ("US",..),"date"=("2021-01-01",..),"value"=c(10,....)) 


    registerDoParallel()
groupColumns <- c("country","id")
system.time(temp_anom <- ddply(df, groupColumns, function(x){
  x <- x[,c('date','value')]  
  resid.q <- quantile(x$value,prob = c(0.1,0.90))
  iqr <- diff(resid.q)
  limits <- resid.q   3 * iqr * c(-1,1) 
  lower_bound <- limits[1]
  upper_bound <- limits[2]
  outlier_dip_index <- dplyr::filter(x, value < lower_bound) %>% data.frame() 
  if (nrow(outlier_dip_index) > 0) {
    outlier_dip_index$status <- "dip"}
  outlier_spike_index <- dplyr::filter(x, value > upper_bound) %>% data.frame()
  if (nrow(outlier_spike_index) > 0) {
    outlier_spike_index$status <- "spike"  
    outlier <- rbind(outlier_spike_index,outlier_dip_index)
    outlier
  }
},.paralle = T))
 

Ответ №1:

Для увеличения скорости параллельных вычислений нам необходимо указать оптимальное количество ядер в Doparallel, в данном случае оптимальное значение равно 5. Только изменив код, как показано ниже, мы сможем увидеть огромное улучшение исходного кода.

 doParallel::registerDoParallel(cores = 5)

system.time(temp_anom <- plyr::ldply(df$id, function(ids){
  title_dataset <- df[which(df$short_id == ids),]
  result_dataset <- plyr::ldply(title_dataset$country, function(iso){
    country_dataset <- title_dataset[which(title_dataset$country == iso),]
    resid.q <- quantile(country_dataset$raw_de, prob = c(0.1, 0.90))
    iqr <- diff(resid.q)
    limits <- resid.q   3 * iqr * c(-1,1) 
    temp_dataset <- data.frame(country = iso, lower_bound = limits[1], upper_bound = limits[2])
    temp_dataset
  })
  result_dataset$id <- ids
  result_dataset
}, .parallel = T))