Yarn ResourceManager信息状态数据基于Redis持久化存储

ResourceManager应用状态存储是什么意思呢,我们都知道yarn有两个很重要的进程:ResourceManager和NodeManager。而ResourceManager不仅要与各个节点上的ApplicationMaster进行通信,还要与NodeManager进行心跳包的传输,因此自然在ResourceManager上会注册进来很多的应用,每个应用由一个ApplicationMaster负责跟踪整个应用周期。既然ResourceManager角色这么重要,就有必要保存一下ResourceManager的信息状态RMState,以免ResourceManager进程异常退出导致的应用状态信息丢失,ResourceManager重启无法重跑之前的应用的现象。

那么在RMState中,保存的应用信息到底是哪些数据信息呢,应用状态信息只是1个笼统的概念。下面用一张图来表示。

RMState状态数据结构

可以看到,这是一张分层多叉树的形状,这个图类似于MapReduce作用运行的分层执行状态图,做个简单介绍,最上面就是1个RMState的状态,这个状态中包含若干个ApplicationState的应用状态信息,每个应用状态信息中包含了很多个应用尝试信息状态。

目前yarn源码相关部分已经提供了FileSystemRMStateStore、ZKRMStateStore两个实现类来将数据分别持久化存储到hdfs和zookeeper中。

下面将小编实现的把状态信息数据存储到redis做持久化作一个简单的描述,供各位看官一览。

首先创建RedisRMStateStore实现类,该类继承于hadoop源码中的RMStateStore

public class RedisRMStateStore extends RMStateStore {

public static final Log LOG = LogFactory.getLog(RedisRMStateStore.class);

protected static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 2);

protected static final String AMRMTOKEN_SECRET_MANAGER_NODE = "AMRMTokenSecretManagerNode";

protected static final String EPOCH_NODE = "EpochNode";

protected static final String RM_APP_ROOT = "RMAppRoot";

protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";

protected static final String RM_VERSION_NODE = "RMVersionNode";

protected RedisUtil redisUtil =null;

private String redisIP = null;

......

}

RedisRMStateStore类中的initInternal()方法,检查yarn-site.xml配置项中是否配置redis的链接信息

@Override

protected void initInternal(Configuration conf) throws Exception {

redisIP = conf.get(YarnConf.RM_REDIS_ADDRESS);

if (null == redisIP){

throw new YarnRuntimeException("No server address specified for " +

"redis state store for Resource Manager recovery. " +

YarnConf.RM_REDIS_ADDRESS + " is not configured.");

}

}

关于redis相关的客户端操作都被分装到RedisUtil工具类中,直接在RedisRMStateStore类中使用

@Override

protected synchronized void startInternal() throws Exception {

redisUtil = new RedisUtil(redisIP);

}

@Override

protected synchronized void closeInternal() throws Exception {

redisUtil.jClose();

}

RedisRMStateStore实现类中其余的方法就是对于RMState的信息状态数据进行存储、加载已经更新或者删除等相关操作,就不一一解释每一行代码,直接附每个函数的源码以供参考

@Override

protected Version getCurrentVersion() {

return CURRENT_VERSION_INFO;

}

