Распределенная обработка — AWS Sagemaker

#python #amazon-web-services #amazon-s3 #amazon-sagemaker

Вопрос:

У меня .csv в корзине S3 есть несколько необработанных файлов. Как я могу обрабатывать их параллельно, чтобы сократить время выполнения? Смотрите комментарии о том, где мне требуется небольшая помощь. Я использую SKLearnProcessor и s3_data_distribution_type='ShardedByS3Key'

введите описание изображения здесь

 %%writefile preprocessing/preprocessing_sklearn.py
    
import pandas as pd
import argparse
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import os

def process(input_data_path):
    df = pd.read_csv(input_data_path)
#     drop first col (unamed: 0)
    df = df.iloc[: , 1:]
    
    features = df.iloc[:,1:]
    headers = features.columns
    labels = df.iloc[:,0]

    scaler = StandardScaler()
    
    normalized_x_train = scaler.fit_transform(features)

    # write
    pd.DataFrame(normalized_x_train).to_csv((os.path.join('/opt/ml/processing/output/train', 'train_features.csv')), header=False, index=False)
    pd.DataFrame(labels).to_csv((os.path.join('/opt/ml/processing/output/train', 'train_labels.csv')), header=False, index=False)
    
if __name__ == '__main__':
    # HOW DO I MAKE THIS DYNAMIC?
    input_data_path = os.path.join("/opt/ml/processing/input", "train-data-with-header.csv")  
    process(input_data_path)
 

Мое призвание fn —

 from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
import timeit

start = timeit.default_timer()
# WHAT SHOULD BE MY SOURCE?[![enter image description here][1]][1]
source = "s3://sagemaker-end-to-end/data_tuning/train/chunk_0.csv" 
source2 = "s3://sagemaker-end-to-end/data_tuning/train/"

sklearn_processor = SKLearnProcessor(framework_version='0.23-1',
                                     role=role,
                                     instance_type='ml.m5.xlarge',
                                     instance_count=2,
                                     base_job_name = 'preprocess-sklearn'
                                    )

sklearn_processor.run(
    code='preprocessing/preprocessing_sklearn.py',
    inputs=[
        ProcessingInput(
            source=source2,
            s3_data_distribution_type='ShardedByS3Key',
            destination='/opt/ml/processing/input')
    ],
    
    outputs=[
        ProcessingOutput(
          source='/opt/ml/processing/output/train', 
          destination= make_url(store_bucket, "preprocess_sklearn", "train")
        ),
#                                
        ProcessingOutput(
            source='/opt/ml/processing/output/test',
            destination= make_url(store_bucket, "preprocess_sklearn", "test")
        )
    ]
                     
)

stop = timeit.default_timer()

print('Time: ', stop - start) 
 

Ответ №1:

В документации AWS неясно, как они управляют горизонтальным масштабированием и объединяют результаты из нескольких экземпляров в S3. Я полагаю , что мы можем только предполагать, что SageMaker автоматически выполняет параллельную обработку с s3_data_distribution_type='ShardedByS3Key' помощью, разбивает входные данные на сегменты, назначает каждый сегмент и агрегирует выходные данные.

Единственные замечания, которые я видел до сих пор.

Для параллельной обработки данных с помощью Scikit-Learn на Amazon SageMaker Processing вы можете разделить входные объекты по ключу S3, установив s3_data_distribution_type='ShardedByS3Key' внутри ProcessingInput так, чтобы каждый экземпляр получал примерно одинаковое количество входных объектов.

Следует ли распространять данные из Amazon S3 во все экземпляры обработки с полным дублированием, или данные из Amazon S3 совместно используются ключом Amazon S3, загружая по одному фрагменту данных в каждый экземпляр обработки.

По умолчанию. S3DataType='S3Prefix' и используются файлы по указанному пути S3.

Независимо от того, используете ли вы S3Prefix или файл манифеста для данного типа данных. Если вы выберете S3Prefix , S3Uri идентифицирует префикс имени ключа. Amazon SageMaker использует для задания обработки все объекты с указанным префиксом имени ключа. Если вы выберете Файл манифеста, S3Uri идентифицирует объект, который является файлом манифеста, содержащим список ключей объектов, которые Amazon SageMaker должен использовать для задания обработки.

В примерах AWS github используется только один экземпляр instance_count=1 , поэтому я тоже не могу получить ключ к разгадке.


Остается только надеяться, что AWS улучшит свою документацию.

Дело не в том, что AWS сложнее в использовании, чем GCP, а в том, что это излишне сложно; разрозненное, разросшееся множество примитивов инфраструктуры с плохой связью между ними.

Задача приятна, запутанный беспорядок-нет, и проблема с AWS заключается в том, что большая часть вашего рабочего времени будет потрачена на распутывание их документации и изучение функций и продуктов, чтобы найти то, что вы хотите, вместо того, чтобы сосредоточиться на интересных интересных задачах.