R: параллельный импорт данных с SQL Server не возвращает данные

#r #sql-server #parallel-processing

#r #sql-server #параллельная обработка

Вопрос:

Мне нужно импортировать ежемесячные данные за 8 лет, где каждый месяц содержит около 360.000 наблюдений, и в каждом из них 75 переменных. Данные хранятся в SQL Server.

Я пытался импортировать его из года в год с помощью этого кода, но окончательный фрейм данных не создается. Кроме того, r shoud определяет все годы, но каждое значение одинаково.

Это мой код:

 library(foreach)
library(doParallel)

numCores <- detectCores()

system.time({
  nr_years <- 8
  start_year <-  2010
  registerDoParallel(numCores)

  steps <- foreach::foreach(icount(nr_years), .combine = rbind) %dopar% {
    next_year <- start_year  1 
    date_before <- paste0(start_year, '-12-31', sep = '')
    date_end <- paste0(next_year, '-12-31', sep = '')

    myQuery <- RxSqlServerData(sqlQuery = sprintf("SELECT DATE,CLIENT_NO,MAT_ST
                           FROM DBO.DATA_TABLE 
                           WHERE DATE >='%s' and DATE <= '%s'", date_before, date_end), connectionString = connStr, returnDataFrame = TRUE)
    
    my_df <- rxImport(myQuery)
    #start_year <- start_year   1 
    
  }
})
  

Я протестировал код, заменив %dopar% на %do% : фреймы данных не были объединены, а переписаны (объект steps был правильным).

Однако я хочу, чтобы данные импортировались параллельно, и в конце все годы должны быть привязаны к dataframe my_df .

Примечание

Я обнаружил, что в случае сохранения %dopar% результатов последней строки foreach функции сохраняются в steps (поэтому я прокомментировал последнюю строку). Однако данные по-прежнему отсутствуют rbinded , а содержат только результаты первой итерации.

Ответ №1:

При параллельной обработке создаются частичные результаты, которые «сохраняются» в разных объектах и сохраняются только при объединении в конце. Итак, проблема заключалась в сохранении данных в объекты.

Я нашел решение, которое работает для меньшего количества переменных

 period <- function(it){
  res <- c(paste0(start_year   it, '-12-31', sep = ''), paste0(start_year   1   it, '-12-31', sep = ''))

}

system.time({
  numCores <- detectCores()
  start_year <-  2016
  registerDoParallel(numCores)

  raz2 <- foreach(i=0:3, .combine = rbind, .multicombine = TRUE) %dopar% {
    dates <- period(i)
    myQuery <- RxSqlServerData(sqlQuery = sprintf("SELECT DATE,CLIENT_NO,MAT_ST
                           FROM DBO.DATA_TABLE 
                           WHERE DATE >='%s' and DATE <= '%s'", dates[[1]], dates[[2]]), connectionString = connStr, returnDataFrame = FALSE)
    rxImport(myQuery)
  }
})
  

Однако, если я хочу импортировать 75 переменных, я получаю сообщение об ошибке:

Ошибка в { : сбой задачи 2 — «неправильное распределение» остановлено на: 34.97 7.33 392.9