Получение ответа от брокера Кафки после создания сообщения

#apache-kafka #kafka-consumer-api #confluent-platform #kafka-producer-api #deno

Вопрос:

Мы успешно создаем тему Кафки, используя приведенный ниже код:

Таким образом, мы используем Deno.connect для создания TCP-сокета и используем KafkaJS для кодирования сообщения и успешной отправки его нашему брокеру, работающему на порту 9093. Затем мы можем посмотреть на нашего потребителя и увидеть, что сообщение успешно получено.

  /** @format */
import { Encoder } from 'file:///C:/Users/wesge/Desktop/Coding/kafkaEx/protocol/encoder.js';

import { Decoder } from 'file:///C:/Users/wesge/Desktop/Coding/kafkaEx/protocol/decoder.js';

import { Buffer } from 'https://deno.land/std@0.76.0/node/buffer.ts';

import { readAll, writeAll } from 'https://deno.land/std@0.105.0/io/util.ts';

import request from 'file:///C:/Users/wesge/Desktop/Coding/kafkaEx/protocol/request.js';

import { readerFromStreamReader } from 'https://deno.land/std/io/mod.ts';

import {
  ProducerBatch,
  Message,
  IHeaders,
  TopicMessages,
  Broker,
  TopicOffsets,
  PartitionOffset,
} from './index.d.ts';

// import { Produce: apiKey } from '../../apiKeys'

// import { Produce: apiKey } from './protocol/requests/apiKeys'

import MessageSet from './protocol/messageSet/index.js';

import {
  Client,
  Packet,
  Event,
} from 'https://deno.land/x/tcp_socket@0.0.1/mods.ts';

let date = await new Date(Date.now()).toUTCString();

export default async function func(string: string = date) {
  console.log(typeof string);
  const conn = await Deno.connect({
    hostname: 'localhost',
    port: 9093,
    transport: 'tcp',
  });
  //console.log('Connected', conn.write);

  interface topicDataType {
    topicData: Array<{
      topic: string;
      partitions: Array<{
        partition: number;
        firstSequence?: number;
        messages: Message[];
      }>;
    }>;
  }

  //producedMessage = ({acks, timeout, topicData}: ProducerBatch) =>
  const producedMessage = ({ acks, timeout, topicData }: ProducerBatch) => ({
    apiKey: 0, //0
    apiVersion: 0,
    apiName: 'Produce',
    //expectResponse: () => acks !== 0,
    encode: async () => {
      return new Encoder()
        .writeInt16(acks)
        .writeInt32(timeout)
        .writeArray(topicData.map(encodeTopic));
    },
  });

  //topicData structure
  const td = [
    {
      topic: 'quickstart-events',
      partitions: [
        {
          partition: 0,
          firstSequence: undefined,
          messages: [
            {
              key: 'new-key6',
              value: string,
              partition: 0,
              headers: undefined,
              timestamp: Date.now(),
            },
          ],
        },
      ],
    },
  ];

  const message = producedMessage({
    acks: 0,
    timeout: 1000,
    topicData: td,
  });

  const encodeTopic = ({ topic, partitions }: any) => {
    return new Encoder()
      .writeString(topic)
      .writeArray(partitions.map(encodePartitions));
  };

  const encodePartitions = ({ partition, messages }: any) => {
    const messageSet = MessageSet({
      messageVersion: 0,
      compression: 0,
      entries: messages,
    });
    return new Encoder()
      .writeInt32(partition)
      .writeInt32(messageSet.size())
      .writeEncoder(messageSet);
  };

  const pleaseWork = await request({
    correlationId: 1,
    clientId: 'my-app',
    request: message,
  });

  console.log('GOT TO HERE SAM', pleaseWork);

  const denoVersion = await conn.write(pleaseWork.buf);

  console.log('Hello is ', denoVersion);

  conn.close();
}

func();
 

Теперь мы пытаемся сделать то же самое и создать потребителя, используя протокол Кафки таким же образом, как описано выше.

Наш первый шаг заключается в том, чтобы попытаться получить ответ после создания нашего сообщения. Мы пробовали различные методы создания прослушивающего сервера TCP с помощью Deno.listen, Deno.connect и т. Д., Но нам не повезло. Ниже приведен конгломерат большинства различных методов, которые мы пробовали:

     /** @format */

// /** @format */

// import { Application, Router, send } from 'https://deno.land/x/oak/mod.ts';

import producer from './shitProducer.ts';

import { copy } from 'https://deno.land/std@0.103.0/io/util.ts';

// const router = new Router();

// router
//   .get('/test', (ctx) => {
//     ctx.response.body = 'Hello World!';
//     producer();
//   })
//   .post('/test', (ctx) => {
//     ctx.response.body = "You've posted!";
//   })
//   .delete('/test', (ctx) => {
//     console.log('Request body ', ctx.request.body);
//     ctx.response.body = 'You deleted!';
//   });

// const app = new Application();

// app.use(router.routes());

// app.use(router.allowedMethods());

// app.use(async (ctx) => {
//   await send(ctx, ctx.request.url.pathname, {
//     root: `${Deno.cwd()}`,
//     index: 'index.html',
//   });
// });

// await app.listen({ port: 8000 });

// const server = Deno.listen({ port: 8000 });
// console.log('tcp server listening on port 8000');

// const newConn = await Deno.connect({
//   port: 9093,
//   hostname: 'localhost',
//   transport: 'tcp',
// });

// const connections: Deno.Conn[] = [newConn];

// // for await (const connection of server) {
// //   // new connection
// //   //connections.push(connection);
// //   handle_connection(connection);
// // }

// console.log('line 50');
// //producer();

// async function handle_connection(connection: Deno.Conn) {
//   console.log('in handle connection');
//   let buffer = new Uint8Array(512);
//   while (true) {
//     const count = await connection.read(buffer);
//     console.log('count', count);
//     if (!count) {
//       // connection closed
//       const index = connections.indexOf(connection);
//       connections.splice(index, 1);
//       console.log('about to break');
//       break;
//     } else {
//       // message received
//       let message = buffer.subarray(0, count);
//       for (const current_connection of connections) {
//         if (current_connection !== connection) {
//           console.log('in else');
//           await current_connection.write(message);
//         }
//       }
//     }
//   }
// }
// await handle_connection(newConn);

const listener = Deno.listen({ port: 9093 });
setInterval(() => console.log('listener', listener), 1000);
console.log('listening on 0.0.0.0:8080');
for await (const conn of listener) {
  console.log('Hello');
  copy(conn, conn).finally(() => conn.close());
}
 

Кто-нибудь знает, что нам нужно сделать, чтобы прочитать ответ, который мы должны получить от нашего брокера? Спасибо!!

Комментарии:

1. Потребители Кафки не являются TCP-серверами (они не прослушивают порты). Это клиенты, которые отправляют запросы на извлечение брокеру, аналогично тому, как производитель получает подтверждение после пакетного запроса