Как освободить мои данные от Node.js Поток

#javascript #node.js #stream

#javascript #node.js #поток

Вопрос:

Я уже некоторое время работаю с API Java Script, но это первый раз, когда я пытаюсь выполнить выборку из активного потока, который никогда не будет генерироваться 'done' . Моя цель — получить заданное количество выборок из потока в час. Поток подключается и передает много информации, но мне не удалось перевести возвращенные данные в формат, в котором я могу выполнить дальнейшую обработку (например, я знаком с рабочим процессом data science).).

Такое ощущение, что я уже несколько дней просматриваю документы и заметил, что большинство простых примеров передают читаемый поток в файл на сервере. Это кажется неэффективным для моего приложения. (Чтобы записать его в файл только для того, чтобы снова прочитать его, чтобы выполнить дополнительную обработку, прежде чем отправлять его в браузер для рендеринга через fetch API или отправлять его в MongoDB проекта для долгосрочного хранения и глубокого анализа. Я почти уверен, что есть способ установить JSON как const или var , и я просто не знаком с этим.

Как мне перенести мои данные в saved переменную Java Script? Что мне нужно изменить или добавить в свой код, чтобы иметь возможность продолжать манипулировать и обрабатывать возвращенный JSON?

 const needle = require('needle');

const token = process.env.BEARER_TOKEN;
const streamURL = 'https://api.twitter.com/2/tweets/sample/stream';

function streamConnect() {
    const options = {
        timeout: 2000,
    };

    const stream = needle.get(
        streamURL,
        {
            headers: {
                Authorization: `Bearer ${token}`,
            },
        },
        options
    );

    stream
        .on('data', (data) => {
            try {
                const json = JSON.parse(data);
                // console.log(json);
            } catch (e) {
                // Keep alive signal received. Do nothing.
            }
        })
        .on('error', (error) => {
            if (error.code === 'ETIMEDOUT') {
                stream.emit('timeout');
            }
        });

    return stream;
}

function getTweetSample() {
    const s = streamConnect();
    const chunks = [];
    s.on('readable', () => {
        let chunk;
        while (null !== (chunk = s.read())) {
            chunks.push(chunk);
        }
    });
    setInterval(() => {
        s.destroy();
    }, 3000);
    return chunks;
}

const saved = API.getTweetSample();
console.log('saved: ', saved);

// Above returns
// "saved: []"

// Expecting 
// "saved:
{
{
  data: {
    id: '1301578967443337***',
    text: 'See bones too so sure your weight perfect!'
  }
}
{
  data: {
    id: '1301578980001230***
    text: 'Vcs perderam a Dona Maria, ela percebeu q precisa trabalhar e crescer na vida, percebeu q paga 40% de imposto no consumo enquanto políticos q dizem lutar por ela, estão usufruindo dos direitos q ela nunca vai ter 👍 Trabalho escravo é ter q trabalhar pra vcs'
  }
}
...... // 20 examples
}"
  

Отредактировано 2020-09-07

Это пример полезной нагрузки ответа:

 PassThrough {
  _readableState: ReadableState {
    objectMode: false,
    highWaterMark: 16384,
    buffer: BufferList { head: null, tail: null, length: 0 },
    length: 0,
    pipes: null,
    pipesCount: 0,
    flowing: true,
    ended: false,
    endEmitted: false,
    reading: false,
    sync: false,
    ....
}
  

Комментарии:

1. Правильно ли это резюмирует вашу проблему: 1) Подключитесь к удаленному JSON API. 2) Загрузить / потоковые данные из API и потоковый анализ ответа 3) корректно завершить поток после получения 20 образцов?

2. Да, я бы сказал, что это точно, с дополнительным 4-м шагом, когда данные возвращаются в const или var для дальнейшей обработки.

3. Можете ли вы предоставить минимальный пример полезной нагрузки данных, возвращаемых API? Структура данных JSON важна для определения того, как поток данных должен быть проанализирован.

Ответ №1:

Три шага для решения проблемы:

  1. Данные должны быть извлечены в виде тела потокового HTTP-ответа
  2. Поток ответов должен быть проанализирован анализатором JSON, поскольку данные передаются из ответа
  3. Поток должен завершиться после того, как 20 элементов будут проанализированы анализатором JSON

Пример кода из OP уже иллюстрирует, как решить (1).

Существует множество библиотек для оперативного анализа потока данных JSON для решения задачи (2). Мое личное предпочтение stream-json , поскольку для этого требуется только одна строка кода в нашем конвейере.

Наконец, (3) потребует от кода завершения входящего потока до его завершения. Это приведет к тому, что nodejs выдаст ERR_STREAM_PREMATURE_CLOSE ошибку, которая может быть обработана целевым оператором catch.

Объединение этих шагов станет чем-то вроде следующего исполняемого файла POC. У меня нет токена API Twitter, но я думаю, что это сработает:

 const stream = require('stream');
const util = require('util');
const got = require('got');
const StreamValues = require("stream-json/streamers/StreamValues.js");

(async () => {
  const token = "<YOUR API TOKEN>";

  const dataStream = got.stream('https://api.twitter.com/2/tweets/sample/stream', {
    headers: { "Authorization": `Bearer ${token}` },
  });

  // This array will by filled by JSON parsed objects from the HTTP response
  const dataPoints = [];
    
  await util.promisify(stream.pipeline)(
    // This readable stream [dataStream] will emit the incoming HTTP body as string data
    dataStream,
    // The string data is then JSON parsed on the fly by [stream-json]
    StreamValues.withParser(),
    // Finally, we iterate over the the JSON objects and push them to the [dataPoints] array.
    async function(source){
      for await (const parsedObject of source){
        dataPoints.push( parsedObject.value );

        if( dataPoints.length === 20 ){
          // When we reach 20 data points, the stream is forcefully terminated
          dataStream.destroy();
          return;
        }
      }
    }
  )
    // Prematurely terminating the stream will cause nodejs to emit a [ERR_STREAM_PREMATURE_CLOSE] 
    // error. If it is OK to return more than 20 elements, you could try to remove the 
    // [return] statement on L28;
    .catch(error => (error.code !== "ERR_STREAM_PREMATURE_CLOSE" amp;amp; Promise.reject(error)));
}())
  .catch(console.error);