Как мне пройти аутентификацию в Azure, чтобы запустить конвейерный тест фабрики данных с поведением python?

#python #azure-data-factory #python-behave

#python #azure-data-factory #python-поведение

Вопрос:

У меня есть конвейер на фабрике данных Azure, для которого я хочу написать тест на использование поведения python. На данный момент я просто хочу запустить тест локально. Следующая команда не будет выполняться прямо сейчас, поскольку я никак не прошел аутентификацию.

 get_client_from_cli_profile(DataFactoryManagementClient)
  

В сообщении об ошибке говорится, что мне нужно запустить «az login» для настройки учетной записи.

 knack.util.CLIError: Please run 'az login' to setup account.
  

Может кто-нибудь привести пример того, как я это делаю?

Функция

 Feature: Run pipeline
    Scenario: Get pipeline
        Given we get the pipeline
  

Шаг

 @given('we get the pipeline')
def get_pipeline(context):
    pipeline_name = "xxx"
    resource_group = "yyy"
    data_factory = "zzz"
    parameters={}
    pipeline = get_datafactory_pipeline(pipeline_name, resource_group, data_factory, parameters)
  

Код для получения конвейера

 from azure.common.client_factory import get_client_from_cli_profile
from azure.mgmt.datafactory import DataFactoryManagementClient

def get_datafactory_pipeline(pipeline_name, resource_group, data_factory, parameters):
    return get_client_from_cli_profile(DataFactoryManagementClient().pipelines.create_run(
        resource_group_name = resource_group,
        factory_name = data_factory,
        pipeline_name = pipeline_name,
        parameters = parameters)
  

Ответ №1:

Двусторонний:

1. az login установите Azure CLI, а затем получите доступ к Azure. (ссылка для скачивания: https://learn.microsoft.com/en-us/cli/azure/install-azure-cli)

введите описание изображения здесь

2.no необходимо установить Azure CLI, но изменить свой код следующим образом:

 def get_datafactory_pipeline(subscription_id,credentials,pipeline_name, resource_group, data_factory, parameters):
    return DataFactoryManagementClient(credentials,subscription_id).pipelines.create_run(
        resource_group_name=resource_group,
        factory_name=data_factory,
        pipeline_name=pipeline_name,
        parameters=parameters)
  

и ваш шаг такой:

 @given('we get the pipeline')
def get_pipeline(context):
    subscription_id = '<Specify your Azure Subscription ID>'
    credentials = ServicePrincipalCredentials(client_id='<Active Directory application/client ID>', secret='<client secret>', tenant='<Active Directory tenant ID>')
    pipeline_name = "xxx"
    resource_group = "yyy"
    data_factory = "zzz"
    parameters={}
    pipeline = get_datafactory_pipeline(subscription_id,credentials,pipeline_name, resource_group, data_factory, parameters)