#snowflake-cloud-data-platform
Вопрос:
У меня есть следующие инкрементные транзакционные данные, поступающие из формата S3 AVRO.
{
"after": {
"COM_PCT": null,
"DEPT_ID": 30,
"EMAIL": "AKHOO",
"EMPLOYEE_ID": 115,
"FIRST_NAME": "ALEX",
"LAST_NAME": "TIM",
"HIRE": "1995-05-18 00:00:00",
"MANAGER_ID": 114
},
"before": {},
"current_ts": "2018-05-18 00:00:00:00",
"op_ts": "2018-05-18 00:00:00:00",
"op_type": "I",
"pos": "00000000001123",
"primary_keys": ["EMPLOYEE_ID"],
"table": "HR.EMPLOYEE"
},
{
"after": {
"COM_PCT": null,
"DEPT_ID": 11,
"EMAIL": "AKHOO",
"EMPLOYEE_ID": 115,
"FIRST_NAME": "ALEX",
"LAST_NAME": "TIM",
"HIRE": "1995-05-18 00:00:00",
"MANAGER_ID": 114
},
"before": {},
"current_ts": "2018-05-19 00:00:00:00",
"op_ts": "2018-05-19 00:00:00:00",
"op_type": "U",
"pos": "00000000001124",
"primary_keys": ["EMPLOYEE_ID"],
"table": "HR.EMPLOYEE"
},
{
"after": {
"COM_PCT": null,
"DEPT_ID": 30,
"EMAIL": "AKHOO",
"EMPLOYEE_ID": 115,
"FIRST_NAME": "ALEX",
"LAST_NAME": "TIM",
"HIRE": "1995-05-18 00:00:00",
"MANAGER_ID": 114
},
"before": {},
"current_ts": "2018-05-20 00:00:00:00",
"op_ts": "2018-05-20 00:00:00:00",
"op_type": "U",
"pos": "00000000001125",
"primary_keys": ["EMPLOYEE_ID"],
"table": "HR.EMPLOYEE"
}
Первая транзакция — это транзакция вставки для того же первичного ключа,
вторые две — транзакции обновления,
Я не могу использовать потоковый канал для обработки инкрементных обновлений, есть ли способ преобразовать это в структурированную таблицу и показать только последнюю транзакцию вставки / обновления для этого первичного ключа?
Ответ №1:
Я предположил, что файл содержит изменения только для одной таблицы, если нет, то вам нужно отфильтровать изменения только для определенной таблицы. Я также предположил, что данные находятся в типе variant в таблице, но это никак не влияет на решение, как вы извлекаете данные.
Я предлагаю это решение:
- В самом начале вы должны использовать функцию QUALIFY и отфильтровать данные только до последней версии записи.
- Затем вы можете выполнить операцию СЛИЯНИЯ, чтобы вставить или обновить запись.
- Если ваши данные также допускают операции УДАЛЕНИЯ, то они должны быть включены в код.
Пример данных:
CREATE OR REPLACE TABLE SAMPLE_RAW (samples variant);
INSERT INTO SAMPLE_RAW
SELECT parse_json('{
"after": {
"COM_PCT": null,
"DEPT_ID": 30,
"EMAIL": "AKHOO",
"EMPLOYEE_ID": 115,
"FIRST_NAME": "ALEX",
"LAST_NAME": "TIM",
"HIRE": "1995-05-18 00:00:00",
"MANAGER_ID": 114
},
"before": {},
"current_ts": "2018-05-18 00:00:00:00",
"op_ts": "2018-05-18 00:00:00:00",
"op_type": "I",
"pos": "00000000001123",
"primary_keys": ["EMPLOYEE_ID"],
"table": "HR.EMPLOYEE"
}')
UNION ALL
SELECT parse_json('{
"after": {
"COM_PCT": null,
"DEPT_ID": 11,
"EMAIL": "AKHOO",
"EMPLOYEE_ID": 115,
"FIRST_NAME": "ALEX",
"LAST_NAME": "TIM",
"HIRE": "1995-05-18 00:00:00",
"MANAGER_ID": 114
},
"before": {},
"current_ts": "2018-05-19 00:00:00:00",
"op_ts": "2018-05-19 00:00:00:00",
"op_type": "U",
"pos": "00000000001124",
"primary_keys": ["EMPLOYEE_ID"],
"table": "HR.EMPLOYEE"
}')
UNION ALL
SELECT parse_json('{
"after": {
"COM_PCT": null,
"DEPT_ID": 30,
"EMAIL": "AKHOO",
"EMPLOYEE_ID": 115,
"FIRST_NAME": "ALEX",
"LAST_NAME": "TIM",
"HIRE": "1995-05-18 00:00:00",
"MANAGER_ID": 114
},
"before": {},
"current_ts": "2018-05-20 00:00:00:00",
"op_ts": "2018-05-20 00:00:00:00",
"op_type": "U",
"pos": "00000000001125",
"primary_keys": ["EMPLOYEE_ID"],
"table": "HR.EMPLOYEE"
}');
Решение:
WITH src AS (
SELECT TO_TIMESTAMP(s.samples:op_ts::string, 'YYYY-MM-DD HH24:MI:SS:FF') AS op_ts
, s.samples:op_type::string AS op_type
, s.samples:after:COM_PCT::string AS COM_PCT
, s.samples:after:DEPT_ID As DEPT_ID
, s.samples:after:EMAIL::string As EMAIL
, s.samples:after:EMPLOYEE_ID As EMPLOYEE_ID
, s.samples:after:FIRST_NAME::string As FIRST_NAME
, s.samples:after:LAST_NAME::string AS LAST_NAME
, TO_TIMESTAMP(s.samples:after:HIRE::string, 'YYYY-MM-DD HH24:MI:SS') As HIRE
, s.samples:after:MANAGER_ID AS MANAGER_ID
FROM SAMPLE_RAW AS s
QUALIFY ROW_NUMBER() OVER(PARTITION BY EMPLOYEE_ID ORDER BY op_ts DESC) = 1
)
MERGE INTO HR.EMPLOYEE AS trg USING src ON trg.EMPLOYEE_ID = src.EMPLOYEE_ID
WHEN MATCHED AND src.op_type = 'U' THEN UPDATE SET trg.EMAIL = src.EMAIL ...
WHEN MATCHED AND src.op_type = 'D' THEN DELETE
WHEN NOT MATCHED THEN INSERT (EMAIL, FIRST_NAME, LAST_NAME, ...) VALUES (src.EMAIL, src.FIRST_NAME, src.LAST_NAME, ...)
Комментарии:
1. Спасибо вам за это, Майкл, это именно то, что я искал: D, есть ли шанс, что вы можете помочь с частью удаления и сделать это как процедуру с таблицей ввода и таблицей вывода в качестве параметра?
2. Я добавил поддержку операций УДАЛЕНИЯ. Чтобы поместить все в процедуру и автоматизировать ее, вам нужна где-то структура, вам нужно хранить информацию о таблицах, столбцах и типах данных, и вы можете создать генератор на основе того, что я дал.