ApplicationMaster コードの記述
同様に、vim
エディタを使って ApplicationMaster.java
ファイルを開いてコードを記述します。
vim /home/hadoop/yarn_app/ApplicationMaster.java
package com.labex.yarn.app;
public class ApplicationMaster {
public static void main(String[] args){
//TODO:Edit code here.
}
}
コードの解説は依然としてセグメント形式です。以下に挙げるすべてのコードは、ApplicationMaster
クラス内 (つまり、コメント //TODO:Edit code here.
があるコード ブロック内) に記述する必要があります。
AM は ジョブ の実際の所有者で、RM によって起動され、クライアントを介してすべての必要な情報とリソースを提供して、タスクを監視して完了させます。
AM は単一のコンテナ内で起動されるため、そのコンテナは他のコンテナと同じ物理ホストを共有する可能性があります。クラウド コンピューティング プラットフォームのマルチテナンシー機能やその他の問題のため、最初から事前に設定されたリッスンするポートを知ることができない場合があります。
したがって、AM が起動する際には、環境を通じていくつかのパラメータを与えることができます。これらのパラメータには、AM コンテナの ContainerId
、アプリケーションの提出時刻、および AM を実行している NodeManager
ホストの詳細が含まれます。
RM とのすべての相互作用には、アプリケーションをスケジュールする必要があります。このプロセスが失敗した場合、各アプリケーションは再試行することができます。AM のコンテナ ID から ApplicationAttemptId
を取得することができます。環境から取得した値をオブジェクトに変換する関連 API があります。
以下のコードを記述します。
Map<String, String> envs = System.getenv();
String containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
If (containerIdString == null) {
// The container ID should be set in the environment variable of the framework
Throw new IllegalArgumentException(
"Container ID not set in the environment");
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
AM が完全に初期化された後、2 つのクライアントを起動することができます。1 つは ResourceManager へのクライアントで、もう 1 つは NodeManager へのクライアントです。カスタム イベント ハンドラを使って設定します。詳細については後で説明します。
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
amRMClient.start();
containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
AM は定期的に RM にハートビートを送信しなければなりません。そうすることで、RM は AM がまだ実行中であることを知ることができます。RM 上の有効期限間隔は YarnConfiguration
によって定義され、そのデフォルト値は構成ファイルの YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS
構成項目によって定義されます。AM はハートビートを送信するために ResourceManager
に自身を登録する必要があります。
// Register yourself with RM and start sending heartbeats to RM
appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient.registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl);
登録プロセスの応答情報には、クラスタの最大リソース容量が含まれる場合があります。この情報を使って、アプリケーションの要求をチェックすることができます。
// Temporarily save information about cluster resource capabilities in RM
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capability of resources in this cluster " + maxMem);
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max vcores capability of resources in this cluster " + maxVCores);
// Use the maximum memory limit to constrain the container's memory capacity request value
if (containerMemory > maxMem) {
LOG.info("Container memory specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerMemory + ", max="
+ maxMem);
containerMemory = maxMem;
}
if (containerVirtualCores > maxVCores) {
LOG.info("Container virtual cores specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerVirtualCores + ", max="
+ maxVCores);
containerVirtualCores = maxVCores;
}
List<Container> previousAMRunningContainers =
response.getContainersFromPreviousAttempts();
LOG.info("Received " + previousAMRunningContainers.size()
+ " previous AM's running containers on AM registration.");
タスクの要件に応じて、AM はタスクを実行するためのコンテナのセットをスケジュールすることができます。これらの要件を使って、必要なコンテナの数を計算し、対応する数のコンテナを要求します。
int numTotalContainersToRequest = numTotalContainers - previousAMRunningContainers.size();
for (int i = 0; i < numTotalContainersToRequest; ++i) {
//Set the request object to the RM request container
ContainerRequest containerAsk = setupContainerAskForRM();
//Send container request to RM
amRMClient.addContainerRequest(containerAsk);
// This loop means polling RM for containers after getting fully allocated quotas
}
上記のループは、すべてのコンテナが起動され、シェル スクリプトが実行されるまで (成功しても失敗しても) 継続して実行されます。
setupContainerAskForRM()
では、以下のことを設定する必要があります。
- リソース能力:現在、YARN はメモリベースのリソース要件をサポートしています。したがって、要求は必要なメモリ量を定義する必要があります。この値はメガバイトで定義され、クラスタの最大と最小の能力の正確な倍数よりも小さくする必要があります。このメモリ リソースは、タスク コンテナに課される物理メモリの制限に対応します。リソース能力には、コンピューテーションベースのリソース (vCore) も含まれます。
- 優先度:コンテナ セットを要求する際、AM はコレクションに対して異なる優先度を定義することができます。たとえば、MapReduce AM は Map タスク に必要なコンテナに対してより高い優先度を割り当てることができます。一方、Reduce タスク のコンテナは、より低い優先度を持ちます。
Private ContainerRequest setupContainerAskForRM() {
/ / Set the priority of the request
Priority pri = Priority.newInstance(requestPriority);
/ / Set the request for the resource type, including memory and CPU
Resource capability = Resource.newInstance(containerMemory,
containerVirtualCores);
ContainerRequest request = new ContainerRequest(capability, null, null, pri);
LOG.info("Requested container allocation: " + request.toString());
Return request;
}
AM がコンテナ割り当て要求を送信した後、コンテナは AMRMClientAsync
クライアントのイベント ハンドラによって非同期で起動されます。このロジックを処理するプログラムは、AMRMClientAsync.CallbackHandler
インターフェイスを実装する必要があります。
(1) コンテナに割り当てられた場合、ハンドラはスレッドを起動する必要があります。そのスレッドは、コンテナを起動するための関連コードを実行します。ここでは、例として LaunchContainerRunnable
を使用します。このクラスについては後で説明します。
@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG.info("Got response from RM for container allocation, allocatedCnt=" + allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
LaunchContainerRunnable runnableLaunchContainer =
new LaunchContainerRunnable(allocatedContainer, containerListener);
Thread launchThread = new Thread(runnableLaunchContainer);
// Start and run the container with different threads, which prevents the main thread from blocking when all the containers cannot be allocated resources
launchThreads.add(launchThread);
launchThread.start();
}
}
(2) ハートビートを送信する際、イベント ハンドラはアプリケーションの進捗状況を報告する必要があります。
@Override
public float getProgress() {
// Set the progress information and report it to RM the next time you send a heartbeat
float progress = (float) numCompletedContainers.get() / numTotalContainers;
Return progress;
}
コンテナの起動スレッドは、実際に NM 上でコンテナを起動します。コンテナを AM に割り当てた後、割り当てられたコンテナ上で最終タスクを実行するために ContainerLaunchContext
を設定する際に、Client が辿るのと同じようなプロセスに従う必要があります。ContainerLaunchContext
を定義した後、AM は NMClientAsync
を介してそれを起動することができます。
// Set the necessary commands to execute on the allocated container
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
// Set the executable command
vargs.add(shellCommand);
// Set the path of the shell script
if (!scriptPath.isEmpty()) {
vargs.add(Shell.WINDOWS? ExecBatScripStringtPath
: ExecShellStringPath);
}
// Set parameters for shell commands
vargs.add(shellArgs);
// Add log redirection parameters
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
// Get the final command
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
// Set ContainerLaunchContext to set local resources, environment variables, commands and tokens for the constructor.
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, shellEnv, commands, null, allTokens.duplicate(), null);
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);
NMClientAsync
オブジェクトとそのイベント ハンドラは、コンテナ イベントを処理する責任があります。それには、コンテナの起動、停止、状態更新、およびエラーが含まれます。
ApplicationMaster
が完了したと判断した後、AM-RM の Client
から登録解除し、その後 Client
を停止する必要があります。
try {
amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
} catch (YarnException ex) {
LOG.error("Failed to unregister application", ex);
} catch (IOException e) {
LOG.error("Failed to unregister application", e);
}
amRMClient.stop();
上記の内容は ApplicationMaster
の主なコードです。編集した後、内容を保存して vim
エディタを終了します。