本节分析一个作业从开始运行到运行结束,所经历的整个过程,期间涉及到的各种事件和状态变化。

在正式讲解作业生命周期之前,先要了解MRAppMaster中作业表示方式,每个作业由若干干Map Task和Reduce Task组成,每个Task进一步由若干个TaskAttempt组成,Job、Task和TaskAttempt的生命周期均由一个状态机表示,具体可参考https://issues.apache.org/jira/browse/MAPREDUCE-279(附件中的图yarn-state-machine.job.png,yarn-state-machine.task.png和yarn-state-machine.task-attempt.png)

作业的创建入口在MRAppMaster类中,如下所示:

public class MRAppMaster extends CompositeService {
  public void start() {
    ...
    job = createJob(getConfig());//创建Job
    JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
    jobEventDispatcher.handle(initJobEvent);//发送JOB_INI,创建MapTask,ReduceTask
    startJobs();//启动作业,这是后续一切动作的触发之源
    ...
  }
  protected Job createJob(Configuration conf) {
     Job newJob =
       new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
         taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
         completedTasksFromPreviousRun, metrics, committer, newApiCommitter,
         currentUser.getUserName(), appSubmitTime, amInfos, context);
         ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
     dispatcher.register(JobFinishEvent.Type.class,
        createJobFinishEventHandler());     
     return newJob;
 }
}

(1)作业/任务初始化

JobImpl会接收到.JOB_INIT事件,然后触发作业状态从NEW变为INITED,并触发函数InitTransition(),该函数会创建MapTask和

ReduceTask,代码如下:

public static class InitTransition 
      implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
  ...
  createMapTasks(job, inputLength, taskSplitMetaInfo);
  createReduceTasks(job);
  ...
}

其中,createMapTasks函数实现如下:

private void createMapTasks(JobImpl job, long inputLength,
                                TaskSplitMetaInfo[] splits) {
      for (int i=0; i < job.numMapTasks; ++i) {
        TaskImpl task =
            new MapTaskImpl(job.jobId, i,
                job.eventHandler, 
                job.remoteJobConfFile, 
                job.conf, splits[i], 
                job.taskAttemptListener, 
                job.committer, job.jobToken, job.fsTokens,
                job.clock, job.completedTasksFromPreviousRun, 
                job.applicationAttemptId.getAttemptId(),
                job.metrics, job.appContext);
        job.addTask(task);
      }
    }

(2)作业启动

public class MRAppMaster extends CompositeService {
  protected void startJobs() {
    JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
    dispatcher.getEventHandler().handle(startJobEvent);
  }
}

JobImpl会接收到.JOB_START事件,会触发作业状态从INITED变为RUNNING,并触发函数StartTransition(),进而触发Map Task和Reduce Task开始调:

public static class StartTransition
  implements SingleArcTransition<JobImpl, JobEvent> {
   public void transition(JobImpl job, JobEvent event) {
      job.scheduleTasks(job.mapTasks);  
      job.scheduleTasks(job.reduceTasks);
  }
}

这之后,所有Map Task和Reduce Task各自负责各自的状态变化,ContainerAllocator模块会首先为Map Task申请资源,然后是Reduce Task,一旦一个Task获取到了资源,则会创建一个运行实例TaskAttempt,如果该实例运行成功,则Task运行成功,否则,Task还会启动下一个运行实例TaskAttempt,直到一个TaskAttempt运行成功或者达到尝试次数上限。当所有Task运行成功后,Job运行成功。一个运行成功的任务所经历的状态变化如下(不包含失败或者被杀死情况):

【总结】

本文分析只是起到抛砖引入的作用,读者如果感兴趣,可以自行更深入的研究以下内容:

(1) Job、Task和TaskAttempt状态机设计(分别在JobImpl、TaskImpl和TaskAttemptImpl中)

(2) 在以下几种场景下,以上三个状态机的涉及到的变化:

  • kill job
  • kill task attempt
  • fail task attempt
  • container failed
  • lose node

原创文章,转载请注明: 转载自董的博客

本文链接地址: YARN/MRv2 MRAppMaster深入剖析—作业生命周期

微信公众号:hadoop-123,专注于大数据技术分享,欢迎加入!

说点什么

avatar
  Subscribe  
提醒