#apache-kafka #apache-storm #kafka-consumer-api #kafka-producer-api
#apache-kafka #apache-storm #kafka-consumer-api #kafka-producer-api
Вопрос:
С некоторых пор я много пытался понять, как создать группу kafkaspouts, подписавшихся на одну тему. Я нашел несколько источников, в которых идентификатор в spoutconfig может использоваться в качестве идентификатора группы. Моя главная проблема в том, как мне узнать, действуют ли созданные носики как группа. Мне также интересно, является ли paralellism_hint setSpout()
тем, который создает группу paralellism_hint из носиков. Пожалуйста, просветите меня.
Мой код выглядит примерно так
private KafkaSpout buildKafkaSpout(String zkTopic, String zkRoot, String groupId) {
String zkConnString = "localhost:2181";
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, zkTopic,zkRoot, groupId );
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(kafkaSpoutConfig);
return kafkaSpout;
}
private void buildTopologyForGroupOne(TopologyBuilder builder ) {
String zkTopic = "topic1";
String zkRoot = "/topic1";
String groupId = "group1";
List<String> zkSpoutIds = new ArrayList<String>();
zkSpoutIds.add("word_count-spout");
zkSpoutIds.add("total_word_count-spout");
for(String spoutId:zkSpoutIds ){
KafkaSpout kafkaSpout = buildKafkaSpout(zkTopic, zkRoot, groupId);
builder.setSpout(spoutId.concat("_" groupId), kafkaSpout,2);
}
builder.setBolt("word_split-bolt_group_1",new SplitBolt()).shuffleGrouping("word_count-spout" "_" groupId);
builder.setBolt("split_count-bolt_group_1",new CountBolt()).shuffleGrouping("word_split-bolt_group_1");
builder.setBolt("total_word_count-bolt_group_1",new TotalWordCountBolt()).shuffleGrouping("total_word_count-spout" "_" groupId);
}
Теперь, как мне узнать, действуют ли два созданных мной потока (word_count-spout, total_word_count-spout) как группа. Действуя как группа, я имею в виду, что если будет создан новый spout, zookeeper изменит порядок разделов.
Заранее спасибо
Комментарии:
1. Наконец-то найден источник. KafkaSpout с идентификатором потребителя может быть создан с помощью новой версии storm apache-storm-1.0.2.
Ответ №1:
storm-kafka использует SimpleConsumer вместо высокоуровневого пользовательского API, поэтому group.id это не важно, поскольку предназначено для высокоуровневого потребителя, который использует этот параметр для реализации высокой доступности и отработки отказа.
Итак, возвращаясь к вашему вопросу, я предпочитаю думать, что что-то не так произойдет с вашим кодом во время выполнения, поскольку два экземпляра потребителя будут претендовать на одну и ту же блокировку пути zk.
Комментарии:
1. Я намерен использовать идентификатор группы для KafkaSpout. Вы хотели сказать, что это невозможно?
2. Для повышения производительности вы можете задать номер рабочего / номер исполнителя / номер задачи в терминах Storm для достижения лучшей пропускной способности. Настройка group.id не оказывает никакого влияния на KafkaSpout. Кроме того, это должен быть идентификатор клиента, который сообщает ZK, где хранить смещение, а не идентификатор группы, который вы считаете.
3. Спасибо за разъяснение. Моя цель — разрешить использование одного вывода spouts несколькими болтами, что можно сделать. Но проблема в такой реализации заключается в том, что если одному болту не удается подтвердить кортеж, кортеж воспроизводится для каждого другого болта также с помощью spout. Я хочу избежать этого. Я был бы очень благодарен, если бы вы могли предложить мне выход. Я пытался использовать имена потоков, чтобы найти решение. Пока не повезло.
4. Я помню, что Storm воспроизводит поток только там, где существует неудачный кортеж, который не может быть подтвержден. И вы можете объявить несколько потоков, чтобы разные болты подписывались на разные потоки. При этом вы можете избежать ситуации, о которой вы беспокоитесь.
5. Я пробовал то же самое. Но проблема в том, что, несмотря на то, что болты подтверждают, что носик не может подтвердить, и сообщения постоянно воспроизводятся, из-за чего я не могу проверить, что происходит, если один болт выходит из строя. А также я не могу создать spout, который считывает из тем, как это делает KafkaSpout. Пожалуйста, любая помощь в этом направлении.