hadoop的集群HA安装,可以参考Hadoop HA集群搭建中进行查看,这里主要是在已有的Hadoop集群上启动yarn, 然后启动集群yarn, 就可以在集群中进行任务的调度和执行。yarn的调度过程,可以参考yarn架构
yarn-site.xml
<configuration> <!-- Site specific YARN configuration properties --> <!-- 让yarn的容器支持mapreduce的洗牌,开启shuffle服务 --> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <!-- 启用resoucemanager的HA --> <property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> </property> <!-- 指定zookeeper集群的各个节点地址和端口号 --> <property> <name>yarn.resourcemanager.zk-address</name> <value>node1:2181,node2:2181,node3:2181</value> </property> <!-- 标识集群,以确保RM不会接管另一个集群的活动 --> <property> <name>yarn.resourcemanager.cluster-id</name> <value>cluster1</value> </property> <!-- RM HA的两个ResourceManager的名字 --> <property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value> </property> <!-- 指定rm1的resourcemanager进程所在的主机名称 --> <property> <name>yarn.resourcemanager.hostname.rm1</name> <value>node1</value> </property> <!-- 指定rm2的resourcemanager进程所在的主机名称 --> <property> <name>yarn.resourcemanager.hostname.rm2</name> <value>node4</value> </property> </configuration>
mapred-site.xml
<configuration> <!--指定mr作业运行的框架:要么本地运行,要么使用classic(MRv1),要么使用yarn--> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
这里主要配置了在任性任务调度的时候,使用yarn进行任务的调度,这样就可以使用yarn了
启动yarn
启动yarn则不会像启动dfs那样可以通过ssh的方式启动,我们可以通过脚本的方式启动yarn:
#!/bin/bash for node in node1 node2 node3 do ssh $node "source /etc/profile; zkServer.sh start" done sleep 1 start-dfs.sh start-yarn.sh for node in node4 do ssh $node "source /etc/profile; start-yarn.sh" done echo "----------------node1-jps------------------------" jps for node in node2 node3 node4 do echo "--------------------------${node}-jps-----------------" ssh $node "source /etc/profile; jps" done
这里就主要通过ssh的方式去启动yarn和dfs,并查看启动的状态,以上就是yarn的简单配置,这样我们就可以访问集群了。
验证是否成功
从上面配置可以知道,我们可以通过访问node1和Node4查看yarn的启动情况。通过访问8088端口,查看集群启动情况,如果启动正常,则会展示一下界面:
以下为node1的界面情况:
因为node4是standby的角色,因此在访问node4的8088端口时,会自动的跳转到node1的地址。
以上就是yarn简单配置,这样我们就可以开发mapreduce的任务啦~~~~
创建任务并执行
这里主要实现一个单词统计的功能,主要代码如下:
pom.xml
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.5</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.5</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.5</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.5</version> </dependency>
WordCountMapper
package org.hadoop.learn.mp.wordcount; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 第一个参数KEYIN: 读取文件的偏移量 * 第二个参数VALUEIN: 代表了这一行的文本内容,输入的value类型 * 第三个参数KEYOUT: 输出的key的value类型 * 第四个擦拿书VALUEOUT 输出的value类型 */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable inKey, Text inValue, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { Thread.sleep(10000000); // 获取当前行文本内容 String line = inValue.toString(); // 按照空行进行拆分 String[] words = line.split(" "); for (String word : words) { if (word.isEmpty()) { continue; } context.write(new Text(word), new LongWritable(1)); } } }
WordCountReducer
package org.hadoop.learn.mp.wordcount; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { // 定义当前单词出现的总次数 long sum = 0; for (LongWritable value : values) { sum += value.get(); } context.write(key, new LongWritable(sum)); } }
WordCountMain
package org.hadoop.learn.mp.wordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import java.io.IOException; public class WordCountMain { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { if (args == null || args.length == 0 || args.length != 2) { System.out.println("请输入输入/输出路径"); return; } System.setProperty("HADOOP_HOME", "H:\\xianglujun\\hadoop-2.6.5"); System.setProperty("hadoop.home.dir", "H:\\xianglujun\\hadoop-2.6.5"); System.setProperty("HADOOP_USER_NAME", "root"); Configuration configuration = new Configuration(); // 设置本地运行 configuration.set("mapreduce.framework.name", "local"); JobConf jobConf = new JobConf(configuration); // 设置作业的输入输出路径 FileInputFormat.addInputPath(jobConf, new Path(args[0])); Path outputPath = new Path(args[1]); FileOutputFormat.setOutputPath(jobConf, outputPath); Job job = Job.getInstance(jobConf); FileSystem fs = FileSystem.get(configuration); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } job.setJarByClass(WordCountMain.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setJobName("wordcount"); // 设置输出key的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 设置reducer的相关 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 提交作业并等待作业结束 boolean b = job.waitForCompletion(true); System.out.println("任务是否执行完成: " + b); } }
为了能够执行起来,需要做一下步骤:
- 在通过hadoop集群运行的时候,需要取消本地运行的配置哦
mapreduce.framework.name
- 将以上代码使用maven打包为jar, 并上传到hadoop的服务器
- 然后通过yarn的方式执行代码
yarn jar hedoop-learn-1.0-SNAPSHOT.jar org.hadoop.learn.mp.temporary.WeatherMain /xianglujun/tmpr/list.txt /xianglujun/tmpr/result
- 在hadoop的hdfs中查看结果输出,检验结果。
以上执行完毕,希望可以帮助到大家。。