HDFS架构原理
HDFS架构原理
HDFS架构剖析
HDFS整体概述
HDFS是Hadoop Distribute File System 的简称,意为:Hadoop分布式文件系统。是Hadoop核心组件之一,作为大数据生态圈最底层的分布式存储服务而存在。HDFS解决的问题就是大数据如何存储,它是横跨在多台计算机上的文件存储系统并且具有高度的容错能力。
HDFS集群遵循主从架构。每个集群包括一个主节点和多个从节点。在内部,文件分为一个或多个块,每个块根据复制因子存储在不同的从节点计算机上。主节点存储和管理文件系统名称空间,即有关文件块的信息,例如块位置,权限等。从节点存储文件的数据块。主从各司其职,互相配合,共同对外提供分布式文件存储服务。当然内部细节对于用户来说是透明的。
角色介绍
概述
HDFS遵循主从架构。每个群集包括一个主节点和多个从节点。其中:
NameNode是主节点,负责存储和管理文件系统元数据信息,包括namespace目录结构、文件块位置信息等;DataNode是从节点,负责存储文件具体的数据块。
两种角色各司其职,共同协调完成分布式的文件存储服务。
SecondaryNameNode是主角色的辅助角色,帮助主角色进行元数据的合并。
Namenode
NameNode是Hadoop分布式文件系统的核心,架构中的主角色。它维护和管理文件系统元数据,包括名称空间目录树结构、文件和块的位置信息、访问权限等信息。基于此,NameNode成为了访问HDFS的唯一入口。
内部通过内存和磁盘两种方式管理元数据。其中磁盘上的元数据文件包括Fsimage内存元数据镜像文件和edits log(Journal)编辑日志。
在Hadoop2之前,NameNode是单点故障。Hadoop 2中引入的高可用性。Hadoop集群体系结构允许在集群中以热备配置运行两个或多个NameNode。
Datanode
DataNode是Hadoop HDFS中的从角色,负责具体的数据块存储。DataNode的数量决定了HDFS集群的整体数据存储能力。通过和NameNode配合维护着数据块。
Secondarynamenode
除了DataNode和NameNode之外,还有另一个守护进程,它称为secondary NameNode。充当NameNode的辅助节点,但不能替代NameNode。
当NameNode启动时,NameNode合并Fsimage和edits log文件以还原当前文件系统名称空间。如果edits log过大不利于加载,Secondary NameNode就辅助NameNode从NameNode下载Fsimage文件和edits log文件进行合并。
HDFS重要特性
主从架构
HDFS采用master/slave架构。一般一个HDFS集群是有一个Namenode和一定数目的Datanode组成。Namenode是HDFS主节点,Datanode是HDFS从节点,两种角色各司其职,共同协调完成分布式的文件存储服务。
分块机制
HDFS中的文件在物理上是分块存储(block)的,块的大小可以通过配置参数来规定,参数位于hdfs-default.xml中:dfs.blocksize。默认大小是128M(134217728)。
副本机制
为了容错,文件的所有block都会有副本。每个文件的block大小(dfs.blocksize)和副本系数(dfs.replication)都是可配置的。应用程序可以指定某个文件的副本数目。副本系数可以在文件创建的时候指定,也可以在之后通过命令改变。
默认dfs.replication的值是3,也就是会额外再复制2份,连同本身总共3份副本。
Namespace
HDFS支持传统的层次型文件组织结构。用户可以创建目录,然后将文件保存在这些目录里。文件系统名字空间的层次结构和大多数现有的文件系统类似:用户可以创建、删除、移动或重命名文件。
Namenode负责维护文件系统的namespace名称空间,任何对文件系统名称空间或属性的修改都将被Namenode记录下来。
HDFS会给客户端提供一个统一的抽象目录树,客户端通过路径来访问文件,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.data
。
元数据管理
在HDFS中,Namenode管理的元数据具有两种类型:
- 文件自身属性信息
文件名称、权限,修改时间,文件大小,复制因子,数据块大小。
- 文件块位置映射信息
记录文件块和DataNode之间的映射信息,即哪个块位于哪个节点上。
数据块存储
文件的各个block的具体存储管理由DataNode节点承担。每一个block都可以在多个DataNode上存储。
HDFS Web Interfaces
Web Interfaces介绍
除了命令行界面之外,Hadoop还为HDFS提供了Web用户界面。用户可以通过Web界面操作文件系统并且获取和HDFS相关的状态属性信息。
HDFS Web地址是http://nn_host:port/,默认端口号9870。
模块功能解读
Overview
Overview是总揽模块,默认的主页面。展示了HDFS一些最核心的信息。
Summary
NameNode Journal Status
NameNode Storage
DFS Storage Types
Datanodes
Datanodes模块主要记录了HDFS集群中各个DataNode的相关状态信息。
Datanode Volume Failures
此模块记录了DataNode卷故障信息。
Snapshot
Snapshot模块记录HDFS文件系统的快照相关信息,包括哪些文件夹创建了快照和总共有哪些快照。
Satartup progress
Startup Progress模块记录了HDFS集群启动的过程信息,执行步骤和每一步所做的事和用时。
Utilities
Utilities模块算是用户使用最多的模块了,里面包括了文件浏览、日志查看、配置信息查看等核心功能。
Browse the file system
该模块可以说是我们在开发使用HDFS过程中使用最多的模块了,提供了一种Web页面浏览操作文件系统的能力,在某些场合下,比使用命令操作更加直观方便。
Logs、Log Level
Configruation
该模块可以列出当前集群成功加载的所谓配置文件属性,可以从这里来进行判断用户所设置的参数属性是否成功加载生效,如果此处没有,需要检查配置文件或者重启集群加载。
HDFS读写流程
因为namenode维护管理了文件系统的元数据信息,这就造成了不管是读还是写数据都是基于NameNode开始的,也就是说NameNode成为了HDFS访问的唯一入口。入口地址是:http://nn_host:8020。
写数据流程
Pipeline管道、ACK应答响应
Pipeline,中文翻译为管道。这是HDFS在上传文件写数据过程中采用的一种数据传输方式。客户端将数据块写入第一个数据节点,第一个数据节点保存数据之后再将块复制到第二个数据节点,后者保存后将其复制到第三个数据节点。通俗描述pipeline的过程就是:Client->A->B->C。
为什么datanode之间采用pipeline线性传输,而不是一次给三个datanode拓扑式传输呢?因为数据以管道的方式,顺序的沿着一个方向传输,这样能够充分利用每个机器的带宽,避免网络瓶颈和高延迟时的连接,最小化推送所有数据的延时。在线性推送模式下,每台机器所有的出口宽带都用于以最快的速度传输数据,而不是在多个接受者之间分配宽带。
ACK (Acknowledge character)即是确认字符,在数据通信中,接收方发给发送方的一种传输类控制字符。表示发来的数据已确认接收无误。在pipeline管道传输数据的过程中,传输的反方向会进行ACK校验,确保数据传输安全。
具体流程
- HDFS客户端通过对DistributedFileSystem 对象调用create()请求创建文件。
- DistributedFileSystem对namenode进行RPC调用,请求上传文件。namenode执行各种检查判断:目标文件是否存在、父目录是否存在、客户端是否具有创建该文件的权限。检查通过,namenode就会为创建新文件记录一条记录。否则,文件创建失败并向客户端抛出一个IOException。
- DistributedFileSystem为客户端返回FSDataOutputStream输出流对象。由此客户端可以开始写入数据。FSDataOutputStream是一个包装类,所包装的是DFSOutputStream。
- 在客户端写入数据时,DFSOutputStream将它分成一个个数据包(packet 默认64kb),并写入一个称之为数据队列(data queue)的内部队列。DFSOutputStream有一个内部类做DataStreamer,用于请求NameNode挑选出适合存储数据副本的一组DataNode。这一组DataNode采用pipeline机制做数据的发送。默认是3副本存储。
- DataStreamer将数据包流式传输到pipeline的第一个datanode,该DataNode存储数据包并将它发送到pipeline的第二个DataNode。同样,第二个DataNode存储数据包并且发送给第三个(也是最后一个)DataNode。
- DFSOutputStream也维护着一个内部数据包队列来等待DataNode的收到确认回执,称之为确认队列(ack queue),收到pipeline中所有DataNode确认信息后,该数据包才会从确认队列删除。
- 客户端完成数据写入后,将在流上调用close()方法关闭。该操作将剩余的所有数据包写入DataNode pipeline,并在联系到NameNode告知其文件写入完成之前,等待确认。
- 因为namenode已经知道文件由哪些块组成(DataStream请求分配数据块),因此它仅需等待最小复制块即可成功返回。
- 数据块最小复制是由参数
dfs.namenode.replication.min
指定,默认是1.
默认3副本存储策略
默认副本存储策略是由BlockPlacementPolicyDefault指定。策略如下:
第一块副本:优先客户端本地,否则随机
第二块副本:不同于第一块副本的不同机架。
第三块副本:第二块副本相同机架不同机器。
读数据流程
具体流程
- 客户端通过调用DistributedFileSystem对象上的open()来打开希望读取的文件。
- DistributedFileSystem使用RPC调用namenode来确定文件中前几个块的块位置。对于每个块,namenode返回具有该块副本的datanode的地址,并且datanode根据块与客户端的距离进行排序。注意此距离指的是网络拓扑中的距离。比如客户端的本身就是一个DataNode,那么从本地读取数据明显比跨网络读取数据效率要高。
- DistributedFileSystem将FSDataInputStream(支持文件seek定位读的输入流)返回到客户端以供其读取数据。FSDataInputStream类转而封装为DFSInputStream类,DFSInputStream管理着datanode和namenode之间的IO。
- 客户端在流上调用read()方法。然后,已存储着文件前几个块DataNode地址的DFSInputStream随即连接到文件中第一个块的最近的DataNode节点。通过对数据流反复调用read()方法,可以将数据从DataNode传输到客户端。
- 当该块快要读取结束时,DFSInputStream将关闭与该DataNode的连接,然后寻找下一个块的最佳datanode。这些操作对用户来说是透明的。所以用户感觉起来它一直在读取一个连续的流。
- 客户端从流中读取数据时,块是按照打开DFSInputStream与DataNode新建连接的顺序读取的。它也会根据需要询问NameNode来检索下一批数据块的DataNode位置信息。一旦客户端完成读取,就对FSDataInputStream调用close()方法。
- 如果DFSInputStream与DataNode通信时遇到错误,它将尝试该块的下一个最接近的DataNode读取数据。并将记住发生故障的DataNode,保证以后不会反复读取该DataNode后续的块。此外,DFSInputStream也会通过校验和(checksum)确认从DataNode发来的数据是否完整。如果发现有损坏的块,DFSInputStream会尝试从其他DataNode读取该块的副本,也会将被损坏的块报告给namenode 。
角色职责概述
Namenode职责
a、 NameNode是HDFS的核心,集群的主角色,被称为Master。
b、 NameNode仅存储管理HDFS的元数据:文件系统namespace操作维护目录树,文件和块的位置信息。
c、 NameNode不存储实际数据或数据集。数据本身实际存储在DataNodes中。
d、 NameNode知道HDFS中任何给定文件的块列表及其位置。使用此信息NameNode知道如何从块中构建文件。
e、 NameNode并不持久化存储每个文件中各个块所在的DataNode的位置信息,这些信息会在系统启动时从DataNode汇报中重建。
f、 NameNode对于HDFS至关重要,当NameNode关闭时,HDFS / Hadoop集群无法访问。
g、 NameNode是Hadoop集群中的单点故障。
h、 NameNode所在机器通常会配置有大量内存(RAM)。
Datanode职责
a、 DataNode负责将实际数据存储在HDFS中。是集群的从角色,被称为Slave。
b、 DataNode启动时,它将自己发布到NameNode并汇报自己负责持有的块列表。
c、 根据NameNode的指令,执行块的创建、复制、删除操作。
d、 DataNode会定期(dfs.heartbeat.interval配置项配置,默认是3秒)向NameNode发送心跳,如果NameNode长时间没有接受到DataNode发送的心跳, NameNode就会认为该DataNode失效。
e、 DataNode会定期向NameNode进行自己持有的数据块信息汇报,汇报时间间隔取参数dfs.blockreport.intervalMsec,参数未配置的话默认为6小时.
f、 DataNode所在机器通常配置有大量的硬盘空间。因为实际数据存储在DataNode中。
Namenode 元数据管理
元数据是什么
元数据(Metadata),又称中介数据,为描述数据的数据(data about data),主要是描述数据属性(property)的信息,用来支持如指示存储位置、历史数据、资源查找、文件记录等功能。
在HDFS中,元数据主要指的是文件相关的元数据,由NameNode管理维护。从广义的角度来说,因为NameNode还需要管理众多DataNode节点,因此DataNode的位置和健康状态信息也属于元数据。
元数据管理概述
在HDFS中,文件相关元数据具有两种类型:
- 文件自身属性信息
文件名称、权限,修改时间,文件大小,复制因子,数据块大小。
- 文件块位置映射信息
记录文件块和DataNode之间的映射信息,即哪个块位于哪个节点上。
按存储形式分为内存元数据和元数据文件两种,分别存在内存和磁盘上。
内存元数据
为了保证用户操作元数据交互高效,延迟低,NameNode把所有的元数据都存储在内存中,我们叫做内存元数据。内存中的元数据是最完整的,包括文件自身属性信息、文件块位置映射信息。
但是内存的致命问题是,断点数据丢失,数据不会持久化。因此NameNode又辅佐了元数据文件来保证元数据的安全完整。
磁盘元数据文件
fsimage 内存镜像文件
是内存元数据的一个持久化的检查点。但是fsimage中仅包含Hadoop文件系统中文件自身属性相关的元数据信息,但不包含文件块位置的信息。文件块位置信息只存储在内存中,是由datanode启动加入集群的时候,向namenode进行数据块的汇报得到的,并且后续间断指定时间进行数据块报告。
持久化的动作是一种数据从内存到磁盘的IO过程。会对namenode正常服务造成一定的影响,不能频繁的进行持久化。
Edits log编辑日志
为了避免两次持久化之间数据丢失的问题,又设计了Edits log编辑日志文件。文件中记录的是HDFS所有更改操作(文件创建,删除或修改)的日志,文件系统客户端执行的更改操作首先会被记录到edits文件中。
加载元数据顺序
fsimage和edits文件都是经过序列化的,在NameNode启动的时候,它会将fsimage文件中的内容加载到内存中,之后再执行edits文件中的各项操作,使得内存中的元数据和实际的同步,存在内存中的元数据支持客户端的读操作,也是最完整的元数据。
当客户端对HDFS中的文件进行新增或者修改操作,操作记录首先被记入edits日志文件中,当客户端操作成功后,相应的元数据会更新到内存元数据中。因为fsimage文件一般都很大(GB级别的很常见),如果所有的更新操作都往fsimage文件中添加,这样会导致系统运行的十分缓慢。
HDFS这种设计实现着手于:一是内存中数据更新、查询快,极大缩短了操作响应时间;二是内存中元数据丢失风险颇高(断电等),因此辅佐元数据镜像文件(fsimage)+编辑日志文件(edits)的备份机制进行确保元数据的安全。
NameNode维护整个文件系统元数据。因此,元数据的准确管理,影响着HDFS提供文件存储服务的能力。
元数据管理相关目录文件
元数据存储目录
在Hadoop的HDFS首次部署好配置文件之后,并不能马上启动使用,而是先要对文件系统进行格式化操作:hdfs namenode -format
在这里要注意两个概念,一个是format之前,HDFS在物理上还不存在;二就是此处的format并不是指传统意义上的本地磁盘格式化,而是一些清除与准备工作。其中就会创建元数据本地存储目录和一些初始化的元数据相关文件。
namenode元数据存储目录由参数:dfs.namenode.name.dir
指定,格式化完成之后,将会在$dfs.namenode.name.dir/current
目录下创建如下的文件:
其中的dfs.namenode.name.dir是在hdfs-site.xml文件中配置的,默认值如下:
dfs.namenode.name.dir属性可以配置多个目录,各个目录存储的文件结构和内容都完全一样,相当于备份,这样做的好处是当其中一个目录损坏了,也不会影响到hadoop的元数据,特别是当其中一个目录是NFS(网络文件系统Network File System,NFS)之上,即使你这台机器损坏了,元数据也得到保存。
元数据相关文件
VERSION
- namespaceID/clusterID/blockpoolID
这些都是HDFS集群的唯一标识符。标识符被用来防止DataNodes意外注册到另一个集群中的namenode上。这些标识在联邦(federation)部署中特别重要。联邦模式下,会有多个NameNode独立工作。每个的NameNode提供唯一的命名空间(namespaceID),并管理一组唯一的文件块池(blockpoolID)。clusterID将整个集群结合在一起作为单个逻辑单元,在集群中的所有节点上都是一样的。
- storageType
说明这个文件存储的是什么进程的数据结构信息
如果是DataNode,storageType=DATA_NODE。
- cTime
NameNode存储系统创建时间,首次格式化文件系统这个属性是0,当文件系统升级之后,该值会更新到升级之后的时间戳;
- layoutVersion
HDFS元数据格式的版本。添加需要更改元数据格式的新功能时,请更改此数字。当前的HDFS软件使用比当前版本更新的布局版本时,需要进行HDFS升级。
seen_txid
包含最后一个checkpoint的最后一个事务ID。这不是NameNode接受的最后一个事务ID。该文件不会在每个事务上更新,而只会在checkpoint或编辑日志记录上更新。该文件的目的是尝试识别edits启动期间是否丢失的文件。如果 edits目录被意外删除,然后自上一个checkpoint以来的所有事务都将消失,NameNode仅从最近的一次fsimage加载启动。为了防止这种情况,NameNode启动还检查 seen_txid以确认它至少可以加载该数目的事务。如果无法验证装入事务,它将中止启动。
fsimage相关
元数据镜像文件。每个fsimage文件还有一个对应的.md5文件,其中包含MD5校验和,HDFS使用该文件来防止磁盘损坏文件异常。
Edits log相关
已完成且不可修改的编辑日志。这些文件中的每个文件都包含文件名定义的范围内的所有编辑日志事务。在HA高可用性部署中,主备namenode之间可以通过edits log进行数据同步。
Fsimage、editslog查看
Fsimage
fsimage文件是Hadoop文件系统元数据的一个永久性的检查点,包含Hadoop文件系统中的所有目录和文件idnode的序列化信息;对于文件来说,包含的信息有修改时间、访问时间、块大小和组成一个文件块信息等;而对于目录来说,包含的信息主要有修改时间、访问控制权限等信息。
oiv是offline image viewer的缩写,用于将fsimage文件的内容转储到指定文件中以便于阅读,该工具还提供了只读的WebHDFS API以允许离线分析和检查hadoop集群的命名空间。
oiv在处理非常大的fsimage文件时是相当快的,如果该工具不能够处理fsimage,它会直接退出。该工具不具备向后兼容性,比如使用hadoop-2.4版本的oiv不能处理hadoop-2.3版本的fsimage,只能使用hadoop-2.3版本的oiv。就像它的名称所提示的(offline),oiv不需要hadoop集群处于运行状态。
命令:hdfs oiv -i fsimage_0000000000000000050 -p XML -o fsimage.xml
edits log
edits log文件存放的是Hadoop文件系统的所有更新操作记录日志,文件系统客户端执行的所有写操作首先会被记录到edits文件中。
NameNode起来之后,HDFS中的更新操作会重新写到edits文件中,因为fsimage文件一般都很大(GB级别的很常见),如果所有的更新操作都往fsimage文件中添加,这样会导致系统运行的十分缓慢,但是如果往edits文件里面写就不会这样,每次执行写操作之后,且在向客户端发送成功代码之前,edits文件都需要同步更新。如果一个文件比较大,使得写操作需要向多台机器进行操作,只有当所有的写操作都执行完成之后,写操作才会返回成功,这样的好处是任何的操作都不会因为机器的故障而导致元数据的不同步。
oev是offline edits viewer(离线edits查看器)的缩写,该工具不需要hadoop集群处于运行状态。
命令:hdfs oev -i edits_0000000000000000011-0000000000000000025 -o edits.xml
在输出文件中,每个RECORD记录了一次操作,示例如下:
SecondaryNamenode
SNN职责概述
NameNode职责是管理元数据信息,DataNode的职责是负责数据具体存储,那么SecondaryNameNode的作用是什么?对很多初学者来说是非常迷惑的。它为什么会出现在HDFS中。从它的名字上看,它给人的感觉就像是NameNode的备份。但它实际上却不是。
当HDFS集群运行一段事件后,就会出现下面一些问题:
- edits logs会变的很大,fsimage将会变得很旧;
- namenode重启会花费很长时间,因为有很多改动要合并到fsimage文件上;
- 如果频繁进行fsimage持久化,又会影响NameNode正常服务,毕竟IO操作是一种内存到磁盘的耗精力操作
因此为了克服这个问题,需要一个易于管理的机制来帮助我们减小edit logs文件的大小和得到一个最新的fsimage文件,这样也会减小在NameNode上的压力。
SecondaryNameNode就是来帮助解决上述问题的,它的职责是合并NameNode的edit logs到fsimage文件中。
SNN checkpoint机制
概述
Checkpoint核心是把fsimage与edits log合并以生成新的fsimage的过程。此过程有两个好处:fsimage版本不断更新不会太旧、edits log文件不会太大。
流程
- 当触发checkpoint操作条件时,SNN发生请求给NN滚动edits log。然后NN会生成一个新的编辑日志文件:edits new,便于记录后续操作记录。
- 同时SNN会将edits文件和fsimage复制到本地(使用HTTP GET方式)。
- SNN首先将fsimage载入到内存,然后一条一条地执行edits文件中的操作,使得内存中的fsimage不断更新,这个过程就是edits和fsimage文件合并。合并结束,SNN将内存中的数据dump生成一个新的fsimage文件。
- SNN将新生成的Fsimage new文件复制到NN节点。
- 至此刚好是一个轮回,等待下一次checkpoint触发SecondaryNameNode进行工作,一直这样循环操作。
触发机制
Checkpoint触发条件受两个参数控制,可以通过core-site.xml进行配置:
dfs.namenode.checkpoint.period=3600 //两次连续的checkpoint之间的时间间隔。默认1小时
dfs.namenode.checkpoint.txns=1000000 //最大没有执行checkpoint事务的数量,满足将强制执行紧急checkpoint,即使尚未达到检查点周期。默认100万事务数量。
从上面的描述我们可以看出,SecondaryNamenode根本就不是Namenode的一个热备,只是将fsimage和edits合并。
Namenode 元数据恢复
NameNode存储多目录
namenode元数据存储目录由参数:dfs.namenode.name.dir
指定。
dfs.namenode.name.dir**属性可以配置多个目录**,各个目录存储的文件结构和内容都完全一样,相当于备份,这样做的好处是当其中一个目录损坏了,也不会影响到hadoop的元数据,特别是当其中一个目录是NFS(网络文件系统Network File System,NFS)之上,即使你这台机器损坏了,元数据也得到保存。
从SecondaryNameNode恢复
SecondaryNameNode在checkpoint的时候会将fsimage和edits log下载到自己的本机上本地存储目录下。并且在checkpoint之后也不会进行删除。
如果NameNode中的fsimage真的出问题了,还是可以用SecondaryNamenode中的fsimage替换一下NameNode上的fsimage,虽然已经不是最新的fsimage,但是我们可以将损失减小到最少!
HDFS 小文件解决方案
Hadoop Archive归档
HDFS并不擅长存储小文件,因为每个文件最少一个block,每个block的元数据都会在NameNode占用内存,如果存在大量的小文件,它们会吃掉NameNode节点的大量内存。如下所示,模拟小文件场景:
[root@node1 ~]# hadoop fs -mkdir /smallfile
[root@node1 ~]# echo 1 >1.txt
[root@node1 ~]# echo 2 >2.txt
[root@node1 ~]# echo 3 >3.txt
[root@node1 ~]# hadoop fs -put 1.txt 2.txt 3.txt /smallfile
Hadoop Archives可以有效的处理以上问题,它可以把多个文件归档成为一个文件,归档成一个文件后还可以透明的访问每一个文件。
创建Archive
Usage: hadoop archive -archiveName name -p <parent> <src>* <dest>
其中-archiveName是指要创建的存档的名称。比如test.har,archive的名字的扩展名应该是*.har。 -p参数指定文件存档文件(src)的相对路径。
举个例子:-p /foo/bar a/b/c e/f/g,这里的/foo/bar是a/b/c与e/f/g的父路径,所以完整路径为/foo/bar/a/b/c与/foo/bar/e/f/g。
例如:如果你只想存档一个目录/smallfile下的所有文件:
hadoop archive -archiveName test.har -p /smallfile /outputdir
这样就会在/outputdir目录下创建一个名为test.har的存档文件。
注意:Archive归档是通过MapReduce程序完成的,需要启动YARN集群。
查看Archive
查看归档之后的样子
首先我们来看下创建好的har文件。使用如下的命令:
hadoop fs -ls /outputdir/test.har
这里可以看到har文件包括:两个索引文件,多个part文件(本例只有一个)以及一个标识成功与否的文件。part文件是多个原文件的集合, 通过index文件可以去找到原文件。
例如上述的三个小文件1.txt 2.txt 3.txt内容分别为1,2,3。进行archive操作之后,三个小文件就归档到test.har里的part-0一个文件里。
查看归档之前的样子
在查看har文件的时候,如果没有指定访问协议,默认使用的就是hdfs://,此时所能看到的就是归档之后的样子。
此外,Archive还提供了自己的har uri访问协议。如果用har uri去访问的话,索引、标识等文件就会隐藏起来,只显示创建档案之前的原文件:
Hadoop Archives的URI是:
har://scheme-hostname:port/archivepath/fileinarchive
scheme-hostname格式为hdfs-域名:端口。
hadoop fs -ls har://hdfs-node1:8020/outputdir/test.har/
hadoop fs -ls har:///outputdir/test.har
hadoop fs -cat har:///outputdir/test.har/1.txt
提取Archive
按顺序解压存档(串行):
hadoop fs -cp har:///user/zoo/foo.har/dir1 hdfs:/user/zoo/newdir
[root@node1 ~]# hadoop fs -mkdir /smallfile1
[root@node1 ~]# hadoop fs -cp har:///outputdir/test.har/* /smallfile1
[root@node1 ~]# hadoop fs -ls /smallfile1
要并行解压存档,请使用DistCp,对应大的归档文件可以提高效率:
hadoop distcp har:///user/zoo/foo.har/dir1 hdfs:/user/zoo/newdir
hadoop distcp har:///outputdir/test.har/* /smallfile2
Archive使用注意事项
Hadoop archives是特殊的档案格式。一个Hadoop archive对应一个文件系统目录。Hadoop archive的扩展名是
.har
;创建archives本质是运行一个Map/Reduce任务,所以应该在Hadoop集群上运行创建档案的命令;
创建archive文件要消耗和原文件一样多的硬盘空间;
archive文件不支持压缩,尽管archive文件看起来像已经被压缩过;
archive文件一旦创建就无法改变,要修改的话,需要创建新的archive文件。事实上,一般不会再对存档后的文件进行修改,因为它们是定期存档的,比如每周或每日;
当创建archive时,源文件不会被更改或删除;
Sequence File
Sequence File介绍
Sequence File是Hadoop API提供的一种二进制文件支持。这种二进制文件直接将<key, value>
键值对序列化到文件中。
Sequence File优缺点
优点
- 二级制格式存储,比文本文件更紧凑。
- 支持不同级别压缩(基于Record或Block压缩)。
- 文件可以拆分和并行处理,适用于MapReduce。
局限性
- 二进制格式文件不方便查看。
- 特定于hadoop,只有Java API可用于与之件进行交互。尚未提供多语言支持。
Sequence File格式
Hadoop Sequence File 是一个由二进制键/值对组成的。根据压缩类型,有3种不同的Sequence File格式:未压缩格式、record压缩格式、block压缩格式。
Sequence File由一个header和一个或多个record组成。以上三种格式均使用相同的header结构,如下所示:
前3个字节为SEQ,表示该文件是序列文件,后跟一个字节表示实际版本号(例如SEQ4或SEQ6)。Header中其他也包括key、value class名字、 压缩细节、metadata、Sync marker。Sync Marker同步标记,用于可以读取任意位置的数据。
未压缩格式
未压缩的Sequence File文件由header、record、sync三个部分组成。其中record包含了4个部分:record length(记录长度)、key length(键长)、key、value。
每隔几个record(100字节左右)就有一个同步标记。
基于record压缩格式
基于record压缩的Sequence File文件由header、record、sync三个部分组成。其中record包含了4个部分:record length(记录长度)、key length(键长)、key、compressed value(被压缩的值)。
每隔几个record(100字节左右)就有一个同步标记。
基于block压缩格式
基于block压缩的Sequence File文件由header、block、sync三个部分组成。
block指的是record block,可以理解为多个record记录组成的块。注意,这个block和HDFS中分块存储的block(128M)是不同的概念。
Block中包括:record条数、压缩的key长度、压缩的keys、压缩的value长度、压缩的values。每隔一个block就有一个同步标记。
block压缩比record压缩提供更好的压缩率。使用Sequence File时,通常首选块压缩。
Sequence File文件读写
开发环境构建
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.4</version>
</dependency>
</dependencies>
SequenceFileWrite
package cn.itcast.hdfs.sequence;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
/**
* @description:
* @author: Allen Woon
* @time: 2020/12/23 15:56
*/
public class SequenceFileWrite {
private static final String[] DATA = {
"One, two, buckle my shoe",
"Three, four, shut the door",
"Five, six, pick up sticks",
"Seven, eight, lay them straight",
"Nine, ten, a big fat hen"
};
public static void main(String[] args) throws Exception {
//设置客户端运行身份 以root去操作访问HDFS
System.setProperty("HADOOP_USER_NAME","root");
//Configuration 用于指定相关参数属性
Configuration conf = new Configuration();
//sequence file key、value
IntWritable key = new IntWritable();
Text value = new Text();
//构造Writer参数属性
SequenceFile.Writer writer = null;
CompressionCodec Codec = new GzipCodec();
SequenceFile.Writer.Option optPath = SequenceFile.Writer.file(new Path("hdfs://node1:8020/seq.out"));
SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(key.getClass());
SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(value.getClass());
SequenceFile.Writer.Option optCom = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD,Codec);
try {
writer = SequenceFile.createWriter( conf, optPath, optKey, optVal, optCom);
for (int i = 0; i < 100; i++) {
key.set(100 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
}
}
运行过程打印输出:
最终输出的文件结果如下:
SequenceFileRead
package cn.itcast.hdfs.sequence;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
/**
* @description:
* @author: Allen Woon
* @time: 2020/12/23 20:27
*/
public class SequenceFileRead {
public static void main(String[] args) throws IOException {
//设置客户端运行身份 以root去操作访问HDFS
System.setProperty("HADOOP_USER_NAME","root");
//Configuration 用于指定相关参数属性
Configuration conf = new Configuration();
SequenceFile.Reader.Option option1 = SequenceFile.Reader.file(new Path("hdfs://node1:8020/seq.out"));
SequenceFile.Reader.Option option2 = SequenceFile.Reader.length(174);//这个参数表示读取的长度
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(conf,option1,option2);
Writable key = (Writable) ReflectionUtils.newInstance(
reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(
reader.getValueClass(), conf);
long position = reader.getPosition();
while (reader.next(key, value)) {
String syncSeen = reader.syncSeen() ? "*" : "";//是否返回了Sync Mark同步标记
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
position = reader.getPosition(); // beginning of next record
}
} finally {
IOUtils.closeStream(reader);
}
}
}
案例:使用Sequence File合并小文件
理论依据
可以使用Sequence File对小文件合并,即将文件名作为key,文件内容作为value序列化到大文件中。例如,假设有10,000个100KB文件,那么我们可以编写一个程序将它们放入单个Sequence File中,如下所示,您可以在其中使用filename作为键,并使用content作为值。
具体实现
package cn.itcast.hdfs.sequence;
/**
* @description:
* @author: Allen Woon
* @time: 2020/12/24 11:21
*/
import java.io.File;
import java.io.FileInputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MergeSmallFilesToSequenceFile {
private Configuration configuration = new Configuration();
private List<String> smallFilePaths = new ArrayList<String>();
//定义方法用来添加小文件的路径
public void addInputPath(String inputPath) throws Exception{
File file = new File(inputPath);
//给定路径是文件夹,则遍历文件夹,将子文件夹中的文件都放入smallFilePaths
//给定路径是文件,则把文件的路径放入smallFilePaths
if(file.isDirectory()){
File[] files = FileUtil.listFiles(file);
for(File sFile:files){
smallFilePaths.add(sFile.getPath());
System.out.println("添加小文件路径:" + sFile.getPath());
}
}else{
smallFilePaths.add(file.getPath());
System.out.println("添加小文件路径:" + file.getPath());
}
}
//把smallFilePaths的小文件遍历读取,然后放入合并的sequencefile容器中
public void mergeFile() throws Exception{
Writer.Option bigFile = Writer.file(new Path("D:\\bigfile.seq"));
Writer.Option keyClass = Writer.keyClass(Text.class);
Writer.Option valueClass = Writer.valueClass(BytesWritable.class);
//构造writer
Writer writer = SequenceFile.createWriter(configuration, bigFile, keyClass, valueClass);
//遍历读取小文件,逐个写入sequencefile
Text key = new Text();
for(String path:smallFilePaths){
File file = new File(path);
long fileSize = file.length();//获取文件的字节数大小
byte[] fileContent = new byte[(int)fileSize];
FileInputStream inputStream = new FileInputStream(file);
inputStream.read(fileContent, 0, (int)fileSize);//把文件的二进制流加载到fileContent字节数组中去
String md5Str = DigestUtils.md5Hex(fileContent);
System.out.println("merge小文件:"+path+",md5:"+md5Str);
key.set(path);
//把文件路径作为key,文件内容做为value,放入到sequencefile中
writer.append(key, new BytesWritable(fileContent));
}
writer.hflush();
writer.close();
}
//读取大文件中的小文件
public void readMergedFile() throws Exception{
Reader.Option file = Reader.file(new Path("D:\\bigfile.seq"));
Reader reader = new Reader(configuration, file);
Text key = new Text();
BytesWritable value = new BytesWritable();
while(reader.next(key, value)){
byte[] bytes = value.copyBytes();
String md5 = DigestUtils.md5Hex(bytes);
String content = new String(bytes, Charset.forName("GBK"));
System.out.println("读取到文件:"+key+",md5:"+md5+",content:"+content);
}
}
public static void main(String[] args) throws Exception {
MergeSmallFilesToSequenceFile msf = new MergeSmallFilesToSequenceFile();
//合并小文件
// msf.addInputPath("D:\\datasets\\smallfile");
// msf.mergeFile();
//读取大文件
msf.readMergedFile();
}
}