#jupyter-notebook #boto3 #amazon-sagemaker #dask-distributed #python-s3fs
Вопрос:
Я пытаюсь использовать Dask для получения нескольких файлов (JSON) из AWS S3 в память в ноутбуке Sagemaker Jupyter. Когда я отправляю 10 или 20 работников, все проходит гладко. Однако, когда я отправляю 100 сотрудников, от 30% до 50% из них сталкиваются со следующей ошибкой: «Не удалось найти учетные данные».
Сначала я пытался использовать Boto3. Чтобы попытаться устранить эту проблему, я переключился на S3FS, но происходит та же ошибка.
Рабочие, которые ошибаются с помощью NoCredentialError, являются случайными, если я повторю эксперимент, как и точное количество неудачных загрузок.
Sagemaker обрабатывает все учетные данные AWS через свою роль IAM, поэтому у меня нет доступа к парам ключей или чему-либо еще. Файл ~/.aws/config содержит только расположение по умолчанию — ничего о учетных данных.
Похоже, это очень распространенное использование Dask, поэтому очевидно, что он способен выполнять такую задачу — где я ошибаюсь?
Любая помощь будет очень признательна! Код и обратная связь ниже. В этом примере 29 работников потерпели неудачу из-за учетных данных. Спасибо, Патрик
import boto3
import json
import logging
import multiprocessing
from dask.distributed import Client, LocalCluster
import s3fs
import os
THREADS_PER_DASK_WORKER = 4
CPU_COUNT = multiprocessing.cpu_count()
HTTP_SUCCESSFUL_REQUEST_CODE = 200
S3_BUCKET_NAME = '-redacted-'
keys_100 = ['-redacted-']
keys_10 = ['-redacted-']
def dispatch_workers(workers):
cluster_workers = min(len(workers), CPU_COUNT)
cluster = LocalCluster(n_workers=cluster_workers, processes=True,
threads_per_worker=THREADS_PER_DASK_WORKER)
client = Client(cluster)
data = []
data_futures = []
for worker in workers:
data_futures.append(client.submit(worker))
for future in data_futures:
try:
tmp_flight_data = future.result()
if future.status == 'finished':
data.append(tmp_flight_data)
else:
logging.error(f"Future status = {future.status}")
except Exception as err:
logging.error(err)
del data_futures
cluster.close()
client.close()
return data
def _get_object_from_bucket(key):
s3 = s3fs.S3FileSystem(anon=False)# uses default credentials
with s3.open(os.path.join(S3_BUCKET_NAME,key)) as f:
return json.loads(f.read())
def get_data(keys):
objects = dispatch_workers(
[lambda key=key: _get_object_from_bucket(key) for key in keys]
)
return objects
data = get_data(keys_100)
Output:
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials