Кафка-Проблема с потребителем, показывающая некоторую ошибку тайм-аута

# #go #apache-kafka #kafka-consumer-api

Вопрос:

Я новичок в этой теме Apache Kafka, и я писал базовый код производителя-потребителя, и у меня возникли некоторые проблемы с кодом потребителя, после запуска zookeeper и Kafka я создал название темы «firsttopic», и я ввожу некоторые события, используя команды CLI в качестве производителя, и для извлечения этих событий в качестве потребителя я написал код go с использованием Kafka-go, который я прикрепляю ниже, и ошибка, с которой я тоже сталкиваюсь. Для Кафки я использую «github.com/segmentio/kafka-go».

 func Startkafka() {
    conf := kafka.ReaderConfig{
        Brokers:  []string{"localhost:9092"},
        Topic:    "firsttopic",
        GroupID:  "g1",
        MaxBytes: 10,
    }
    reader := kafka.NewReader(conf)
    for {
        m, err := reader.ReadMessage((context.Background()))
        if err != nil {
            fmt.Println("Some error occured", err)
            continue
        }
        fmt.Println("Message is : ", string(m.Value))
    }}

func main() {
    go Startkafka()
    fmt.Println("Kafka has been started...")
}
 

Ошибка:
Кафка был начат…
Произошла некоторая ошибка чтения tcp 127.0.0.1:34858->127.0.1.1:9092: время ожидания ввода-вывода

Комментарии:

1. не могли бы вы добавить файл server.properties и файл etc/hosts

2. Вы нашли какое-нибудь решение?

Ответ №1:

Основываясь на ошибке, вам следует удалить 127.0.1.1 из /etc/сопоставление хостов с локальным хостом, поскольку оно должно оставаться как 127.0.0.1 во время подключения клиента к локальному хосту

В качестве альтернативы вместо этого подключитесь к 127.0.0.1:9092

Комментарии:

1. Я делал точно такой же, как вы предложили, но все равно, это не работает, хотя он не показывает мне никакой ошибки он не реагирует на любые события, которые я захожу через продюсера, с помощью командной строки команду » ОГРН/Кафка-консоль-производителя.с—темы firsttopic, брокер-список 127.0.0.1:9092 «.

2. А что, если вы воспользуетесь kafka-console-consumer --from-beginning ?

3. Или изменение идентификатора группы делает что-то по-другому?

4. на кафке-консоли-потребителе показано все, что я ввел через кафку-потребителя-производителя.

Ответ №2:

MaxBytes: 10 очень мало, вам следует увеличить его, так как это может вызвать проблему. Остальная часть кода кажется правильной. сначала вы должны попробовать подключиться с помощью Kafka-потребитель, поставляемый с установкой Kafka, независимо от того, включена ли она, которая также проверит строку подключения, которую вы используете.

Комментарии:

1. Я попытался изменить размер MaxBytes, но все равно ничего не произошло, и да, kafka-потребитель отвечает на команду CLI кафки-производителя.

Ответ №3:

В вас основная функция, которая StartKafka() вызывает в a go routine . Затем он запускается в другом стеке и запускается одновременно, в то время как основная процедура завершает выполнение.

Удалите это go routine и запустите приложение. он будет блокировать приложение и потреблять из темы Кафки.

Если вам нужно создать правильный процесс плавного завершения работы приложения, вам нужно использовать каналы или что-то в этом роде. Но следующий код просто работает для использования содержимого темы.

 func main() {
    fmt.Println("starting kafka...")
    Startkafka()
}