Снежинка — как я могу запросить метаданные потока и сохранить в таблице

#snowflake-cloud-data-platform

#snowflake-cloud-data-platform

Вопрос:

У меня есть разные потоки, но некоторые из потоков устаревают. Чтобы избежать их устаревания, я хочу запустить некоторый процесс, который может считывать свойство «показать поток» «устаревшим после», если осталось всего 1 день, запустите процесс для обновления потока.

Ответ №1:

Чтобы достичь своей цели, вы должны зафиксировать результаты SHOW STREAMS https://docs.snowflake.com/en/sql-reference/sql/show-streams.html . Вы можете начать создавать хранимую процедуру, которая запускает ее и возвращает ее результат в виде результирующего набора, используя TABLE(RESULT_SCAN(LAST_QUERY_ID())) аналогично следующему, который может быть обогащен параметром для временного окна, которое вы хотите проверить (ваш «1 день остался) и последующий CREATE OR REPLACE STREAM .

Пожалуйста, обратите внимание, что это не полное решение вашей проблемы, а только половина, поскольку оно не включает в себя действия, необходимые для повторного создания устаревших потоков.

 CREATE OR REPLACE PROCEDURE sp_show_stream_stale() 
RETURNS VARIANT NOT NULL 
LANGUAGE Javascript  
EXECUTE AS Caller 
AS 
$ 
var sql_command0 = snowflake.createStatement({ sqlText:`show streams in database`});

var sql_command1 = snowflake.createStatement({ sqlText:`SELECT "created_on"
                                                            , "name"
                                                            , "database_name"
                                                            , "schema_name"
                                                            , "owner"
                                                            , "comment"
                                                            , "table_name"
                                                            , "type"
                                                            , "stale"
                                                            , "mode"
                                                            , "stale_after" 
                                                        FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))`});

try { 
    sql_command0.execute();
    var db = sql_command1.execute(); 
    var json_rows = {}; 
    var array_of_rows = []; 
    var COLUMNS = ["created_on","name","database_name","schema_name","owner", "comment", "table_name", "type", "stale", "mode", "stale_after"]; 
    var row_num = 1;
    while (db.next()) {  
        json_rows = {}; 
        for (var col_num = 0; col_num < COLUMNS.length; col_num = col_num   1) { 
            var col_name = COLUMNS[col_num]; 
            json_rows[col_name] = db.getColumnValue(col_num   1);
        } 
        array_of_rows.push(json_rows); 
          row_num; 
    }
    return array_of_rows; 
}
catch (err) { 
    return "Failed: "   err;
}
$;
 

Поскольку результирующий набор представляет собой один JSON, вы можете запустить хранимую процедуру и вскоре после следующего оператора SELECT получить результирующий набор в табличном формате.

   CALL sp_show_stream_stale();

  SELECT value:created_on::datetime as "created_on",
    value:name::string as "name",
    value:database_name::string  as "database_name",
    value:schema_name::string as "schema_name",
    value:owner::string as "owner",
    value:comment::string as "comment",
    value:table_name::string as "table_name",
    value:type::string as "type",
    value:stale::string as "stale",
    value:mode::string as "mode",
    value:stale_after::datetime as "stale_after"
  FROM (SELECT * FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())))
      , LATERAL FLATTEN(Input => sp_show_stream_stale)    
  WHERE DATEDIFF(Day, current_timestamp, value:stale_after::datetime) <= 1 ;
 

Комментарии:

1. спасибо, Франческо, запрос и объяснение помогли мне.