Тип ключа вывода редуктора MapReduce

#hadoop #mapreduce

#hadoop #mapreduce

Вопрос:

Я написал программу Map и Reduce, в которой выходной ключ и значение редуктора отличаются от входных данных или выходных данных Mapper. Я внес соответствующие изменения в класс драйвера. Вот исключение, которое я получаю при его запуске:

 INFO mapreduce.Job: Task Id : attempt_1550670375771_4211_m_000003_2, Status : FAILED Error: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.FloatWritable
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1084)
        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:721)
        at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
        at com.hirw.maxcloseprice.MyHadoopMapper.map(MyHadoopMapper.java:20)
        at com.hirw.maxcloseprice.MyHadoopMapper.map(MyHadoopMapper.java:1)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:793)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

19/04/16 22:24:50 INFO mapreduce.Job:  map 100% reduce 100% 19/04/16 22:24:50 INFO mapreduce.Job: Job job_1550670375771_4211 failed with state FAILED due to: Task failed task_1550670375771_4211_m_000001 Job failed as tasks failed. failedMaps:1 failedReduces:0
 

Это работает, когда KeyOut и ValueOut редуктора такие же, как у Mapper, но сбой, когда они разные.

Мой класс Mapper:

 public class MyHadoopMapper extends Mapper<LongWritable, Text, Text, FloatWritable>{

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        
        String[] recordItems = value.toString().split(",");
        
        String stock = recordItems[1];
        Float stockValue = Float.parseFloat(recordItems[6]);
        
        context.write(new Text(stock), new FloatWritable(stockValue));
    }
}
 

Класс редуктора:

 public class MyHadoopReducer extends Reducer<Text, FloatWritable, Text, Text> { 

    @Override
    public void reduce(Text key, Iterable<FloatWritable> values, Context context
            ) throws IOException, InterruptedException {
        
        Float maxVal = Float.MIN_VALUE;
        for (FloatWritable stockValue : values) {
            maxVal = stockValue.get() > maxVal ? stockValue.get() : maxVal;
        }
        
        context.write(key, new Text(String.valueOf(maxVal)));
    }
    
}
 

И класс драйвера:

 public class MyHadoopDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub

        Job hadoopJob = new Job();
        hadoopJob.setJarByClass(MyHadoopDriver.class);
        hadoopJob.setJobName("MyStockPrice");
        
        FileInputFormat.addInputPath(hadoopJob, new Path("/user/hirw/input/stocks"));
        FileOutputFormat.setOutputPath(hadoopJob, new Path("stocksData"));
        
        hadoopJob.setInputFormatClass(TextInputFormat.class);
        hadoopJob.setOutputFormatClass(TextOutputFormat.class);
        
        hadoopJob.setMapperClass(MyHadoopMapper.class);
        hadoopJob.setReducerClass(MyHadoopReducer.class);
        
        hadoopJob.setCombinerClass(MyHadoopReducer.class);
        
        hadoopJob.setOutputKeyClass(Text.class);
        hadoopJob.setOutputValueClass(Text.class);
        
        System.exit(hadoopJob.waitForCompletion(true) ? 0: 1);
    }

}
 

Ответ №1:

По умолчанию тип вывода mapper — Text, в то время как вы используете FloatWritable . Это то, что говорит вам exception. Вам нужно указать тип вывода mapper следующим образом:

 job.setMapOutputValueClass(FloatWritable.class)
 

Комментарии:

1. Проблема была в объединителе. У него был другой ключ. Я вынул его, и это сработало. Спасибо за ваш ответ!!

Ответ №2:

Извлеките объединитель или напишите новый с соответствующим ключом и выводом.