#java #cloud #bigdata #apache-storm
#java #облако #bigdata #apache-storm
Вопрос:
Я пытаюсь выполнить топологию в кластере. Я написал топологию и скомпилировал ее в jar, а затем зарегистрировал ее в кластере. Но по какой-то причине похоже, что топология не запущена. Я просто хочу использовать Storm здесь как конвейер. Регистрация с помощью cmd: ./storm jar /tmp/storm_test.jar storm.topology.MyTopology /tmp/bigFile.log
Топология:
package storm.topology;
import storm.spouts.LineReaderSpout;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import storm.bolts.Bo<
public class MyTopology {
public static long tupleCounter = 0;
public static long endTime = 0;
public static void main(String[] args) throws Exception {
Config config = new Config();
config.put("inputFile", args[0]);
config.put(Config.TOPOLOGY_WORKERS, 4);
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("line-reader-spout", new LineReaderSpout());
builder.setBolt("boltA", new Bolt()).shuffleGrouping("line-reader-spout");
StormSubmitter.submitTopology("mytopology", config, builder.createTopology());
}
}
Болт:
package storm.bolts;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBo<
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import storm.topology.MyTopology2;
import storm.spouts.LineReaderSpout;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Bolt implements IRichBolt {
Integer id;
String name;
static long totalTime;
Map<String, Integer> counters;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
@Override
public void execute(Tuple input) {
//String str = input.getString(0);
MyTopology2.tupleCounter ;
if (input.getString(0).contains("END")) {
MyTopology.endTime = System.nanoTime();
System.out.println("====================================================");
System.out.println("Number of tuples: " MyTopology.tupleCounter);
totalTime = MyTopology.endTime - LineReaderSpout.startTime;
double tuplePerSec = MyTopology.tupleCounter / (totalTime / 1000000000d);
System.out.println("Test results: " NumberFormat.getNumberInstance(Locale.US).format(tuplePerSec) "tuple/sec");
totalTime = MyTopology2.endTime - LineReaderSpout.star`enter code here`tTime;
System.out.println("Total run time: " totalTime " nsec");
System.out.println("====================================================");
PrintWriter writer;
try {
writer = new PrintWriter("/tmp/storm_results.log", "UTF-8");
writer.println("Number of tuples: " MyTopology.tupleCounter);
writer.println("Test results: "
NumberFormat.getNumberInstance(Locale.US).format(tuplePerSec) "tuple/sec");
writer.println("Total run time: " totalTime " nsec");
writer.println("====================================================");
writer.close();
} catch (FileNotFoundException ex) {
Logger.getLogger(TestBolt.class.getName()).log(Level.SEVERE, null, ex);
} catch (UnsupportedEncodingException ex) {
Logger.getLogger(TestBolt.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
Носик:
package storm.spouts;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class LineReaderSpout implements IRichSpout {
private SpoutOutputCollector collector;
private FileReader fileReader;
private boolean completed = false;
private TopologyContext context;
public static long startTime;
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.context = context;
this.fileReader = new FileReader(conf.get("inputFile").toString());
startTime = System.nanoTime();
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file "
conf.get("inputFile"));
}
this.collector = collector;
}
@Override
public void nextTuple() {
if (completed) {
}
String str;
BufferedReader reader = new BufferedReader(fileReader);
try {
while ((str = reader.readLine()) != null) {
this.collector.emit(new Values(str), str);
}
} catch (Exception e) {
throw new RuntimeException("Error reading typle", e);
} finally {
completed = true;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
@Override
public void close() {
try {
fileReader.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public boolean isDistributed() {
return false;
}
@Override
public void activate() {
// TODO Auto-generated method stub
}
@Override
public void deactivate() {
// TODO Auto-generated method stub
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Ответ №1:
Вы можете использовать локальный кластер и отправить ему свою топологию, таким образом, вам не нужно настраивать производственный кластер на удаленном компьютере, код выглядит так:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("mytopology", conf, builder.createTopology());
Комментарии:
1. Он отлично работает с локальным кластером, но по какой-то причине не в производственном кластере, я скомпилировал jar с помощью netbeans, используя 1.6_35 jdk
2. проверьте этот документ Running-topologies-on-a-production-cluster
Ответ №2:
В производственном кластере вы не увидите журнал на консоли. Перейдите в каталог журналов storm. Там вы найдете файл рабочих журналов. Там записываются все сообщения журнала. Более того, в производственном кластере вы не видите каждое сообщение журнала. Для этого вы должны включить опцию ОТЛАДКИ в конфигурации cfg.put(Config.TOPOLOGY_DEBUG, true);