#javascript #node.js #mongodb #amazon-web-services #aws-sdk
Вопрос:
Я пытаюсь передать JSON из MongoDB в S3 с помощью новой версии @aws-sdk/lib-хранилища:
"@aws-sdk/client-s3": "^3.17.0"
"@aws-sdk/lib-storage": "^3.34.0"
"JSONStream": "^1.3.5",
Попытка №1: Кажется, я неправильно использую JSONStream.stringify() :
import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName).find('{}').limit(5).stream();
readStream.pipe(JSONStream.stringify());
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: readStream,
},
});
await upload.done();
}
catch (err) {
log.error(err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
Ошибка № 1:
Ошибка типа [ERR_INVALID_ARG_TYPE]: Первым аргументом должен быть аргумент типа string, Buffer, ArrayBuffer, Array или объект, подобный массиву. Полученный тип объекта в функции.из (буфера.в JS:305:9) в getDataReadable (/…/node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) в processTicksAndRejections (внутренний/процесс/task_queues.в JS:94:5) в объект.getChunkStream (/…/node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) во время загрузки изображения.__doConcurrentUpload (/…/node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) в асинхронных обещание.все (индекс 0) в загрузки.__doMultipartUpload (/…/node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) при загрузке.сделано (/…/node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)
Try #2, using the variable jsonStream
:
const readStream = db.collection(collectionName).find('{}').limit(5).stream();
const jsonStream = readStream.pipe(JSONStream.stringify());
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: jsonStream,
},
});
Error #2:
ReferenceError: ReadableStream is not defined
at Object.getChunk (/…/node_modules/@aws-sdk/lib-storage/src/chunker.ts:22:30)
at Upload.__doMultipartUpload (/…/node_modules/@aws-sdk/lib-storage/src/Upload.ts:187:24)
at Upload.done (/…/node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:37)
Try #3: use stream.PassThrough
:
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName).find('{}').limit(5).stream();
readStream.pipe(JSONStream.stringify()).pipe(uploadStreamFile('benda_mongo.json'));
...
const stream = require('stream');
export const uploadStreamFile = async(fileName) => {
try{
const pass = new stream.PassThrough();
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: pass,
},
});
const res = await upload.done();
log.info('finished uploading file', fileName);
return res;
}
catch(err){
return;
}
};
Error #3:
‘dest.on is not a function at Stream.pipe (internal/streams/legacy.js:30:8’
Try #4: mongodb.stream({transform: doc => JSON.stringify…}) instead of JSONStream:
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import { env } from '../../../env';
const s3Client = new S3Client({ region: env.AWS_REGION });
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName)
.find('{}')
.limit(5)
.stream({ transform: doc => JSON.stringify(doc) 'n' });
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: readStream,
},
});
await upload.done();
}
catch (err) {
log.error('waaaaa', err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
Error: #4:
Ошибка типа [ERR_INVALID_ARG_TYPE]: Первым аргументом должен быть аргумент типа string, Buffer, ArrayBuffer, Array или объект, подобный массиву. Полученный тип объекта в функции.из (буфера.в JS:305:9) в getDataReadable (/…/node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) в processTicksAndRejections (внутренний/процесс/task_queues.в JS:94:5) в объект.getChunkStream (/…/node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) во время загрузки изображения.__doConcurrentUpload (/…/node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) в асинхронных обещание.все (индекс 0) в загрузки.__doMultipartUpload (/…/node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) при загрузке.сделано (/…/node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)
Попробуйте #5: используйте stream.PassThrough()
и вернитесь pass
к pipe
:
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName).find('{}').limit(5).stream({ transform: doc => JSON.stringify(doc) 'n' });
readStream.pipe(uploadStreamFile());
}
catch (err) {
log.error('waaaaa', err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
const stream = require('stream');
export const uploadStreamFile = async() => {
try{
const pass = new stream.PassThrough();
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: pass,
},
});
await upload.done();
return pass;
}
catch(err){
log.error('pawoooooo', err);
return;
}
};
Ошибка № 5:
Ошибка типа: dest.on не является функцией в Cursor.pipe (_stream_readable.js:680:8)
Комментарии:
1. Спасибо, но произошла та же ошибка. Мне удалось передать поток fs.createWriteStream и записать его в файл, позже я могу использовать поток fs.createReadStream и передать его в свой файл «uploadStreamFile», и он будет работать. но мне не нравится это решение, так как оно приводит к тому, что мой сервер записывает данные во временный файл вместо прямой потоковой передачи ресурсов MongoDB в s3.
2. Спасибо, полная трассировка стека: «dest.on не является функцией в Stream.pipe (внутренний/потоки/наследие.js:30:8»
3. Я обновил вопрос со всеми соответствующими сообщениями об ошибках
4. Я удалил комментарии, так как они уже включены в ответ. Я надеюсь, что любая из предложенных альтернатив сработает для вас.
Ответ №1:
После просмотра ваших трассировок стека ошибок, вероятно, проблема связана с тем фактом, что драйвер MongoDB предоставляет курсор в объектном режиме, тогда Body
как параметр Upload
требует традиционного потока, подходящего для обработки Buffer
в этом случае.
Взяв свой исходный код в качестве эталона, вы можете попробовать предоставить Transform
поток для выполнения обоих требований.
Пожалуйста, рассмотрите, например, следующий код:
import { Transform } from 'stream';
import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName).find('{}').limit(5).stream();
// We are creating here a Transform to adapt both sides
const toJSONTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(JSON.stringify(chunk) 'n');
callback();
}
});
readStream.pipe(toJSONTransform);
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: toJSONTransform,
},
});
await upload.done();
}
catch (err) {
log.error(err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
В коде toJSONTransform
мы определяем записываемую часть потока как объектный режим; напротив, читаемая часть будет пригодна для чтения из метода S3 Upload
… по крайней мере, я надеюсь на это.
Что касается второй ошибки, о которой вы сообщили , связанной с dest.on
, я сначала подумал, и я написал вам о возможности того , что ошибка была вызвана тем, что uploadStreamFile
вы возвращаете Promise
, а не поток, и вы передаете это Promise
pipe
методу, для которого требуется поток, в основном, что вы вернули неправильную переменную. Но я не понял, что ты пытаешься пройти PassThrough
поток как параметр Upload
метода: пожалуйста, имейте в виду, что этот поток не содержит никакой информации, поскольку вы не передаете ему никакой информации, содержимое считываемого потока, полученного из запроса MongoDB, никогда не передается ни обратному вызову, ни Upload
самому методу.
Комментарии:
1. Спасибо, я тоже это пробовал. Запуск кода с синтаксисом: «Тело: ReadStream.pipe(JSONStream.stringify ())» вернет следующую ошибку: «Ошибка ссылки: ReadableStream не определен»
2. Мне жаль это слышать. Пожалуйста, не могли бы вы предоставить дополнительную информацию о трассировке стека ошибок в вопросе? Я думаю, что это может иметь отношение к делу.
3. Спасибо, я обновил вопрос со всеми соответствующими сообщениями об ошибках
4. Большое вам спасибо @OronBen-Дэвид. Я обновил ответ дополнительными альтернативами. О
ReferenceError: ReadableStream is not defined
я думаю, что это связано с тем, что поток полученыJSONStream.stringify
только для записи в нашем случае, но не читается одновременно: я думаю, я неправильно понял документацию, которая гласит, что это может быть и для записи и для чтения, как преобразовать, но я боюсь, что не одновременно. Ошибка имеет смысл.5. Это здорово @ОронБен-Дэвид. Я очень рад слышать, что это сработало должным образом. Большое вам спасибо за
stream.PassThrough
то, что вы также поделились основанным решением.
Ответ №2:
Я нашел дополнительное решение , используя stream.PassThrough
, используя JSONStream, будет передавать массив объектов вместо одного за другим:
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const passThroughStream = new stream.PassThrough();
const readStream = db.collection(collectionName)
.find('{}')
.stream();
readStream.on('end', () => passThroughStream.end());
readStream.pipe(JSONStream.stringify()).pipe(passThroughStream);
await uploadStreamFile('benda_mongo.json', passThroughStream);
}
catch (err) {
log.error(err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
export const uploadStreamFile = async(fileName, stream) => {
try{
log.info('start uploading file', fileName);
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: `${fileName}`,
Body: stream,
},
});
const res = await upload.done();
log.info('finished uploading file', fileName);
return res;
}
catch(err){
log.error(err);
return;
}
};