поток s3 в dynamodb с помощью fast-csv: вставлены не все данные

#amazon-s3 #aws-lambda #stream #amazon-dynamodb #fast-csv

#amazon-s3 #aws-lambda #поток #amazon-dynamodb #быстрый csv

Вопрос:

Когда файл csv загружается в мою корзину s3, мой лямбда-код будет запущен для вставки моих данных в DynamoDB. Мне нужен поток, потому что файл слишком большой, чтобы его можно было загрузить как полный объект.

 const batchWrite = async (clientDynamoDB, itemsToProcess) => {
    const ri = {};
    ri[TABLE_DYNAMO] = itemsToProcess.map((itm) => toPutRequest(itm));
    const params = { RequestItems: ri };
    await clientDynamoDB.batchWriteItem(params).promise();
};

function runStreamPromiseAsync(stream, clientDynamoDB) {
    return new Promise((resolve, reject) => {
        const sizeChunk = 25;
        let itemsToProcess = [];

        stream
            .pipe(fastCsv.parse({headers: Object.keys(schemaGeData), trim: true}))
            .on("data", (row) => {
                stream.pause();
                itemsToProcess.push(row);

                if (itemsToProcess.length === sizeChunk) {
                    batchWrite(clientDynamoDB, itemsToProcess).finally(() => {
                        stream.resume();
                    });
                    itemsToProcess = [];
                }
            })
            .on("error", (err) => {
                console.log(err);
                reject("Error");
            })
            .on("end", () => {
                stream.pause();
                console.log("end");
                batchWrite(clientDynamoDB, itemsToProcess).finally(() => {
                    resolve("OK");
                });
            });
    });
}

module.exports.main = async (event, context, callback) => {

    context.callbackWaitsForEmptyEventLoop = false;

    const AWS = require('aws-sdk');
    const s3 = new AWS.S3();

    const object = event.Records[0].s3;
    const bucket = object.bucket.name;
    const file = object.object.key;

    const agent = new https.Agent({
        keepAlive: true
    });
    const client = new AWS.DynamoDB({
        httpOptions: {
            agent
        }
    });

    try {
        //get Stream csv data
        const stream = s3
            .getObject({
                Bucket: bucket,
                Key: file
            })
            .createReadStream()
            .on('error', (e) => {
                console.log(e);
            });

        await runStreamPromiseAsync(stream, client);
    } catch (e) {
        console.log(e);
    }
};
  

Когда мой файл состоит из 1000 строк, все вставляется, но когда у меня 5000 строк, моя функция вставляет только около 3000 строк, и это число является случайным… Иногда больше, иногда меньше..

Итак, я хотел бы понять, чего мне здесь не хватает?

Я также прочитал эту статью, но, честно говоря, даже если вы приостановите второй поток, первый все еще выполняется.. Так что, если у кого-то есть какие-либо идеи о том, как это сделать, мы будем очень признательны!

Спасибо

Ответ №1:

Я выяснил, почему он не был полностью обработан, это потому, что обратный batchWriteItem вызов может возвращать необработанные элементы. Поэтому я меняю функцию batchWrite , а также runPromiseStreamAsync немного, потому что у меня могут быть обработаны не все элементы itemsToProcess .

В любом случае, вот полный код :

 const batchWrite = (client, itemsToProcess) => {
    const ri = {};
    ri[TABLE_DYNAMO] = itemsToProcess.map((itm) => toPutRequest(itm));
    const items = { RequestItems: ri };
    const processItemsCallback = function(err, data) {
        return new Promise((resolve, reject) => {
            if(!data || data.length === 0){
                return resolve();
            }
            if(err){
                return reject(err);
            }
            let params = {};
            params.RequestItems = data.UnprocessedItems;
            return client.batchWriteItem(params, processItemsCallback);
        });
    };
    return client.batchWriteItem(items, processItemsCallback );
};

function runStreamPromiseAsync(stream, clientDynamoDB) {
    return new Promise((resolve, reject) => {
        const sizeChunk = 25;
        let itemsToProcess = [];
        let arrayPromise = [];

        stream
            .pipe(fastCsv.parse({headers: Object.keys(schemaGeData), trim: true}))
            .on("error", (err) => {
                console.log(err);
                reject("Error");
            })
            .on('data', data => {
                itemsToProcess.push(data);
                if(itemsToProcess.length === sizeChunk){
                    arrayPromise.push(batchWrite(clientDynamoDB, itemsToProcess));
                    itemsToProcess = [];
                }
            })
            .on('end', () => {
                if(itemsToProcess.length !== 0){
                    arrayPromise.push(batchWrite(clientDynamoDB, itemsToProcess));
                }
                resolve(Promise.all(arrayPromise).catch(e => {
                    reject(e)
                }));
            });
    });
}