Escrevendo o Código do ApplicationMaster
Da mesma forma, use o editor vim para abrir o arquivo ApplicationMaster.java para escrever o código:
vim /home/hadoop/yarn_app/ApplicationMaster.java
package com.labex.yarn.app;
public class ApplicationMaster {
public static void main(String[] args){
//TODO:Edite o código aqui.
}
}
A explicação do código ainda está na forma de um segmento. Todo o código mencionado abaixo deve ser escrito na classe ApplicationMaster (ou seja, o bloco de código onde o comentário //TODO:Edite o código aqui. está localizado).
AM é o proprietário real do job, que é iniciado pelo RM e fornece todas as informações e recursos necessários por meio do Cliente para supervisionar e concluir a tarefa.
Como o AM é iniciado em um único contêiner, é provável que o contêiner compartilhe o mesmo host físico com outros contêineres. Dadas as características de multi-tenancy da plataforma de computação em nuvem e outros problemas, é possível não conseguir saber as portas pré-configuradas para ouvir no início.
Portanto, quando o AM inicia, ele pode receber vários parâmetros por meio do ambiente. Esses parâmetros incluem o ContainerId do contêiner AM, o tempo de envio da aplicação e detalhes sobre o host do NodeManager que está executando o AM.
Todas as interações com o RM exigem que uma aplicação seja agendada. Se este processo falhar, cada aplicação pode tentar novamente. Você pode obter o ApplicationAttemptId do ID do contêiner do AM. Existem APIs relacionadas que podem converter valores obtidos do ambiente em objetos.
Escreva o seguinte código:
Map<String, String> envs = System.getenv();
String containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
If (containerIdString == null) {
// O ID do contêiner deve ser definido na variável de ambiente do framework
Throw new IllegalArgumentException(
"ID do contêiner não definido no ambiente");
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
Depois que o AM for totalmente inicializado, podemos iniciar dois clientes: um cliente para o ResourceManager e outro para o NodeManager. Usamos um manipulador de eventos personalizado para configurá-lo, e os detalhes são discutidos posteriormente:
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
amRMClient.start();
containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
O AM deve enviar heartbeats para o RM periodicamente para que este último saiba que o AM ainda está em execução. O intervalo de expiração no RM é definido por YarnConfiguration e seu valor padrão é definido pelo item de configuração YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS no arquivo de configuração. O AM precisa se registrar no ResourceManager para começar a enviar heartbeats:
// Registre-se no RM e comece a enviar heartbeats para o RM
appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient.registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl);
As informações de resposta do processo de registro podem incluir a capacidade máxima de recursos do cluster. Podemos usar essas informações para verificar a solicitação da aplicação:
// Salve temporariamente informações sobre as capacidades de recursos do cluster no RM
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Capacidade máxima de memória de recursos neste cluster " + maxMem);
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Capacidade máxima de vcores de recursos neste cluster " + maxVCores);
// Use o limite máximo de memória para restringir o valor da solicitação de capacidade de memória do contêiner
if (containerMemory > maxMem) {
LOG.info("Memória do contêiner especificada acima do limite máximo do cluster."
+ " Usando o valor máximo." + ", especificado=" + containerMemory + ", max="
+ maxMem);
containerMemory = maxMem;
}
if (containerVirtualCores > maxVCores) {
LOG.info("Núcleos virtuais do contêiner especificados acima do limite máximo do cluster."
+ " Usando o valor máximo." + ", especificado=" + containerVirtualCores + ", max="
+ maxVCores);
containerVirtualCores = maxVCores;
}
List<Container> previousAMRunningContainers =
response.getContainersFromPreviousAttempts();
LOG.info("Recebido " + previousAMRunningContainers.size()
+ " contêineres em execução do AM anterior no registro do AM.");
Dependendo dos requisitos da tarefa, o AM pode agendar um conjunto de contêineres para executar tarefas. Usamos esses requisitos para calcular quantos contêineres precisamos e solicitar um número correspondente de contêineres:
int numTotalContainersToRequest = numTotalContainers - previousAMRunningContainers.size();
for (int i = 0; i < numTotalContainersToRequest; ++i) {
//Defina o objeto de solicitação para o contêiner de solicitação RM
ContainerRequest containerAsk = setupContainerAskForRM();
//Envie a solicitação do contêiner para o RM
amRMClient.addContainerRequest(containerAsk);
// Este loop significa sondar o RM em busca de contêineres após obter cotas totalmente alocadas
}
O loop acima continuará em execução até que todos os contêineres tenham sido iniciados e o script shell tenha sido executado (independentemente de ter sucesso ou falhar).
Em setupContainerAskForRM(), você precisa definir o seguinte:
- Capacidades de recursos: Atualmente, o YARN suporta requisitos de recursos baseados em memória, portanto, a solicitação deve definir quanta memória é necessária. Este valor é definido em megabytes e deve ser menor que o múltiplo exato das capacidades máximas e mínimas do cluster. Este recurso de memória corresponde ao limite de memória física imposto ao contêiner de tarefas. As capacidades de recursos também incluem recursos baseados em computação (vCore).
- Prioridade: Ao solicitar um conjunto de contêineres, o AM pode definir diferentes prioridades para as coleções. Por exemplo, o MapReduce AM pode atribuir uma prioridade mais alta aos contêineres exigidos pela tarefa Map, enquanto o contêiner da tarefa Reduce tem uma prioridade mais baixa:
Private ContainerRequest setupContainerAskForRM() {
/ / Defina a prioridade da solicitação
Priority pri = Priority.newInstance(requestPriority);
/ / Defina a solicitação para o tipo de recurso, incluindo memória e CPU
Resource capability = Resource.newInstance(containerMemory,
containerVirtualCores);
ContainerRequest request = new ContainerRequest(capability, null, null, pri);
LOG.info("Alocação de contêiner solicitada: " + request.toString());
Return request;
}
Depois que o AM enviou uma solicitação de alocação de contêiner, o contêiner é iniciado de forma assíncrona pelo manipulador de eventos do cliente AMRMClientAsync. Os programas que lidam com essa lógica devem implementar a interface AMRMClientAsync.CallbackHandler.
(1) Quando despachado para um contêiner, o manipulador precisa iniciar uma thread. A thread executa o código relevante para iniciar o contêiner. Aqui, usamos LaunchContainerRunnable para demonstração. Discutiremos esta classe mais tarde:
@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG.info("Recebeu resposta do RM para alocação de contêiner, allocatedCnt=" + allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
LaunchContainerRunnable runnableLaunchContainer =
new LaunchContainerRunnable(allocatedContainer, containerListener);
Thread launchThread = new Thread(runnableLaunchContainer);
// Inicie e execute o contêiner com threads diferentes, o que impede que a thread principal bloqueie quando todos os contêineres não podem ter recursos alocados
launchThreads.add(launchThread);
launchThread.start();
}
}
(2) Ao enviar um heartbeat, o manipulador de eventos deve relatar o progresso da aplicação:
@Override
public float getProgress() {
// Defina as informações de progresso e relate-as ao RM na próxima vez que você enviar um heartbeat
float progress = (float) numCompletedContainers.get() / numTotalContainers;
Return progress;
}
A thread de inicialização do contêiner realmente inicia o contêiner no NM. Depois de atribuir um contêiner ao AM, ele precisa seguir um processo semelhante ao que o Cliente segue ao configurar o ContainerLaunchContext para a tarefa final a ser executada no contêiner alocado. Depois de definir o ContainerLaunchContext, o AM pode iniciá-lo via NMClientAsync:
// Defina os comandos necessários para executar no contêiner alocado
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
// Defina o comando executável
vargs.add(shellCommand);
// Defina o caminho do script shell
if (!scriptPath.isEmpty()) {
vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
: ExecShellStringPath);
}
// Defina os parâmetros para os comandos shell
vargs.add(shellArgs);
// Adicione parâmetros de redirecionamento de log
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
// Obtenha o comando final
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
// Defina ContainerLaunchContext para definir recursos locais, variáveis de ambiente, comandos e tokens para o construtor.
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, shellEnv, commands, null, allTokens.duplicate(), null);
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);
O objeto NMClientAsync e seu manipulador de eventos são responsáveis por lidar com eventos de contêiner. Ele inclui iniciar, parar, atualizações de status e erros para o contêiner.
Depois que o ApplicationMaster determinar que está concluído, ele precisa se desregistrar com o Cliente do AM-RM e, em seguida, parar o Cliente:
try {
amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
} catch (YarnException ex) {
LOG.error("Falha ao cancelar o registro da aplicação", ex);
} catch (IOException e) {
LOG.error("Falha ao cancelar o registro da aplicação", e);
}
amRMClient.stop();
O conteúdo acima é o código principal do ApplicationMaster. Após a edição, salve o conteúdo e saia do editor vim.