/**

* Derived class use this method to load the version information from state

* store.

@Override

protected synchronized Version loadVersion() throws Exception {

Version version = null;

if (redisUtil.existKey(RM_VERSION_NODE.getBytes())){

byte[] bytes = redisUtil.loadDataBykey(RM_VERSION_NODE.getBytes());

version = new VersionPBImpl(VersionProto.parseFrom(bytes));

}

return version;

}

/**

* Derived class use this method to store the version information.

@Override

protected synchronized void storeVersion() throws Exception {

byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();

redisUtil.storeString(RM_VERSION_NODE.getBytes(), data);

}

/**

* Get the current epoch of RM and increment the value.

@Override

public synchronized long getAndIncrementEpoch() throws Exception {

long currentEpoch = 0;

if (redisUtil.existKey(EPOCH_NODE.getBytes())){

byte[] bytes = redisUtil.loadDataBykey(EPOCH_NODE.getBytes());

Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(bytes));

currentEpoch = epoch.getEpoch();

byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()

.toByteArray();

redisUtil.storeString(EPOCH_NODE.getBytes(), storeData);

} else{

byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()

.toByteArray();

redisUtil.storeString(EPOCH_NODE.getBytes(), storeData);

}

return currentEpoch;

}

/**

* Blocking API

* The derived class must recover state from the store and return a new

* RMState object populated with that state

* This must not be called on the dispatcher thread

@Override

public synchronized RMState loadState() throws Exception {

RMStates rmState = new RMStates();

long start = System.currentTimeMillis();

loadRMDTSecretManagerState(rmState);

loadRMAppState(rmState);

loadAMRMTokenSecretManagerState(rmState);

long end = System.currentTimeMillis();

LOG.info("加载RMState消耗时间:\t" + (end-start));

return rmState;

}

private void loadAMRMTokenSecretManagerState(RMStates rmState) throws Exception {

if (redisUtil.existKey(AMRMTOKEN_SECRET_MANAGER_NODE.getBytes())){

byte[] data = redisUtil.loadDataFromMapByKey(AMRMTOKEN_SECRET_MANAGER_NODE.getBytes(), AMRMTOKEN_SECRET_MANAGER_NODE.getBytes());

AMRMTokenSecretManagerStatePBImpl stateData = new AMRMTokenSecretManagerStatePBImpl(AMRMTokenSecretManagerStateProto.parseFrom(data));

rmState.amrmTokenSecretManagerState = AMRMTokenSecretManagerState.newInstance(stateData.getCurrentMasterKey(), stateData.getNextMasterKey());

}

}

private void loadRMDTSecretManagerState(RMStates rmState) throws Exception {

if (redisUtil.existKey(RM_DT_SECRET_MANAGER_ROOT.getBytes())){

Map map = redisUtil.loadAllDataFromMap(RM_DT_SECRET_MANAGER_ROOT.getBytes());

for (Map.Entry entry : map.entrySet()){

String childNodeName = new String(entry.getKey());

if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)){

rmState.dtSequenceNumber = Integer.parseInt(childNodeName.split("_")[1]);

continue;

}

byte[] childData = entry.getValue();

ByteArrayInputStream is = new ByteArrayInputStream(childData);

DataInputStream fsIn = new DataInputStream(is);

if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)){

DelegationKey key = new DelegationKey();

key.readFields(fsIn);

rmState.masterKeyState.add(key);

} else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)){

RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier();

identifier.readFields(fsIn);

long renewDate = fsIn.readLong();

rmState.delegationTokenState.put(identifier, renewDate);

}

}

}

}

private void loadRMAppState(RMStates rmState) throws Exception {

List attempts =

new ArrayList();

if (redisUtil.existKey(RM_APP_ROOT.getBytes())){

Map map = redisUtil.loadAllDataFromMap(RM_APP_ROOT.getBytes());

for (Map.Entry entry : map.entrySet()){

String childNodeName = new String(entry.getKey());

byte[] childData = entry.getValue();

if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)){

ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl(ApplicationStateDataProto.parseFrom(childData));

ApplicationId appId = appState.getApplicationSubmissionContext().getApplicationId();

rmState.appState.put(appId, appState);

} else if (childNodeName.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)){

ApplicationAttemptStateDataPBImpl attemptState =

new ApplicationAttemptStateDataPBImpl(

ApplicationAttemptStateDataProto.parseFrom(childData));

attempts.add(attemptState);

}

}

for (ApplicationAttemptStateData attemptState : attempts) {

ApplicationId appId = attemptState.getAttemptId().getApplicationId();

ApplicationStateData appState = rmState.appState.get(appId);

assert appState != null;

appState.attempts.put(attemptState.getAttemptId(), attemptState);

}

}

}

/**

* Blocking API

* Derived classes must implement this method to store the state of an

* application.

*

* @param appId

* @param appStateData

@Override

protected synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateData) throws Exception {

byte[] data = appStateData.getProto().toByteArray();

redisUtil.storeDataToMap(RM_APP_ROOT.getBytes(), appId.toString().getBytes(), data);

}

@Override

protected synchronized void updateApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateData) throws Exception {

byte[] data = appStateData.getProto().toByteArray();

redisUtil.storeDataToMap(RM_APP_ROOT.getBytes(), appId.toString().getBytes(), data);

}

/**

* Blocking API

* Derived classes must implement this method to store the state of an

* application attempt

*

* @param attemptId

* @param attemptStateData

@Override

protected synchronized void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId, ApplicationAttemptStateData attemptStateData) throws Exception {

byte[] data = attemptStateData.getProto().toByteArray();

redisUtil.storeDataToMap(RM_APP_ROOT.getBytes(), attemptId.toString().getBytes(), data);

}

@Override

protected synchronized void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId, ApplicationAttemptStateData attemptStateData) throws Exception {

byte[] data = attemptStateData.getProto().toByteArray();

redisUtil.storeDataToMap(RM_APP_ROOT.getBytes(), attemptId.toString().getBytes(), data);

}

/**

* Blocking API

* Derived classes must implement this method to remove the state of an

* application and its attempts

*

* @param appState

@Override

protected synchronized void removeApplicationStateInternal(ApplicationStateData appState) throws Exception {

ApplicationId appId = appState.getApplicationSubmissionContext().getApplicationId();

redisUtil.removeDataFromMapBykey(RM_APP_ROOT.getBytes(), appId.toString().getBytes());

}

/**

* Blocking API

* Derived classes must implement this method to store the state of

* RMDelegationToken and sequence number

*

* @param rmDTIdentifier

* @param renewDate

@Override

protected synchronized void storeRMDelegationTokenState(RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) throws Exception {

storeOrUpdateRMDelegationTokenState(rmDTIdentifier, renewDate);

}

/**

* Blocking API

* Derived classes must implement this method to remove the state of RMDelegationToken

*

* @param rmDTIdentifier

@Override

protected synchronized void removeRMDelegationTokenState(RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {

String keyname = DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber();

redisUtil.removeDataFromMapBykey(RM_DT_SECRET_MANAGER_ROOT.getBytes(), keyname.getBytes());

}

/**

* Blocking API

* Derived classes must implement this method to update the state of

* RMDelegationToken and sequence number

*

* @param rmDTIdentifier

* @param renewDate

@Override

protected void updateRMDelegationTokenState(RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) throws Exception {

storeOrUpdateRMDelegationTokenState(rmDTIdentifier, renewDate);

}

private void storeOrUpdateRMDelegationTokenState(RMDelegationTokenIdentifier identifier, Long renewDate) throws Exception {

String keyName = DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber();

ByteArrayOutputStream os = new ByteArrayOutputStream();

DataOutputStream fsOut = new DataOutputStream(os);

identifier.write(fsOut);

fsOut.writeLong(renewDate);

redisUtil.storeDataToMap(RM_DT_SECRET_MANAGER_ROOT.getBytes(), keyName.getBytes(), os.toByteArray());

String latestSequenceNumber = DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + identifier.getSequenceNumber();

}

/**

* Blocking API

* Derived classes must implement this method to store the state of

* DelegationToken Master Key

*

* @param masterKey

@Override

protected synchronized void storeRMDTMasterKeyState(DelegationKey masterKey) throws Exception {

String keyname = DELEGATION_KEY_PREFIX + masterKey.getKeyId();

ByteArrayOutputStream os = new ByteArrayOutputStream();

DataOutputStream fsOut = new DataOutputStream(os);

masterKey.write(fsOut);

redisUtil.storeDataToMap(RM_DT_SECRET_MANAGER_ROOT.getBytes(), keyname.getBytes(), os.toByteArray());

}

/**

* Blocking API

* Derived classes must implement this method to remove the state of

* DelegationToken Master Key

*

* @param masterKey

@Override

protected synchronized void removeRMDTMasterKeyState(DelegationKey masterKey) throws Exception {

String keyname = DELEGATION_KEY_PREFIX + masterKey.getKeyId();

redisUtil.removeDataFromMapBykey(RM_DT_SECRET_MANAGER_ROOT.getBytes(), keyname.getBytes());

}

/**

* Blocking API Derived classes must implement this method to store or update

* the state of AMRMToken Master Key

*

* @param amrmTokenSecretManagerState

* @param isUpdate

@Override

public synchronized void storeOrUpdateAMRMTokenSecretManagerState(AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate) {

AMRMTokenSecretManagerState data = AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);

byte[] stateData = data.getProto().toByteArray();

redisUtil.storeDataToMap(AMRMTOKEN_SECRET_MANAGER_NODE.getBytes(), AMRMTOKEN_SECRET_MANAGER_NODE.getBytes(), stateData);

}

/**

* Derived classes must implement this method to delete the state store

*

* @throws Exception

@Override

public void deleteStore() throws Exception {

List keys = new ArrayList();

keys.add(RM_DT_SECRET_MANAGER_ROOT.getBytes());

keys.add(RM_APP_ROOT.getBytes());

keys.add(RM_VERSION_NODE.getBytes());

keys.add(AMRMTOKEN_SECRET_MANAGER_ROOT.getBytes());

keys.add(EPOCH_NODE.getBytes());

redisUtil.delKeys(keys);

}

发表评论
留言与评论(共有 0 条评论)
   
验证码:

相关文章

推荐文章

'); })();