#java #hadoop #mapreduce #hadoop2 #mapr
#java #hadoop #mapreduce #hadoop2 #mapr
Вопрос:
Ниже приведены мои конфигурации кластера Mapr (небезопасные).
MapR version - 6.1
Os - Ubuntu 16.04
Hadoop version - 2.7.0
Nodes - Single node
core-site.xml:
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<property>
<name>hadoop.proxyuser.mapr.hosts</name>
<value>*</value>
<description>The superuser mapr can connect from any host to impersonate a user</description>
</property>
<property>
<name>hadoop.proxyuser.mapr.groups</name>
<value>*</value>
<description>Allow the superuser mapr to impersonate any member of any group</description>
</property>
</configuration>
hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
</configuration>
mapred-site.xml
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>mapreduce.jobhistory.address</name>
<value>non-sec-mapr:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>non-sec-mapr:19888</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.https.address</name>
<value>non-sec-mapr:19890</value>
</property>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
yarn-site.xml
<?xml version="1.0"?>
<configuration>
<!-- Resource Manager MapR HA Configs -->
<property>
<name>yarn.resourcemanager.ha.custom-ha-enabled</name>
<value>true</value>
<description>MapR Zookeeper based RM Reconnect Enabled. If this is true, set the failover proxy to be the class MapRZKBasedRMFailoverProxyProvider</description>
</property>
<property>
<name>yarn.client.failover-proxy-provider</name>
<value>org.apache.hadoop.yarn.client.MapRZKBasedRMFailoverProxyProvider</value>
<description>Zookeeper based reconnect proxy provider. Should be set if and only if mapr-ha-enabled property is true.</description>
</property>
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
<description>RM Recovery Enabled</description>
</property>
<property>
<name>yarn.resourcemanager.ha.custom-ha-rmaddressfinder</name>
<value>org.apache.hadoop.yarn.client.MapRZKBasedRMAddressFinder</value>
</property>
<property>
<description>Indicate to clients whether Timeline service is enabled or not. If enabled, the TimelineClient library used by end-users will post entities and events to the Timeline server.</description>
<name>yarn.timeline-service.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.timeline-service.hostname</name>
<value>non-sec-mapr</value>
</property>
<property>
<description>The setting that controls whether yarn system metrics is published on the timeline server or not by RM.</description>
<name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.timeline-service.http-cross-origin.enabled</name>
<value>true</value>
</property>
</configuration>
Наше требование заключается в том, что пользователи будут передавать свое задание mapreduce в виде jar, а наше приложение загрузит jar и отправит его в кластер mapr. Ниже приведен наш код.
Example mapreduce job:
import java.util.Iterator;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configuration;
public class WordCount
{
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>
{
private static final IntWritable one;
private Text word;
static {
one = new IntWritable(1);
}
public Map() {
this.word = new Text();
}
public void map(final LongWritable key, final Text value, final Mapper.Context context) throws IOException, InterruptedException {
final String line = value.toString();
final StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
this.word.set(tokenizer.nextToken());
context.write((Object)this.word, (Object)Map.one);
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(final Text key, final Iterable<IntWritable> values, final Reducer.Context context) throws IOException, InterruptedException {
int sum = 0;
for (final IntWritable val : values) {
sum = val.get();
}
context.write((Object)key, (Object)new IntWritable(sum));
}
}
}
We have created a jar for the above code , let’s consider the jar name is wordcount.jar
Below is the logic that we have written in our application , which will load the above created jar (wordcount.jar) and creates a new mapreduce job object and submits to the mapr cluster.
public void submitMapReduceJar()
{
Thread.currentThread().setContextClassLoader( getClass().getClassLoader() );
URL resolvedJarUrl=new File("/home/mapr/mapreduce/wordcount.jar").toURI().toURL();
URL[] urls = new URL[] { resolvedJarUrl };
try ( URLClassLoader loader = new URLClassLoader( urls, this.getClass().getClassLoader() ) ) {
Job joblocal = Job.getInstance();
String hdfs = "<hdfs-site.xml>";
String core = "<core-site.xml>";
String mapred = "<mapred-site.xml>";
String yarn = "<yarn-site.xml>";
joblocal.getConfiguration().addResource( new ByteArrayInputStream( yarn.getBytes() ),"yarn-site.xml" );
joblocal.getConfiguration().addResource( new ByteArrayInputStream( hdfs.getBytes() ),"hdfs-site.xml" );
joblocal.getConfiguration().addResource( new ByteArrayInputStream( core.getBytes() ),"core-site.xml" );
joblocal.getConfiguration().addResource( new ByteArrayInputStream( mapred.getBytes() ),"mapred-site.xml" );
joblocal.setJar( "/home/mapr/mapreduce/wordcount.jar" );
joblocal.setJobName("WordCount3");
Class<?> keyClass = loader.loadClass( "org.apache.hadoop.io.Text" );
joblocal.setOutputKeyClass( keyClass );
Class<?> valueClass = loader.loadClass( "org.apache.hadoop.io.IntWritable" );
joblocal.setOutputValueClass( valueClass );
Class<?> mapper = loader.loadClass( "WordCount$Map" );
joblocal.setMapperClass((Class<? extends org.apache.hadoop.mapreduce.Mapper>) mapper );
Class<?> reducer = loader.loadClass( "WordCount$Reduce" );
joblocal.setReducerClass( (Class<? extends org.apache.hadoop.mapreduce.Reducer>)reducer );
Class<?> inputFormat = loader.loadClass( "org.apache.hadoop.mapreduce.lib.input.TextInputFormat" );
joblocal.setInputFormatClass((Class<? extends org.apache.hadoop.mapreduce.InputFormat>) inputFormat );
Class<?> outputFormat = loader.loadClass( "org.apache.hadoop.mapreduce.lib.output.TextOutputFormat" );
joblocal.setOutputFormatClass( (Class<? extends org.apache.hadoop.mapreduce.OutputFormat>)outputFormat );
FileInputFormat.addInputPath(joblocal, new Path("/user/mapr/input"));
FileOutputFormat.setOutputPath(joblocal, new Path("/user/mapr/outjar2"));
( (JobConf) joblocal.getConfiguration()).setNumMapTasks(1);
( (JobConf) joblocal.getConfiguration()).set( "fs.defaultFS", "maprfs://non-sec-mapr:7222" );
( (JobConf) joblocal.getConfiguration()).set( "mapred.job.tracker", "non-sec-mapr:8032" );
joblocal.setNumReduceTasks(1);
joblocal.waitForCompletion(true);
}
}
Используя приведенную выше логику, мы можем отправить задание, и в журналах мы видим, что map и reduce выполнены на 100%, но в конце мы получаем исключение NullPointerException.
Ниже приведены журналы из контейнера приложения:
2021-01-21 05:00:53,766 INFO [main] org.apache.hadoop.metrics2.impl.MetricsConfig: loaded properties from hadoop-metrics2.properties
2021-01-21 05:00:53,837 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled snapshot period at 10 second(s).
2021-01-21 05:00:53,837 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system started
2021-01-21 05:00:53,839 INFO [main] org.apache.hadoop.mapred.YarnChild: Executing with tokens:
2021-01-21 05:00:53,839 INFO [main] org.apache.hadoop.mapred.YarnChild: Kind: mapreduce.job, Service: job_1611205037383_0001, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@4bb33f74)
2021-01-21 05:00:53,935 INFO [main] org.apache.hadoop.mapred.YarnChild: Sleeping for 0ms before retrying again. Got null now.
2021-01-21 05:00:54,093 INFO [main] org.apache.hadoop.mapred.YarnChild: mapreduce.cluster.local.dir for child: /tmp/hadoop-mapr/nm-local-dir/usercache/mapr/appcache/application_1611205037383_0001
2021-01-21 05:00:54,318 INFO [main] org.apache.hadoop.mapred.Task: mapOutputFile class: org.apache.hadoop.mapred.MapRFsOutputFile
2021-01-21 05:00:54,318 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
2021-01-21 05:00:54,331 INFO [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: File Output Committer Algorithm version is 1
2021-01-21 05:00:54,331 INFO [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2021-01-21 05:00:54,359 INFO [main] org.apache.hadoop.mapred.Task: Using ResourceCalculatorProcessTree : [ ]
2021-01-21 05:00:54,434 INFO [main] org.apache.hadoop.mapred.MapTask: Processing split: maprfs://non-sec-mapr:7222/user/mapr/input/1word.txt:0 11
2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: mapreduce.task.io.sort.mb: 100
2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: soft limit at 83886080
2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufvoid = 104857600
2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 26214396; length = 6553600
2021-01-21 05:00:54,471 INFO [main] org.apache.hadoop.mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2021-01-21 05:00:54,505 INFO [main] org.apache.hadoop.mapred.MapTask: Starting flush of map output
2021-01-21 05:00:54,505 INFO [main] org.apache.hadoop.mapred.MapTask: flush: Spilling map output
2021-01-21 05:00:54,505 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = 20; bufvoid = 104857600
2021-01-21 05:00:54,505 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214392(104857568); length = 5/6553600
2021-01-21 05:00:54,540 INFO [main] org.apache.hadoop.mapred.MapTask: Finished spill 0
2021-01-21 05:00:54,541 INFO [main] org.apache.hadoop.mapred.MapTask: Starting flush of map output
2021-01-21 05:00:54,542 INFO [main] org.apache.hadoop.mapred.MapTask: kvbuffer is null. Skipping flush.
2021-01-21 05:00:54,543 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.lang.NullPointerException
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:657)
at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:88)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sameVolRename(MapTask.java:1962)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.mergeParts(MapTask.java:1829)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1522)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:732)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:802)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:346)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1669)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
2021-01-21 05:00:54,546 INFO [main] org.apache.hadoop.mapred.Task: Runnning cleanup for the task
2021-01-21 05:00:54,547 WARN [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Could not delete maprfs://non-sec-mapr:7222/user/mapr/ou4/_temporary/1/_temporary/attempt_1611205037383_0001_m_000000_0
Пожалуйста, помогите нам решить эту проблему.