Завершить фрейм данных в sparklyr

#r #apache-spark #dplyr #tidyr #sparklyr

#r #apache-spark #dplyr #tidyr #sparklyr

Вопрос:

Я пытаюсь воспроизвести tidyr:complete функцию в sparklyr. У меня есть фрейм данных с некоторыми отсутствующими значениями, и я должен заполнить эти строки. В dplyr / tidyr я могу сделать:

 data <- tibble(
  "id" = c(1,1,2,2),
  "dates" = c("2020-01-01", "2020-01-03", "2020-01-01", "2020-01-03"),
  "values" = c(3,4,7,8))

# A tibble: 4 x 3
     id dates      values
  <dbl> <chr>       <dbl>
1     1 2020-01-01      3
2     1 2020-01-03      4
3     2 2020-01-01      7
4     2 2020-01-03      8

data %>% 
  mutate(dates = as_date(dates)) %>% 
  group_by(id) %>% 
  complete(dates = seq.Date(min(dates), max(dates), by="day"))

# A tibble: 6 x 3
# Groups:   id [2]
     id dates      values
  <dbl> <date>      <dbl>
1     1 2020-01-01      3
2     1 2020-01-02     NA
3     1 2020-01-03      4
4     2 2020-01-01      7
5     2 2020-01-02     NA
6     2 2020-01-03      8
  

Однако complete функция не существует в sparklyr .

 data_spark %>% 
  mutate(dates = as_date(dates)) %>% 
  group_by(id) %>% 
  complete(dates = seq.Date(min(dates), max(dates), by="day"))

Error in UseMethod("complete_") : 
no applicable method for 'complete_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"
  

Есть ли способ установить UDF или добиться аналогичного результата?

Спасибо

Ответ №1:

Под капотом tidyr::complete просто выполняется полное объединение, за которым следует необязательное заполнение NA. Вы можете воспроизвести его эффекты, используя sdf_copy_to для создания нового sdf, который представляет собой всего один столбец seq.Date между вашей начальной и конечной датой, а затем выполнить сопоставление full_join между ним и вашим набором данных.

Комментарии:

1. Да, но в моем случае последовательность дат, а также объединяемый фрейм данных будут разными для каждой группы. Есть ли способ эффективно определить другое объединение для каждой группы?

Ответ №2:

Вот метод, который выполняет всю работу в Spark.

 library(sparklyr)

sc <- spark_connect(master = "local")

data <- tibble(
  id = c(1, 1, 2, 2),
  dates = c("2020-01-02", "2020-01-04", "2020-01-01", "2020-01-03"),
  values = c(1, 2, 3, 4)
)

data_spark <- copy_to(sc, data)
  

Нам нужно сгенерировать все комбинации dates и id . Для этого нам нужно знать общее количество дней и первую дату.

 days_info <-
  data_spark %>%
  summarise(
    first_date = min(dates),
    total_days = datediff(max(dates), min(dates))
  ) %>%
  collect()
days_info
#> # A tibble: 1 x 2
#>   first_date total_days
#>   <chr>           <int>
#> 1 2020-01-01          3
  

sdf_seq может использоваться для генерации последовательности в Spark. Это можно использовать для получения комбинаций dates и id .

 dates_id_combinations <- 
  sdf_seq(
    sc,
    from = 0,
    to = days_info$total_days,
    repartition = 1
  ) %>%
  transmute(
    dates = date_add(local(days_info$first_date), id),
    join_by = TRUE
  ) %>%
  full_join(data_spark %>% distinct(id) %>% mutate(join_by = TRUE)) %>%
  select(dates, id)
dates_id_combinations
#> # Source: spark<?> [?? x 2]
#>   dates         id
#>   <date>     <dbl>
#> 1 2020-01-01     1
#> 2 2020-01-01     2
#> 3 2020-01-02     1
#> 4 2020-01-02     2
#> 5 2020-01-03     1
#> 6 2020-01-03     2
#> 7 2020-01-04     1
#> 8 2020-01-04     2
  

full_join исходный фрейм данных и комбинированный фрейм данных. Затем фильтруйте на основе min / max date для каждой группы.

 data_spark %>%
  group_by(id) %>%
  mutate(first_date = min(dates), last_date = max(dates)) %>%
  full_join(dates_id_combinations) %>%
  filter(dates >= min(first_date), dates <= max(last_date)) %>%
  arrange(id, dates) %>%
  select(id, dates)
#> # Source:     spark<?> [?? x 2]
#> # Groups:     id
#> # Ordered by: id, dates
#>      id dates     
#>   <dbl> <chr>     
#> 1     1 2020-01-02
#> 2     1 2020-01-03
#> 3     1 2020-01-04
#> 4     2 2020-01-01
#> 5     2 2020-01-02
#> 6     2 2020-01-03