本文会从源码的实现角度来分享关于HDFS副本存储策略的概念和实现原理,HDFS的副本存储策略涉及副本写入、副本读取、机架感知、目标端存储的好坏区分策略, 熟悉副本存储策略可以帮助我们在开发或者运维过程中,提升数据处理/读写的效率、避免集群故障的发生.
HDFS中文件是以副本的形式进行存储的, HDFS的副本放置策略的主要逻辑在于如何将副本放在合适的地方,并且副本放置好坏会影响数据读写性能的高低,同时HDFS提供了对于副本的容错机制,在副本丢失或DataNode宕机的时候会自动进行恢复.
首先 , HDFS副本放置策略的核心逻辑如下:
第一副本:放置在上传文件的 DataNode上(比如计算任务计算的时候,数据写入会选择当前节点的DataNode);如果是集群外提交,则随机挑选台磁盘不太慢、CPU不太忙的节点
第二副本: 放置在与第一个副本不同的机架的节点上(如果没有配置机架感知,则选择距离第一副本节点的就近节点)
第三副本: 与第二个副本相同机架的不同节点上(如果没有配置机架感知,则选择距离第二副本节点的就近节点)
如果还有更多的副本的话,则会选择随机节点放置
这里还会涉及到配置机架情况和没有配置机架的情况,是否配置机架感知也会影响到BPP(BlockPlacementPolicy副本策略核心类)
目前在HDFS中现有的副本防止策略类有2大继承子类,分别为BlockPlacementPolicyDefault, BlockPlacementPolicyWithNodeGroup
源码基于Hadoop3.3.0版本进行分析, 在HDFS中副本放置的核心抽象类是: BlockPlacementPolicy
,策略的具体实现类为: BlockPlacementPolicyDefault
在前面副本策略的核心逻辑的描述就是这个类的注释内容.
BlockPlacementPolicy
类的核心功能包括:
那么从源码阅读的角度,首先需要知道父类中函数用途,才能更容易去了解其子类的实现逻辑.
在BlockPlacementPolicyDefault
类中最核心的方法是 chooseTarget
,但是这里有三个同名的 chooseTarget
方法, 三个函数的区别在于storageTypes
和favoredNodes
两个参数 .
划重点: 两个参数的作用, storageTypes
是用来选择目标端的存储类型,比如RAM_DISK、SSD、DISK、ARAHIVE , 关于异构存储类型代码实现正在编写,后面也会发送,记得先关注公众号哦 ^.^
favoredNodes
表示在选择节点的时候,优先选择这些节点
class: org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault
/**
* 选择numOfReplicas个DataNode作为Block的目标节点,复制一个大小为blocksize的块
* 最后将它们以Pipeline的方式排序返回
* @param srcPath 正在调用这个chooseTargets的文件
* @param numOfReplicas 需要的额外副本数量.
* @param writer 的机器,如果不在集群中则为空。
* @param 已选定为目标的datanode。
* @param returnChosenNodes 决定是否返回chosenNodes。.
* @param excludedNodes 已排除的节点不应该在目标节点中
* @param blocksize 要写入数据的大小
* @param flags 块放置标志
* @return 数组的DatanodeDescriptor实例选择作为目标并作为管道排序。
*/
public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,
int numOfReplicas,
Node writer,
List<DatanodeStorageInfo> chosen,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags);
// 根据目标端的存储类型来选择
public DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas,
Node writer, List<DatanodeStorageInfo> chosen, boolean returnChosenNodes,
Set<Node> excludedNodes, long blocksize, BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags, EnumMap<StorageType, Integer> storageTypes) {
return chooseTarget(srcPath, numOfReplicas, writer, chosen,
returnChosenNodes, excludedNodes, blocksize, storagePolicy, flags);
}
// 根据目标端的存储类型来选择
public DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas,
Node writer, List<DatanodeStorageInfo> chosen, boolean returnChosenNodes,
Set<Node> excludedNodes, long blocksize, BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags, EnumMap<StorageType, Integer> storageTypes) {
return chooseTarget(srcPath, numOfReplicas, writer, chosen,
returnChosenNodes, excludedNodes, blocksize, storagePolicy, flags);
}
在了解了核心函数定义之后,下面我们来拆分一下核心方法里面的具体实现流程: 在三个方法中,可以发现根据favoredNodes
参数来区分了两段逻辑实现,一个是由favoredNodes
参数的逻辑,一个是不包含favoredNodes
的逻辑.
在chooseTarget
方法中,如果传入了favoredNodes
参数,则优先选取该参数中的节点,然后和无favoredNodes
参数的实现中,方法最终会进入到真正的chooseTarget
方法中.
真正的chooseTarget
方法实现主要的逻辑如下:
chooseTarget
方法代码实现,代码比较长,可以简单看看核心逻辑
/**这个是不包含favoredNodes参数的代码实现 */
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
Node writer,
List<DatanodeStorageInfo> chosenStorage,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
final BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> addBlockFlags,
EnumMap<StorageType, Integer> sTypes) {
/// 判断numOfReplicas是否为0 或者 集群中没有可选的节点
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return DatanodeStorageInfo.EMPTY_ARRAY;
}
/// 创建ExcludeNodes集合.
if (excludedNodes == null) {
excludedNodes = new HashSet<>();
}
// 计算出每个机架所允许的最大副本数
int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
numOfReplicas = result[0];
int maxNodesPerRack = result[1];
for (DatanodeStorageInfo storage : chosenStorage) {
// 添加localMachine和相关节点到excludedNodes
addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
}
List<DatanodeStorageInfo> results = null;
Node localNode = null;
boolean avoidStaleNodes = (stats != null
&& stats.isAvoidingStaleDataNodesForWrite());
boolean avoidLocalNode = (addBlockFlags != null
&& addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE)
&& writer != null
&& !excludedNodes.contains(writer));
// 尝试排除本地节点。如果不能获得足够的节点,它就退回到默认的块放置策略。
if (avoidLocalNode) {
results = new ArrayList<>(chosenStorage);
Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes);
if (writer != null) {
excludedNodeCopy.add(writer);
}
localNode = chooseTarget(numOfReplicas, writer,
excludedNodeCopy, blocksize, maxNodesPerRack, results,
avoidStaleNodes, storagePolicy,
EnumSet.noneOf(StorageType.class), results.isEmpty(), sTypes);
if (results.size() < numOfReplicas) {
// 没有足够的节点;则后退一步
results = null;
}
}
if (results == null) {
results = new ArrayList<>(chosenStorage);
localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes,
storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty(),
sTypes);
}
if (!returnChosenNodes) {
results.removeAll(chosenStorage);
}
// 排序形成Pipeline
return getPipeline(
(writer != null && writer instanceof DatanodeDescriptor) ? writer
: localNode,
results.toArray(new DatanodeStorageInfo[results.size()]));
}
目标节点的选择策略源码chooseTargetInOrder
方法分析:
分别为第一节点选择、第二节点选择、第三节点选择以及超过三个节点之后的选择策略,这块代码逻辑正好是符合文中开头提到的副本策略的描述
final int numOfResults = results.size();
// 1.如果目标节点数为0,则表示还没有开始选择节点
if (numOfResults == 0) {
// 1 . 先拿本地节点作为存储节点
DatanodeStorageInfo storageInfo = chooseLocalStorage(writer,
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
storageTypes, true);
writer = (storageInfo != null) ? storageInfo.getDatanodeDescriptor()
: null;
// 1. numOfReplicas 表示选择多少个Node作为存储节点,
// 如果numOfReplicas=0 , 则表示节点已经选择完成,不需要在选择了。
if (--numOfReplicas == 0) {
return writer;
}
}
// 2. 否则目标节点不为空,则取第一个作为存储
final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
// 2.1 : 如果目标节点只有一个
if (numOfResults <= 1) {
// 2.2 则剩下的从其他机架进行选择
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
// 2.3 : 选择完成,返回这个writer
if (--numOfReplicas == 0) {
return writer;
}
}
// 3. 如果已经选择了两个节点以内,则开始选取第三个节点
if (numOfResults <= 2) {
// 3.1: 取出第二个DN节点
final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
// 3.2: 判断DN0和DN1 是否是相同的机架,如果是相同机架,则在其他机架中选择一个节点
if (clusterMap.isOnSameRack(dn0, dn1)) {
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
// 3.3: 如果DN0和DN1不是一个机架,是一个新的块的话,则在DN1所在机架选择第一个节点
} else if (newBlock){
chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
// 3.4: 如果DN0和DN1不是一个机架,则在本地机架选择一个节点
} else {
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
}
// 选择完成了,返回
if (--numOfReplicas == 0) {
return writer;
}
}
// 4: 最后:如果副本数已经超过2个,已经设置超过3副本的数量
// 则剩余位置在集群中随机选择放置节点
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
return writer;
Pipeline形成的逻辑
getPipeline : 返回一个节点管道。管道的形成是从写入器开始,遍历所有节点的最短路径。这基本上是一个TSP旅行商问题TSP旅行商问题百科:假设有一个旅行商人要拜访N个城市,他必须选择所要走的路径,路径的限制是每个城市只能拜访一次,而且最后要回到原来出发的城市。路径的选择目标是要求得的路径路程为所有路径之中的最小值
Pipeline类实现的核心源码如下:
int index=0;
// 如果writer请求方本身不在一个datanode上,则默认选取第一个datanode作为起始节点
if (writer == null || !clusterMap.contains(writer)) {
writer = storages[0].getDatanodeDescriptor();
}
// 遍历所有的storeages,计算最近距离目标的storage
for(; index < storages.length; index++) {
// 获取当前index下标所属的Storage为最近距离的目标storage
DatanodeStorageInfo shortestStorage = storages[index];
// 计算最短距离,getDistance 的逻辑 :返回两个节点之间的距离
//假设一个节点到其父节点的距离为1
//两个节点之间的距离是通过将它们的距离相加来计算的
//和他们最近的共同祖先。
int shortestDistance = clusterMap.getDistance(writer, shortestStorage.getDatanodeDescriptor());
int shortestIndex = index;
for(int i = index + 1; i < storages.length; i++) {
// 遍历计算当前的距离
int currentDistance = clusterMap.getDistance(writer,
storages[i].getDatanodeDescriptor());
if (shortestDistance>currentDistance) {
shortestDistance = currentDistance;
shortestStorage = storages[i];
shortestIndex = i;
}
}
//找到新的最短距离的storage,并进行下标替换
if (index != shortestIndex) {
storages[shortestIndex] = storages[index];
storages[index] = shortestStorage;
}
writer = shortestStorage.getDatanodeDescriptor();
}
}
return storages;
如何判断目标存储的好坏?
经过上面一系列的计算、选择将符合条件的节点加入到了result列表中,那么到这里这里其实还有一步很关键.在isGoodDataNode
方法中判断了目标存储的好坏.
需要判断以下几个条件:
代码逻辑如下 :
boolean isGoodDatanode(DatanodeDescriptor node,
int maxTargetPerRack, boolean considerLoad,
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes) {
// check if the node is (being) decommissioned
// 检查该节点是否在服务
if (!node.isInService()) {
logNodeIsNotChosen(node, NodeNotChosenReason.NOT_IN_SERVICE);
return false;
}
// 该节点是否是下线节点
if (avoidStaleNodes) {
if (node.isStale(this.staleInterval)) {
logNodeIsNotChosen(node, NodeNotChosenReason.NODE_STALE);
return false;
}
}
// 是否考虑目标节点的负载,如果考虑的话,就要判断当前节点负载是否比较高
if(considerLoad) {
if (excludeNodeByLoad(node)) {
return false;
}
}
// 检查目标机架是否选择了过多的节点
String rackname = node.getNetworkLocation();
int counter=1;
for(DatanodeStorageInfo resultStorage : results) {
if (rackname.equals(
resultStorage.getDatanodeDescriptor().getNetworkLocation())) {
counter++;
}
}
// 是否满足同机架内最大副本数的限制
if (counter > maxTargetPerRack) {
logNodeIsNotChosen(node, NodeNotChosenReason.TOO_MANY_NODES_ON_RACK);
return false;
}
return true;
}
所谓副本系数是指在HDFS中可以通过hdfs.site.xml中的参数: dfs.replication
来配置的,默认的情况下是3, 也就是说每个文件在上传之后,默认会生成三份数据,三份数据的存储的策略是按照上面提到的副本策略来保存的.
当然, 这个副本系数(数量)也可以进行配置,配置的方式有以下几种:
dfs.replication
设置为 2 , 不过这种方式是一种全局策略,等于说所有文件的保存都是只有2个副本.Configuration configuration = new Configuration();
configuration.set("dfs.replication",1);
hadoop fs -setrep [-R] [-w] <numReplicas> <path>
更改文件的副本系数。如果path是一个目录,则该命令递归地更改以path为根的目录树下所有文件的副本系数 -w : 会等待复制直到完成之后才会退出,这个可能需要很长时间 -R : 标记为向后兼容 , 实际从源码层面没有任何作用对于本文副本策略的内容的学习,我们了解了副本放置策略的概念、方法、原理实现(源码层) , 最后我们来归纳总结一下设计要点:
HDFS默认使用的副本策略设计要点:
后续会持续更新一些大数据组件源码相关的内容 , 欢迎点赞、关注、在看 ^.^
本文参考:
深度剖析Hadoop HDFS - 林意群 编著