这个问题又让我们碰到了,发生次数不频繁但是一旦发生就会造成ResourceManager服务崩溃、ZK注册watch过多等问题。不彻底解决这个问题心中一直是个梗,所以基于前两次的分析和阅读社区最新版Hadoop 3.2.1代码之后,给生产环境YARN打patch最终解决这个问题。对于疑难问题,每遇到一次就有一次不同的感悟,接下来是我本次分析和解决该问题的过程记录。前两次解决和分析该问题的记录如下:
Hadoop版本:Apache Hadoop 2.6.3ZooKeeper版本:ZooKeeper 3.4.10ResourceManager节点:主节点RM01,从节点RM02这个问题很难复现,前两次一直没找到产生该问题的原因,打了patch之后,我们在日志中发现,产生该问题主要是由于部分异常任务导致的,日志如下:
2020-04-28 10:05:54 INFO org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore:768 - Application update attemptState data size for /rmstore/ZKRMStateRoot/RMAppRoot/application_1587969707206_16259/appattempt_1587969707206_16259_000001 is 20266528. Exceed the maximum allowed 3145728 size. ApplicationAttemptState info: ApplicationAttemptState{attemptId=appattempt_1587969707206_16259_000001, diagnostics='User class threw exception: java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 2.0 failed 4 times, most recent failure: Lost task 15.3 in stage 2.0 (TID 4224, datanode44.bi): java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificUnsafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */ return new SpecificUnsafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
/* 006 */
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.immutable.Set hset;
/* 009 */ private boolean hasNull;
/* 010 */ private UnsafeRow result;
/* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
/* 012 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;当任务出现异常时,YARN会保存任务的异常信息,当异常信息很多时,YARN往ZK保存任务状态的数据量就会超过ZK的限制。从日志中可以看出,出现异常的Spark任务状态数据是20266528字节,也就是19MB,远远超过了我们所设置的3MB。在YARN监控界面上可以看到该任务的异常信息有20万行:


由于有了前两次发现和解决问题以及源码理解的经验,所以这次解决问题就顺手的多,去年八月份解决该问题的最终方案是调整ZK服务端和YARN客户端的jute.maxbuffer参数值为3MB,也就是调整ZK中每个ZNode能保存的最大数据量为3MB。但是这样的方案有以下明显的缺点:
ZK中保存的数据量比较大,导致ZK JVM内存紧张,极端情况下会使ZK OOM,同时也会影响ZK数据读写、数据同步以及持久化效率jute.maxbuffer属于硬配置的方式,为了使配置生效,还需要重启ZK服务和客户端YARN RM服务,对ZK服务以及依赖ZK的服务运维成本比较大。由于当前我们生产环境YARN使用的这套ZK集群还管理HBase、流式计算任务的元数据,所以重启影响还是比较大的可以看出,通过修改jute.maxbuffer方式虽然也解决了问题,但是会对ZK服务和依赖ZK的服务有影响,运维成本也比较高。于是通过追踪社区issue和阅读Hadoop 3.2.1源码,我们采取通过在yarn-site.xml增加yarn.resourcemanager.zk-max-znode-size.bytes配置的方式来解决YARN往ZK写数据量超过ZK限制的问题,该配置是在Hadoop 2.9.0版本加入的。使用这种方式,我们不需要修改ZK服务端的配置,而只需要修改YARN服务端的配置并重启YARN就能限制YARN往ZK写入的数据量,而且也提高了ZK服务的可用性。打了patch后的代码逻辑超过数据量限制的任务状态数据直接被丢弃,并打印log日志,方便日后问题追溯。打了patch后的ZKRMStateStore主要代码如下(由于篇幅原因,其余代码省略):
public class ZKRMStateStore extends RMStateStore{
private int zknodeLimit; // 保存ZNode节点数据量限制
@Override
public synchronized void initInternal(Configuration conf) throws Exception {
// 其余部分省略
// 获取yarn-site.xml中yarn.resourcemanager.zk-max-znode-size.bytes的值
zknodeLimit = conf.getInt(YarnConfiguration.RM_ZK_ZNODE_SIZE_LIMIT_BYTES,
YarnConfiguration.DEFAULT_RM_ZK_ZNODE_SIZE_LIMIT_BYTES);
}
@Override
public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
String appIdStr = appAttemptId.getApplicationId().toString();
String appAttemptIdStr = appAttemptId.toString();
String appDirPath = getNodePath(rmAppRoot, appIdStr);
String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
+ " at: " + nodeUpdatePath);
}
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
ApplicationAttemptState attemptState = getApplicationAttemptState(appAttemptId, attemptStateDataPB);
// 判断要写入的任务尝试数据信息是否超过zknodeLimit变量的值,如果没有,就执行任务尝试数据更新操作。否则,只打印info信息,不执行任务尝试数据更新操作
if (attemptStateData.length <= zknodeLimit) {
if (existsWithRetries(nodeUpdatePath, true) != null) {
setDataWithRetries(nodeUpdatePath, attemptStateData, -1);
} else {
createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
CreateMode.PERSISTENT);
LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
+ " update the application attempt state.");
}
LOG.info("Application update attemptState data size for " + nodeUpdatePath + " is "
+ attemptStateData.length + ". The maximum allowed " + zknodeLimit + " size. ApplicationAttemptState info: " + attemptState.toString() + ". AppAttemptTokens length:" + attemptStateDataPB.getAppAttemptTokens().array().length
+ ". See yarn.resourcemanager.zk-max-znode-size.bytes.");
} else {
LOG.info("Application update attemptState data size for " + nodeUpdatePath + " is "
+ attemptStateData.length + ". Exceed the maximum allowed " + zknodeLimit + " size. ApplicationAttemptState info: " + attemptState.toString() + ". AppAttemptTokens length:" + attemptStateDataPB.getAppAttemptTokens().array().length
+ ". See yarn.resourcemanager.zk-max-znode-size.bytes.");
}
}
}1、YARN使用ZK来实现故障状态恢复,这里的修改会不会影响正常任务的执行和状态恢复?
不会。经过线上一段时间的运行和我们使用zkdoctor监控的数据发现,YARN存储在ZK中的正常任务的状态数据一般不会超过512K,只有部分异常任务的异常信息数据会特别大,这个异常信息数据是引起YARN向ZK写数据量超过限制的根本原因。
YARN将共享状态存储系统定义成一个RMStateStore抽象类,以保存ResourceManager故障恢复后所必需的状态信息,这些信息都是一些基本数据类型的信息,没有特别复杂的数据类型,比如字节数组。ResourceManager也不会保存已经分配给每个ApplicationMaster的资源信息和每个NodeManager的资源使用信息,这些均可通过相应的心跳汇报机制重构出来。因此,ResourceManager的HA实现是非常轻量级的。涉及到任务状态的主要类如下:
(1)Application状态信息ApplicationState:
/**
* State of an application application
* 任务状态信息类
*/
public static class ApplicationState {
final ApplicationSubmissionContext context; // 任务描述信息content
final long submitTime; // 任务提交时间
final long startTime; // 任务开始时间
final String user; // 任务提交人
Map<ApplicationAttemptId, ApplicationAttemptState> attempts =
new HashMap<ApplicationAttemptId, ApplicationAttemptState>(); // 任务重试信息
// fields set when application completes.
RMAppState state; // 任务运行状态
String diagnostics; // 任务异常诊断信息
long finishTime; // 任务完成时间
// 省略其他代码
}(2)Application对应的每个ApplicationAttempt信息ApplicationAttemptState:
/**
* State of an application attempt
* 任务尝试状态信息类
*/
public static class ApplicationAttemptState {
final ApplicationAttemptId attemptId; // 任务尝试ID
final Container masterContainer; // 所在container的信息
final Credentials appAttemptCredentials; // 安全token
long startTime = 0; // 开始时间
long finishTime = 0; // 结束时间
// fields set when attempt completes
RMAppAttemptState state; // 运行状态
String finalTrackingUrl = "N/A"; // 任务运行日志查看地址
String diagnostics; // 任务异常诊断信息
int exitStatus = ContainerExitStatus.INVALID; // 任务退出状态
FinalApplicationStatus amUnregisteredFinalStatus; // 任务最终状态
long memorySeconds; // 任务消耗的内存总资源
long vcoreSeconds; // 任务消耗的CPU总资源
// 省略其他代码
}(3)安全令牌相关信息RMDTSecretManagerState:
/**
* 安全令牌信息
*/
public static class RMDTSecretManagerState {
// DTIdentifier -> renewDate
Map<RMDelegationTokenIdentifier, Long> delegationTokenState =
new HashMap<RMDelegationTokenIdentifier, Long>(); // 授权令牌状态
Set<DelegationKey> masterKeyState =
new HashSet<DelegationKey>(); // master key状态
int dtSequenceNumber = 0; // 序列号
// 省略其他代码
}2、YARN出现异常时为什么会导致ZK中注册很多的watch?
YARN出现异常会进行故障转移,故障转移到standby节点,standby节点会调用RMState的loadState方法进行任务状态数据的恢复,loadState会调用ZKRMStateStore的loadRMAppState方法读取在ZK中保存的任务状态数据,在调用ZK的getData方法时会给任务状态节点和任务尝试状态节点注册watch,以监听任务状态的变化。由于任务状态节点和任务尝试状态节点是持久节点,不会因为ZK客户端连接失效而删除,且是一对多的关系,因此会导致watch数量很多。以下是加载任务状态的相关代码:
private synchronized void loadRMAppState(RMState rmState) throws Exception {
// 当/rmstore/ZKRMStateRoot/RMAppRoot/节点及其子节点被删除或创建时,watch被触发
List<String> childNodes = getChildrenWithRetries(rmAppRoot, true);
for (String childNodeName : childNodes) {
String childNodePath = getNodePath(rmAppRoot, childNodeName);
// 获取任务节点数据并注册watch,该watch当任务节点被删除或数据被更新时触发
byte[] childData = getDataWithRetries(childNodePath, true);
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application
if (LOG.isDebugEnabled()) {
LOG.debug("Loading application from znode: " + childNodeName);
}
ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
ApplicationStateDataPBImpl appStateData =
new ApplicationStateDataPBImpl(
ApplicationStateDataProto.parseFrom(childData));
// 获取任务数据
ApplicationState appState =
new ApplicationState(appStateData.getSubmitTime(),
appStateData.getStartTime(),
appStateData.getApplicationSubmissionContext(),
appStateData.getUser(),
appStateData.getState(),
appStateData.getDiagnostics(), appStateData.getFinishTime());
if (!appId.equals(appState.context.getApplicationId())) {
throw new YarnRuntimeException("The child node name is different " +
"from the application id");
}
rmState.appState.put(appId, appState);
// 获取任务重试数据
loadApplicationAttemptState(appState, appId);
} else {
LOG.info("Unknown child node with name: " + childNodeName);
}
}
}我们生产环境设置在ZK中保存2万个任务状态信息,发生问题时监控发现YARN往ZK注册了10几万的watch。由于ZK的watch信息是用HashMap(key是ZNode节点的path,value是注册在ZNode上的watch集合)保存的,因此大量的watch会使这个HashMap成为JVM中的一个大对象,这个大对象会一直保存在ZK的服务器端不会被回收,直到YARN被动删除或者更新任务状态数据时才会移除相应节点的watch,ZK服务端保存watch信息的HashMap的元素数量才会相应减少。这是一个比较缓慢的过程,在这个过程中,ZK很可能因为JVM GC问题响应缓慢甚至出现OOM。去年就由于YARN出现问题往ZK注册很多watch导致ZK OOM,继而影响到依赖ZK的HBase服务出现异常。因此,我们在打patch的基础上,将YARN迁移到一套独立的ZK集群,这套ZK集群只为YARN服务,从而提高大数据基础服务的可用性。
我们监控和统计发现,正常情况下,YARN往ZK中注册的watch很少,基本上都是运行时的任务状态数据节点的watch,因此不会对ZK产生太大压力。
3、YARN向ZK写任务状态异常为什么会触发YARN故障转移?
在ZKRMStateStore与ZK交互的方法里,都会调用ZKRMStateStore.ZKAction类的runWithRetries方法进行重试,正常情况下不需要重试。如果发生异常才会触发重试逻辑,默认重试1000次,当重试1000次之后,会使用throw方式给上层调用者抛出异常,凡是以下方法都有可能抛出异常:

