1. CoronaJobTracker功能

CoronaJobTracker实际上是一个单Job版本的JobTracker,它是在MRv1的JobTracker基础上修改而来的,它只负责管理一个Job的生命周期,包括该Job的创建、并行化、任务失败时重启、任务运行慢时为其额外启动一个备份任务等。相比于JobTracker,CoronaJobTracker由于不再负责资源管理和任务调度(这些由ClusterManager负责),它更加轻量级,维护的信息更少,且它会被随机调度到某个CoronaTaskTracker上执行,因此,它不存在JobTracker所具有的那些缺点,比如维护信息过多影响系统扩展性,一旦失败则整个系统不可用等。

MRv1的架构图如下所示:

Corona的架构图如下所示:

2. CoronaJobTracker实现方法

CoronaJobTracker是在MRv1的JobTracker基础上修改而来的,阅读代码可发现,它仍保持JobTracker的基本代码框架,但代码更加简洁,毕竟它只需要维护一个作业的运行时信息。此外,作业的信息维护是由CoronaJobInProgress完成的,它是在MRv1的JobInProgress类基本上修改而来的,它的所有代码与facebook版本的MRv1中的JobInProgress基本一致,只是将作业的任务获取函数进行了修改(即函数ObtainNewXXX()),改后的函数只需要返回某个TaskInProgress(这个类未经过修改,直接重用)的Task Attempt即可。

       CoronaJobTracker最大的修改是RPC部分。Corona将MRv1中单向通信模型改为了双向通信模型,以便实现push-based模型以减小作业延时。为此,需要为每对通信双方额外添加一个通信协议,进而使每个服务既是RPC client,又是RPC server。CoronaJobTracker与ClusterManager之间采用了thirft通信协议,具体定义在src\contrib\corona\interface\ClusterManager.thrift中:

(1)SessionDriverService   ClusterManager通过该协议为CoronaJobTracker分配资源、回收资源和处理死节点。

(2)ClusterManagerService       CoronaJobTracker通过该协议向ClusterManager申请资源、释放资源等。

CoronaJobTracker与CoronaTaskTracker之间仍采用原始的Hadoop RPC实现,具体如下:

(1)     InterTrackerProtocol     与MRv1一样,CoronaTaskTracker通过该协议向CoronaJobTracker汇报各个任务的运行状态。

(2)     CoronaTaskTrackerProtocol  CoronaJobTracker通过该协议向CoronaTaskTracker下达各种命令,比如启动新任务,杀死任务等。(ClusterManager通过该协议让CoronaTaskTracker启动某个CoronaJobTracker。)

3. CoronaJobTracker架构

CoronaJobTracker主要组成模块/线程如下如所示,下面分别对这几个模块进行介绍。

  • ResourceUpdater

CoronaJobTracker中的一个独立线程,每隔1s重复以下动作:更新Speculative任务的数目,处死节点上的任务(重新为之申请资源),向ClusterManager汇报最新的资源需求和待释放的资源。

  • interTrackerServer

实现InterTrackerProtocol协议的RPC server,它接收来自CoronaTaskTracker的心跳信息,并根据需要更新任务的运行状态。

  • ExpireTasks

CoronaJobTracker中的一个独立线程,它包含两个功能:跟踪新分配的任务是否成功启动,如果一定时间内未成功启动则需重新启动它;跟踪每个任务的运行状态,如果一个任务在一定时间内未汇报进度,则将其杀死。

  • ResourceTracker

调整作业所需的资源。在作业启动时,将map和reduce任务所需的资源转化为标准形式,并在任务失败时为其重新计算资源,此外,它还会调整speculative任务所需的资源。在Corona中,资源标准表示形式如下:

< id, hosts, specs, type, excludeHosts>

其中 ,hosts为期望资源所在的节点(数据本地性),specs为资源描述,当前会传递4中资源,分别是内存、网络、CPU和磁盘,但实际上仅使用了内存、磁盘和CPU三种资源,type为资源类型,当前有3种类型,分别是MAP(用于运行Map Task)、REDUCE(用于运行Reduce Task)和JOBTRACKER(用于运行CoronaJobTracker),excludeHosts为hosts黑名单,即分配的资源不能来自这些节点,可能是因为作业在该节点上失败过或者其他原因。默认情况下,每个Map Task所需的资源配置为:<1 CPU, 10MBps network, 1024MB Memory, 10GB Disk>,每个Reduce Task所需的资源配置为:<1 CPU, 50MBps network, 1024MB Memory, 10GB Disk>,每个Corona JobTracker所需的资源配置与Reduce Task相同。

  • AssignTasksThread

将申请到的资源分配给各个Task,并与对应的CoronaTaskTracker启动,要求它启动Task。该线程会调用CoronaTaskLauncher,让它与CoronaTaskTracker通信启动Task。

  • CoronaJobHistory

对作业的各种行为(如启动作业,启动任务,任务完成等)做日志,并在需要的时候解析这些日志。

  • CoronaJobInProgress

管理作业的运行周期,包括作业的状态,各个任务的状态、进度等信息。与MRv1中的JobInProgress功能一致。

  • CoronaTaskLauncher

与CoronaTaskTracker通信,以要求它启动新任务。为了防止单个有问题的CoronaTaskTracker阻塞了后面新任务的启动,CoronaTaskLauncher采用多个发送线程与CoronaTaskTracker通信。

原创文章,转载请注明: 转载自董的博客

本文链接地址: Corona深入剖析系列—CoronaJobTracker实现

微信公众号:hadoop-123,专注于大数据技术分享,欢迎加入!

说点什么

avatar
  Subscribe  
提醒