#r #apache-spark #dplyr #tidyr #sparklyr
#r #apache-spark #dplyr #tidyr #sparklyr
Вопрос:
Я пытаюсь воспроизвести tidyr:complete
функцию в sparklyr. У меня есть фрейм данных с некоторыми отсутствующими значениями, и я должен заполнить эти строки. В dplyr / tidyr я могу сделать:
data <- tibble(
"id" = c(1,1,2,2),
"dates" = c("2020-01-01", "2020-01-03", "2020-01-01", "2020-01-03"),
"values" = c(3,4,7,8))
# A tibble: 4 x 3
id dates values
<dbl> <chr> <dbl>
1 1 2020-01-01 3
2 1 2020-01-03 4
3 2 2020-01-01 7
4 2 2020-01-03 8
data %>%
mutate(dates = as_date(dates)) %>%
group_by(id) %>%
complete(dates = seq.Date(min(dates), max(dates), by="day"))
# A tibble: 6 x 3
# Groups: id [2]
id dates values
<dbl> <date> <dbl>
1 1 2020-01-01 3
2 1 2020-01-02 NA
3 1 2020-01-03 4
4 2 2020-01-01 7
5 2 2020-01-02 NA
6 2 2020-01-03 8
Однако complete
функция не существует в sparklyr
.
data_spark %>%
mutate(dates = as_date(dates)) %>%
group_by(id) %>%
complete(dates = seq.Date(min(dates), max(dates), by="day"))
Error in UseMethod("complete_") :
no applicable method for 'complete_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"
Есть ли способ установить UDF или добиться аналогичного результата?
Спасибо
Ответ №1:
Под капотом tidyr::complete
просто выполняется полное объединение, за которым следует необязательное заполнение NA. Вы можете воспроизвести его эффекты, используя sdf_copy_to
для создания нового sdf, который представляет собой всего один столбец seq.Date
между вашей начальной и конечной датой, а затем выполнить сопоставление full_join
между ним и вашим набором данных.
Комментарии:
1. Да, но в моем случае последовательность дат, а также объединяемый фрейм данных будут разными для каждой группы. Есть ли способ эффективно определить другое объединение для каждой группы?
Ответ №2:
Вот метод, который выполняет всю работу в Spark.
library(sparklyr)
sc <- spark_connect(master = "local")
data <- tibble(
id = c(1, 1, 2, 2),
dates = c("2020-01-02", "2020-01-04", "2020-01-01", "2020-01-03"),
values = c(1, 2, 3, 4)
)
data_spark <- copy_to(sc, data)
Нам нужно сгенерировать все комбинации dates
и id
. Для этого нам нужно знать общее количество дней и первую дату.
days_info <-
data_spark %>%
summarise(
first_date = min(dates),
total_days = datediff(max(dates), min(dates))
) %>%
collect()
days_info
#> # A tibble: 1 x 2
#> first_date total_days
#> <chr> <int>
#> 1 2020-01-01 3
sdf_seq
может использоваться для генерации последовательности в Spark. Это можно использовать для получения комбинаций dates
и id
.
dates_id_combinations <-
sdf_seq(
sc,
from = 0,
to = days_info$total_days,
repartition = 1
) %>%
transmute(
dates = date_add(local(days_info$first_date), id),
join_by = TRUE
) %>%
full_join(data_spark %>% distinct(id) %>% mutate(join_by = TRUE)) %>%
select(dates, id)
dates_id_combinations
#> # Source: spark<?> [?? x 2]
#> dates id
#> <date> <dbl>
#> 1 2020-01-01 1
#> 2 2020-01-01 2
#> 3 2020-01-02 1
#> 4 2020-01-02 2
#> 5 2020-01-03 1
#> 6 2020-01-03 2
#> 7 2020-01-04 1
#> 8 2020-01-04 2
full_join
исходный фрейм данных и комбинированный фрейм данных. Затем фильтруйте на основе min
/ max
date для каждой группы.
data_spark %>%
group_by(id) %>%
mutate(first_date = min(dates), last_date = max(dates)) %>%
full_join(dates_id_combinations) %>%
filter(dates >= min(first_date), dates <= max(last_date)) %>%
arrange(id, dates) %>%
select(id, dates)
#> # Source: spark<?> [?? x 2]
#> # Groups: id
#> # Ordered by: id, dates
#> id dates
#> <dbl> <chr>
#> 1 1 2020-01-02
#> 2 1 2020-01-03
#> 3 1 2020-01-04
#> 4 2 2020-01-01
#> 5 2 2020-01-02
#> 6 2 2020-01-03