Hadoop学习笔记-持续更新

一、Hadoop

1.大数据概述

1.1 大数据概念
  • 无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。
1.2 大数据特点 :
  • Volume (大量的):TB级数据,EB级数据
  • Velocity (高速的):处理海量数据(EB,ZB)的效率至关重要
  • Variety (多样的):结构化数据(数据库表/文本等和非结构化数据(网络日志、音频、视频、图片、地理位置等)
  • Value(低密度价值):价值密度的高低与数据总量的大小成反比
1.3 大数据部门内部组织结构

alt

2.Hadoop入门

2.1 Hadoop是什么

**Hadoop:**是由Apache基金会所开发的分布式系统基础架构,主要解决海量数据的存储和海量数据的分析计算问题;Hadoop通常指Hadoop生态圈

2.2 Hadoop发展史
  1. Hadoop创始人:Doug Cutting

  2. Google是Hadoop的思想之源

    GFS -> HDFS ; Map-Reduce -> MR ; BigTable -> HBase

  3. 三大发行版本:

    1. Apache版本,入门最合适
    2. Cloudera :内部集成了大数据框架,对应产品 CDH
    3. Hortonworks :文档较好,对应产品 HDP; 现被Cloudera收购,新产品 CDP
2.3 Hadoop优势
  1. 高可靠性:维护多个数据副本,即使某个出现故障,也不会导致数据丢失
  2. 高扩展性:在集群间分配任务数据,可以方便的扩展数以千计的节点
  3. 高效性:在MapReduce的思想下,Hadoop是并行工作的,以加快任务的处理速度
  4. 高容错性:能够自动将失败的任务重新分配
2.4 Hadoop的组成

​ 1.x时代 MapReduce(计算+资源调度) HDFS(数据存储) Common(辅助工具)

​ 2.x / 3.x 时代,MapReduce(计算) Yarn(资源调度) HDFS(数据存储)


3.Hadoop架构

3.1 Hadoop概述

Hadoop Distributed File System, 是一个分布式文件系统

  1. NameNode(nm): 存储元数据,如文件名,文件目录结构,文件属性,以及每个文件的块列表和块所在的DataNode
  2. DataNode(dn):在本地文件系统存储文件块数据,以及块数据的校验和
  3. Secondary NameNode(2nn):没隔一段时间对NameNode元数据进行备份
3.2 YARN概述

Yet Another Resource Negotiator(另一种资源协调者) ,是Hadoop的资源管理器

  1. ResourceManager(RM):整个集群资源(内存、CPU)的老大
  2. NodeManager(NM):单个节点服务器的资源老大
  3. ApplicationMaster(AM):单个任务运行的老大
  4. Container:容器,相当一台独立的服务器,里面封装了任务运行时所需的资源(内存、CPU、磁盘、网络等)

3.3 MapReduce概述

计算过程分为两个阶段:Map阶段(并行处理输入数据)和 Reduce阶段(对Map结果进行汇总)

3.4 三者关系图:

3.5 大数据技术生态体系(含解释)

技术名词解释
  • Sqoop:一款开源的工具,主要用于在Hadoop、Hive 与传统的数据库(MySQL)间进行数据的传递
  • Flume:一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统;Flume支持在日志系统中定制各种数据发送,用于收集数据
  • Kafka:一种高吞吐量的分布式发布/订阅消息系统
  • Spark:当前最流行的开源大数据内存计算框架,可基于Hadoop上存储的大数据进行计算
  • Flink:当前最流行的开源大数据内存计算框架,用于实时计算的场景较多
  • Oozie:一个管理Hadoop作业(job) 的工作流程调度管理系统
  • Hbase:一个分布式、面向列的开源数据库。不同于一般的关系数据库,适用于非结构化数据存储的数据库
  • Hive:基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,提供简单的SQL查询功能
  • Zookeeper:针对大型分布式系统的可靠协调系统,提供的功能:配置维护、名字服务、分布式同步、组服务等

4.Hadoop运行环境搭建(开发重点)

4.1 运行前准备
  1. 安装虚拟机,能ping通外网(能上网),本笔记以CentOS-7.5-x86-1804

  2. 虚拟机指令列表:

    ping www.baidu.com #正常上网
    #安装epel-release,"红帽系"的操作系统提供的软件包,适用于CentOS、RHEL、等,相当于软件仓库
    yum install -y epel-release
    yum install -y net-tools #工具包集合 包含ifconfig等
    yum install -y vim	#安装vim
    systemctl stop firewalld	#关闭防火墙
    systemctl disable firewalld.service #关闭防火墙开机自启
    useradd atguigu
    passwd atguigu
    vim /etc/sudoers #添加用户,赋予root权限
    rpm -qa | grep -i java | xargs -n1 rpm -e --nodeps #卸载虚拟机自带的JDK
    reboot
    vim /etc/sysconfig/network-scripts/ifcfg-ens33 #更改IP
    vim /etc/hostname #修改主机名
    vim /etc/hosts # 配置主机名称映射hosts文件  reboot 后再重启
    #安装Jdk,配置环境变量
    tar -zcvf jdk.tar.gz -C /opt/module/
    sudo vim /etc/profile.d/my_env.sh
    source /etc/profile
    #安装Hadoop,配置环境变量
    tar -zxvf hadoop.tar.gz -C /opt/module/
    sudo vim /etc/profile.d/my_env.sh
    source /etc/profile
    sudo reboot
    
  3. Hadoop目录结构

    重要目录:

    ​ bin : 存放Hadoop相关服务(hdfs,yarn,mapred)进行操作的脚本

    ​ etc : Hadoop的配置文件目录

    ​ lib : 存放 Hadoop 的本地库(对数据进行压缩解压缩功能)

    ​ sbin : 存放启动或停止 Hadoop 相关服务的脚本

    ​ share : 存放 Hadoop 的依赖jar包、文档。和官方案例

  4. Hadoop的运行模式

    本地模式:单机运行,生产环境不用

    伪分布式模式:单机运行,具备Hadoop集群的所有功能,一台服务器模拟一个分布式的环境

    完全分布式模式:多台服务器组成分布式环境,生产环境使用

  5. 本地运行模式代码演示

mkdir wcinput #hadoop安装目录下,创建文件夹,存放输入文件
vim wcinput/word.txt #编辑输入文件
#运行本地模式
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount wcinput wcoutput
#查看运行结果
cat wcoutput/part-r-00000
4.2 完全分布式运行模式(开发重点)
4.2.1 配置集群前准备(linux命令)
  1. 集群的每台机器都需要安装Hadoop

  2. 编写集群分发脚本

    #scp 实现服务器之间的数据拷贝 语法:scp -r $pdir/$fname $user@$host:$pdir/$fname
    scp -r /opt/module/jdk root@hadoop103:/opt/module
    scp -r root@hadoop102:/opt/module/* root@hadoop104:/opt/module
    #rsync 远程同步工具 主要用于备份和镜像,具有速度快、避免复制相同内容和支持符号链接的优点
    #语法:rsync -av $pdir/fname $user@$host:$pdir/$fname
    #-a:归档拷贝  -v:显示复制过程
    rsync -av hadoop-3.1.3/ root@hadoop103:/opt/module/hadoop-3.1.3/
    
  3. bash脚本演示

    #!/bin/bash
    if [ $# -lt 1 ]
    then
    	echo Not Enough Arguement!
    	exit;
    fi
    #遍历所有机器
    for host in hadoop102 hadoop103 hadoop104
    do
    	echo === $host ===
    	#当前主机,遍历所有文件
    	for file in $@
    	do
    		if [ -e $file ]
    		then
    			#获取父目录
    			pdir=$(cd -P $(dirname $file), pwd)
    			#获取当前文件的名称
    			fname=$(basename $file)
    			ssh $host "mkdir -p $pdir"
    			rsync -av $pdir/$fname $host:$pdir
    		else
    			echo $file does not exists!
    		fi
    	done
    done
    
    #后续使用
    chmod +x xsync #赋予可执行权限
    xsync /home/atguigu/bin	#分发该脚本
    sudo cp xsync /bin/ #复制脚本到全局目录,以便全局调用
    sudo ./bin/xsync etc/profile.d/my_env.sh	#同步环境变量配置
    
  4. 配置SSH免密登录

    1)免密登录原理

​ 2) linux命令列表

#生成公钥和私钥
cd /home/atguigu/.ssh
ssh-keygen -t rsa #连敲三个回车,会生成两个文件:id_rsa(私钥) id_rsa.pub(公钥)
#将公钥拷贝到目标机器
ssh-copy-id hadoop102
ssh-copy-id hadoop103
ssh-copy-id hadoop104
#.ssh目录下文件解释
#known_hosts : 记录ssh访问过计算机的公钥(public key)
#authorized_keys : 存放授权过的无密登录服务器公钥
4.2.2 集群配置
  1. 配置文件说明

    • 默认配置文件:

    core-default.xml / hdfs-default.xml / yarn-default.xml / mapred-default.xml

    1. 自定义配置文件:

    core-site.xml / hdfs-site.xml / yarn-site.xml / mapred-site.xml

    ​ 文件位置:$HADOOP_HOME/etc/hadoop

    1. 配置代码:
    <!-- 1.配置core-site.xml -->
    <configuration>
    	<!-- 指定NameNode的地址 -->
        <property>
            <name>fs.defaultFS</name>
            <value>hdfs://hadoop102:9820</value>
    	</property>
        
    	<!-- 指定hadoop数据的存储目录 -->
        <property>
            <name>hadoop.tmp.dir</name>
            <value>/opt/module/hadoop-3.1.3/data</value>
    	</property>
        
    	<!-- 配置HDFS网页登录使用的静态用户为atguigu -->
        <property>
            <name>hadoop.http.staticuser.user</name>
            <value>atguigu</value>
    	</property>
        
    	<!-- 配置该atguigu(superUser)允许通过代理访问的主机节点 -->
        <property>
            <name>hadoop.proxyuser.atguigu.hosts</name>
            <value>*</value>
    	</property>
        
    	<!-- 配置该atguigu(superUser)允许通过代理用户所属组 -->
        <property>
            <name>hadoop.proxyuser.atguigu.groups</name>
            <value>*</value>
    	</property>
        
    	<!-- 配置该atguigu(superUser)允许通过代理的用户-->
        <property>
            <name>hadoop.proxyuser.atguigu.groups</name>
            <value>*</value>
    	</property>
    </configuration>
    
    <!-- 2.配置hdfs-site.xml   -->
    <configuration>
    	<!-- nn web端访问地址-->
    	<property>
            <name>dfs.namenode.http-address</name>
            <value>hadoop102:9870</value>
        </property>
        
    	<!-- 2nn web端访问地址-->
        <property>
            <name>dfs.namenode.secondary.http-address</name>
            <value>hadoop104:9868</value>
        </property>
    </configuration>
    
    <!-- 3.配置yarn-site.xml -->
    <configuration>
    	<!-- 指定MR走shuffle -->
        <property>
            <name>yarn.nodemanager.aux-services</name>
            <value>mapreduce_shuffle</value>
    	</property>
        
    	<!-- 指定ResourceManager的地址-->
        <property>
            <name>yarn.resourcemanager.hostname</name>
            <value>hadoop103</value>
    	</property>
        
    	<!-- 环境变量的继承 -->
        <property>
            <name>yarn.nodemanager.env-whitelist</name>
            <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
    	</property>
        
    	<!-- yarn容器允许分配的最大最小内存 -->
        <property>
            <name>yarn.scheduler.minimum-allocation-mb</name>
            <value>512</value>
        </property>
        <property>
            <name>yarn.scheduler.maximum-allocation-mb</name>
            <value>4096</value>
    	</property>
        
    	<!-- yarn容器允许管理的物理内存大小 -->
        <property>
            <name>yarn.nodemanager.resource.memory-mb</name>
            <value>4096</value>
    	</property>
        
    	<!-- 关闭yarn对物理内存和虚拟内存的限制检查 -->
        <property>
            <name>yarn.nodemanager.pmem-check-enabled</name>
            <value>false</value>
        </property>
        <property>
            <name>yarn.nodemanager.vmem-check-enabled</name>
            <value>false</value>
        </property>
    </configuration>
    
    <!-- 4.配置mapred-site.xml -->
    <configuration>
    	<!-- 指定MapReduce程序运行在Yarn上 -->
        <property>
            <name>mapreduce.framework.name</name>
            <value>yarn</value>
        </property>
    </configuration>
    

    然后,在集群上分发配置好的配置文件:

    xsync /opt/module/hadoop-3.1.3/etc/hadoop/
    
  2. 启动集群

    1)配置works :

    vim /opt/module/hadoop-3.1.3/etc/hadoop/workers
    #新增内容
    hadoop102
    hadoop103
    hadoop104
    xsync /opt/module/hadoop-3.1.3/etc/  #同步置集群其他机器
    

    2)启动步骤

    #1.第一次启动,需要在hadoop102节点上格式化 NameNode 
    #注意:格式化NameNode会产生新的集群Id,会导致NameNode和DataNode的集群id不一致而报错
    #解决:暂停namenode和datanode进程,删除集群所有的data和logs目录,再重新格式化
    hdfs namenode -format
    #2.启动HDFS (hadoop102上)
    sbin/start-dfs.sh
    #3.启动YARN (在ResourceManager的节点上启动-hadoop103)
    sbin/start-yarn.sh
    #4.查看浏览器是否能连接
    http://hadoop102:9870 #HDFS上存储的数据信息 (HDFS-NameNode)
    http://hadoop103:8088 #YARN上运行的Job信息 (YARN-ResourceManager)
    

    3)简单使用

    hadoop fs -mkdir /input #hdfs上创建目录
    #上传小文件
    hadoop fs -put $HADOOP_HOME/wcinput/word.txt /input
    #查看上传后文件存放位置
    /opt/module/hadoop-1.3.3/data/dfs/data/current/BP.../current/finalized/...
    #从hdfs上下载
    hadoop fs -get /jdk-.tar.gz ./
    #执行wordcount程序 
    hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /inout /output
    

    4)配置历史服务器

    <!-- mapred-site.xml 添加配置 -->
    <!-- 历史服务器端地址 -->
    <property>
        <name>mapreduce.jobhistory.address</name>
        <value>hadoop102:10020</value>
    </property>
    <!-- 历史服务器web端地址 -->
    <property>
        <name>mapreduce.jobhistory.webapp.address</name>
        <value>hadoop102:19888</value>
    </property>
    
    #分发配置到集群其他机器
    xsync $HADOOP_HOME/etc/hadoop/mapred-site.xml
    #hadoop102上启动历史服务器
    mapred --daemon start historyserver
    #查看历史服务器是否启动
    jps
    http://hadoop102:19888/jobhistory
    

    5)配置日志的聚集

    开启日志聚集功能,需要重新启动NodeManager、ResourceManager、HistoryServer

    <!-- 配置yarn-site.xml-->
    <!-- 开启日志聚集功能 -->
    <property>
        <name>yarn.log-aggregation-enable</name>
        <value>true</value>
    </property>
    <!-- 设置日志聚集服务器地址 -->
    <property>  
        <name>yarn.log.server.url</name>  
        <value>http://hadoop102:19888/jobhistory/logs</value>
    </property>
    <!-- 设置日志保留时间为7天 -->
    <property>
        <name>yarn.log-aggregation.retain-seconds</name>
        <value>604800</value>
    </property>
    
    #分发配置到集群其他机器
    xsync $HADOOP_HOME/etc/hadoop/yarn-site.xml
    #关闭 NodeManager、ResourceManager(YARN-hadoop103) / HistoryServer(hadoop102)
    sbin/stop-yarn.sh
    mapred --daemon stop historyserver
    #启动 YARN(hadoop103) / HistoryServer(hadoop102)
    start-yarn.sh
    mapred --daemon start historyserver
    #测试删除hdfs上的文件
    hadoop fs -rm -r /output
    #查看日志
    http://hadoop102:19888/jobhistory
    
4.2.3 集群脚本
  1. 集群 启动/停止 命令总结

    #HDFS
    start-dfs.sh  / stop-dfs.sh
    #YARN
    start-yarn.sh / stop-yarn.sh
    #HDFS各个组件
    hdfs --deamon start/stop namenode/datanode/secondarynamenode
    #YARN各个组件
    hdfs --deamon start/stop resourcemanager/nodemanager
    
  2. 编写启动脚本

    #!/bin/bash
    if [ $# -lt 1 ]
    then
        echo "No Args Input..."
        exit ;
    fi
    case $1 in
    "start")
    	echo " =================== 启动 hadoop集群 ==================="
        echo " --------------- 启动 hdfs ---------------"
        ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/start-dfs.sh"
        echo " --------------- 启动 yarn ---------------"
        ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"
        echo " --------------- 启动 historyserver ---------------"
        ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon start historyserver"
    ;;
    "stop")
        echo " =================== 关闭 hadoop集群 ==================="
        echo " --------------- 关闭 historyserver ---------------"
        ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon stop historyserver"
        echo " --------------- 关闭 yarn ---------------"
        ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"
        echo " --------------- 关闭 hdfs ---------------"
        ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh"
    ;;
    *)
        echo "Input Args Error..."
    ;;
    esac
    
4.2.4 端口号总结
端口名称 Hadoop2.x Hadoop3.x
NameNode内部通信(集群内部) 8020/9000 8020/9000/9800
NameNode(http外部访问) 50070 9870
MapReduce 查看执行任务端口 8088 8088
历史服务器通信端口 19888 19888
4.2.5 集群时间同步 (了解)
  1. 服务器在公网环境,可以不采用集群时间同步,因为服务器会定时和公网时间进行校准
  2. 如果服务器在内网环境,必须配置集群时间同步,因为时间久了,会产生时间偏差,导致集群执行任务不同步
  3. 如何同步:选择集群中一台机器作为时间服务器,其他所有机器与时间服务器定时同步时间(根据任务进行周期同步)
  4. 步骤
#时间服务器配置(必须root用户下)
#查看ntpd服务状态和开机自启命令
sudo systemctl status ntpd
sudo systemctl start ntpd
sudo systemctl is-enabled ntpd
#第一步 修改ntp.conf配置文件
sudo vim /etc/ntp.conf 
#修改1 授权某些网段内的所有服务器均能从该机器查询和同步时间 (取消下面1行代码的注释)
restrict 192.168.10.0 mask 255.255.255.0 nomodify notrap
#修改2 集群在局域网中,不使用其他互联网上的时间 (注释下面4行代码)
#server 0.centos.pool.ntp.org iburst
#server 1.centos.pool.ntp.org iburst
#server 2.centos.pool.ntp.org iburst
#server 3.centos.pool.ntp.org iburst
#修改3 当该服务器丢失网络连接,依然可以采用本地时间作为时间服务器为其他机器提供时间同步(添加下面两行代码)
server 127.127.1.0
fudge 127.127.1.0 stratum 10

#第二步 修改时间服务器的ntps文件
sudo vim /etc/sysconfig/ntpd
SYNC_HWCLOCK=yes #添加该行代码:让硬件时间和系统时间一起同步
sudo systemctl start ntpd #重启ntpd服务
sudo systemctl enable ntpd #设置ntpd服务开机自启

#第三步 配置其他服务器
sudo systemctl stop ntpd
sudo systemctl disable ntpd  #关闭所有节点的ntp服务和自启动
sudo crontab -e

5.HDFS详解

5.1 HDFS概述
  1. 背景:随着数据量越来越大,一台机器存储不下所有数据,存储到多台机器上却不方便管理和维护,因此需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统,HDFS只是分布式文件管理系统中的一种

  2. 定义:一个文件系统,用于存储文件,通过目录树来定位文件;其次,它是分布式的,很多服务器联合实现功能,且集群中的服务器各有各的角色

  3. 使用场景:一次写入,多次读出,不支持文件的修改,适合用来做数据分析,不适合网盘应用

  4. 优点:1)高容错性 2)适合处理大数据 3)可构建在廉价机器上

  5. 缺点:1)不适合低延时数据访问 2)无法高效对大量小文件进行存储 3)不支持并发写入和随机修改

  6. HDFS组成架构
    1. NameNode(nm) : Master,是管理者;

      1)管理HDFS的名称空间 2)配置副本策略

      3)管理数据块(Block)映射信息 4)处理客服端请求

    2. DataNode : 就是Slave,NameNode下达命令,DataNode执行命令;1)存储实际的数据块 2)执行数据块 读/写操作

    3. Client : 客户端;1)文件切分,将文件切分成一个个Block,然后上传; 2)与NameNode交互,获取文件实际位置信息(DataNode中); 3)与DataNode交互,读取/写入数据; 4)提供一些命令管理HDFS(如NameNode格式化等) ; 5)通过一些命令访问HDFS(增删改查操作)

    4. Secondary NameNode(2nn) : 当NameNode挂掉时,不会马上替换NameNode并提供服务;1)辅助NN,定期合并Fsimages 和Edits,并推动给NN; 2)紧急情况下,可辅助恢复NN

    5. HDFS的文件块(Block)-------------面试

      1. HDFS的文件在物理上是分块存储(Block),由 dfs.blocksize 配置决定,默认为128MB(hadoop2.x)

      2. Block不能太大,也不能太小?(和磁盘传输速率(单位s)基本一致最好)

        1)设置太小,会增加寻址时间

        2) 设置太大,传输时间会明显大于定位时间,导致处理是会变慢

        3) HDFS块的大小设置取决于磁盘传输速率

5.2 HDFS的Shell操作(开发)

Shell命令合集

#基本语法
hadoop fs   OR   hdfs dfs
hadoop fs #展示所有命令
hadoop fs -help rm #展示 rm 该命令的参数
##上传
# -moveFromLocal 从本地剪切到HDFS
touch test1.txt  #本地目录创建文件
hadoop fs -moveFromLocal ./text1.txt /sanguo/shuguo # 本地文件 Hdfs目录
# -copyFromLocal 从本地复制到HDFS
hadoop fs -copyFromLocal ./text2.txt /sanguo/shuguo
# -appendToFile 追加一个文件到已存在文件的末尾
touch test3.txt #在文件中输入些许内容
hadoop fs -appendToFile ./test3.txt /sanguo/shuguo.test2.txt
# -put 等同于copyFromLocal,复制文件到HDFS
##下载
# -copyToLocal 从HDFS中拷贝到本地
hadoop fs -copyToLocal /sanguo/shuguo/text1.txt ./dir # HDFS文件 本地目录
# -get 等同于-copyToLocal
hadoop fs -get /sanguo/shuguo/text2.txt ./dir
# -getmerge 合并下载多个文件
hadoop fs -getmerge /sanguo/shuguo/* ./merge.txt # HDFS目录 本地文件
##HDFS的直接操作
hadoop fs -ls / #显示目录信息
hadoop fs mkdir #在HDFS上创建目录
hadoop fs -cat /sanguo/shuguo/test1.txt #查看文件
hadoop fs -cp /sanguo/shuugo/test2.txt /textn2.txt
hadoop fs -mv /sanguo/shuguo/text3.txt /
hadoop fs -tail /text3.txt #显示一个文件末尾1KB的数据
hadoop fs -rm /textn2.txt
hadoop fs -rmdir /sanguo/shuguo
hadoop fs -du -s -h /text3.txt #统计文件夹大小信息(***后续查询更多***)
hadoop fs -setrep 10 /test3.txt #设置文件副本数
#记录在NameNode元数据中,若集群只有3台机器,则只有3副本,若集群增加到10台,则副本数更新到10
5.3 HDFS的客户端操作
  1. Windows下安装Hadoop开发环境

    1)下载安装包 2)配置环境变量 3)创建maven项目,导入依赖 4)写代码

  2. 日后更新

5.4 HDFS的读写流程(面试)
写数据流程

image-20220323123938653

解释说明:
1. 客户端通过 Distributed FileSystem 模块向NameNode请求上传文件(/user/atguigu/ss.avi),NameNode检查目标文件是否存在,父目录是否存在
2.NameNode响应,向客户端回复信息
3.客户端向NameNode请求第一个Block发送哪里(哪一个DataNode)
4.NameNode返回第一个Block存储的节点(包括多个副本节点),dn1,dn2,dn3
5.客户端通过 FSDataOutputStream 模块请求dn1上传数据,dn1收到请求会请求dn2,dn2会请求dn3,最后通信管道建立完成
6.通信管道建立后,逐级应答客户端;dn3应答dn2,dn2应答dn1,dn1应答客户端
7.客户端向dn1上传第一个Block(先从磁盘读取数据放到本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2收到后就会传给dn3,dn1每传一个packet,会放一个应答队列等待应答(逐级应答)
8.当第一个Block传输完成后,客户端会再次请求NameNode上传下一个Block(步骤3-7)
  1. 网络拓扑-节点计算
    1. HDFS写数据过程中,NameNode会选择距离上传数据最近距离的DataNode接受数据
    2. 节点距离:两个节点到达最近的共同祖先的距离总和
  2. 机架感知(副本节点选择)

    image-20220323130005428

读数据流程

image-20220323130038617

解释说明:
1,2.客户端向 NameNode 请求下载文件,NameNode通过查询元数据,找到文件所在块的DataNode地址,返回给客户端
3.挑选一台DataNode服务器(就近原则,然后随机),请求读取数据
4.DataNode开始向客户端传送数据(从磁盘读取数据,以Packet为单位来做校验)
5.客户端以Packet为单位接收,先在本地缓存,后再写入目标文件
5.5 NN和2NN的工作机制
5.6 DataNode的工作机制

6. MapReduce详解

6.1 MapReduce概述
  1. 定义

    MapReduce 是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上

  2. 优点

    易于编程;有良好的扩展性;高容错性;适合PB级以上的海量数据的离线处理

  3. 缺点

    不擅长实时计算;不擅长流式计算;不擅长有向无环图(DAG)计算

  4. 核心思想

    一个完整的MapReduce程序在分布式运行时有三个实例进程:
    1.MrAppMaster:负责整个程序的过程调度以及状态协调
    2.MapTask:负责Map阶段的整个数据处理流程
    3.ReduceTask:负责Reduce阶段的整个数据处理流程
    
    一个MapReduce程序中Map阶段和Reduce阶段都只有一个,如果逻辑复杂,有多个MapReduce程序,则会串行运行
    一个Map阶段中,能并发运行MapTask,互不干扰
    一个Reduce阶段,也能并发ReduceTask,互不干扰,但是数据依赖于Map阶段所有MapTask并发实例的输出
    
6.2 MapReduce编程规范
6.2.1 常用数据序列化类型

image-20220323132501660

数据序列化类型:
BooleanWritable		ByteWritable
IntWritable		LongWritable
FloatWritable	DoubleWritable 
Text	MapWritable		ArrayWritable	NullWritable
6.2.2 WordCount案例实操
用户编写的程序分为三个部分:Mapper、Reducer、Driver
Mapper阶段
1) 用户自定义Mapper要继承提供的父类
2) Mapper的输入数据/输出数据都是KV对的形式(K、V的类型可自定义)
3) Mapper中的业务逻辑写在map()方法中,每个<K,v>都会调用一次map()方法
Reducer阶段
1) 用户自定义Reducer要继承提供的父类
2) Reducer的输入类型对应Mapper的输出类型,也是KV对
3) Reducer的业务逻辑写在reduce()方法中,每一组相同k的<K,V>组会调用一次reduce()方法
Driver阶段
相当于YARN集群的客户端,用于提交整个程序到YARN集群,提交的是封装了MapReducer程序相关运行参数的job对象
//准备阶段 添加依赖(hadoop-client)
//Mapper类
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	Text k = new Text();
	IntWritable v = new IntWritable(1);
	@Override
	protected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {
        // 1 获取一行
		String line = value.toString();
		// 2 切割
		String[] words = line.split(" ");
		// 3 输出
		for (String word : words) {
			k.set(word);
			context.write(k, v);
		}
	}
}
//Reducer类
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
	int sum;
	IntWritable v = new IntWritable();
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {	
		// 1 累加求和
		sum = 0;
		for (IntWritable count : values) {
			sum += count.get();
		}
		// 2 输出
         v.set(sum);
		context.write(key,v);
	}
}
//Driver驱动类
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordcountDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// 1 获取配置信息以及获取job对象
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);
		// 2 关联本Driver程序的jar
		job.setJarByClass(WordcountDriver.class);
		// 3 关联Mapper和Reducer的jar
		job.setMapperClass(WordcountMapper.class);
		job.setReducerClass(WordcountReducer.class);
		// 4 设置Mapper输出的kv类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		// 5 设置最终输出kv类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		// 6 设置输入和输出路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		// 7 提交job
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}
#启动集群,运行程序
sbin/start-dfs.sh #hadoop102上
sbin/start-yarn.sh #hadoop103上
hadoop jar wc.jar com.test.WordCountDriver /test/input /test/output
6.2.3 Hadoop序列化
6.3 MapReduce框架原理
6.4 MapReduce开发总结
1.输入数据接口:InputFormat
	1) 默认使用的实现类是:TextInputFormat
	2) TextInputFormat的功能逻辑是:一次读一行文本;然后将该行的起始偏移量作为key;行内容作为value返回。
	3) CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。
2.逻辑处理接口:Mapper 
	用户根据业务需求实现其中三个方法:map() setup() cleanup () 
3.Partitioner 分区
	1) 有默认实现 HashPartitioner,逻辑是根据 key 的哈希值和 numReduces 来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces
	2) 如果业务上有特别的需求,可以自定义分区。
4.Comparable 排序
	1) 当我们用自定义的对象作为 key 来输出时,就必须要实现 WritableComparable 接
口,重写其中的 compareTo()方法。
	2) 部分排序:对最终输出的每一个文件进行内部排序。
	3) 全排序:对所有数据进行排序,通常只有一个 Reduce。 (4)二次排序:排序的条件有两个。
5.Combiner 合并
	Combiner 合并可以提高程序执行效率,减少 IO 传输。但是使用时必须不能影响原有的业务处理结果。
6.逻辑处理接口:Reducer
	用户根据业务需求实现其中三个方法:reduce() setup() cleanup () 
7.输出数据接口:OutputFormat
	1) 默认实现类是 TextOutputFormat,功能逻辑是:将每一个 KV 对,向目标文本文件输出一行。
	2) 用户还可以自定义 OutputFormat

7.YARN详解

8.优化

二、 Zookeeper

1. Zookeeper入门

1.1 概述

Zookeeper是一个开源的分布式的,为分布式应用提供协调服务的Apache项目。基于观察者模式,接受观察者的注册,一旦数据发生变化,Zookeeper就会通知已经注册的观察者。

Zookeeper = 文件系统 + 通知机制

1.2 特点
1. Zookeeper:一个Leader(领导者),多个Follower(跟随者)组成的集群

2. 集群存活一半以上的节点,Zookeeper就能运行
3. 全局数据一致,每个Server保存一份相同的数据副本
4. 更新请求顺序进行,来自同一个Client的更新请求,会该Client的按照发送顺序,依次执行
5. 数据更新原子性,一次数据更新,要么成功,要么失败
6. 实时性,在一定时间范围内,Client能读到最新数据
1.3 数据结构

Zookeeper的数据模型的结构是树形结构,每一个节点称做一个ZNode,每个ZNode默认存储1MB的数据,每个ZNode都可以通过其路径唯一表示

image-20220319151654748

1.4 应用场景

统一命名服务、统一配置管理、统一集群管理、服务器动态上下线、软负载均衡等

图示:

image-20220319152424419

image-20220319152137837

image-20220319152320366

1.5 Zookeeper安装
  1. 集群中的每一台服务器都需要安装 Zookeeper

  2. 需要安装JKD

  3. 正式安装

    #解压到指定目录
    tar -zxvf zookeeper-3.5.7.tar.gz -C /opt/module/
    #修改配置
    mv zoo_sample.cfg zoo.cfg
    mkdir zkData
    vim zoo.cfg
    dataDir=/opt/module/zookeeper-3.7.5/zkData #在zoo.cfg中修改该行
    #操作Zookeeper
    bin/zkServer.sh start #启动Zookeeper
    jps #查看进程状态
    bin/zkServer.sh status #查看状态
    bin/zkCli.sh #启动客户端
    quit #退出客户端
    bin/zkServer.sh stop #停止Zookeeper
    
    
  4. 配置参数解读

    #Zookeeper中配置文件zoo.cfg 中参数含义
    tickTime=2000 #单位ms,通信心跳数,Zookeeper服务器与客户端心跳时间
    initLimit=10 #LF初始通信时限,Follower与Leader初始连接时,允许的最多心跳数
    syncLimit=5 #Leader与Follower之间最大响应时间单位
    dataDir=/dir #数据文件目录 + 数据持久化路径 ;用于保存Zookeeper中的数据
    clientPort=2181 #客户端连接端口
    

2. Zookeeper实战

2.1 分布式安装部署
启动时Linux命令合集
#解压zookeeper安装包
tar -zxvf zookeeper-3.5.7.tar.gz -C /opt/module
xsync zookeeper-3.5.7.tar.gz/ #同步到集群的其他服务器
#配置服务器编号
mkdir -p zkData
touch zkData/myid
vim myid#hadoop102,103,104 的myid文件填写不同myid(如:2,3,4)
#修改zoo.cfg
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg
dataDir=/opt/module/zookeeper-3.5.7/zkData #修改该行
server.A=B:C:D #增加该行,集群多少台机器,就添加多少行
A:一个数字,也就是myid的值
B:服务器的地址,host(ip)
C:Follower与Leader交换信息的端口;2888
D:执行选举新Leader时服务器互相通信的端口;3888
#分发配置到集群的其他机器 xsync
bin/zkServer.sh start #启动集群;每台服务器均需输入该命令启动,可写脚本
客户端命令行操作
bin/zkCli.sh #启动客户端
#以下是启动客户端后,操作zookeeper的命令列表
help #显示所有操作命令
ls path #查看当前路径节点的子节点,-w:监听子节点变化,-s:附加次级信息(详细信息)
create #普通创建 -s:含有序列 -e:临时节点(重启或者超时就小时)
get path #获取节点的值 -w:监听节点的变化 -s:附加次级信息
set xx #设置节点的具体值
stat #查看节点的状态
delete #删除节点
deleteall #递归删除节点

#具体操作
ls / #[zookeeper]
create /sanguo "diaochan"
create /sanguo/shuguo "loubei"
get /sanguo # diaochan
get -s /sanguo/shuguo # liubei ……
create -e /sanguo/wuguo "zhouyu"
ls /sanguo # [wuguo,shuguo]  (重启后,wuguo会被删除)
##创建带序号的节点,序号从0开始递增
create /sanguo/w "c" # 创建成功
create /sanguo/w "c" # 创建失败 Node already exists:/sanguo/w
create -s /sanguo/w "c" # 创建成功 /sanguo/0000000000
create -s /sanguo/w "c" # 创建成功 /sanguo/0000000001
create -s /sanguo/w "c" # 创建成功 /sanguo/0000000002
##修改节点数据值
set /sanguo/w "newc"

##节点的值变化监听
get -w /sanguo  #hadoop104键入该命令,监听/sanguo
set /sanguo "xishi" #hadoop103上修改/sanguo的值
#hadoop104主机会收到数据变化的监听
##节点的子节点变化监听(路径变化)
ls -w /sanguo #hadoop104主机注册监听/sanguo
create /sanguo/jin "simayi" #hadoop103上创建子节点
#hadoop104主机会收到子节点变化的监听

delete /sanguo/jin #删除节点
deleteall /sanguo #递归删除节点
stat /sanguo #查看节点状态
API应用(IDEA环境)

Loading

3. Zookeeper原理

三、Hadoop源码编译

全部评论

相关推荐

评论
点赞
1
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务