# #go #apache-kafka #kafka-consumer-api
Вопрос:
Я новичок в этой теме Apache Kafka, и я писал базовый код производителя-потребителя, и у меня возникли некоторые проблемы с кодом потребителя, после запуска zookeeper и Kafka я создал название темы «firsttopic», и я ввожу некоторые события, используя команды CLI в качестве производителя, и для извлечения этих событий в качестве потребителя я написал код go с использованием Kafka-go, который я прикрепляю ниже, и ошибка, с которой я тоже сталкиваюсь. Для Кафки я использую «github.com/segmentio/kafka-go».
func Startkafka() {
conf := kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "firsttopic",
GroupID: "g1",
MaxBytes: 10,
}
reader := kafka.NewReader(conf)
for {
m, err := reader.ReadMessage((context.Background()))
if err != nil {
fmt.Println("Some error occured", err)
continue
}
fmt.Println("Message is : ", string(m.Value))
}}
func main() {
go Startkafka()
fmt.Println("Kafka has been started...")
}
Ошибка:
Кафка был начат…
Произошла некоторая ошибка чтения tcp 127.0.0.1:34858->127.0.1.1:9092: время ожидания ввода-вывода
Комментарии:
1. не могли бы вы добавить файл server.properties и файл etc/hosts
2. Вы нашли какое-нибудь решение?
Ответ №1:
Основываясь на ошибке, вам следует удалить 127.0.1.1 из /etc/сопоставление хостов с локальным хостом, поскольку оно должно оставаться как 127.0.0.1 во время подключения клиента к локальному хосту
В качестве альтернативы вместо этого подключитесь к 127.0.0.1:9092
Комментарии:
1. Я делал точно такой же, как вы предложили, но все равно, это не работает, хотя он не показывает мне никакой ошибки он не реагирует на любые события, которые я захожу через продюсера, с помощью командной строки команду » ОГРН/Кафка-консоль-производителя.с—темы firsttopic, брокер-список 127.0.0.1:9092 «.
2. А что, если вы воспользуетесь
kafka-console-consumer --from-beginning
?3. Или изменение идентификатора группы делает что-то по-другому?
4. на кафке-консоли-потребителе показано все, что я ввел через кафку-потребителя-производителя.
Ответ №2:
MaxBytes: 10 очень мало, вам следует увеличить его, так как это может вызвать проблему. Остальная часть кода кажется правильной. сначала вы должны попробовать подключиться с помощью Kafka-потребитель, поставляемый с установкой Kafka, независимо от того, включена ли она, которая также проверит строку подключения, которую вы используете.
Комментарии:
1. Я попытался изменить размер MaxBytes, но все равно ничего не произошло, и да, kafka-потребитель отвечает на команду CLI кафки-производителя.
Ответ №3:
В вас основная функция, которая StartKafka()
вызывает в a go routine
. Затем он запускается в другом стеке и запускается одновременно, в то время как основная процедура завершает выполнение.
Удалите это go routine
и запустите приложение. он будет блокировать приложение и потреблять из темы Кафки.
Если вам нужно создать правильный процесс плавного завершения работы приложения, вам нужно использовать каналы или что-то в этом роде. Но следующий код просто работает для использования содержимого темы.
func main() {
fmt.Println("starting kafka...")
Startkafka()
}