#python #docker #credentials #prefect
#python #docker #учетные данные #префект
Вопрос:
Я начал работать с prefect и пытаюсь сохранить свои результаты в облачном хранилище Google:
import prefect
from prefect.engine.results import GCSResult
from prefect.run_configs import DockerRun, LocalRun
from prefect.storage import Docker, Local
@prefect.task(checkpoint=True, result=GCSResult(bucket="redacted"))
def task1():
return 1
storage = Local(...)
run_config = LocalRun()
with prefect.Flow(
"myflow",
storage=storage,
run_config=run_config
) as flow:
results = task1()
flow.run()
При условии, что для моей переменной среды GOOGLE_APPLICATION_CREDENTIALS установлено значение key, все работает нормально.
Однако при попытке настроить мой поток я сталкиваюсь с некоторыми трудностями:
storage = Docker(...)
run_config = DockerRun(dockerfile="DockerFile")
with prefect.Flow(
"myflow",
storage=storage,
run_config=run_config
) as flow:
... # Same definition as previously
flow.register()
В таком случае, при попытке запустить мой поток с помощью агента docker (будь то на том же компьютере, с которого был зарегистрирован поток, или на другом, я получаю эту ошибку):
google.auth.exceptions.DefaultCredentialsError: Could not automatically determine credentials.
Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials and re-run the application.
For more information, please see https://cloud.google.com/docs/authentication/getting-started
Следуя документации, я попытался установить GCP_CREDENTIALS
секрет в моем облаке Prefect.Безрезультатно, я все еще сталкиваюсь с той же ошибкой.
Я также пытался сохранить результаты в отдельной GCSUpload
задаче, но у меня все еще возникает та же ошибка.
Одним из решений, которое я вижу, было бы упаковать учетные данные внутри моего образа docker через DockerFile, однако я чувствую, что это должен быть вариант использования, когда я должен использовать секреты Prefect.
Ответ №1:
Я разработал что-то, извлекающее учетные данные с помощью PrefectSecret
задачи.
Мне пришлось создать дополнительную GCSUpload
задачу, которая взяла результат task1
, чтобы напрямую сохранить его в GCS.
Мой окончательный код выглядит так:
import prefect
from prefect.tasks.gcp.storage import GCSUpload
from prefect.tasks.secrets import PrefectSecret
from prefect.run_configs import DockerRun
from prefect.storage import Docker
retrieve_gcp_credentials = PrefectSecret("GCP_CREDENTIALS")
@prefect.task(checkpoint=True, result=GCSResult(bucket="redacted"))
def task1():
return "1"
save_results_to_gcp = GCSUpload(bucket="redacted")
storage = Docker()
run_config = DockerRun()
with prefect.Flow(
"myflow",
storage=storage,
run_config=run_config
) as flow:
credentials = retrieve_gcp_credentials()
results = task1()
save_results_to_gcp(results, credentials=credentials)
flow.run()
(Обратите внимание, что мне также пришлось изменить тип возвращаемого значения task1
, поскольку задача может загружать только строку или байты)
Это достаточно хорошо для моего варианта использования (просто сохраняйте результаты в GCS), но я оставлю вопрос открытым, если кто-то знает, как использовать GCSResult
, поскольку это также было бы полезно для кэширования.