@aws-sdk/lib-хранилище для потоковой передачи JSON из MongoDB в S3 с помощью JSONStream.stringify()

#javascript #node.js #mongodb #amazon-web-services #aws-sdk

Вопрос:

Я пытаюсь передать JSON из MongoDB в S3 с помощью новой версии @aws-sdk/lib-хранилища:

 "@aws-sdk/client-s3": "^3.17.0"
"@aws-sdk/lib-storage": "^3.34.0"
"JSONStream": "^1.3.5",
 

Попытка №1: Кажется, я неправильно использую JSONStream.stringify() :

 import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    readStream.pipe(JSONStream.stringify());
 
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: readStream,
      },
    });
    
    await upload.done(); 
  }
  catch (err) {
    log.error(err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }

};
 

Ошибка № 1:

Ошибка типа [ERR_INVALID_ARG_TYPE]: Первым аргументом должен быть аргумент типа string, Buffer, ArrayBuffer, Array или объект, подобный массиву. Полученный тип объекта в функции.из (буфера.в JS:305:9) в getDataReadable (/…/node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) в processTicksAndRejections (внутренний/процесс/task_queues.в JS:94:5) в объект.getChunkStream (/…/node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) во время загрузки изображения.__doConcurrentUpload (/…/node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) в асинхронных обещание.все (индекс 0) в загрузки.__doMultipartUpload (/…/node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) при загрузке.сделано (/…/node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)

Try #2, using the variable jsonStream :

   const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    const jsonStream = readStream.pipe(JSONStream.stringify());
 
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: jsonStream,
      },
    });
 

Error #2:

ReferenceError: ReadableStream is not defined
at Object.getChunk (/…/node_modules/@aws-sdk/lib-storage/src/chunker.ts:22:30)
at Upload.__doMultipartUpload (/…/node_modules/@aws-sdk/lib-storage/src/Upload.ts:187:24)
at Upload.done (/…/node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:37)

Try #3: use stream.PassThrough :

     client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    readStream.pipe(JSONStream.stringify()).pipe(uploadStreamFile('benda_mongo.json'));

...

const stream = require('stream');
export const uploadStreamFile = async(fileName) => {
  try{

    const pass = new stream.PassThrough();
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: pass,
      },
    });
    const res = await upload.done();
    
    log.info('finished uploading file', fileName);
    return res;
  }
  catch(err){
    return;
  }
};
 

Error #3:

‘dest.on is not a function at Stream.pipe (internal/streams/legacy.js:30:8’

Try #4: mongodb.stream({transform: doc => JSON.stringify…}) instead of JSONStream:

 import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import { env } from '../../../env';
const s3Client = new S3Client({ region: env.AWS_REGION });

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName)
      .find('{}')
      .limit(5)
      .stream({ transform: doc => JSON.stringify(doc)   'n' });
  
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: readStream,
      },
    });
  
    await upload.done(); 
  }
  catch (err) {
    log.error('waaaaa', err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }
};
 

Error: #4:

Ошибка типа [ERR_INVALID_ARG_TYPE]: Первым аргументом должен быть аргумент типа string, Buffer, ArrayBuffer, Array или объект, подобный массиву. Полученный тип объекта в функции.из (буфера.в JS:305:9) в getDataReadable (/…/node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) в processTicksAndRejections (внутренний/процесс/task_queues.в JS:94:5) в объект.getChunkStream (/…/node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) во время загрузки изображения.__doConcurrentUpload (/…/node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) в асинхронных обещание.все (индекс 0) в загрузки.__doMultipartUpload (/…/node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) при загрузке.сделано (/…/node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)

Попробуйте #5: используйте stream.PassThrough() и вернитесь pass к pipe :

 export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream({ transform: doc => JSON.stringify(doc)   'n' });
    readStream.pipe(uploadStreamFile());
  }
  catch (err) {
    log.error('waaaaa', err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }
};


const stream = require('stream');

export const uploadStreamFile = async() => {
  try{
    const pass = new stream.PassThrough();
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: pass,
      },
    });
    await upload.done();
    return pass;
  }
  catch(err){
    log.error('pawoooooo', err);
    return;
  }
};
 

Ошибка № 5:

Ошибка типа: dest.on не является функцией в Cursor.pipe (_stream_readable.js:680:8)

Комментарии:

