Writing ApplicationMaster Code
Similarly, use vim
editor to open the ApplicationMaster.java
file to write code:
vim /home/hadoop/yarn_app/ApplicationMaster.java
package com.labex.yarn.app;
public class ApplicationMaster {
public static void main(String[] args){
//TODO:Edit code here.
}
}
The code explanation is still in the form of a segment. All the code mentioned below should be written in the ApplicationMaster
class (i.e., the code block where the comment //TODO:Edit code here.
is located).
AM is the actual owner of the job, which is started by RM and provides all the necessary information and resources through the Client to supervise and finish the task.
Since AM is started in a single container, it is likely that the container shares the same physical host with other containers. Given the multi-tenancy features of the cloud computing platform and other issues, it is possible to be unable to know the pre-configured ports to listen to at the beginning.
So, when AM starts, it can be given several parameters through the environment. These parameters include the ContainerId
of the AM container, the submission time of the application and details about the NodeManager
host which is running AM.
All the interactions with RM require an application to schedule. If this process fails, each application may try again. You can get ApplicationAttemptId
from the container ID of AM. There are related APIs that can convert values obtained from the environment into objects.
Write the following code:
Map<String, String> envs = System.getenv();
String containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
If (containerIdString == null) {
// The container ID should be set in the environment variable of the framework
Throw new IllegalArgumentException(
"Container ID not set in the environment");
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
After AM has been fully initialized, we can start two clients: one client to the ResourceManager and the other one to the NodeManager. We use a custom event handler to set it up, and the details are discussed later:
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
amRMClient.start();
containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
AM must send heartbeats to RM periodically so that the latter knows that AM is still running. The expiration interval on the RM is defined by YarnConfiguration
and its default value is defined by the YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS
configuration item in the configuration file. AM needs to register itself in the ResourceManager
to start sending heartbeats:
// Register yourself with RM and start sending heartbeats to RM
appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient.registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl);
The response information of the registration process may include the maximum resource capacity of the cluster. We can use this information to check the application's request:
// Temporarily save information about cluster resource capabilities in RM
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capability of resources in this cluster " + maxMem);
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max vcores capability of resources in this cluster " + maxVCores);
// Use the maximum memory limit to constrain the container's memory capacity request value
if (containerMemory > maxMem) {
LOG.info("Container memory specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerMemory + ", max="
+ maxMem);
containerMemory = maxMem;
}
if (containerVirtualCores > maxVCores) {
LOG.info("Container virtual cores specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerVirtualCores + ", max="
+ maxVCores);
containerVirtualCores = maxVCores;
}
List<Container> previousAMRunningContainers =
response.getContainersFromPreviousAttempts();
LOG.info("Received " + previousAMRunningContainers.size()
+ " previous AM's running containers on AM registration.");
Depending on the task requirements, AM can schedule a set of containers to run tasks. We use these requirements to calculate how many containers we need and request a corresponding number of containers:
int numTotalContainersToRequest = numTotalContainers - previousAMRunningContainers.size();
for (int i = 0; i < numTotalContainersToRequest; ++i) {
//Set the request object to the RM request container
ContainerRequest containerAsk = setupContainerAskForRM();
//Send container request to RM
amRMClient.addContainerRequest(containerAsk);
// This loop means polling RM for containers after getting fully allocated quotas
}
The above loop will keep on running until all the containers have been started and the shell script has been executed (no matter if it succeeds or fails).
In setupContainerAskForRM()
, you need to set the following:
- **Resource capabilities:**Currently, YARN supports memory-based resource requirements, so the request should define how much memory is needed. This value is defined in megabytes and must be less than the exact multiple of the cluster's maximum and minimum capabilities. This memory resource corresponds to the physical memory limit imposed on the task container. Resource capabilities also include computation-based resources (vCore).
- Priority: When requesting a container set, AM can define different priorities for collections. For example, MapReduce AM can assign a higher priority to the containers required by the Map task, while the Reduce task container has a lower priority:
Private ContainerRequest setupContainerAskForRM() {
/ / Set the priority of the request
Priority pri = Priority.newInstance(requestPriority);
/ / Set the request for the resource type, including memory and CPU
Resource capability = Resource.newInstance(containerMemory,
containerVirtualCores);
ContainerRequest request = new ContainerRequest(capability, null, null, pri);
LOG.info("Requested container allocation: " + request.toString());
Return request;
}
After AM has sent a container allocation request, the container is started asynchronously by the event handler of the AMRMClientAsync
client. Programs that handle this logic should implement the AMRMClientAsync.CallbackHandler
interface.
(1) When dispatched to a container, the handler needs to start a thread. The thread runs the relevant code to start the container. Here we use LaunchContainerRunnable
for demonstration. We will discuss this class later:
@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG.info("Got response from RM for container allocation, allocatedCnt=" + allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
LaunchContainerRunnable runnableLaunchContainer =
new LaunchContainerRunnable(allocatedContainer, containerListener);
Thread launchThread = new Thread(runnableLaunchContainer);
// Start and run the container with different threads, which prevents the main thread from blocking when all the containers cannot be allocated resources
launchThreads.add(launchThread);
launchThread.start();
}
}
(2) When sending a heartbeat, the event handler should report the progress of the application:
@Override
public float getProgress() {
// Set the progress information and report it to RM the next time you send a heartbeat
float progress = (float) numCompletedContainers.get() / numTotalContainers;
Return progress;
}
The container's startup thread actually starts the container on NM. After assigning a container to AM, it needs to follow a similar process that the Client follows when setting the ContainerLaunchContext
up for the final task to run on the allocated container. After defining the ContainerLaunchContext
, the AM can start it via NMClientAsync
:
// Set the necessary commands to execute on the allocated container
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
// Set the executable command
vargs.add(shellCommand);
// Set the path of the shell script
if (!scriptPath.isEmpty()) {
vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
: ExecShellStringPath);
}
// Set parameters for shell commands
vargs.add(shellArgs);
// Add log redirection parameters
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
// Get the final command
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
// Set ContainerLaunchContext to set local resources, environment variables, commands and tokens for the constructor.
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, shellEnv, commands, null, allTokens.duplicate(), null);
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);
NMClientAsync
object and its event handler are responsible for handling container events. It includes start, stop, status updates and errors for the container.
After the ApplicationMaster
has determined that it's done, it needs to unregister with the Client
of AM-RM and then stop the Client
:
try {
amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
} catch (YarnException ex) {
LOG.error("Failed to unregister application", ex);
} catch (IOException e) {
LOG.error("Failed to unregister application", e);
}
amRMClient.stop();
The content above is the main code of ApplicationMaster
. After editing, save the content and exit vim
editor.