как преобразовать данные инкрементного обновления в структурированную таблицу в snowflake

#snowflake-cloud-data-platform

Вопрос:

У меня есть следующие инкрементные транзакционные данные, поступающие из формата S3 AVRO.

 {
  "after": {
    "COM_PCT": null,
    "DEPT_ID": 30,
    "EMAIL": "AKHOO",
    "EMPLOYEE_ID": 115,
    "FIRST_NAME": "ALEX",
    "LAST_NAME": "TIM",
    "HIRE": "1995-05-18 00:00:00",
    "MANAGER_ID": 114
  },
  "before": {},
  "current_ts": "2018-05-18 00:00:00:00",
  "op_ts": "2018-05-18 00:00:00:00",
  "op_type": "I",
  "pos": "00000000001123",
  "primary_keys": ["EMPLOYEE_ID"],
  "table": "HR.EMPLOYEE"
},
{
    "after": {
      "COM_PCT": null,
      "DEPT_ID": 11,
      "EMAIL": "AKHOO",
      "EMPLOYEE_ID": 115,
      "FIRST_NAME": "ALEX",
      "LAST_NAME": "TIM",
      "HIRE": "1995-05-18 00:00:00",
      "MANAGER_ID": 114
    },
    "before": {},
    "current_ts": "2018-05-19 00:00:00:00",
    "op_ts": "2018-05-19 00:00:00:00",
    "op_type": "U",
    "pos": "00000000001124",
    "primary_keys": ["EMPLOYEE_ID"],
    "table": "HR.EMPLOYEE"
  },
  {
      "after": {
        "COM_PCT": null,
        "DEPT_ID": 30,
        "EMAIL": "AKHOO",
        "EMPLOYEE_ID": 115,
        "FIRST_NAME": "ALEX",
        "LAST_NAME": "TIM",
        "HIRE": "1995-05-18 00:00:00",
        "MANAGER_ID": 114
      },
      "before": {},
      "current_ts": "2018-05-20 00:00:00:00",
      "op_ts": "2018-05-20 00:00:00:00",
      "op_type": "U",
      "pos": "00000000001125",
      "primary_keys": ["EMPLOYEE_ID"],
      "table": "HR.EMPLOYEE"
    }

 

Первая транзакция — это транзакция вставки для того же первичного ключа,
вторые две — транзакции обновления,
Я не могу использовать потоковый канал для обработки инкрементных обновлений, есть ли способ преобразовать это в структурированную таблицу и показать только последнюю транзакцию вставки / обновления для этого первичного ключа?

Ответ №1:

Я предположил, что файл содержит изменения только для одной таблицы, если нет, то вам нужно отфильтровать изменения только для определенной таблицы. Я также предположил, что данные находятся в типе variant в таблице, но это никак не влияет на решение, как вы извлекаете данные.

Я предлагаю это решение:

  • В самом начале вы должны использовать функцию QUALIFY и отфильтровать данные только до последней версии записи.
  • Затем вы можете выполнить операцию СЛИЯНИЯ, чтобы вставить или обновить запись.
  • Если ваши данные также допускают операции УДАЛЕНИЯ, то они должны быть включены в код.

Пример данных:

 CREATE OR REPLACE TABLE SAMPLE_RAW (samples variant);                  

INSERT INTO SAMPLE_RAW
SELECT parse_json('{
    "after": {
      "COM_PCT": null,
      "DEPT_ID": 30,
      "EMAIL": "AKHOO",
      "EMPLOYEE_ID": 115,
      "FIRST_NAME": "ALEX",
      "LAST_NAME": "TIM",
      "HIRE": "1995-05-18 00:00:00",
      "MANAGER_ID": 114
    },
    "before": {},
    "current_ts": "2018-05-18 00:00:00:00",
    "op_ts": "2018-05-18 00:00:00:00",
    "op_type": "I",
    "pos": "00000000001123",
    "primary_keys": ["EMPLOYEE_ID"],
    "table": "HR.EMPLOYEE"
  }')
UNION ALL
SELECT parse_json('{
      "after": {
        "COM_PCT": null,
        "DEPT_ID": 11,
        "EMAIL": "AKHOO",
        "EMPLOYEE_ID": 115,
        "FIRST_NAME": "ALEX",
        "LAST_NAME": "TIM",
        "HIRE": "1995-05-18 00:00:00",
        "MANAGER_ID": 114
      },
      "before": {},
      "current_ts": "2018-05-19 00:00:00:00",
      "op_ts": "2018-05-19 00:00:00:00",
      "op_type": "U",
      "pos": "00000000001124",
      "primary_keys": ["EMPLOYEE_ID"],
      "table": "HR.EMPLOYEE"
    }')
UNION ALL                
SELECT parse_json('{
        "after": {
          "COM_PCT": null,
          "DEPT_ID": 30,
          "EMAIL": "AKHOO",
          "EMPLOYEE_ID": 115,
          "FIRST_NAME": "ALEX",
          "LAST_NAME": "TIM",
          "HIRE": "1995-05-18 00:00:00",
          "MANAGER_ID": 114
        },
        "before": {},
        "current_ts": "2018-05-20 00:00:00:00",
        "op_ts": "2018-05-20 00:00:00:00",
        "op_type": "U",
        "pos": "00000000001125",
        "primary_keys": ["EMPLOYEE_ID"],
        "table": "HR.EMPLOYEE"
      }');
 

Решение:

 WITH src AS (
  SELECT TO_TIMESTAMP(s.samples:op_ts::string, 'YYYY-MM-DD HH24:MI:SS:FF') AS op_ts
       , s.samples:op_type::string AS op_type
       , s.samples:after:COM_PCT::string AS COM_PCT
       , s.samples:after:DEPT_ID As DEPT_ID
       , s.samples:after:EMAIL::string As EMAIL
       , s.samples:after:EMPLOYEE_ID As EMPLOYEE_ID
       , s.samples:after:FIRST_NAME::string As FIRST_NAME
       , s.samples:after:LAST_NAME::string AS LAST_NAME
       , TO_TIMESTAMP(s.samples:after:HIRE::string, 'YYYY-MM-DD HH24:MI:SS') As HIRE
       , s.samples:after:MANAGER_ID AS MANAGER_ID  
    FROM SAMPLE_RAW AS s
 QUALIFY ROW_NUMBER() OVER(PARTITION BY EMPLOYEE_ID ORDER BY op_ts DESC) = 1
)  
MERGE INTO HR.EMPLOYEE AS trg USING src ON trg.EMPLOYEE_ID = src.EMPLOYEE_ID
WHEN MATCHED AND src.op_type = 'U' THEN UPDATE SET trg.EMAIL = src.EMAIL ...
WHEN MATCHED AND src.op_type = 'D' THEN DELETE
WHEN NOT MATCHED THEN INSERT (EMAIL, FIRST_NAME, LAST_NAME, ...) VALUES (src.EMAIL, src.FIRST_NAME, src.LAST_NAME, ...)
 

Ссылка: УТОЧНЕНИЕ, СЛИЯНИЕ

Комментарии:

1. Спасибо вам за это, Майкл, это именно то, что я искал: D, есть ли шанс, что вы можете помочь с частью удаления и сделать это как процедуру с таблицей ввода и таблицей вывода в качестве параметра?

2. Я добавил поддержку операций УДАЛЕНИЯ. Чтобы поместить все в процедуру и автоматизировать ее, вам нужна где-то структура, вам нужно хранить информацию о таблицах, столбцах и типах данных, и вы можете создать генератор на основе того, что я дал.