Hadoop搭建实例

2016/11/8 posted in  课程相关  

本次课程要求自主搭建Hadoop集群环境,并进行MapReduce作业操作
本文以统计知乎用户地域分布情况为例进行统计展示

数据获取与格式说明

数据获取

实验数据采用爬虫方式从知乎进行爬取,并保存进入数据库,总数据量3383054条。为了后续处理方便,我们将数据从数据库中倒成csv文件进行存储。
数据爬取代码:知乎爬虫

数据格式

导出的数据格式为:id,name,headline,gender,school,major,address,industry,company,job

Hadoop集群搭建

Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。
本次课程实例基于ubuntu12.04、jdk1.8.111、hadoop2.7.3进行说明,其中ubuntu服务器以虚拟机的形式搭建。

  1. ubuntu虚拟机搭建

    虚拟机创建的步骤在本文中不加以介绍,如有需要请自行百度。ps.为了简便操作,我们可以先创建并配置一台虚拟机然后进行克隆。为了简便操作,可以使用sudo apt-get install ubuntu-desktop安装ubuntu的图形操作界面。

  2. 建立软件目录

    为了后续更好的管理我们按照的软件。我们使用mkdir命令,建立软件的安装目录。命令:sudo mkdir /usr/soft

  3. jdk的安装与环境变量设置

    1. 首先我们从Oracle的官网上下载jdk的压缩文件。下载地址:jdk;
    2. 使用tar -xzvf 文件名对安装包进行解压;
    3. 将解压后的文件夹移至上一步我们建立的软件安装目录。mv jdk1.8.0_111/ /usr/soft
    4. 设置java所需要的环境变量:

      1. 打开配置文件 sudo vi /etc/profile
      2. 在配置文件中加入export JAVA_HOME="/usr/soft/jdk1.8.0_111"
      3. 在配置文件中加入export PATH="$PATH:$JAVA_HOME/bin"
      4. 保存配置文件退出;
      5. 使用source命令让配置文件生效,source /etc/profile
    5. 调用javac -version命令,检查配置是否生效,安装是否成功。

  4. Hadoop的安装与环境变量设置

    1. 从Apache Hadoop网站上下载对应的Hadoop安装包。下载地址:Hadoop2.7.3
    2. 使用tar -xzvf 文件名对安装包进行解压;
    3. 将解压后的文件夹移至上一步我们建立的软件安装目录。mv Hadoop-2.7.3 /usr/soft
    4. 打开配置文件 sudo vi /etc/profile
    5. 在配置文件中加入export PATH="$PATH:$JAVA_HOME/bin:/usr/soft/hadoop-2.7.3/bin:/usr/soft/hadoop-2.7.3/sbin"
    6. 保存配置文件退出;
    7. 使用source命令让配置文件生效,source /etc/profile
    8. 调用Hadoop version命令,检查配置是否生效,安装是否成功。
  5. SSH免密码登录配置

    1. 使用sudo apt-get install ssh安装完整的ssh客户端;
    2. 使用ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa生成公私钥;
    3. 使用cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys将公钥复制;
    4. 使用ssh localhost命令检验是否可以免密码登录系统。
  6. 修改服务器的名称

    1. 使用sudo vi /etc/hostname,修改服务器的名称
    2. 重启服务器使配置生效
  7. 配置Hadoop配置文件

    Hadoop的配置文件位于其安装目录下的etc/hadoop/目录下

    1. 修改yarn-site.xml,内容如下(master代表主服务器名):

      <configuration>
          <property>
              <name>fs.defaultFS</name>
              <value>hdfs://master/</value>
          </property>
          <property>
              <name>hadoop.tmp.dir</name>
              <value>~/hadoop/data</value>
          </property>
      </configuration>
      
    2. 修改hdfs-site.xml,内容如下:

      <configuration>
          <property>
              <name>dfs.replication</name>
              <value>3</value>
          </property>
      </configuration>
      
    3. 修改mapred-site.xml(此文件需要从mapred-site.xml.template复制),内容如下:

      <configuration>
          <property>
              <name>mapreduce.framework.name</name>
              <value>yarn</value>
          </property>
      </configuration>
      
    4. 修改yarn-site.xml,内容如下:

      <configuration>
          <property>
              <name>yarn.resourcemanager.hostname</name>
              <value>master</value>
          </property>
          <property>
              <name>yarn.nodemanager.aux-services</name>
              <value>mapreduce_shuffle</value>
          </property>
          <property>
              <name>yarn.resourcemanager.address</name>
              <value>master:8032</value>
          </property>
          <property>
              <name>yarn.resourcemanager.scheduler.address</name>
              <value>master:8030</value>
          </property>
      </configuration>
      
    5. 修改slaves文件,将附属机主机名添加入内,每行一个,样例为:

      s1
      s3
      
  8. 克隆宿主机,配置hosts文件

    1. 使用VM的克隆功能,克隆虚拟机,并依次修改主机名
    2. 根据实际IP修改各个机器的hosts文件,ip查看可以使用ifconfig命令,样例如下:

      127.0.0.1  localhost    
      10.104.234.203  master
      182.254.216.245 s1
      123.207.252.11 s2
      123.207.24.98 s3
      
  9. 格式化HDFS文件系统,执行hdfs namenode -format命令,对HDFS进行格式化。

  10. 启动Hadoop集群

    1. 启动文件系统,start-dfs.sh;
    2. 启动yarn,start-yarn.sh;
    3. 可以使用jps命令,检查各个服务的启动情况

