Как фильтровать плохие записи при записи в таблицу RDS (Postgre) с помощью задания ETL

#postgresql #amazon-web-services #aws-glue #aws-glue-data-catalog #aws-glue-spark

Вопрос:

Я выполняю обработку ETL клеем, которая в основном выполняет следующее —

  1. Чтение файла из S3 (через каталог клея)
  2. Передача данных (добавление/удаление столбцов)
  3. Запись данных в таблицу RDS postgre (также через каталог клея)
 args = getResolvedOptions(sys.argv, ['JOB_NAME', 'SRC_DB', 'SRC_TABLE', 'TGT_DB', 'TGT_TABLE'])

DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = args['SRC_DB'], table_name = args['SRC_TABLE'], transformation_ctx = "DataSource0")

Transform0 = sparkSqlQuery(glueContext, query = SqlQuery0, mapping = {"sparkDataSource": DataSource0}, transformation_ctx = "Transform0")

DataSink0 = glueContext.write_dynamic_frame.from_catalog(frame = Transform0, database = args['TGT_DB'], table_name = args['TGT_TABLE'], transformation_ctx = "DataSink0")
 

Чего я хочу добиться здесь, так это отфильтровать плохие записи (например, в любой записи для одного из столбцов, если длина значения данных превышает длину, определенную в каталоге данных или таблице RDS) и вставить эти записи в какую — либо другую таблицу или файл S3 и продолжить обработку без исключения. Чтобы я мог сообщить о плохих записях команде источника.,

Что здесь происходит, если после преобразования есть какие-либо неверные данные (несоответствие типа данных столбца или длины), задание ETL клея прерывается с исключением пакета.

Ответ №1:

Вы можете следовать приведенному ниже подходу для решения этой проблемы:

  1. Используйте карту клея.примените с помощью UDF, где вы можете передать динамическую запись и рассчитать длину в качестве нового столбца. После этого вы можете вернуть динамический кадр с дополнительной записью. Обратитесь к этой ссылке для примера UDF.
        d["date"] = datetime.today() 
       return d 
       datasource1 = Map.apply(frame = datasource0, f = addDate)```
    
     
  2. Теперь используйте splitrows преобразование клея для достижения этой цели, передав столбец длины в comparison_dict. Ниже приведен пример с условиями и возвращением двух динамических кадров в виде коллекции в зависимости от условия.
 
dyf_splitRows = SplitRows.apply(frame = dyf_dropNullfields, comparison_dict = {"`data.pineapple`": {">": "100", 
    "<": "200"}}, name1 = 'pa_200_less', name2 = 'pa_200_more')