#postgresql #amazon-web-services #aws-glue #aws-glue-data-catalog #aws-glue-spark
Вопрос:
Я выполняю обработку ETL клеем, которая в основном выполняет следующее —
- Чтение файла из S3 (через каталог клея)
- Передача данных (добавление/удаление столбцов)
- Запись данных в таблицу RDS postgre (также через каталог клея)
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'SRC_DB', 'SRC_TABLE', 'TGT_DB', 'TGT_TABLE'])
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = args['SRC_DB'], table_name = args['SRC_TABLE'], transformation_ctx = "DataSource0")
Transform0 = sparkSqlQuery(glueContext, query = SqlQuery0, mapping = {"sparkDataSource": DataSource0}, transformation_ctx = "Transform0")
DataSink0 = glueContext.write_dynamic_frame.from_catalog(frame = Transform0, database = args['TGT_DB'], table_name = args['TGT_TABLE'], transformation_ctx = "DataSink0")
Чего я хочу добиться здесь, так это отфильтровать плохие записи (например, в любой записи для одного из столбцов, если длина значения данных превышает длину, определенную в каталоге данных или таблице RDS) и вставить эти записи в какую — либо другую таблицу или файл S3 и продолжить обработку без исключения. Чтобы я мог сообщить о плохих записях команде источника.,
Что здесь происходит, если после преобразования есть какие-либо неверные данные (несоответствие типа данных столбца или длины), задание ETL клея прерывается с исключением пакета.
Ответ №1:
Вы можете следовать приведенному ниже подходу для решения этой проблемы:
- Используйте карту клея.примените с помощью UDF, где вы можете передать динамическую запись и рассчитать длину в качестве нового столбца. После этого вы можете вернуть динамический кадр с дополнительной записью. Обратитесь к этой ссылке для примера UDF.
d["date"] = datetime.today() return d datasource1 = Map.apply(frame = datasource0, f = addDate)```
- Теперь используйте
splitrows
преобразование клея для достижения этой цели, передав столбец длины в comparison_dict. Ниже приведен пример с условиями и возвращением двух динамических кадров в виде коллекции в зависимости от условия.
dyf_splitRows = SplitRows.apply(frame = dyf_dropNullfields, comparison_dict = {"`data.pineapple`": {">": "100",
"<": "200"}}, name1 = 'pa_200_less', name2 = 'pa_200_more')