Создание KafkaSpout с идентификатором группы потребителей

#apache-kafka #apache-storm #kafka-consumer-api #kafka-producer-api

#apache-kafka #apache-storm #kafka-consumer-api #kafka-producer-api

Вопрос:

С некоторых пор я много пытался понять, как создать группу kafkaspouts, подписавшихся на одну тему. Я нашел несколько источников, в которых идентификатор в spoutconfig может использоваться в качестве идентификатора группы. Моя главная проблема в том, как мне узнать, действуют ли созданные носики как группа. Мне также интересно, является ли paralellism_hint setSpout() тем, который создает группу paralellism_hint из носиков. Пожалуйста, просветите меня.

Мой код выглядит примерно так

 private KafkaSpout buildKafkaSpout(String zkTopic, String zkRoot, String groupId) {

        String zkConnString = "localhost:2181";
        BrokerHosts hosts = new ZkHosts(zkConnString);
        SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, zkTopic,zkRoot, groupId );
        kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        KafkaSpout kafkaSpout = new KafkaSpout(kafkaSpoutConfig);
        return kafkaSpout;
    }

private void buildTopologyForGroupOne(TopologyBuilder builder ) {

        String zkTopic = "topic1";
        String zkRoot = "/topic1";
        String groupId = "group1";
        List<String> zkSpoutIds = new ArrayList<String>();
        zkSpoutIds.add("word_count-spout");
        zkSpoutIds.add("total_word_count-spout");

        for(String spoutId:zkSpoutIds ){
            KafkaSpout kafkaSpout = buildKafkaSpout(zkTopic, zkRoot, groupId);
            builder.setSpout(spoutId.concat("_" groupId), kafkaSpout,2);
        }
        builder.setBolt("word_split-bolt_group_1",new SplitBolt()).shuffleGrouping("word_count-spout" "_" groupId);
        builder.setBolt("split_count-bolt_group_1",new CountBolt()).shuffleGrouping("word_split-bolt_group_1");
        builder.setBolt("total_word_count-bolt_group_1",new TotalWordCountBolt()).shuffleGrouping("total_word_count-spout" "_" groupId);

    }
 

Теперь, как мне узнать, действуют ли два созданных мной потока (word_count-spout, total_word_count-spout) как группа. Действуя как группа, я имею в виду, что если будет создан новый spout, zookeeper изменит порядок разделов.

Заранее спасибо

Комментарии:

1. Наконец-то найден источник. KafkaSpout с идентификатором потребителя может быть создан с помощью новой версии storm apache-storm-1.0.2.

Ответ №1:

storm-kafka использует SimpleConsumer вместо высокоуровневого пользовательского API, поэтому group.id это не важно, поскольку предназначено для высокоуровневого потребителя, который использует этот параметр для реализации высокой доступности и отработки отказа.

Итак, возвращаясь к вашему вопросу, я предпочитаю думать, что что-то не так произойдет с вашим кодом во время выполнения, поскольку два экземпляра потребителя будут претендовать на одну и ту же блокировку пути zk.

Комментарии:

1. Я намерен использовать идентификатор группы для KafkaSpout. Вы хотели сказать, что это невозможно?

2. Для повышения производительности вы можете задать номер рабочего / номер исполнителя / номер задачи в терминах Storm для достижения лучшей пропускной способности. Настройка group.id не оказывает никакого влияния на KafkaSpout. Кроме того, это должен быть идентификатор клиента, который сообщает ZK, где хранить смещение, а не идентификатор группы, который вы считаете.

3. Спасибо за разъяснение. Моя цель — разрешить использование одного вывода spouts несколькими болтами, что можно сделать. Но проблема в такой реализации заключается в том, что если одному болту не удается подтвердить кортеж, кортеж воспроизводится для каждого другого болта также с помощью spout. Я хочу избежать этого. Я был бы очень благодарен, если бы вы могли предложить мне выход. Я пытался использовать имена потоков, чтобы найти решение. Пока не повезло.

4. Я помню, что Storm воспроизводит поток только там, где существует неудачный кортеж, который не может быть подтвержден. И вы можете объявить несколько потоков, чтобы разные болты подписывались на разные потоки. При этом вы можете избежать ситуации, о которой вы беспокоитесь.

5. Я пробовал то же самое. Но проблема в том, что, несмотря на то, что болты подтверждают, что носик не может подтвердить, и сообщения постоянно воспроизводятся, из-за чего я не могу проверить, что происходит, если один болт выходит из строя. А также я не могу создать spout, который считывает из тем, как это делает KafkaSpout. Пожалуйста, любая помощь в этом направлении.