Hadoop> Ошибка ввода класса Mapper

#hadoop #map #reduce

#hadoop #Карта #уменьшить

Вопрос:

Содержимое используемого мной входного текстового файла

 1   "Come 
1   "Defects," 
1   "I 
1   "Information 
1   "J" 
2   "Plain 
5   "Project 
1   "Right 
1   "Viator"
  

цифры с левой стороны и слова с правой стороны разделены табуляцией
Но когда я выполняю функцию mapper ниже

 public static class SortingMapper extends Mapper<Text, Text, Pair, NullWritable> 
{
    private Text word = new Text();
    private IntWritable freq = new IntWritable();
    
    @Override
    public void map(Text key, Text value, Context context) throws IOException, InterruptedException
    {
        String line = value.toString();
        String[] words = line.split("t");
        
        freq = new IntWritable(Integer.parseInt(words[0]));
        word.set(words[1]);     
        context.write(new Pair(word, freq), NullWritable.get());}}
public static class FirstPartitioner extends Partitioner<Pair, NullWritable>
{
    @Override
    public int getPartition(Pair p, NullWritable n, int numPartitions)
    {
        String word = p.getFirst().toString();
        
        char first = word.charAt(0);
        char middle = 'n';
        
        if(middle < first)
        {
            return 0;
        }
        else 
            return 1 % numPartitions; //why does % need???
    }
}

public static class KeyComparator extends WritableComparator
{   
    
    protected KeyComparator()
    {
        super(Pair.class, true);
    }
    
    @Override
    public int compare(WritableComparable w1, WritableComparable w2)
    {
        Pair v1 = (Pair) w1;
        Pair v2 = (Pair) w2;
        
        /*
         * since we already count word in the first MR we only need to sort the list by frequency
         * so no need to compare Text again
        int cmp = Pair.compare(v1.getFirst(), v2.getFirst());
        if(cmp != 0) {  return cmp; }
        */ 
        
        return -1 * v1.compareTo(v2);
        //possible error: it compares Text first and then compare IntWritable 
    }
}

public static class GroupComparator extends WritableComparator
{
    protected GroupComparator()
    {
        super(Pair.class, true);
    }
    
    @Override
    public int compare(WritableComparable w1, WritableComparable w2)
    {
        Pair v1 = (Pair) w1;
        Pair v2 = (Pair) w2;
        return v1.getFirst().compareTo(v2.getFirst());
        //this compareTo is under binarycomparable
    }
}

public static class SortingReducer extends Reducer<Pair, NullWritable, Pair, NullWritable>
{
    @Override
    public void reduce(Pair p, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException
    {
        System.out.println("sortingReducer");
        context.write(p, NullWritable.get());
    }
}

public static void main(String[] args) throws Exception
{
    
    Configuration conf2 = new Configuration();
    //String[] otherArgs2 = new GenericOptionsParser(conf1, args).getRemainingArgs();
    
    ControlledJob cJob2 = new ControlledJob(conf2);
    //conf2.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " ");
    cJob2.setJobName("Sorting");
    
    Job job2 = cJob2.getJob();
    
    job2.setJarByClass(Sorting.class);
    
    job2.setInputFormatClass(KeyValueTextInputFormat.class);
    
    job2.setMapperClass(SortingMapper.class);
    job2.setPartitionerClass(FirstPartitioner.class);
    job2.setSortComparatorClass(KeyComparator.class);
    job2.setGroupingComparatorClass(GroupComparator.class);
    job2.setReducerClass(SortingReducer.class);
    
    job2.setOutputKeyClass(Pair.class);
    job2.setOutputValueClass(NullWritable.class);
    
    job2.setOutputFormatClass(TextOutputFormat.class);
    
    FileInputFormat.addInputPath(job2, new Path("hdfs:///tmp/inter/part-r-        
00000.txt"));
    FileOutputFormat.setOutputPath(job2, new Path(args[0]));
    
    job2.waitForCompletion(true);
    
}
  

Затем я получил несколько ошибок ниже

 Error: java.lang.NumberFormatException: For input string: ""Come"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:481)
    at java.lang.Integer.parseInt(Integer.java:527)
    at Sorting$SortingMapper.map(Sorting.java:98)
    at Sorting$SortingMapper.map(Sorting.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1557)               
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
  

Я предполагаю, что в String[] words есть проблемы, но я не могу понять, что именно нужно исправить. Я был бы признателен, если бы вы помогли мне исправить ошибки.

Дополнительно

Я обнаружил, что использовал

  job2.setInputFormatClass(KeyValueTextInputFormat.class); 

     
  

в основной функции, которая разделяет ключ и значение с помощью разделителя табуляции, я просто изменил

 String line = value.toString();

        String[] words = line.split("t");
        
        freq = new IntWritable(Integer.parseInt(words[0]));
        word.set(words[1]);     
  

в

 String num = key.toString();
        freq = new IntWritable(Integer.parseInt(num));
        word = value;
        context.write(new Pair(word, freq), NullWritable.get());
                                                                                  
  

он успешно запущен, но вывод странный.

 Sorting$Pair@5b5b072f
Sorting$Pair@5b5b072f
Sorting$Pair@5b5b072f
Sorting$Pair@5b5b072f
Sorting$Pair@5b5b072f
Sorting$Pair@5b5b072f
Sorting$Pair@5b5b072f
Sorting$Pair@5b5b072f
Sorting$Pair@5b5b072f
Sorting$Pair@5b5b072f
Sorting$Pair@5b5b072f
Sorting$Pair@5b5b072f
Sorting$Pair@5b5b072f
Sorting$Pair@5b5b072f
Sorting$Pair@5b5b072f
Sorting$Pair@5b5b072f
....
  

Мой ожидаемый результат

 5   "Project 
2   "Plain 
1   "Come 
1   "Defects," 
1   "I 
1   "Information 
1   "J" 
1   "Right 
1   "Viator" 
  

ухудшило ли это изменение?

Комментарии:

1. У вас также есть reduce или просто mapper?

2. У меня есть reduce и другие классы. Я запишу весь свой код.

3. Можете ли вы показать это. Похоже, вам просто нужно вызвать toString() свой выходной объект. Или какое-либо другое значение / поле / средство получения объекта рядом с самим объектом. Результат, который вы видите, — это адрес памяти объекта.

4. Я обновил его. Спасибо

Ответ №1:

Вам просто нужно переопределить toString ваш Pair объект и вернуть все, что вы хотите, в качестве конечного результата для каждой записи.

Что-то вроде этого…

 class Pair {

    ... 

    @Override
    public String toString() {
        return freq   " "   word;
    }
}
  

Комментарии:

1. Теперь класс pair имеет (текстовые, записываемые) параметры. Так вы имеете в виду, что мне нужно изменить их на string и int?