java.lang.Строка не может быть преобразована в java.util.Отображается ошибка списка при передаче данных из R-болта в следующий болт

#r #integration #apache-storm

#r #интеграция #apache-storm

Вопрос:

Я использую R-болт для обработки своих данных.Я хочу передать обработанные данные из R-болта в мой следующий болт, т. Е. CountBolt.Но при передаче из R-болта, java.lang.Строка не может быть преобразована в java.util.Отображается ошибка списка.

  ERROR o.a.s.t.ShellBolt - Halting process: ShellBolt died. Command: [Rscript, permute.R], ProcessInfo pid:7169, name:python-split-sentence exitCode:-1, errorString: 
java.lang.ClassCastException: java.lang.String cannot be cast to java.util.List
    at org.apache.storm.multilang.JsonSerializer.readShellMsg(JsonSerializer.java:135) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:125) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:330) [storm-core-1.2.2.jar:1.2.2]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
7055 [Thread-77] ERROR o.a.s.d.executor - 
java.lang.ClassCastException: java.lang.String cannot be cast to java.util.List
    at org.apache.storm.multilang.JsonSerializer.readShellMsg(JsonSerializer.java:135) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:125) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:330) [storm-core-1.2.2.jar:1.2.2]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
7145 [Thread-58-sentence-spout-executor[27 27]] INFO  o.a.s.d.task - Emitting: sentence-spout default [the cow jumped over the moon]
7145 [Thread-58-sentence-spout-executor[27 27]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 25 tuple: source: sentence-spout:27, stream: default, id: {}, [the cow jumped over the moon]]
7148 [Thread-70-python-split-sentence-executor[25 25]] INFO  o.a.s.d.executor - Processing received message FOR 25 TUPLE: source: sentence-spout:27, stream: default, id: {}, [the cow jumped over the moon]
7149 [Thread-70-python-split-sentence-executor[25 25]] ERROR o.a.s.util - Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: pid:7212, name:python-split-sentence exitCode:-1, errorString: 
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: pid:7212, name:python-split-sentence exitCode:-1, errorString: 
    at org.apache.storm.task.ShellBolt.execute(ShellBolt.java:175) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.2.jar:1.2.2]
    ... 6 more
Caused by: java.lang.RuntimeException: pid:7212, name:python-split-sentence exitCode:-1, errorString: 
    at org.apache.storm.task.ShellBolt.die(ShellBolt.java:292) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt.access$400(ShellBolt.java:72) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:364) ~[storm-core-1.2.2.jar:1.2.2]
    ... 1 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.util.List
    at org.apache.storm.multilang.JsonSerializer.readShellMsg(JsonSerializer.java:135) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:125) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:330) ~[storm-core-1.2.2.jar:1.2.2]
    ... 1 more
7150 [Thread-70-python-split-sentence-executor[25 25]] ERROR o.a.s.d.executor - 
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: pid:7212, name:python-split-sentence exitCode:-1, errorString: 
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: pid:7212, name:python-split-sentence exitCode:-1, errorString: 
    at org.apache.storm.task.ShellBolt.execute(ShellBolt.java:175) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.2.jar:1.2.2]
    ... 6 more
Caused by: java.lang.RuntimeException: pid:7212, name:python-split-sentence exitCode:-1, errorString: 
    at org.apache.storm.task.ShellBolt.die(ShellBolt.java:292) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt.access$400(ShellBolt.java:72) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:364) ~[storm-core-1.2.2.jar:1.2.2]
    ... 1 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.util.List
    at org.apache.storm.multilang.JsonSerializer.readShellMsg(JsonSerializer.java:135) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:125) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:330) ~[storm-core-1.2.2.jar:1.2.2]
    ... 1 more
