Создание функции Azure для выполнения ETL

#python #azure-functions #etl #azure-blob-storage

#python #azure-функции #etl #azure-blob-хранилище

Вопрос:

Я довольно новичок в функциях Azure, поэтому у меня возникли некоторые проблемы с пониманием того, какой шаг предпринять.

Я написал код на Python, который извлекает определенный файл JSON из базы данных MongoDB, выравнивает его и экспортирует в хранилище Azure Data Lake в виде CSV-файла.

Я провел некоторое исследование и решил, что могу создать функцию Azure и использовать триггер хранилища больших двоичных объектов, который может проверить, был ли загружен файл JSON в test directory и автоматически выполнить мой скрипт Python, чтобы сгладить его и экспортировать обратно в виде файла CSV.

Однако, как мне теперь изменить свой скрипт на Python таким образом, чтобы он импортировал этот файл JSON, который был загружен test directory , а не подключался к базе данных MongoDB?

 from pymongo import MongoClient
import pandas as pd
import os, uuid, sys
import collections
from azure.storage.filedatalake import DataLakeServiceClient
from azure.core._match_conditions import MatchConditions
from azure.storage.filedatalake._models import ContentSettings
from pandas import json_normalize
from datetime import datetime, timedelta

mongo_client = MongoClient("xxxxxxx")
db = mongo_client.r_db
table = db.areas

document = table.find()
mongo_docs = list(document)
mongo_docs = json_normalize(mongo_docs)
mongo_docs.to_csv("areas.csv", sep = ",", index=False) 

#print(mongo_docs)
try:  
    global service_client
        
    service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
        "https", "xxxx"), credential='xxxxx')
    

    file_system_client = service_client.get_file_system_client(file_system="root")

    directory_client = file_system_client.get_directory_client("testdirectory")

    file_client = directory_client.create_file("areas.csv")
    local_file = open(r"C:Usersareas.csv",'rb')

    file_contents = local_file.read()

    file_client.upload_data(file_contents, overwrite=True)

except Exception as e:
    print(e) 

 

Лучше ли создавать триггер сетки событий или триггер хранилища больших двоичных объектов для такого рода проблем?

Любая помощь или совет будут оценены.

Ответ №1:

Однако, как мне теперь изменить свой скрипт на Python таким образом, чтобы он импортировал этот файл JSON, который был загружен в test directory, а не подключался к базе данных MongoDB?

Если вы используете триггер больших двоичных объектов, сначала необходимо перенести данные в учетную запись хранилища, а затем вы можете обрабатывать данные внутри функции:

 import logging

import azure.functions as func


def main(myblob: func.InputStream):
    #just put the python script here.
    logging.info(f"Python blob trigger function processed blob n"
                 f"Name: {myblob.name}n"
                 f"Blob Size: {myblob.length} bytes")
 

Пожалуйста, будьте осторожны, чтобы не использовать привязку вывода больших двоичных объектов, вам необходимо вручную написать логику обработки, в противном случае объект на основе озера данных будет уничтожен, чтобы его больше нельзя было получать с помощью пакетов озера данных.

Лучше ли создавать триггер сетки событий или триггер хранилища больших двоичных объектов для такого рода проблем?

Любая помощь или совет будут оценены.

Если ваше требование является единственным (то есть оно должно выполняться только при передаче данных из озера данных), тогда вы можете использовать триггер blob-объектов. Преимущество сетки событий заключается в том, что конечные точки могут запускаться множеством разных событий.