异常会被RMStateStore的notifyStoreOperationFailed方法捕捉到,该方法很简单,主要进行以下逻辑判断:
YARN开启了HA,则触发故障转移操作HA,则判断YARN是否开启了快速失败特性,则触发RMFatalEventType.STATE_STORE_OP_FAILED事件,退出进程warn信息该方法具体代码如下:
/**
* 该方法通知RM存储操作失败,参数是引起操作失败的异常信息
* This method is called to notify the ResourceManager that the store
* operation has failed.
* @param failureCause the exception due to which the operation failed
*/
protected void notifyStoreOperationFailed(Exception failureCause) {
LOG.error("State store operation failed ", failureCause);
// 如果开启了HA,则执行故障转移操作
if (HAUtil.isHAEnabled(getConfig())) {
LOG.warn("State-store fenced ! Transitioning RM to standby");
Thread standByTransitionThread =
new Thread(new StandByTransitionThread());
standByTransitionThread.setName("StandByTransitionThread Handler");
standByTransitionThread.start();
} else if (YarnConfiguration.shouldRMFailFast(getConfig())) { // 如果没有开启HA,则判断有没有开启快速失败
LOG.fatal("Fail RM now due to state-store error!");
rmDispatcher.getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED,
failureCause));
} else { // 否则,打印跳过存储异常警告信息
LOG.warn("Skip the state-store error.");
}
}patch后的ZKRMStateStore代码GitHub地址Hadoop版本对于过长的异常诊断信息进行了截断处理