#apache-kafka #kafka-consumer-api #confluent-platform #kafka-producer-api #deno
Вопрос:
Мы успешно создаем тему Кафки, используя приведенный ниже код:
Таким образом, мы используем Deno.connect для создания TCP-сокета и используем KafkaJS для кодирования сообщения и успешной отправки его нашему брокеру, работающему на порту 9093. Затем мы можем посмотреть на нашего потребителя и увидеть, что сообщение успешно получено.
/** @format */
import { Encoder } from 'file:///C:/Users/wesge/Desktop/Coding/kafkaEx/protocol/encoder.js';
import { Decoder } from 'file:///C:/Users/wesge/Desktop/Coding/kafkaEx/protocol/decoder.js';
import { Buffer } from 'https://deno.land/std@0.76.0/node/buffer.ts';
import { readAll, writeAll } from 'https://deno.land/std@0.105.0/io/util.ts';
import request from 'file:///C:/Users/wesge/Desktop/Coding/kafkaEx/protocol/request.js';
import { readerFromStreamReader } from 'https://deno.land/std/io/mod.ts';
import {
ProducerBatch,
Message,
IHeaders,
TopicMessages,
Broker,
TopicOffsets,
PartitionOffset,
} from './index.d.ts';
// import { Produce: apiKey } from '../../apiKeys'
// import { Produce: apiKey } from './protocol/requests/apiKeys'
import MessageSet from './protocol/messageSet/index.js';
import {
Client,
Packet,
Event,
} from 'https://deno.land/x/tcp_socket@0.0.1/mods.ts';
let date = await new Date(Date.now()).toUTCString();
export default async function func(string: string = date) {
console.log(typeof string);
const conn = await Deno.connect({
hostname: 'localhost',
port: 9093,
transport: 'tcp',
});
//console.log('Connected', conn.write);
interface topicDataType {
topicData: Array<{
topic: string;
partitions: Array<{
partition: number;
firstSequence?: number;
messages: Message[];
}>;
}>;
}
//producedMessage = ({acks, timeout, topicData}: ProducerBatch) =>
const producedMessage = ({ acks, timeout, topicData }: ProducerBatch) => ({
apiKey: 0, //0
apiVersion: 0,
apiName: 'Produce',
//expectResponse: () => acks !== 0,
encode: async () => {
return new Encoder()
.writeInt16(acks)
.writeInt32(timeout)
.writeArray(topicData.map(encodeTopic));
},
});
//topicData structure
const td = [
{
topic: 'quickstart-events',
partitions: [
{
partition: 0,
firstSequence: undefined,
messages: [
{
key: 'new-key6',
value: string,
partition: 0,
headers: undefined,
timestamp: Date.now(),
},
],
},
],
},
];
const message = producedMessage({
acks: 0,
timeout: 1000,
topicData: td,
});
const encodeTopic = ({ topic, partitions }: any) => {
return new Encoder()
.writeString(topic)
.writeArray(partitions.map(encodePartitions));
};
const encodePartitions = ({ partition, messages }: any) => {
const messageSet = MessageSet({
messageVersion: 0,
compression: 0,
entries: messages,
});
return new Encoder()
.writeInt32(partition)
.writeInt32(messageSet.size())
.writeEncoder(messageSet);
};
const pleaseWork = await request({
correlationId: 1,
clientId: 'my-app',
request: message,
});
console.log('GOT TO HERE SAM', pleaseWork);
const denoVersion = await conn.write(pleaseWork.buf);
console.log('Hello is ', denoVersion);
conn.close();
}
func();
Теперь мы пытаемся сделать то же самое и создать потребителя, используя протокол Кафки таким же образом, как описано выше.
Наш первый шаг заключается в том, чтобы попытаться получить ответ после создания нашего сообщения. Мы пробовали различные методы создания прослушивающего сервера TCP с помощью Deno.listen, Deno.connect и т. Д., Но нам не повезло. Ниже приведен конгломерат большинства различных методов, которые мы пробовали:
/** @format */
// /** @format */
// import { Application, Router, send } from 'https://deno.land/x/oak/mod.ts';
import producer from './shitProducer.ts';
import { copy } from 'https://deno.land/std@0.103.0/io/util.ts';
// const router = new Router();
// router
// .get('/test', (ctx) => {
// ctx.response.body = 'Hello World!';
// producer();
// })
// .post('/test', (ctx) => {
// ctx.response.body = "You've posted!";
// })
// .delete('/test', (ctx) => {
// console.log('Request body ', ctx.request.body);
// ctx.response.body = 'You deleted!';
// });
// const app = new Application();
// app.use(router.routes());
// app.use(router.allowedMethods());
// app.use(async (ctx) => {
// await send(ctx, ctx.request.url.pathname, {
// root: `${Deno.cwd()}`,
// index: 'index.html',
// });
// });
// await app.listen({ port: 8000 });
// const server = Deno.listen({ port: 8000 });
// console.log('tcp server listening on port 8000');
// const newConn = await Deno.connect({
// port: 9093,
// hostname: 'localhost',
// transport: 'tcp',
// });
// const connections: Deno.Conn[] = [newConn];
// // for await (const connection of server) {
// // // new connection
// // //connections.push(connection);
// // handle_connection(connection);
// // }
// console.log('line 50');
// //producer();
// async function handle_connection(connection: Deno.Conn) {
// console.log('in handle connection');
// let buffer = new Uint8Array(512);
// while (true) {
// const count = await connection.read(buffer);
// console.log('count', count);
// if (!count) {
// // connection closed
// const index = connections.indexOf(connection);
// connections.splice(index, 1);
// console.log('about to break');
// break;
// } else {
// // message received
// let message = buffer.subarray(0, count);
// for (const current_connection of connections) {
// if (current_connection !== connection) {
// console.log('in else');
// await current_connection.write(message);
// }
// }
// }
// }
// }
// await handle_connection(newConn);
const listener = Deno.listen({ port: 9093 });
setInterval(() => console.log('listener', listener), 1000);
console.log('listening on 0.0.0.0:8080');
for await (const conn of listener) {
console.log('Hello');
copy(conn, conn).finally(() => conn.close());
}
Кто-нибудь знает, что нам нужно сделать, чтобы прочитать ответ, который мы должны получить от нашего брокера? Спасибо!!
Комментарии:
1. Потребители Кафки не являются TCP-серверами (они не прослушивают порты). Это клиенты, которые отправляют запросы на извлечение брокеру, аналогично тому, как производитель получает подтверждение после пакетного запроса