#hadoop
#hadoop
Вопрос:
Я хочу прочитать файл csv и нормализовать данные. Если я хорошо понимаю, как работает hadoop, картограф получает данные построчно. Я нашел эту формулу для нормализации: Xnew = (X — Xmin) / (Xmax — Xmin) Поэтому мне нужно знать минимальное значение столбца и максимальное значение для нормализации. Как я могу это сделать, когда в mapper у меня есть доступ только к одной строке за раз?
Ответ №1:
Проблема с поиском максимального и минимального значения столбца в этом типе приложений заключается в области действия переменных max / min, где к ним можно получить доступ / изменить в параллельной программе, где каждый экземпляр изолирован от другого с точки зрения данных. Итак, что здесь нужно сделать, так это найти способ иметь глобальную область видимости для переменных max / min, чтобы получить доступ к их собственным экземплярам и синхронизировать их в конце каждого шага сопоставления / сокращения.
Самое близкое к этому, поддерживаемое Hadoop (на момент написания этого ответа), — это функция счетчиков, но они разработаны таким образом, чтобы только увеличивать их значения, поэтому вам нужно проявить творческий подход для достижения желаемого результата.
Хитрость здесь заключается в том, чтобы на самом деле иметь if-операторы, изменяющие максимальные и минимальные счетчики до значения столбца каждой строки (в случае, если они являются максимальными и / или минимальными), путем
- сброс счетчика до нуля путем добавления отрицательного значения самого себя, а затем
- увеличьте счетчик до значения этой конкретной строки из входного
csv
файла
Это немного утомительно, но оно выполняет работу внутри Map
функции.
Теперь, для доступа к максимальным и минимальным значениям счетчиков из Reduce
функции, мы можем просто получить их в setup
методе перед выполнением всех экземпляров reducer и использовать их для вычисления новых нормализованных значений каждой key-value
пары.
Итак, допустим, у нас есть grades.csv
файл, хранящийся в grades
каталоге в HDFS
, в котором хранятся оценки учащихся в классе начальной школы:
Jack,3
Dennis,5
Kate,10
Nancy,9
Peter,1
Zack,2
Alex,4
Yvonne,10
Violet,1
Claire,2
Мы можем найти максимальные и минимальные значения на Map
этапе, превращая каждую строку входного файла в key-value
пары, и вычислить нормализованную оценку для каждого учащегося (используя, конечно, максимальные и минимальные значения) на Reduce
этапе, как показано ниже:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Cluster;
import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;
public class NormGrades
{
public static enum Global_Counters
{
MAX_GRADE,
MIN_GRADE
}
/* input: <byte_offset, line_of_tweet>
* output: <student, grade>
*/
public static class Map_Normalize extends Mapper<Object, Text, Text, IntWritable>
{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
String[] columns = line.split(",");
int student_grade = Integer.parseInt(columns[1]);
int max_grade = Math.toIntExact(context.getCounter(Global_Counters.MAX_GRADE).getValue());
int min_grade = Math.toIntExact(context.getCounter(Global_Counters.MIN_GRADE).getValue());
// in order to find the maximum grade, we first set the max grade counter to 0
// by "increasing" it to the negative value of itself, and then increment by
// the new found maximum grade
if(student_grade > max_grade)
{
context.getCounter(Global_Counters.MAX_GRADE).increment(max_grade*(-1));
context.getCounter(Global_Counters.MAX_GRADE).increment(student_grade);
}
// in order to find the minimum grade, we first set the min grade counter to 0
// by "increasing" it to the negative value of itself, and then increment by
// the new found minimum grade
// the contents on this if statement will be accessed at least once in order to
// make sure that the min grade counter value is certainly higher than 0
if((student_grade < min_grade) || (min_grade == 0))
{
context.getCounter(Global_Counters.MIN_GRADE).increment(min_grade*(-1));
context.getCounter(Global_Counters.MIN_GRADE).increment(student_grade);
}
context.write(new Text(columns[0]), new IntWritable(student_grade));
}
}
/* input: <student, grade>
* output: <student, normalized_grade>
*/
public static class Reduce_Normalize extends Reducer<Text, IntWritable, Text, DoubleWritable>
{
public int max_grade, min_grade;
protected void setup(Context context) throws IOException, InterruptedException
{
Configuration conf = context.getConfiguration();
Cluster cluster = new Cluster(conf);
Job current_job = cluster.getJob(context.getJobID());
max_grade = Math.toIntExact(current_job.getCounters().findCounter(Global_Counters.MAX_GRADE).getValue());
min_grade = Math.toIntExact(current_job.getCounters().findCounter(Global_Counters.MIN_GRADE).getValue());
}
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
// each reducer instance is run for each student, so there is only one value/grade to access
int student_grade = values.iterator().next().get();
Double normalized_grade = (double) (student_grade - min_grade) / (max_grade - min_grade);
context.write(key, new DoubleWritable(normalized_grade));
}
}
public static void main(String[] args) throws Exception
{
Path input_dir = new Path("grades");
Path output_dir = new Path("normalized_grades");
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_dir))
fs.delete(output_dir, true);
Job normalize_job = Job.getInstance(conf, "Normalize Grades");
normalize_job.setJarByClass(NormGrades.class);
normalize_job.setMapperClass(Map_Normalize.class);
normalize_job.setReducerClass(Reduce_Normalize.class);
normalize_job.setMapOutputKeyClass(Text.class);
normalize_job.setMapOutputValueClass(IntWritable.class);
normalize_job.setOutputKeyClass(Text.class);
normalize_job.setOutputValueClass(DoubleWritable.class);
TextInputFormat.addInputPath(normalize_job, input_dir);
TextOutputFormat.setOutputPath(normalize_job, output_dir);
normalize_job.waitForCompletion(true);
}
}
Результаты сохраняются, как показано в браузере HDFS на следующем скриншоте: