ResourceManager应用状态存储是什么意思呢,我们都知道yarn有两个很重要的进程:ResourceManager和NodeManager。而ResourceManager不仅要与各个节点上的ApplicationMaster进行通信,还要与NodeManager进行心跳包的传输,因此自然在ResourceManager上会注册进来很多的应用,每个应用由一个ApplicationMaster负责跟踪整个应用周期。既然ResourceManager角色这么重要,就有必要保存一下ResourceManager的信息状态RMState,以免ResourceManager进程异常退出导致的应用状态信息丢失,ResourceManager重启无法重跑之前的应用的现象。
那么在RMState中,保存的应用信息到底是哪些数据信息呢,应用状态信息只是1个笼统的概念。下面用一张图来表示。
RMState状态数据结构
可以看到,这是一张分层多叉树的形状,这个图类似于MapReduce作用运行的分层执行状态图,做个简单介绍,最上面就是1个RMState的状态,这个状态中包含若干个ApplicationState的应用状态信息,每个应用状态信息中包含了很多个应用尝试信息状态。
目前yarn源码相关部分已经提供了FileSystemRMStateStore、ZKRMStateStore两个实现类来将数据分别持久化存储到hdfs和zookeeper中。
下面将小编实现的把状态信息数据存储到redis做持久化作一个简单的描述,供各位看官一览。
首先创建RedisRMStateStore实现类,该类继承于hadoop源码中的RMStateStore
public class RedisRMStateStore extends RMStateStore {
public static final Log LOG = LogFactory.getLog(RedisRMStateStore.class);
protected static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 2);
protected static final String AMRMTOKEN_SECRET_MANAGER_NODE = "AMRMTokenSecretManagerNode";
protected static final String EPOCH_NODE = "EpochNode";
protected static final String RM_APP_ROOT = "RMAppRoot";
protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
protected static final String RM_VERSION_NODE = "RMVersionNode";
protected RedisUtil redisUtil =null;
private String redisIP = null;
......
}
RedisRMStateStore类中的initInternal()方法,检查yarn-site.xml配置项中是否配置redis的链接信息
@Override
protected void initInternal(Configuration conf) throws Exception {
redisIP = conf.get(YarnConf.RM_REDIS_ADDRESS);
if (null == redisIP){
throw new YarnRuntimeException("No server address specified for " +
"redis state store for Resource Manager recovery. " +
YarnConf.RM_REDIS_ADDRESS + " is not configured.");
}
}
关于redis相关的客户端操作都被分装到RedisUtil工具类中,直接在RedisRMStateStore类中使用
@Override
protected synchronized void startInternal() throws Exception {
redisUtil = new RedisUtil(redisIP);
}
@Override
protected synchronized void closeInternal() throws Exception {
redisUtil.jClose();
}
RedisRMStateStore实现类中其余的方法就是对于RMState的信息状态数据进行存储、加载已经更新或者删除等相关操作,就不一一解释每一行代码,直接附每个函数的源码以供参考
@Override
protected Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
/**
* Derived class use this method to load the version information from state
* store.
@Override
protected synchronized Version loadVersion() throws Exception {
Version version = null;
if (redisUtil.existKey(RM_VERSION_NODE.getBytes())){
byte[] bytes = redisUtil.loadDataBykey(RM_VERSION_NODE.getBytes());
version = new VersionPBImpl(VersionProto.parseFrom(bytes));
}
return version;
}
/**
* Derived class use this method to store the version information.
@Override
protected synchronized void storeVersion() throws Exception {
byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
redisUtil.storeString(RM_VERSION_NODE.getBytes(), data);
}
/**
* Get the current epoch of RM and increment the value.
@Override
public synchronized long getAndIncrementEpoch() throws Exception {
long currentEpoch = 0;
if (redisUtil.existKey(EPOCH_NODE.getBytes())){
byte[] bytes = redisUtil.loadDataBykey(EPOCH_NODE.getBytes());
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(bytes));
currentEpoch = epoch.getEpoch();
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
.toByteArray();
redisUtil.storeString(EPOCH_NODE.getBytes(), storeData);
} else{
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
.toByteArray();
redisUtil.storeString(EPOCH_NODE.getBytes(), storeData);
}
return currentEpoch;
}
/**
* Blocking API
* The derived class must recover state from the store and return a new
* RMState object populated with that state
* This must not be called on the dispatcher thread
@Override
public synchronized RMState loadState() throws Exception {
RMStates rmState = new RMStates();
long start = System.currentTimeMillis();
loadRMDTSecretManagerState(rmState);
loadRMAppState(rmState);
loadAMRMTokenSecretManagerState(rmState);
long end = System.currentTimeMillis();
LOG.info("加载RMState消耗时间:\t" + (end-start));
return rmState;
}
private void loadAMRMTokenSecretManagerState(RMStates rmState) throws Exception {
if (redisUtil.existKey(AMRMTOKEN_SECRET_MANAGER_NODE.getBytes())){
byte[] data = redisUtil.loadDataFromMapByKey(AMRMTOKEN_SECRET_MANAGER_NODE.getBytes(), AMRMTOKEN_SECRET_MANAGER_NODE.getBytes());
AMRMTokenSecretManagerStatePBImpl stateData = new AMRMTokenSecretManagerStatePBImpl(AMRMTokenSecretManagerStateProto.parseFrom(data));
rmState.amrmTokenSecretManagerState = AMRMTokenSecretManagerState.newInstance(stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
}
}
private void loadRMDTSecretManagerState(RMStates rmState) throws Exception {
if (redisUtil.existKey(RM_DT_SECRET_MANAGER_ROOT.getBytes())){
Map map = redisUtil.loadAllDataFromMap(RM_DT_SECRET_MANAGER_ROOT.getBytes());
for (Map.Entry entry : map.entrySet()){
String childNodeName = new String(entry.getKey());
if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)){
rmState.dtSequenceNumber = Integer.parseInt(childNodeName.split("_")[1]);
continue;
}
byte[] childData = entry.getValue();
ByteArrayInputStream is = new ByteArrayInputStream(childData);
DataInputStream fsIn = new DataInputStream(is);
if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)){
DelegationKey key = new DelegationKey();
key.readFields(fsIn);
rmState.masterKeyState.add(key);
} else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)){
RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier();
identifier.readFields(fsIn);
long renewDate = fsIn.readLong();
rmState.delegationTokenState.put(identifier, renewDate);
}
}
}
}
private void loadRMAppState(RMStates rmState) throws Exception {
List attempts =
new ArrayList();
if (redisUtil.existKey(RM_APP_ROOT.getBytes())){
Map map = redisUtil.loadAllDataFromMap(RM_APP_ROOT.getBytes());
for (Map.Entry entry : map.entrySet()){
String childNodeName = new String(entry.getKey());
byte[] childData = entry.getValue();
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)){
ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl(ApplicationStateDataProto.parseFrom(childData));
ApplicationId appId = appState.getApplicationSubmissionContext().getApplicationId();
rmState.appState.put(appId, appState);
} else if (childNodeName.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)){
ApplicationAttemptStateDataPBImpl attemptState =
new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(childData));
attempts.add(attemptState);
}
}
for (ApplicationAttemptStateData attemptState : attempts) {
ApplicationId appId = attemptState.getAttemptId().getApplicationId();
ApplicationStateData appState = rmState.appState.get(appId);
assert appState != null;
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
}
}
/**
* Blocking API
* Derived classes must implement this method to store the state of an
* application.
*
* @param appId
* @param appStateData
@Override
protected synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateData) throws Exception {
byte[] data = appStateData.getProto().toByteArray();
redisUtil.storeDataToMap(RM_APP_ROOT.getBytes(), appId.toString().getBytes(), data);
}
@Override
protected synchronized void updateApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateData) throws Exception {
byte[] data = appStateData.getProto().toByteArray();
redisUtil.storeDataToMap(RM_APP_ROOT.getBytes(), appId.toString().getBytes(), data);
}
/**
* Blocking API
* Derived classes must implement this method to store the state of an
* application attempt
*
* @param attemptId
* @param attemptStateData
@Override
protected synchronized void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId, ApplicationAttemptStateData attemptStateData) throws Exception {
byte[] data = attemptStateData.getProto().toByteArray();
redisUtil.storeDataToMap(RM_APP_ROOT.getBytes(), attemptId.toString().getBytes(), data);
}
@Override
protected synchronized void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId, ApplicationAttemptStateData attemptStateData) throws Exception {
byte[] data = attemptStateData.getProto().toByteArray();
redisUtil.storeDataToMap(RM_APP_ROOT.getBytes(), attemptId.toString().getBytes(), data);
}
/**
* Blocking API
* Derived classes must implement this method to remove the state of an
* application and its attempts
*
* @param appState
@Override
protected synchronized void removeApplicationStateInternal(ApplicationStateData appState) throws Exception {
ApplicationId appId = appState.getApplicationSubmissionContext().getApplicationId();
redisUtil.removeDataFromMapBykey(RM_APP_ROOT.getBytes(), appId.toString().getBytes());
}
/**
* Blocking API
* Derived classes must implement this method to store the state of
* RMDelegationToken and sequence number
*
* @param rmDTIdentifier
* @param renewDate
@Override
protected synchronized void storeRMDelegationTokenState(RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) throws Exception {
storeOrUpdateRMDelegationTokenState(rmDTIdentifier, renewDate);
}
/**
* Blocking API
* Derived classes must implement this method to remove the state of RMDelegationToken
*
* @param rmDTIdentifier
@Override
protected synchronized void removeRMDelegationTokenState(RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
String keyname = DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber();
redisUtil.removeDataFromMapBykey(RM_DT_SECRET_MANAGER_ROOT.getBytes(), keyname.getBytes());
}
/**
* Blocking API
* Derived classes must implement this method to update the state of
* RMDelegationToken and sequence number
*
* @param rmDTIdentifier
* @param renewDate
@Override
protected void updateRMDelegationTokenState(RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) throws Exception {
storeOrUpdateRMDelegationTokenState(rmDTIdentifier, renewDate);
}
private void storeOrUpdateRMDelegationTokenState(RMDelegationTokenIdentifier identifier, Long renewDate) throws Exception {
String keyName = DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber();
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os);
identifier.write(fsOut);
fsOut.writeLong(renewDate);
redisUtil.storeDataToMap(RM_DT_SECRET_MANAGER_ROOT.getBytes(), keyName.getBytes(), os.toByteArray());
String latestSequenceNumber = DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + identifier.getSequenceNumber();
}
/**
* Blocking API
* Derived classes must implement this method to store the state of
* DelegationToken Master Key
*
* @param masterKey
@Override
protected synchronized void storeRMDTMasterKeyState(DelegationKey masterKey) throws Exception {
String keyname = DELEGATION_KEY_PREFIX + masterKey.getKeyId();
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os);
masterKey.write(fsOut);
redisUtil.storeDataToMap(RM_DT_SECRET_MANAGER_ROOT.getBytes(), keyname.getBytes(), os.toByteArray());
}
/**
* Blocking API
* Derived classes must implement this method to remove the state of
* DelegationToken Master Key
*
* @param masterKey
@Override
protected synchronized void removeRMDTMasterKeyState(DelegationKey masterKey) throws Exception {
String keyname = DELEGATION_KEY_PREFIX + masterKey.getKeyId();
redisUtil.removeDataFromMapBykey(RM_DT_SECRET_MANAGER_ROOT.getBytes(), keyname.getBytes());
}
/**
* Blocking API Derived classes must implement this method to store or update
* the state of AMRMToken Master Key
*
* @param amrmTokenSecretManagerState
* @param isUpdate
@Override
public synchronized void storeOrUpdateAMRMTokenSecretManagerState(AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate) {
AMRMTokenSecretManagerState data = AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
byte[] stateData = data.getProto().toByteArray();
redisUtil.storeDataToMap(AMRMTOKEN_SECRET_MANAGER_NODE.getBytes(), AMRMTOKEN_SECRET_MANAGER_NODE.getBytes(), stateData);
}
/**
* Derived classes must implement this method to delete the state store
*
* @throws Exception
@Override
public void deleteStore() throws Exception {
List keys = new ArrayList();
keys.add(RM_DT_SECRET_MANAGER_ROOT.getBytes());
keys.add(RM_APP_ROOT.getBytes());
keys.add(RM_VERSION_NODE.getBytes());
keys.add(AMRMTOKEN_SECRET_MANAGER_ROOT.getBytes());
keys.add(EPOCH_NODE.getBytes());
redisUtil.delKeys(keys);
}
| 留言与评论(共有 0 条评论) |