统计程序的编写与运行

本次实验所用的MapReduce程序采用Maven的方式进行构建,具体操作可以参考文章:Maven构建Hadoop工程

  1. 编写Mapper类,本次我们实现的功能从原理上与Hadoop自带的WordCount实验非常相似。

    package com.swu.count;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class CountMapper extends Mapper<Object, Text, Text, IntWritable> {
    
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
    
        @Override
        protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            // 获取每一行数据,并以逗号为基准进行分割
            String[] data = value.toString().split(",");
            // 设置word的key为地址信息
            word.set(data[5]);
            // 设置value为1
            context.write(word, one);
        }
    
    }
    
  2. 编写Reducer类。

    package com.swu.count;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class CountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        private Text keyEx = new Text();
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int sum = 0;
            // 遍历value相加
            for (IntWritable val : values) {
                sum += val.get();
            }
            // 返回新的key-value
            result.set(sum);
            keyEx.set(key);
            context.write(keyEx, result);
        }
    }
    
  3. 编写Comparator类,用于第二次作业的排序。

    package com.swu.count;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.WritableComparable;
    
    public class CountComparator extends IntWritable.Comparator {
    
        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            // TODO Auto-generated method stub
            return -super.compare(b1, s1, l1, b2, s2, l2);
        }
        public int compare(WritableComparable a, WritableComparable b) {
            // TODO Auto-generated method stub
            return -super.compare(a, b);
        }
    }
    
  4. 编写主方法。

    package com.swu.count;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import java.util.Random;
    
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
    
    public class Main {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length != 2) {
                System.err.println("Usage: zhihuCount <in> <out>");
                System.exit(2);
            }
            Path tempDir = new Path("wordcount-temp-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); // 定义一个临时目录
    
            Job job = new Job(conf, "zhihuCount");
            job.setJarByClass(Main.class);
            try {
                job.setMapperClass(CountMapper.class);
                job.setCombinerClass(CountReduce.class);
                job.setReducerClass(CountReduce.class);
    
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);
    
                FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
                FileOutputFormat.setOutputPath(job, tempDir);
                // 先将词频统计任务的输出结果写到临时目录中,下一个排序任务以临时目录为输入目录。
                job.setOutputFormatClass(SequenceFileOutputFormat.class);
                if (job.waitForCompletion(true)) {
                    Job sortJob = new Job(conf, "sort");
                    sortJob.setJarByClass(Main.class);
                    FileInputFormat.addInputPath(sortJob, tempDir);
                    sortJob.setInputFormatClass(SequenceFileInputFormat.class);
    
                    /* InverseMapper由hadoop库提供,作用是实现map()之后的数据对的key和value交换 */
                    sortJob.setMapperClass(InverseMapper.class);
                    /* 将 Reducer 的个数限定为1, 最终输出的结果文件就是一个。 */
                    sortJob.setNumReduceTasks(1);
                    FileOutputFormat.setOutputPath(sortJob, new Path(otherArgs[1]));
    
                    sortJob.setOutputKeyClass(IntWritable.class);
                    sortJob.setOutputValueClass(Text.class);
                    /*
                     * Hadoop 默认对 IntWritable 按升序排序,而我们需要的是按降序排列。 因此我们实现了一个
                     * IntWritableDecreasingComparator 类, 并指定使用这个自定义的 Comparator
                     * 类对输出结果中的 key (词频)进行排序
                     */
                    sortJob.setSortComparatorClass(CountComparator.class);
    
                    System.exit(sortJob.waitForCompletion(true) ? 0 : 1);
                }
            } finally {
                FileSystem.get(conf).deleteOnExit(tempDir);
            }
        }
    }
    
  5. 将写好的程序编译为jar包(注意指定Main方法),上传至服务器。

  6. 执行hadoop jar XXX.jar /inputFile /outputFile,执行作业。

结果展示与说明

上述作业结束后,我们就可以通过查询输出文件得到我们的统计结果。本次实验统计结果如下:

输出文件如下:

相关代码

本次实验相关代码和所用的数据资料,均已上传至码云仓库(OSChina git)地址为:HadoopPractice