7246 [Thread-58-sentence-spout-executor[27 27]] INFO  o.a.s.d.task - Emitting: sentence-spout default [an apple a day keeps the doctor away]
7246 [Thread-58-sentence-spout-executor[27 27]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 24 tuple: source: sentence-spout:27, stream: default, id: {}, [an apple a day keeps the doctor away]]
7248 [Thread-30-python-split-sentence-executor[24 24]] INFO  o.a.s.d.executor - Processing received message FOR 24 TUPLE: source: sentence-spout:27, stream: default, id: {}, [an apple a day keeps the doctor away]
7248 [Thread-30-python-split-sentence-executor[24 24]] INFO  o.a.s.d.executor - Execute done TUPLE source: sentence-spout:27, stream: default, id: {}, [an apple a day keeps the doctor away] TASK: 24 DELTA: -1
7254 [Thread-75] ERROR o.a.s.t.ShellBolt - Halting process: ShellBolt died. Command: [Rscript, /home/uvionics/Downloads/Dixon/Rstorm/src/jvm/udacity/storm/resources/permute.R], ProcessInfo pid:7170, name:python-split-sentence exitCode:-1, errorString: 
java.lang.ClassCastException: java.lang.String cannot be cast to java.util.List
    at org.apache.storm.multilang.JsonSerializer.readShellMsg(JsonSerializer.java:135) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:125) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:330) [storm-core-1.2.2.jar:1.2.2]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
7254 [Thread-75] ERROR o.a.s.d.executor - 
java.lang.ClassCastException: java.lang.String cannot be cast to java.util.List
    at org.apache.storm.multilang.JsonSerializer.readShellMsg(JsonSerializer.java:135) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:125) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:330) [storm-core-1.2.2.jar:1.2.2]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
  

Приведенный ниже код представляет код для SentenceWordCountTopology

 package ***.storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.task.ShellBo<
import org.apache.storm.topology.IRichBo<
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBo<
import org.apache.storm.topology.base.BaseRichBo<
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import org.apache.storm.utils.Utils;

import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;

import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import ***.storm.spout.RandomSentenceSpout;
import ***.storm.SplitSentence;

public class SentenceWordCountTopology {

  private SentenceWordCountTopology() { }

  static class CountBolt extends BaseRichBolt {

    // To output tuples from this bolt to the next stage bolts, if any
    private OutputCollector collector;
    PrintWriter fOut;
    String outFile;

    // Map to store the count of the words
    private Map<String, Integer> countMap;

    @Override
    public void execute(Tuple tuple)
    {
         try {
              if(fOut == null) {
                   fOut = new PrintWriter(new FileWriter(outFile));
               }

              String key = tuple.getStringByField("key");
              String value = tuple.getStringByField("value");

              //fOut.println("FROM STORM:"   count   ":"   tuple.toString()); 
              fOut.println("FROM STORM:"   ":("   key   ","   value   ")");
              fOut.flush();

      } catch (IOException ioe) {
             System.out.println("Writing to file:"   outFile   " failed");
      }
        System.err.println("                   "   tuple);

        countMap = new HashMap<String, Integer>();

      //Syntax to get the word from the 1st column of incoming tuple
      String word = tuple.getString(0);

      // check if the word is present in the map
      if (countMap.get(word) == null) {

      // not present, add the word with a count of 1
      countMap.put(word, 1);
      } else {

      // already there, hence get the count
      Integer val = countMap.get(word);

      // increment the count and save it to the map
      countMap.put(word,   val);

    }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
    {
      outputFieldsDeclarer.declare(new Fields("word","count"));
    }

    @Override
    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub
        outFile = config.get((Object)"ALLFILE").toString(); 

    }
  }

  public static void main(String[] args) throws Exception
  {
    // create the topology
      Config config = new Config();
        config.setDebug(true);
        config.setNumWorkers(1);
                config.put("ALLFILE",(Object)"/path/new.txt");

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("sentence-spout", new RandomSentenceSpout(), 1);
    builder.setBolt("python-split-sentence", new SplitSentence(), 10).shuffleGrouping("sentence-spout");
    builder.setBolt("count-bolt", new CountBolt(), 15).shuffleGrouping("python-split-sentence");

    LocalCluster cluster=new LocalCluster();
    cluster.submitTopology("pc", config, builder.createTopology());
    Thread.sleep(40000);
    cluster.shutdown();
    StormSubmitter.submitTopology("pc", config, builder.createTopology());

  }
}
  

