Ноутбук AWS Sagemaker прерывается «Не удается найти учетные данные»

#jupyter-notebook #boto3 #amazon-sagemaker #dask-distributed #python-s3fs

Вопрос:

Я пытаюсь использовать Dask для получения нескольких файлов (JSON) из AWS S3 в память в ноутбуке Sagemaker Jupyter. Когда я отправляю 10 или 20 работников, все проходит гладко. Однако, когда я отправляю 100 сотрудников, от 30% до 50% из них сталкиваются со следующей ошибкой: «Не удалось найти учетные данные».

Сначала я пытался использовать Boto3. Чтобы попытаться устранить эту проблему, я переключился на S3FS, но происходит та же ошибка.

Рабочие, которые ошибаются с помощью NoCredentialError, являются случайными, если я повторю эксперимент, как и точное количество неудачных загрузок.

Sagemaker обрабатывает все учетные данные AWS через свою роль IAM, поэтому у меня нет доступа к парам ключей или чему-либо еще. Файл ~/.aws/config содержит только расположение по умолчанию — ничего о учетных данных.

Похоже, это очень распространенное использование Dask, поэтому очевидно, что он способен выполнять такую задачу — где я ошибаюсь?

Любая помощь будет очень признательна! Код и обратная связь ниже. В этом примере 29 работников потерпели неудачу из-за учетных данных. Спасибо, Патрик

 import boto3
import json
import logging
import multiprocessing
from dask.distributed import Client, LocalCluster
import s3fs
import os

THREADS_PER_DASK_WORKER = 4
CPU_COUNT = multiprocessing.cpu_count()
HTTP_SUCCESSFUL_REQUEST_CODE = 200

S3_BUCKET_NAME = '-redacted-'

keys_100 = ['-redacted-']
keys_10 = ['-redacted-']

def dispatch_workers(workers):

    cluster_workers = min(len(workers), CPU_COUNT)
    cluster = LocalCluster(n_workers=cluster_workers, processes=True,
                           threads_per_worker=THREADS_PER_DASK_WORKER)
    client = Client(cluster)

    data = []
    data_futures = []

    for worker in workers:
        data_futures.append(client.submit(worker))

    for future in data_futures:
        try:
            tmp_flight_data = future.result()
            if future.status == 'finished':
                data.append(tmp_flight_data)
            else:
                logging.error(f"Future status = {future.status}")
        except Exception as err:
            logging.error(err)

    del data_futures

    cluster.close()
    client.close()

    return data

def _get_object_from_bucket(key):

    s3 = s3fs.S3FileSystem(anon=False)# uses default credentials
    with s3.open(os.path.join(S3_BUCKET_NAME,key)) as f:
        return json.loads(f.read())

def get_data(keys):

    objects = dispatch_workers(
        [lambda key=key: _get_object_from_bucket(key) for key in keys]
    )
    return objects
    
data = get_data(keys_100)
 

Output:

 ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials
ERROR:root:Unable to locate credentials