Написание кода ApplicationMaster
Аналогично, используйте редактор vim
, чтобы открыть файл ApplicationMaster.java
и написать код:
vim /home/hadoop/yarn_app/ApplicationMaster.java
package com.labex.yarn.app;
public class ApplicationMaster {
public static void main(String[] args){
//TODO:Edit code here.
}
}
Пояснения к коду также будут представлены в виде сегментов. Все код, упоминаемый ниже, должен быть написан в классе ApplicationMaster
(то есть в блоке кода, где находится комментарий //TODO:Edit code here.
).
AM - это фактический владелец задачи, который запускается RM и предоставляет все необходимые сведения и ресурсы через Клиента для управления и завершения задачи.
Поскольку AM запускается в одном контейнере,很可能该容器与其他容器共享同一物理主机。考虑到云计算平台的多租户特性和其他问题,一开始可能无法知道预先配置的要监听的端口。
因此,当 AM 启动时,可以通过环境为其提供几个参数。这些参数包括 AM 容器的 ContainerId
、应用程序的提交时间以及运行 AM 的 NodeManager
主机的详细信息。
与 RM 的所有交互都需要应用程序进行调度。如果此过程失败,每个应用程序可能会再次尝试。您可以从 AM 的容器 ID 中获取 ApplicationAttemptId
。有相关的 API 可以将从环境中获取的值转换为对象。
Напишите следующий код:
Map<String, String> envs = System.getenv();
String containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
If (containerIdString == null) {
// The container ID should be set in the environment variable of the framework
Throw new IllegalArgumentException(
"Container ID not set in the environment");
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
После полной инициализации AM мы можем запустить двух клиентов: одного клиента к ResourceManager и другого - к NodeManager. Мы используем пользовательский обработчик событий для настройки этого, и подробности будут обсуждаться позже:
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
amRMClient.start();
containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
AM должен периодически отправлять сердцебиения RM, чтобы последний знал, что AM все еще работает. Интервал истечения срока действия на RM определяется YarnConfiguration
, а его значение по умолчанию определяется конфигурационным параметром YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS
в файле конфигурации. AM должен зарегистрироваться в ResourceManager
, чтобы начать отправлять сердцебиения:
// Register yourself with RM and start sending heartbeats to RM
appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient.registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl);
Информация ответа процесса регистрации может включать максимальные ресурсные возможности кластера. Мы можем использовать эту информацию для проверки запроса приложения:
// Temporarily save information about cluster resource capabilities in RM
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capability of resources in this cluster " + maxMem);
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max vcores capability of resources in this cluster " + maxVCores);
// Use the maximum memory limit to constrain the container's memory capacity request value
if (containerMemory > maxMem) {
LOG.info("Container memory specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerMemory + ", max="
+ maxMem);
containerMemory = maxMem;
}
if (containerVirtualCores > maxVCores) {
LOG.info("Container virtual cores specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerVirtualCores + ", max="
+ maxVCores);
containerVirtualCores = maxVCores;
}
List<Container> previousAMRunningContainers =
response.getContainersFromPreviousAttempts();
LOG.info("Received " + previousAMRunningContainers.size()
+ " previous AM's running containers on AM registration.");
В зависимости от требований к задаче AM может планировать запуск набора контейнеров для выполнения задач. Мы используем эти требования, чтобы вычислить, сколько контейнеров нам нужно, и запросить соответствующее количество контейнеров:
int numTotalContainersToRequest = numTotalContainers - previousAMRunningContainers.size();
for (int i = 0; i < numTotalContainersToRequest; ++i) {
//Set the request object to the RM request container
ContainerRequest containerAsk = setupContainerAskForRM();
//Send container request to RM
amRMClient.addContainerRequest(containerAsk);
// This loop means polling RM for containers after getting fully allocated quotas
}
Вышеприведенный цикл будет выполняться до тех пор, пока все контейнеры не будут запущены и не будет выполнен shell-скрипт (независимо от того, будет ли это успешным или нет).
В setupContainerAskForRM()
вам нужно настроить следующее:
- **Resource capabilities:**Currently, YARN supports memory-based resource requirements, so the request should define how much memory is needed. This value is defined in megabytes and must be less than the exact multiple of the cluster's maximum and minimum capabilities. This memory resource corresponds to the physical memory limit imposed on the task container. Resource capabilities also include computation-based resources (vCore).
- Priority: When requesting a container set, AM can define different priorities for collections. For example, MapReduce AM can assign a higher priority to the containers required by the Map task, while the Reduce task container has a lower priority:
Private ContainerRequest setupContainerAskForRM() {
/ / Set the priority of the request
Priority pri = Priority.newInstance(requestPriority);
/ / Set the request for the resource type, including memory and CPU
Resource capability = Resource.newInstance(containerMemory,
containerVirtualCores);
ContainerRequest request = new ContainerRequest(capability, null, null, pri);
LOG.info("Requested container allocation: " + request.toString());
Return request;
}
После того, как AM отправил запрос на выделение контейнера, контейнер запускается асинхронно обработчиком событий клиента AMRMClientAsync
. Программы, обрабатывающие эту логику, должны реализовать интерфейс AMRMClientAsync.CallbackHandler
.
(1) Когда контейнер разрешен на обработку, обработчик должен запустить поток. Поток выполняет соответствующий код для запуска контейнера. Здесь мы используем LaunchContainerRunnable
для демонстрации. Мы обсудим этот класс позже:
@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG.info("Got response from RM for container allocation, allocatedCnt=" + allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
LaunchContainerRunnable runnableLaunchContainer =
new LaunchContainerRunnable(allocatedContainer, containerListener);
Thread launchThread = new Thread(runnableLaunchContainer);
// Start and run the container with different threads, which prevents the main thread from blocking when all the containers cannot be allocated resources
launchThreads.add(launchThread);
launchThread.start();
}
}
(2) Когда отправляется сердцебиение, обработчик событий должен сообщать о прогрессе приложения:
@Override
public float getProgress() {
// Set the progress information and report it to RM the next time you send a heartbeat
float progress = (float) numCompletedContainers.get() / numTotalContainers;
Return progress;
}
Поток запуска контейнера фактически запускает контейнер на NM. После назначения контейнера AM ему нужно следовать аналогичному процессу, который следует Клиент при настройке ContainerLaunchContext
для выполнения конечной задачи на выделенном контейнере. После определения ContainerLaunchContext
, AM может запустить его с помощью NMClientAsync
:
// Set the necessary commands to execute on the allocated container
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
// Set the executable command
vargs.add(shellCommand);
// Set the path of the shell script
if (!scriptPath.isEmpty()) {
vargs.add(Shell.WINDOWS? ExecBatScripStringtPath
: ExecShellStringPath);
}
// Set parameters for shell commands
vargs.add(shellArgs);
// Add log redirection parameters
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
// Get the final command
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
// Set ContainerLaunchContext to set local resources, environment variables, commands and tokens for the constructor.
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, shellEnv, commands, null, allTokens.duplicate(), null);
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);
Объект NMClientAsync
и его обработчик событий отвечают за обработку событий контейнера. Это включает в себя запуск, остановку, обновление статуса и ошибки для контейнера.
После того, как ApplicationMaster
определит, что все сделано, ему нужно отменить регистрацию с Клиентом AM-RM и затем остановить Клиента:
try {
amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
} catch (YarnException ex) {
LOG.error("Failed to unregister application", ex);
} catch (IOException e) {
LOG.error("Failed to unregister application", e);
}
amRMClient.stop();
Вышеприведенное содержание - это основной код ApplicationMaster
. После редактирования сохраните содержание и выйдите из редактора vim
.