Как использовать / использовать поток событий из wolkenkit-eventstore

#wolkenkit

#wolkenkit

Вопрос:

Я хочу использовать хранилище событий wolkenkit и пытался создать быстрый пример. Но я не могу просто вывести поток событий.

Упрощенный пример:

 const eventstore = require("wolkenkit-eventstore/inmemory");
const Stream = require("stream");
const uuidv4 = require("uuid/v4");
const Event = require("commands-events/dist/Event");

const main = async () => {
    await eventstore.initialize();

    const aggregateId = uuidv4();
    const event = new Event({ ... });
    event.metadata.revision = 1;

    await eventstore.saveEvents({ events: event });

    const writableStream = new Stream.Writable();
    writableStream._write = (chunk, encoding, next) => {
        console.log(chunk.toString());
                next()
    };

    const readableStream = eventstore.getUnpublishedEventStream();
    readableStream.pipe(writableStream);
};

main();
 

Насколько я понимаю, getUnpublishedEventStream возвращает читаемый поток. Я следовал этим инструкциям, но это сработало не так, как ожидалось.
Все, что я получаю, это следующая ошибка:

 (node:10988) UnhandledPromiseRejectionWarning: TypeError: readableStream.pipe is not a function
 

Ответ №1:

Согласно документации wolkenkit-eventstore, getUnpublishedEventStream это async функция, то есть вы должны вызвать ее с await помощью . В противном случае вы получите обратно не поток, а обещание (а у обещания нет pipe функции).

Итак, эта строка

 const readableStream = eventstore.getUnpublishedEventStream();
 

должно быть:

 const readableStream = await eventstore.getUnpublishedEventStream();
 

Я не рассматривал ваш код более подробно, кроме этого, но именно по этой причине вы получаете текущее сообщение об ошибке.

PS: Пожалуйста, обратите внимание, что я являюсь одним из основных разработчиков wolkenkit, поэтому, пожалуйста, отнеситесь к моему ответу с недоверием.

Комментарии:

1. Привет, Голо, спасибо за быстрый ответ. К сожалению, вы правы, я совершенно забыл, что это асинхронный метод. Я должен был заглянуть в тестовую папку репозитория раньше. Там я нашел еще один ваш пакет streamtoarray и думаю, что это может быть очень полезно.