#apache-storm
#apache-storm
Вопрос:
Я тестирую случай, используя Storm 1.2.2 и Kafka 2.x в качестве моего носика. Итак, я создал LocalCluster только для целей тестирования.
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("MYKAFKAIP:9092", "storm-test-dpi").build()), 1);
builder.setBolt("bolt", new LoggerBolt()).shuffleGrouping("kafka_spout");
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("kafkaBoltTest", new Config(), builder.createTopology());
Utils.sleep(10000);
После инициализации этого приложения я получил следующее:
9293 [Thread-20-kafka_spout-executor[3 3]] INFO o.a.k.c.u.AppInfoParser - Kafka version : 0.10.1.0
9293 [Thread-20-kafka_spout-executor[3 3]] INFO o.a.k.c.u.AppInfoParser - Kafka commitId : 3402a74efb23d1d4
И после множества ошибок:
9664 [Thread-20-kafka_spout-executor[3 3]] INFO o.a.s.k.s.KafkaSpout - Initialization complete
9703 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9714 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9742 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9756 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9767 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9781 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9806 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
Я думаю, что эта проблема связана с версией Kafka, как вы можете видеть, в журнале отображается версия «0.10.1.0», но моя версия Kafka «2.x».
Это мой pom.xml:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${version.storm}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>${version.storm}</version>
</dependency>
Где ${version.storm}
это 1.2.2
Ответ №1:
Предполагается, что вы также должны объявить версию kafka-clients
, которую вы используете. storm-kafka-client
POM устанавливает kafka-clients
область действия на provided
. Это означает, что kafka-clients
не будет включено при сборке. Мы делаем это, чтобы вы могли легко обновляться.
Причина, по которой он даже работает для вас, заключается в том, что вы используете LocalCluster в некотором тестовом коде, где provided
присутствуют зависимости.
Добавьте это в свой POM, и это должно сработать:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>your-kafka-version-here</version>
</dependency>