Kafka: доступ к серверу kafka на виртуальной машине из Windows через общедоступный ip

#apache-kafka #centos #remote-access

#apache-kafka #centos #удаленный доступ

Вопрос:

У меня небольшая проблема. Я хочу подключиться к моему серверу kafka на виртуальной машине centos из скрипта Windows Java producer.

в config/server.properties у меня есть строка:

 listeners=PLAINTEXT://0.0.0.0:9092

advertised.listeners=PLAINTEXT://<public ip>:9092
  

(например: advertised.listeners=ОТКРЫТЫЙ ТЕКСТ://192.239.83.27:9092)

и я все еще не могу создавать сообщения от производителя в Windows. В виртуальном окне я установил опцию «разрешить все» в настройках сети

Невозможно запустить сервер kafka, при постоянном запуске появляется ПРЕДУПРЕЖДЕНИЕ: [идентификатор контроллера = 0, targetBrokerId = 0] Не удалось установить соединение с узлом 0. Брокер может быть недоступен (org.apache.kafka.клиенты.NetworkClient).

Пожалуйста, помогите: p

введите описание изображения здесь

и мой Java-код в Windows:

 public class TwitterProducer {

Logger logger = LoggerFactory.getLogger(TwitterProducer.class.getName());

String consumerKey = "xxx";
String consumerSecret = "xxx";
String token = "xxx";
String secret = "xxx";

public TwitterProducer() {}

public static void main(String[] args) {
    new TwitterProducer().run();
}

public void run()
{
    logger.info("Setup");
    /** Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */
    BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);

    // create a twitter client
    Client client = createTwitterClient(msgQueue);
    // Attempts to establish a connection.
    client.connect();

    // create a kafka producer
    KafkaProducer<String, String> producer = createKafkaProducer();

    // on a different thread, or multiple different threads....
    while (!client.isDone()) {
        String msg = null;
        try {
            msg = msgQueue.poll(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
            client.stop();
        }
        if(msg != null){
            logger.info(msg);
            producer.send(new ProducerRecord<String, String>("twitter_tweets", null, msg), new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e != null){
                        logger.error("Something bad happened", e);
                    }
                }
            });
        }
    }
    logger.info("End of application");
}

public Client createTwitterClient(BlockingQueue<String> msgQueue)
{
    /** Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) */
    Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
    StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
    // Optional: set up some followings and track terms

    List<String> terms = Lists.newArrayList("bitcoin");
    hosebirdEndpoint.trackTerms(terms);


    // These secrets should be read from a config file
    Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);

    ClientBuilder builder = new ClientBuilder()
            .name("Hosebird-Client-01")                              // optional: mainly for the logs
            .hosts(hosebirdHosts)
            .authentication(hosebirdAuth)
            .endpoint(hosebirdEndpoint)
            .processor(new StringDelimitedProcessor(msgQueue))
             ;                          // optional: use this if you want to process client events

    Client hosebirdClient = builder.build();
    return hosebirdClient;
}

public KafkaProducer<String, String> createKafkaProducer()
{

    String bootstrapServers = "193.239.83.27:9092";



    // create the producer properties
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    // create the producer
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
    return producer;
}
  

}

Ответ №1:

Вы говорите, что у вас есть listeners=PLAINTEXT://0.0.0.0:9092

Но изображение показывает, что это закомментировано.

В журналах также говорится, что Kafka завершает работу, поэтому убедитесь, что brokerId является положительным числом, и убедитесь, что Zookeeper запущен

Кстати, есть источник Kafka Connect Twitter, который, похоже, делает то, что вы хотите

Комментарии:

1. да, я прокомментировал эту строку. Должен ли он быть раскомментирован??

2. вы говорите, чтобы изменить Broker_id = 0 на Broker_id = положительное число, например, 1?

3. Должен ли я изменить имя хоста с localhost на другое в etc /hosts??

4. боже, кафка так болен.. Я не могу с этим справиться. Просто я хочу иметь удаленный эластичный общедоступный IP-доступ, не более того

5. 1) свойства не загружаются, если они прокомментированы. По умолчанию не прослушивается извне 2) никогда не используйте etc /hosts для решения этой проблемы 3) Я действительно не знаю, почему ваш брокер останавливается. И я предполагаю, что вы читали это confluent.io/blog/kafka-listeners-explained