#node.js #kafka-producer-api #kafkajs
#node.js #kafka-producer-api #kafkaj
Вопрос:
Я настраиваю сервер в nodejs, который использует kafka. Для этого я использую KAFKAJ. Проблема здесь в том, что я не хочу, чтобы сервер отключался каждый раз, когда кто-то отправляет post или считывает значение из kafka.
До сих пор я пробовал это для чтения данных
const readData = async (receiver, stream_name) => {
return new Promise((resolve, reject) => {
consumer.connect()
consumer.subscribe({ topic: stream_name })
.catch(e => {
console.log(stream_name)
console.log('failed to subscribe to topic')
console.log('err: ', e)
reject(e)
})
consumer.run({
eachMessage: async ({ topic, message }) => {
receiver(`- ${topic} ${message.timestamp} ${message.key}#${message.value}`)
resolve('ok')
}
})
.then(e => {
console.log('reading')
resolve('read')
})
.catch(e => {
console.log('messed up', e)
reject('fail')
})
setTimeout(() => {
console.log('egging')
return 0
}, 10000)
})
}
и это для создания данных
const pushData = async payload => {
return new Promise((resolve, reject) => {
producer.send(payload)
.then( e => {
resolve(e)
})
.catch(e => {
console.log('Error pushing data to kafka broker:n', e)
reject(e)
})
})
}
// run ({String topic, {String key, JSON value[]} messages} payload)
const putData = async payload => {
console.log('Connecting to kafka server...')
await producer.connect()
const a = await pushData(payload)
console.log('Data sent: ', a)
await producer.disconnect()
process.abort()
}
Этот код работает хорошо, но его нужно записать, чтобы выйти из метода.
Я хотел решение, в котором я мог бы либо завершить процесс / поток, в котором выполняется интерфейс kafka, либо обычный выход из метода.
Приветствуется любая помощь.