Приведенный ниже код представляет собой код для разделения

 package ***.storm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.task.ShellBo<
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IRichBo<
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBo<
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;

  public class SplitSentence extends ShellBolt implements IRichBolt {

    public SplitSentence() {
        super("Rscript", "/path/permute.R");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields());
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
      return null;
    }
  }

  

Приведенный ниже код представляет собой код permute.R

 #!/usr/bin/Rscript

library(permute,quietly=TRUE)
# library(Storm,quietly=TRUE)
source("/path/Storm.R")

storm = Storm$new();

storm$lambda = function(s) {

  t = s$tuple;

  words = strsplit(unlist(t$input[1]),"\s ",perl=TRUE)[[1]];
  words1 = toJSON(words)
  words2 = charToRaw(words1)

  t$output = vector(mode="character",length=1);
  t$output[1] = paste(c("PERMUTE",words[shuffle(length(words))]),collapse=" ");

  s$emit(t);

};

 storm$run();
  

Ответ №1:

Опубликованная вами трассировка стека содержит исключение, исходящее из https://github.com/apache/storm/blob/v1.2.2/storm-core/src/jvm/org/apache/storm/multilang/JsonSerializer.java#L135 . Эта строка пытается прочитать свойство «tuple» в вашем spout.

Storm multilang взаимодействует с вашим bolt через stdin и stdout. Когда ваш процесс bolt отправляет кортеж, он должен записать что-то вроде следующего в стандартный вывод:

 {
    "command": "emit",
    // The ids of the tuples this output tuples should be anchored to
    "anchors": ["1231231", "-234234234"],
    // The id of the stream this tuple was emitted to. Leave this empty to emit to default stream.
    "stream": "1",
    // If doing an emit direct, indicate the task to send the tuple to
    "task": 9,
    // All the values in this tuple
    "tuple": ["field1", 2, 3]
}
  

Обратите внимание, что поле «кортеж» является списком.

Проблема, которую вы видите, заключается в том, что когда ваш bolt выполняет запись в стандартный вывод, он записывает строку в поле «кортеж». Вам нужно будет взглянуть на пакет Storm R, чтобы выяснить, почему это происходит.

Поскольку протокол multilang основан на тексте, вы можете отладить свой bolt, вручную запустив свою R-программу и записав в нее сообщения через stdin. Протокол описан в http://storm.apache.org/releases/1.1.2/Multilang-protocol.html , смотрите заголовок «Болты».

Оригинальный ответ:

Есть пара неправильных вещей:

  • Вам нужно указать, какое имя вы хотите, чтобы у выходного поля было. Так declarer.declare(new Fields()); и должно быть declarer.declare(new Fields("key", "value")); . Просто чтобы объяснить, для чего это, имена выходных полей используются в нижестоящих болтах для ссылки на поле и в группировке полей для разделения по полю. Поля вывода в разделенном болте должны соответствовать полям, которые вы прочитали в CountBolt (т. Е. «ключ» и «значение»)

  • Обновление: Не обращайте внимания на этот момент, это неправильно, вы не передаете должным образом. emit Сигнатурой метода является emit(Tuple, List<Object>) , где кортеж является привязкой (используется для привязки нового кортежа к входному кортежу для подтверждения), а список содержит значения, которые вы хотите, чтобы содержал новый кортеж. Существуют другие emit перегрузки, но они следуют этой основной идее. Поскольку вы настроили свою топологию таким образом, чтобы разделенный болт передавался в болт count, вы хотите передать ключ и значение из разделенного болта. Например emit("myKey", 52) .

Комментарии:

1. Спасибо за ответ. Но формат для передачи из R-болта — это s $ emit(t), верно? Мы по-прежнему получаем ошибку org.apache.storm.multilang. Исключение NoOutputException: Канал для подпроцесса, похоже, поврежден! Выходные данные не прочитаны .

2. Извините, я был неправ. Похоже, синтаксис для интеграции R соответствует вашему описанию, следуя руководству по cran.r-project.org/web/packages/Storm .

3. Есть идеи о том, как сделать интеграцию R и storm безупречной?

4. Да, убедитесь, что пакет R соответствует протоколу multilang, на который я ссылался выше. Вам нужно будет взглянуть на код R-пакета, чтобы выяснить, почему это иногда не происходит.