#amazon-s3 #aws-lambda #stream #amazon-dynamodb #fast-csv
#amazon-s3 #aws-lambda #поток #amazon-dynamodb #быстрый csv
Вопрос:
Когда файл csv загружается в мою корзину s3, мой лямбда-код будет запущен для вставки моих данных в DynamoDB. Мне нужен поток, потому что файл слишком большой, чтобы его можно было загрузить как полный объект.
const batchWrite = async (clientDynamoDB, itemsToProcess) => {
const ri = {};
ri[TABLE_DYNAMO] = itemsToProcess.map((itm) => toPutRequest(itm));
const params = { RequestItems: ri };
await clientDynamoDB.batchWriteItem(params).promise();
};
function runStreamPromiseAsync(stream, clientDynamoDB) {
return new Promise((resolve, reject) => {
const sizeChunk = 25;
let itemsToProcess = [];
stream
.pipe(fastCsv.parse({headers: Object.keys(schemaGeData), trim: true}))
.on("data", (row) => {
stream.pause();
itemsToProcess.push(row);
if (itemsToProcess.length === sizeChunk) {
batchWrite(clientDynamoDB, itemsToProcess).finally(() => {
stream.resume();
});
itemsToProcess = [];
}
})
.on("error", (err) => {
console.log(err);
reject("Error");
})
.on("end", () => {
stream.pause();
console.log("end");
batchWrite(clientDynamoDB, itemsToProcess).finally(() => {
resolve("OK");
});
});
});
}
module.exports.main = async (event, context, callback) => {
context.callbackWaitsForEmptyEventLoop = false;
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const object = event.Records[0].s3;
const bucket = object.bucket.name;
const file = object.object.key;
const agent = new https.Agent({
keepAlive: true
});
const client = new AWS.DynamoDB({
httpOptions: {
agent
}
});
try {
//get Stream csv data
const stream = s3
.getObject({
Bucket: bucket,
Key: file
})
.createReadStream()
.on('error', (e) => {
console.log(e);
});
await runStreamPromiseAsync(stream, client);
} catch (e) {
console.log(e);
}
};
Когда мой файл состоит из 1000 строк, все вставляется, но когда у меня 5000 строк, моя функция вставляет только около 3000 строк, и это число является случайным… Иногда больше, иногда меньше..
Итак, я хотел бы понять, чего мне здесь не хватает?
Я также прочитал эту статью, но, честно говоря, даже если вы приостановите второй поток, первый все еще выполняется.. Так что, если у кого-то есть какие-либо идеи о том, как это сделать, мы будем очень признательны!
Спасибо
Ответ №1:
Я выяснил, почему он не был полностью обработан, это потому, что обратный batchWriteItem
вызов может возвращать необработанные элементы. Поэтому я меняю функцию batchWrite
, а также runPromiseStreamAsync
немного, потому что у меня могут быть обработаны не все элементы itemsToProcess
.
В любом случае, вот полный код :
const batchWrite = (client, itemsToProcess) => {
const ri = {};
ri[TABLE_DYNAMO] = itemsToProcess.map((itm) => toPutRequest(itm));
const items = { RequestItems: ri };
const processItemsCallback = function(err, data) {
return new Promise((resolve, reject) => {
if(!data || data.length === 0){
return resolve();
}
if(err){
return reject(err);
}
let params = {};
params.RequestItems = data.UnprocessedItems;
return client.batchWriteItem(params, processItemsCallback);
});
};
return client.batchWriteItem(items, processItemsCallback );
};
function runStreamPromiseAsync(stream, clientDynamoDB) {
return new Promise((resolve, reject) => {
const sizeChunk = 25;
let itemsToProcess = [];
let arrayPromise = [];
stream
.pipe(fastCsv.parse({headers: Object.keys(schemaGeData), trim: true}))
.on("error", (err) => {
console.log(err);
reject("Error");
})
.on('data', data => {
itemsToProcess.push(data);
if(itemsToProcess.length === sizeChunk){
arrayPromise.push(batchWrite(clientDynamoDB, itemsToProcess));
itemsToProcess = [];
}
})
.on('end', () => {
if(itemsToProcess.length !== 0){
arrayPromise.push(batchWrite(clientDynamoDB, itemsToProcess));
}
resolve(Promise.all(arrayPromise).catch(e => {
reject(e)
}));
});
});
}