Как запустить топологию в кластере storm? Я не вижу журнал вывода

#java #cloud #bigdata #apache-storm

#java #облако #bigdata #apache-storm

Вопрос:

Я пытаюсь выполнить топологию в кластере. Я написал топологию и скомпилировал ее в jar, а затем зарегистрировал ее в кластере. Но по какой-то причине похоже, что топология не запущена. Я просто хочу использовать Storm здесь как конвейер. Регистрация с помощью cmd: ./storm jar /tmp/storm_test.jar storm.topology.MyTopology /tmp/bigFile.log

Топология:

 package storm.topology;

import storm.spouts.LineReaderSpout;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import storm.bolts.Bo<

public class MyTopology {
    public static long tupleCounter = 0;
    public static long endTime = 0;

    public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.put("inputFile", args[0]);
        config.put(Config.TOPOLOGY_WORKERS, 4);
        config.setDebug(true);
        config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("line-reader-spout", new LineReaderSpout());

        builder.setBolt("boltA", new Bolt()).shuffleGrouping("line-reader-spout");

        StormSubmitter.submitTopology("mytopology", config, builder.createTopology()); 
    }
}
 

Болт:

 package storm.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBo<
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import storm.topology.MyTopology2;
import storm.spouts.LineReaderSpout;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Bolt implements IRichBolt {
    Integer id;
    String name;
    static long totalTime;
    Map<String, Integer> counters;
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.counters = new HashMap<String, Integer>();
        this.collector = collector;
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }

    @Override
    public void execute(Tuple input) {
        //String str = input.getString(0);
        MyTopology2.tupleCounter  ;
        if (input.getString(0).contains("END")) {
            MyTopology.endTime = System.nanoTime();

            System.out.println("====================================================");
            System.out.println("Number of tuples: "   MyTopology.tupleCounter);

            totalTime = MyTopology.endTime - LineReaderSpout.startTime;
            double tuplePerSec = MyTopology.tupleCounter / (totalTime / 1000000000d);
            System.out.println("Test results: "   NumberFormat.getNumberInstance(Locale.US).format(tuplePerSec)   "tuple/sec");
            totalTime = MyTopology2.endTime - LineReaderSpout.star`enter code here`tTime;
            System.out.println("Total run time: "   totalTime   " nsec");
            System.out.println("====================================================");
            PrintWriter writer;

            try {
                writer = new PrintWriter("/tmp/storm_results.log", "UTF-8");
                writer.println("Number of tuples: "   MyTopology.tupleCounter);
                writer.println("Test results: "   
NumberFormat.getNumberInstance(Locale.US).format(tuplePerSec)   "tuple/sec");
                writer.println("Total run time: "   totalTime   " nsec");
                writer.println("====================================================");
                writer.close();
            } catch (FileNotFoundException ex) {
                Logger.getLogger(TestBolt.class.getName()).log(Level.SEVERE, null, ex);
            } catch (UnsupportedEncodingException ex) {
                Logger.getLogger(TestBolt.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
}
 

Носик:

 package storm.spouts;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class LineReaderSpout implements IRichSpout {
    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed = false;
    private TopologyContext context;
    public static long startTime;

    @Override
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        try {
            this.context = context;
            this.fileReader = new FileReader(conf.get("inputFile").toString());
            startTime = System.nanoTime();
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file "
                      conf.get("inputFile"));
        }
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        if (completed) {

        }
        String str;
        BufferedReader reader = new BufferedReader(fileReader);
        try {
            while ((str = reader.readLine()) != null) {
                this.collector.emit(new Values(str), str);
            }
        } catch (Exception e) {
            throw new RuntimeException("Error reading typle", e);
        } finally {
            completed = true;
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }

    @Override
    public void close() {
        try {
            fileReader.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public boolean isDistributed() {
        return false;
    }

    @Override
    public void activate() {
        // TODO Auto-generated method stub
    }

    @Override
    public void deactivate() {
        // TODO Auto-generated method stub
    }

    @Override
    public void ack(Object msgId) {
    }

    @Override
    public void fail(Object msgId) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
 

Ответ №1:

Вы можете использовать локальный кластер и отправить ему свою топологию, таким образом, вам не нужно настраивать производственный кластер на удаленном компьютере, код выглядит так:

     LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("mytopology", conf, builder.createTopology());
 

Комментарии:

1. Он отлично работает с локальным кластером, но по какой-то причине не в производственном кластере, я скомпилировал jar с помощью netbeans, используя 1.6_35 jdk

2. проверьте этот документ Running-topologies-on-a-production-cluster

Ответ №2:

В производственном кластере вы не увидите журнал на консоли. Перейдите в каталог журналов storm. Там вы найдете файл рабочих журналов. Там записываются все сообщения журнала. Более того, в производственном кластере вы не видите каждое сообщение журнала. Для этого вы должны включить опцию ОТЛАДКИ в конфигурации cfg.put(Config.TOPOLOGY_DEBUG, true);