#apache-kafka #centos #remote-access
#apache-kafka #centos #удаленный доступ
Вопрос:
У меня небольшая проблема. Я хочу подключиться к моему серверу kafka на виртуальной машине centos из скрипта Windows Java producer.
в config/server.properties у меня есть строка:
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://<public ip>:9092
(например: advertised.listeners=ОТКРЫТЫЙ ТЕКСТ://192.239.83.27:9092)
и я все еще не могу создавать сообщения от производителя в Windows. В виртуальном окне я установил опцию «разрешить все» в настройках сети
Невозможно запустить сервер kafka, при постоянном запуске появляется ПРЕДУПРЕЖДЕНИЕ: [идентификатор контроллера = 0, targetBrokerId = 0] Не удалось установить соединение с узлом 0. Брокер может быть недоступен (org.apache.kafka.клиенты.NetworkClient).
Пожалуйста, помогите: p
и мой Java-код в Windows:
public class TwitterProducer {
Logger logger = LoggerFactory.getLogger(TwitterProducer.class.getName());
String consumerKey = "xxx";
String consumerSecret = "xxx";
String token = "xxx";
String secret = "xxx";
public TwitterProducer() {}
public static void main(String[] args) {
new TwitterProducer().run();
}
public void run()
{
logger.info("Setup");
/** Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);
// create a twitter client
Client client = createTwitterClient(msgQueue);
// Attempts to establish a connection.
client.connect();
// create a kafka producer
KafkaProducer<String, String> producer = createKafkaProducer();
// on a different thread, or multiple different threads....
while (!client.isDone()) {
String msg = null;
try {
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
client.stop();
}
if(msg != null){
logger.info(msg);
producer.send(new ProducerRecord<String, String>("twitter_tweets", null, msg), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null){
logger.error("Something bad happened", e);
}
}
});
}
}
logger.info("End of application");
}
public Client createTwitterClient(BlockingQueue<String> msgQueue)
{
/** Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) */
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
// Optional: set up some followings and track terms
List<String> terms = Lists.newArrayList("bitcoin");
hosebirdEndpoint.trackTerms(terms);
// These secrets should be read from a config file
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);
ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01") // optional: mainly for the logs
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue))
; // optional: use this if you want to process client events
Client hosebirdClient = builder.build();
return hosebirdClient;
}
public KafkaProducer<String, String> createKafkaProducer()
{
String bootstrapServers = "193.239.83.27:9092";
// create the producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create the producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
return producer;
}
}
Ответ №1:
Вы говорите, что у вас есть listeners=PLAINTEXT://0.0.0.0:9092
Но изображение показывает, что это закомментировано.
В журналах также говорится, что Kafka завершает работу, поэтому убедитесь, что brokerId является положительным числом, и убедитесь, что Zookeeper запущен
Кстати, есть источник Kafka Connect Twitter, который, похоже, делает то, что вы хотите
Комментарии:
1. да, я прокомментировал эту строку. Должен ли он быть раскомментирован??
2. вы говорите, чтобы изменить Broker_id = 0 на Broker_id = положительное число, например, 1?
3. Должен ли я изменить имя хоста с localhost на другое в etc /hosts??
4. боже, кафка так болен.. Я не могу с этим справиться. Просто я хочу иметь удаленный эластичный общедоступный IP-доступ, не более того
5. 1) свойства не загружаются, если они прокомментированы. По умолчанию не прослушивается извне 2) никогда не используйте etc /hosts для решения этой проблемы 3) Я действительно не знаю, почему ваш брокер останавливается. И я предполагаю, что вы читали это confluent.io/blog/kafka-listeners-explained