#java #maven #apache-kafka #apache-storm
#java #maven #apache-kafka #apache-storm
Вопрос:
В настоящее время у меня есть кластер, который выглядит как узел zoo keeper 2, узел Kafka 3, узел storm 3. Моя топология Я настроил zookeeper с помощью загрузчика файлов свойств, я загрузил локальный режим zookeeper ip / ports, но режим кластера не работает.
Структура моего проекта maven такова
проект
-
src/main/java
-
src/main/test
-
src/main/ресурсы
внутри каталога ресурсов у меня есть файлы zoo.config и log4j.config
Моя топология
public class Mytopology { public static void main(String[] args) throws AlreadyAliveException,
InvalidTopologyException, FileNotFoundException, IOException {
/** PropertiesConfigurator is used to configure logger from properties file */
Properties prop = new Properties();
PropertyConfigurator.configure("src/main/resources/log4j.properties");
String zoo_cluster=null;
int zoo_cluster_timeout_ms;
/** Topology definition */
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka", new KafkaSpout(),1);
builder.setBolt("bolt1",new bolt1(),1).shuffleGrouping("kafka");
builder.setBolt("bolt2",new bolt2(),1).shuffleGrouping("bolt1");
/** create a storm Config object*/
Config config = new Config();
prop.load(new FileInputStream("src/main/resources/zookeeper-config.properties"));
zoo_cluster= prop.getProperty("zookeeper.connect");
String zoo_cluster_timeout= prop.getProperty("consumer.timeout.ms");
zoo_cluster_timeout_ms=Integer.parseInt(zoo_cluster_timeout);
config.put("kafka.zookeeper.connect",zoo_cluster);
config.put("kafka.consumer.timeout.ms",zoo_cluster_timeout_ms);
/** submit topology into cluster */
if (args != null amp;amp; args.length > 2) {
StormSubmitter.submitTopology(args[2], config,builder.createTopology());
System.out.println("Topology submitted into Storm Cluster ........");
}
/** submit topology into local */
else if(args != null amp;amp; args.length > 1) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("storm-local", config,
builder.createTopology());
System.out.println("Topology submitted into Local Mode........");
Utils.sleep(100000);
}
}
}
Пожалуйста, помогите мне! как прочитать файл свойств в режиме кластера как конфигурацию zookeeper, так и журналы ошибок
Комментарии:
1. неясно, какой компонент не может прочитать конфигурации zk / log4j. Kafka? Шторм?
2. Штормовая топология я настраиваю конфигурацию zookeeper.put («kafka.zookeeper.connect», zoo_cluster);