Escribiendo el código de ApplicationMaster
De manera similar, utiliza el editor vim
para abrir el archivo ApplicationMaster.java
y escribir el código:
vim /home/hadoop/yarn_app/ApplicationMaster.java
package com.labex.yarn.app;
public class ApplicationMaster {
public static void main(String[] args){
//TODO:Edit code here.
}
}
La explicación del código sigue siendo en forma de segmento. Todo el código mencionado a continuación debe escribirse en la clase ApplicationMaster
(es decir, el bloque de código donde se encuentra el comentario //TODO:Edit code here.
).
AM es el verdadero propietario de la tarea, que es iniciado por RM y proporciona toda la información y los recursos necesarios a través del Cliente para supervisar y terminar la tarea.
Dado que AM se inicia en un solo contenedor, es probable que el contenedor comparta el mismo host físico con otros contenedores. Dadas las características de multiarrendamiento de la plataforma de computación en la nube y otros problemas, es posible no poder conocer los puertos preconfigurados para escuchar al principio.
Entonces, cuando AM se inicia, se le pueden dar varios parámetros a través del entorno. Estos parámetros incluyen el ContainerId
del contenedor de AM, la hora de presentación de la aplicación y detalles sobre el host del NodeManager
que está ejecutando AM.
Todas las interacciones con RM requieren que una aplicación se asigne. Si este proceso falla, cada aplicación puede intentarlo nuevamente. Puedes obtener ApplicationAttemptId
a partir del ID del contenedor de AM. Hay APIs relacionadas que pueden convertir los valores obtenidos del entorno en objetos.
Escribe el siguiente código:
Map<String, String> envs = System.getenv();
String containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
If (containerIdString == null) {
// El ID del contenedor debe estar establecido en la variable de entorno del marco
Throw new IllegalArgumentException(
"Container ID not set in the environment");
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
Después de que AM se haya inicializado completamente, podemos iniciar dos clientes: uno al ResourceManager y el otro al NodeManager. Usamos un controlador de eventos personalizado para configurarlo, y los detalles se discuten más adelante:
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
amRMClient.start();
containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
AM debe enviar latidos cardíacos a RM periódicamente para que este último sepa que AM sigue en ejecución. El intervalo de expiración en RM está definido por YarnConfiguration
y su valor predeterminado está definido por el elemento de configuración YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS
en el archivo de configuración. AM necesita registrarse en el ResourceManager
para comenzar a enviar latidos cardíacos:
// Regístrese en RM y empiece a enviar latidos cardíacos a RM
appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient.registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl);
La información de respuesta del proceso de registro puede incluir la capacidad máxima de recursos del clúster. Podemos usar esta información para verificar la solicitud de la aplicación:
// Guarde temporalmente la información sobre las capacidades de recursos del clúster en RM
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capability of resources in this cluster " + maxMem);
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max vcores capability of resources in this cluster " + maxVCores);
// Use el límite de memoria máximo para constreñir el valor de solicitud de capacidad de memoria del contenedor
if (containerMemory > maxMem) {
LOG.info("Container memory specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerMemory + ", max="
+ maxMem);
containerMemory = maxMem;
}
if (containerVirtualCores > maxVCores) {
LOG.info("Container virtual cores specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerVirtualCores + ", max="
+ maxVCores);
containerVirtualCores = maxVCores;
}
List<Container> previousAMRunningContainers =
response.getContainersFromPreviousAttempts();
LOG.info("Received " + previousAMRunningContainers.size()
+ " previous AM's running containers on AM registration.");
Dependiendo de los requisitos de la tarea, AM puede programar un conjunto de contenedores para ejecutar tareas. Usamos estos requisitos para calcular cuántos contenedores necesitamos y solicitar un número correspondiente de contenedores:
int numTotalContainersToRequest = numTotalContainers - previousAMRunningContainers.size();
for (int i = 0; i < numTotalContainersToRequest; ++i) {
// Establece el objeto de solicitud para el contenedor de solicitud de RM
ContainerRequest containerAsk = setupContainerAskForRM();
// Envía la solicitud de contenedor a RM
amRMClient.addContainerRequest(containerAsk);
// Este bucle significa sondear RM para obtener contenedores después de obtener las cuotas completamente asignadas
}
El bucle anterior seguirá ejecutándose hasta que todos los contenedores se hayan iniciado y el script de shell se haya ejecutado (ya sea con éxito o con error).
En setupContainerAskForRM()
, debes establecer lo siguiente:
- Capacidades de recursos: Actualmente, YARN admite requisitos de recursos basados en memoria, por lo que la solicitud debe definir cuánta memoria se necesita. Este valor se define en megabytes y debe ser menor que el múltiplo exacto de las capacidades máxima y mínima del clúster. Esta capacidad de memoria corresponde al límite de memoria física impuesto al contenedor de tarea. Las capacidades de recursos también incluyen recursos basados en cómputo (vCore).
- Prioridad: Al solicitar un conjunto de contenedores, AM puede definir diferentes prioridades para las colecciones. Por ejemplo, el AM de MapReduce puede asignar una prioridad más alta a los contenedores requeridos por la tarea de Map, mientras que el contenedor de la tarea de Reduce tiene una prioridad más baja:
Private ContainerRequest setupContainerAskForRM() {
/ / Establece la prioridad de la solicitud
Priority pri = Priority.newInstance(requestPriority);
/ / Establece la solicitud para el tipo de recurso, incluyendo memoria y CPU
Resource capability = Resource.newInstance(containerMemory,
containerVirtualCores);
ContainerRequest request = new ContainerRequest(capability, null, null, pri);
LOG.info("Requested container allocation: " + request.toString());
Return request;
}
Después de que AM haya enviado una solicitud de asignación de contenedores, el contenedor se inicia de manera asincrónica por el controlador de eventos del cliente AMRMClientAsync
. Los programas que manejan esta lógica deben implementar la interfaz AMRMClientAsync.CallbackHandler
.
(1) Cuando se asigna a un contenedor, el controlador debe iniciar un hilo. El hilo ejecuta el código correspondiente para iniciar el contenedor. Aquí usamos LaunchContainerRunnable
para demostración. Vamos a discutir esta clase más adelante:
@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG.info("Got response from RM for container allocation, allocatedCnt=" + allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
LaunchContainerRunnable runnableLaunchContainer =
new LaunchContainerRunnable(allocatedContainer, containerListener);
Thread launchThread = new Thread(runnableLaunchContainer);
// Inicia y ejecuta el contenedor con diferentes hilos, lo que evita que el hilo principal se bloquee cuando todos los contenedores no pueden asignar recursos
launchThreads.add(launchThread);
launchThread.start();
}
}
(2) Cuando se envía un latido cardíaco, el controlador de eventos debe informar el progreso de la aplicación:
@Override
public float getProgress() {
// Establece la información de progreso y la informa a RM la próxima vez que envíe un latido cardíaco
float progress = (float) numCompletedContainers.get() / numTotalContainers;
Return progress;
}
El hilo de inicio del contenedor realmente inicia el contenedor en NM. Después de asignar un contenedor a AM, debe seguir un proceso similar al que sigue el Cliente cuando se configura el ContainerLaunchContext
para la última tarea que se ejecutará en el contenedor asignado. Después de definir el ContainerLaunchContext
, AM puede iniciarlo a través de NMClientAsync
:
// Establece los comandos necesarios para ejecutar en el contenedor asignado
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
// Establece el comando ejecutable
vargs.add(shellCommand);
// Establece la ruta del script de shell
if (!scriptPath.isEmpty()) {
vargs.add(Shell.WINDOWS? ExecBatScripStringtPath
: ExecShellStringPath);
}
// Establece parámetros para los comandos de shell
vargs.add(shellArgs);
// Agrega parámetros de redirección de registro
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
// Obtiene el comando final
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
// Establece ContainerLaunchContext para establecer recursos locales, variables de entorno, comandos y tokens para el constructor.
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, shellEnv, commands, null, allTokens.duplicate(), null);
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);
El objeto NMClientAsync
y su controlador de eventos son responsables de manejar los eventos del contenedor. Incluye inicio, detención, actualizaciones de estado y errores para el contenedor.
Después de que el ApplicationMaster
haya determinado que ha terminado, debe desregistrarse con el Cliente
de AM-RM y luego detener el Cliente
:
try {
amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
} catch (YarnException ex) {
LOG.error("Failed to unregister application", ex);
} catch (IOException e) {
LOG.error("Failed to unregister application", e);
}
amRMClient.stop();
El contenido anterior es el código principal de ApplicationMaster
. Después de editar, guarda el contenido y sale del editor vim
.