1. Спасибо, но произошла та же ошибка. Мне удалось передать поток fs.createWriteStream и записать его в файл, позже я могу использовать поток fs.createReadStream и передать его в свой файл «uploadStreamFile», и он будет работать. но мне не нравится это решение, так как оно приводит к тому, что мой сервер записывает данные во временный файл вместо прямой потоковой передачи ресурсов MongoDB в s3.

2. Спасибо, полная трассировка стека: «dest.on не является функцией в Stream.pipe (внутренний/потоки/наследие.js:30:8»

3. Я обновил вопрос со всеми соответствующими сообщениями об ошибках

4. Я удалил комментарии, так как они уже включены в ответ. Я надеюсь, что любая из предложенных альтернатив сработает для вас.

Ответ №1:

После просмотра ваших трассировок стека ошибок, вероятно, проблема связана с тем фактом, что драйвер MongoDB предоставляет курсор в объектном режиме, тогда Body как параметр Upload требует традиционного потока, подходящего для обработки Buffer в этом случае.

Взяв свой исходный код в качестве эталона, вы можете попробовать предоставить Transform поток для выполнения обоих требований.

Пожалуйста, рассмотрите, например, следующий код:

 import { Transform } from 'stream';
import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    // We are creating here a Transform to adapt both sides
    const toJSONTransform = new Transform({
      writableObjectMode: true,
      transform(chunk, encoding, callback) {
        this.push(JSON.stringify(chunk)   'n');
        callback();  
      }  
    });

    readStream.pipe(toJSONTransform);
 
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: toJSONTransform,
      },
    });
    
    await upload.done(); 
  }
  catch (err) {
    log.error(err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }

};
 

В коде toJSONTransform мы определяем записываемую часть потока как объектный режим; напротив, читаемая часть будет пригодна для чтения из метода S3 Upload … по крайней мере, я надеюсь на это.

Что касается второй ошибки, о которой вы сообщили , связанной с dest.on , я сначала подумал, и я написал вам о возможности того , что ошибка была вызвана тем, что uploadStreamFile вы возвращаете Promise , а не поток, и вы передаете это Promise pipe методу, для которого требуется поток, в основном, что вы вернули неправильную переменную. Но я не понял, что ты пытаешься пройти PassThrough поток как параметр Upload метода: пожалуйста, имейте в виду, что этот поток не содержит никакой информации, поскольку вы не передаете ему никакой информации, содержимое считываемого потока, полученного из запроса MongoDB, никогда не передается ни обратному вызову, ни Upload самому методу.

Комментарии:

1. Спасибо, я тоже это пробовал. Запуск кода с синтаксисом: «Тело: ReadStream.pipe(JSONStream.stringify ())» вернет следующую ошибку: «Ошибка ссылки: ReadableStream не определен»

2. Мне жаль это слышать. Пожалуйста, не могли бы вы предоставить дополнительную информацию о трассировке стека ошибок в вопросе? Я думаю, что это может иметь отношение к делу.

3. Спасибо, я обновил вопрос со всеми соответствующими сообщениями об ошибках

4. Большое вам спасибо @OronBen-Дэвид. Я обновил ответ дополнительными альтернативами. О ReferenceError: ReadableStream is not defined я думаю, что это связано с тем, что поток получены JSONStream.stringify только для записи в нашем случае, но не читается одновременно: я думаю, я неправильно понял документацию, которая гласит, что это может быть и для записи и для чтения, как преобразовать, но я боюсь, что не одновременно. Ошибка имеет смысл.

5. Это здорово @ОронБен-Дэвид. Я очень рад слышать, что это сработало должным образом. Большое вам спасибо за stream.PassThrough то, что вы также поделились основанным решением.

Ответ №2:

Я нашел дополнительное решение , используя stream.PassThrough , используя JSONStream, будет передавать массив объектов вместо одного за другим:

 export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const passThroughStream = new stream.PassThrough();
    const readStream = db.collection(collectionName)
      .find('{}')
      .stream();

    readStream.on('end', () => passThroughStream.end());

    readStream.pipe(JSONStream.stringify()).pipe(passThroughStream);
    await uploadStreamFile('benda_mongo.json', passThroughStream);
  }
  catch (err) {
    log.error(err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }
};


export const uploadStreamFile = async(fileName, stream) => {
  try{
    log.info('start uploading file', fileName);
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: `${fileName}`,
        Body: stream,
      },
    });

    const res = await upload.done();
    log.info('finished uploading file', fileName);
    return res;
  }
  catch(err){
    log.error(err);
    return;
  }
};