1. YARN简介

YARN(Yet Another Resource Negotiator)是Hadoop 2.0中的一个分支(与HDFS和MapReduce两个分支并列),它是一个资源管理系统,基本设计思想是将MRv1(MapReduce Version 1)中JobTracker的两个主要功能,即资源管理和作业控制(包括作业监控、容错等),分拆成两独立的进程,如图1所示,资源管理进程与具体应用程序无关,负责整个集群的资源(内存、CPU、磁盘等)管理和分配,而作业控制进程则是直接与应用程序相关的模块,且每个作业控制进程只负责管理一个作业。这样,通过将原有JobTracker中与应用程序相关和无关的模块分开,不仅克服了MapReduce中JobTracker扩展性差和单点故障会导致整个集群不可用等问题,也使得Hadoop支持更多的应用程序种类(不仅限于MapReduce一种)。

同MRv1一样,YARN总体上仍然是master/slaves结构,在整个资源管理框架中,ResourceManager为master,NodeManager为slave,ResourceManager负责对整个集群中的资源进行统一管理和调度,而NodeManager则负责本节点上的资源管理和任务运行。当用户提交一个应用程序时,需要提供一个用以跟踪和管理这个程序的组件ApplicationMaster,它负责向ResourceManager申请资源,并要求NodeManger启动可以占用一定资源的任务,由于不同的ApplicationMaster被分布到不同的节点上,因此它们之间不会相互影响。

2. Tez产生背景与定位

在开源界,当前最广泛使用的计算模型是MapReduce,该模型将计算过程抽象成Map和Reduce两个阶段,并通过shuffle机制将两个阶段连接起来。但在一些应用场景中,为了套用MapReduce模型解决问题,不得不将问题分解成若干个有依赖关系的子问题,每个子问题对应一个MapReduce作业,最终所有这些作业形成一个有向图(DAG,Directed Acyclic Graph),在该DAG中,由于每个节点是一个MapReduce作业,因此它们均会从HDFS上读一次数据和写一次数据(默认写三份),即使中间节点产生的数据仅是临时数据。很显然,这种表达依赖关系作业的方式是低效的,会产生大量不必要的磁盘和网络IO。

       为了更高效地运行存在依赖关系的作业(比如Pig和Hive产生的MapReduce作业),减少磁盘和网络IO,Hortonworks开发并开源了DAG计算框架Tez[1]

在实际大数据处理场景中,很多问题需转化成DAG模型解决,典型的有两类,分别是:

  • 用户编写的应用程序:很多场景下,用户编写的多个MapReduce应用程序之间存在依赖关系或者为了使用MapReduce解决一个问题,不得不将问题转化成一系列存在依赖关系的MapReduce作业,而为了表达这些作业的依赖关系,用户通常借助于像Oozie[2]或者Cascading[3]这样的流式作业管理工具。

举例说明】  在搜索引擎领域中,常常需要统计最近最热门的K个查询词,这就是典型的“Top K”问题,也就是从海量查询中统计出现频率最高的前K个。如果采用MapReduce模型解决该问题,则可分解成两个MapReduce作业,分别完成统计词频和找出词频最高的前K个查询词的功能,这两个作业存在依赖关系,第二个作业需要依赖前一个作业的输出结果。第一个作业是典型的WordCount问题。对于第二个作业,首先map函数输出前K个频率最高的词,然后reduce函数进汇总map任务的计算结果,并输出频率最高的前K个查询词。

为了采用MapReduce计算模型解决“Top K”问题,我们不得不将整个计算过程分解成两个MapReduce作业,而是实际上,如果有一种支持MAPàREDUCEàREDUCE的计算框架,则会更加灵活高效,而Tez正式这样的计算框架,具体如图2所示。

  • 类似Pig和Hive的系统:Pig和Hive是构建在MapReduce之上的系统,它们允许用户采用更易于编写的结构化语言或者脚本语言进行大数据处理,而用户编写的结构化语言或者脚本语言往往会转化成多个存在依赖关系的MapReduce作业,这种运行方式非常的低效。

举例说明】 以下Hive语句将被转化成了四个有依赖关系的MR作业,它们的运行过程如图3所示,而使用Tez则可大大简化计算过程。

SELECT a.state, COUNT(*), AVERAGE(c.price)
  FROM a
  JOIN b ON(a.id = b.id)
  JOIN c ON(a.itemId = c.itemId)
  GROUP BY a.state

通过上面的例子可以看出,Tez可以将多个有依赖的作业转换为一个作业(这样只需写一次HDFS,且中间节点较少),从而大大提升DAG作业的性能。

3. Tez实现原理

Tez直接源于MapReduce框架,核心思想是将Map和Reduce两个操作进一步拆分,即Map被拆分成Input、Processor、Sort、Merge和Output, Reduce被拆分成Input、Shuffle、Sort、Merge、Processor和Output等,这样,这些分解后的元操作可以灵活组合,产生新的操作,这些操作经过一些控制程序组装后,可形成一个大的DAG作业。总结起来,Tez有以下特点:

  • 运行在YARN之上;
  • 与MapReduce兼容,继承了MapReduce的各种优点(比如良好的扩展性与容错性);
  • 适用于DAG应用场景。

接下来重点介绍一下第三个特点,Apache当前有顶级项目Oozie用于DAG作业设计,但Oozie是比较高层(作业层面)的,它只是提供了一种多类型作业(比如MR程序、Hive、Pig等)依赖关系表达方式,并按照这种依赖关系提交这些作业,而Tez则不同,它在更底层提供了DAG编程接口,用户编写程序时直接采用这些接口进行程序设计,这种更底层的编程方式会带来更高的效率。

总起来说,Tez主要又两部分组成:数据处理引擎和DAGAppMaster,其中数据处理引擎提供了一套编程接口和数据计算操作符,而DAGAppMaster则是一个YARN ApplicationMaster,它使得Tez应用程序可运行在YARN上。

3.1 Tez数据处理引擎

Tez对外提供了6种可编程组件,分别是:

  • Input:对输入数据源的抽象,类似于MapReduce模型中的InputFormat,它解析输入数据格式,并输出一个个key/value。
  • Output:对输出数据源的抽象,类似于MapReduce模型中的OutputFormat,它按照一定的格式将用户程序产生的Key/value写入文件系统。
  • Paritioner:对数据进行分片,类似于MapReduce模型中的Partitioner。
  • Processor:对计算单元的抽象,它从Input中获取数据,经过用户定义的计算逻辑处理后,通过Output输出到文件系统中。
  • Task:对任务的抽象,每个Task由Input、Ouput和Processor三个组件构成。
  • Maser:管理各个Task的依赖关系,并按顺依赖关系执行它们。

Tez数据处理引擎是以上6种可编程组件的驱动器,它实现了一些常见的算法和组件,以提高用户编程效率。Tez数据处理引擎的基础是两种组件Sort(排序)和Shuffle(混洗),这两个算子直接来自MRv1中的数据处理引擎MapTask和ReduceTask。

为了用户使用方便,Tez提供了多种Input、Output、Task和Sort的实现,具体如下:

  • Input实现:LocalMergedInput(多个文件本地合并后作为输入),ShuffledMergedInput(远程拷贝数据且合并后作为输入);
  • Output实现:InMemorySortedOutput(内存排序后输出),LocalOnFileSorterOutput(本地磁盘排序后输出),OnFileSortedOutput(磁盘排序后输出);
  • Task实现:RunTimeTask(对Input、Ouput和Processor的封装);
  • Sort实现:DefaultSorter(对数据进行外部排序),InMemoryShuffleSorter(远程拷贝数据并对数据进行内存排序)。

3.2 DAGAppMaster实现

(1)DAG编程模型

Tez通过有向图模型组织应用程序中的Task,这些Task按照依赖关系可形成一个DAG,具体如图3所示,Tez采用图数据结构描述DAG组成,一个DAG由若干个顶点(Vertex)和连接这些顶点的边(Edge)组成,每个顶点对应一段作用在一个数据集上的处理逻辑,可同时启动多个任务,而边则定义了两个顶点之间的数据通信方式。如图4所示,MapReduce作业是一个非常简单的DAG,Map和Reduce相当于两个顶点,它们均可以同时启动多个任务处理数据,而Map和Reduce之间的Shuffle通信模型相当于连接两个顶点的边:

(2)Tez ApplicationMaster实现

为了能够让Tez运行在YARN之上,需开发一个YARN客户端和一个ApplicationMaster,本节重点介绍Tez 的YARN ApplicationMaster ——DAGAppMaster。

DAGAppMaster直接源自MapReduce的ApplicationMaster——MRAppMaster,重用了它的大部分实现机制和代码,总结起来,它主要完成以下几个功能:

  • 数据切分和作业分解。DAGAppMaster负责对输入数据集切分,并创建一系列任务处理这些数据;
  • 任务调度。将作业分解成一系列存在依赖关系的任务,并按照依赖关系调度和执行它们;
  • 与ResourceManager通信,为DAG作业申请资源;
  • 与NodeManager通信,启动DAG作业中的任务;
  • 监控DAG作业的运行过程,确保它快速运行结束。当一个任务运行失败时,会重新为它申请资源并启动它;当一个任务运行较慢时,会为它启动一个备份任务。

每个DAGAppMaster负责管理一个DAG作业,前面已经讲到,一个DAG由若干个顶点(Vertex)和连接这些顶点的边(Edge)组成,每个顶点对应一段作用在一个数据集上的处理逻辑。每个顶点可配置一定数目的并发度,并发度实际上就是任务个数。DAGAppMaster优先为那些不依赖任何顶点的顶点任务申请资源,一旦一个顶点中所有任务运行完成,则认为该顶点运行结束,则将该顶点从DAG中移除,再寻找新的不依赖任何顶点的顶点,并为它们申请资源,…,这样持续下去,直到所有顶点运行完成,这样,整个DAG作业便运行完成了。

如下图所示,对于DAG中的一个顶点(Vertex),它由一定数目的任务(Task)组成,这些任务分别处理输入数据集中的一份数据,一旦所有任务运行完成,则意味着该顶点运行完成;对于任何一个任务,它可能对应多个运行实例TaskAttempt,比如任务刚开始运行时,会创建一个运行实例,如果该实例运行失败,则会再启动一个实例重新运行;如果该实例运行速度过慢,则会为它再启动一个相同实例同时处理一份数据,这些机制均借鉴了MRv1中的设计。

4. Tez优化机制

(1)当前YARN存在的问题

  • 每个作业启用一个ApplicationMaster

MRv1中所有应用程序共用一个追踪器JobTracker,当JobTracker出现故障时,整个系统将不可用,且所有应用程序将运行失败。与MRv1不同,YARN中每个应用程序启用一个独立的应用程序追踪器ApplicationMaster,解决了MRv1中单点故障和扩展瓶颈问题。但这种方式将引入一个新的问题:应用程序延迟较大,即每个应用程序首先要申请资源启动一个ApplicationMaster后,才可以启动任务,也就是说,与MRv1中的应用程序运行过程相比,YARN应用程序将耗费更多的计算资源和产生更长的运行延迟,这不利于运行小作业和DAG作业,尤其是DAG作业(如Hive SQL和Pig产生的DAG作业),将需要更多的计算资源。

  • 资源无法重用

在MRv1中,用户可为自己的作业设置是否启用JVM重用功能,如果启用该功能,则同一个JVM可运行多个任务,从而降低作业延迟提高作业效率。在YARN MRAppMaster(MapReduce应用程序的ApplicationMaster)中,MRAppMaster总是为未运行的任务申请新的资源,也就是说,任务运行完成后便会释放对应的资源,并为接下来运行的任务重新申请资源,而不会向MRv1那样重用资源(JVM)。

(2)Tez引入的优化技术

为了克服当前YARN存在的问题,Apache Tez提出了一系列优化技术,其中值得一说的是ApplicationMaster缓冲池、预先启动Container、Container重用三项优化技术。

  • ApplicationMaster缓冲池

在Apache Tez中 ,用户并不是直接将作业提交到ResouceManager上,而是提交到一个叫AMPoolServer的服务上。该服务启动后,会预启动若干个ApplicationMaster,形成一个ApplicationMaster缓冲池,这样,当用户提交作业时,直接将作业提交到某个已经启动的ApplicationMaster上即可。这样做的好处是,避免了每个作业用时动态启动一个独立的ApplicationMaster。

  • 预先启动Container

ApplicationMaster缓冲池中的每个ApplicationMaster启动时可以预先启动若干个Container,以提高作业运行效率。

  • Container重用

一个任务运行完成后,ApplicationMaster不会马上注销它使用的Container,而是将它重新分配给其他未运行的任务,从而达到资源重用的目的。

5. Tez应用场景

  • 直接编写应用程序

同使用MapReduce计算框架编程应用程序一样,用户也可以直接使用Tez编写DAG类型的应用程序。此外,Tez还提供了一个MR到DAG转换的工具,通过使用该工具,用户很容易将多个有依赖关系的MapReduce作业合并成一个DAG作业,这将大大减少磁盘IO次数,从而提高程序运行效率。

  • 优化Pig、Hive等引擎

Hortonworks正在尝试将Tez应用到Hive引擎中[4](该系统被称为Stinger),从而依靠Tez数据处理引擎的更灵活的表达方式为Hive带来性能提升,Hortonworks官方博客的测试结果表明,Stinger性能比Hive提升几十倍,从内部实现看,导致性能提升的因素很多,包括高效的数据存储格式,更加智能的查询计划优化器等,但从Tez引擎角度看,它为Stinger带来的好处入如下:

  • 避免查询语句转换成过多的MapReduce作业后产生大量不必要的网络和磁盘IO

当一个查询语句中存在多个join、sort或者group by操作时,Hive会将该语句转化成多个存在依赖关系的MapReduce作业,且每个作业均需将输出结果写到HDFS上,再由下一个作业读取作进一步处理,而引入Tez后,整个查询语句将被转化成一个MapReduce作业,整个计算过程只需写一次HDFS,从而大大减少磁盘和网络IO。下面给出一个Hive SQL示例予以说明:

SELECT a.state, COUNT(*)
  FROM a JOIN b ON(a.id = b.id)
  GROUP BY a.state

该SQL包含一个两表连接操作和一个分组操作,因此,Hive会将它转化成两个MapReduce作业:第一个作业进行两表连接,并将结果写到HDFS上,第二个作业从HDFS上读取第一个作业的输出结果,进行分组后,将数据再次写到HDFS上。而使用Tez后,该SQL只被转化成一个DAG作业,可减少中间写HDFS和读HDFS的IO开销,具体如图6所示。

  • 更加智能的任务处理引擎

在MapReduce模型中,为了提高容错性,Map Task产生的结果必须写一次磁盘,而Tez是可选的,可根据任务产生的数据量决定存将其存放到内存中还是写磁盘,这可大大减少中间临时数据的读写IO,能显著提高实时任务和交互式任务的执行效率,具体如图7所示。


6. 与其他系统比较

  • 与类Oozie系统比较

Oozie[5]是一个工作流调度系统(其它类似系统还有Cascading[6]、Azkaban[7]等),它能够按照用户事先定义好的作业依赖关系调度这些作业,并提供了灵活的作业级别的依赖关系表达方式和容错机制,但与Tez这种低级的DAG执行引擎不同,Oozie只是一种作业依赖关系表达和调度框架,它在逻辑上并没有将有依赖关系的作业合并成一个作业来优化IO读写,换句话说,Oozie只是方便用户组织和表达存在依赖关系的MapReduce作业而设计的,并不能用于优化DAG作业的执行效率。

  • 与MapReduce比较

MapReduce只是一种简单的数据处理模型,它将数据处理过程简化为Map和Reduce两个阶段,这限制了MapReduce的计算表达能力,而Tez则不同,它可以包含任意多个数据处理阶段,并提供了一种更加灵活高效的编程模型,可以完成MapReduce难以高效表达的计算场景,典型的场景是数据挖掘和自然语言处理经常用到的DAG计算。Tez可作为MapReduce之下的数据处理引擎,即用户仍采用现有的MapReduce API编写程序,但使用Tez将之组装成一个DAG作业。

  • 与Spark比较

Tez和Spark均为了克服MapReduce在内存计算、DAG计算方面性能低下而提出的,它们存在非常多的共性,主要如下:

  • 提供了多种算子(比如Map、Shuffle等)供用户使用;
  • 通过减少不必要的磁盘和网络IO提高DAG作业的性能;
  • 可运行在YARN上;
  • 充分利用内存提高小作业执行效率。

当然,它们也存在很多不同,主要有:

  • 支持的算子种类。 Spark提供的算子种类要远远多于Tez,比如数据传输模型方面,Spark支持多对多(Shuffle),一对多和多对一等算子,而Tez目前仅支持多对多(Shuffle)一种,但Tez正在尝试丰富它的算子库,相信在不久的将来,两者在这方面不存在明显差距。
  • 与MapReduce兼容性。Tez是直接从Hadoop MapReduce计算框架演化而来的,不仅继承了它的良好扩展性,容错性等优点,而且与MapReduce编程接口完全监控,用户可以直接将自己的MapReduce程序运行在Tez上,也可以很容易地将已有的MapReduce DAG作业转换成Tez作业。Spark则是完全重新开发的计算模型,与现有的MapReduce编程接口无法兼容。
  • 实践验证。 Tez是直接在Hadoop MapReduce源代码基础上修改而来的,充分利用了MapReduce自身强大的扩展性和容错性等优点,且这些特性已经在实践中得到验证,但Spark的扩展性和容错性有待验证。
  • 开发语言。 Spark采用了Scala作为开发语言,充分利用了Scala的优点,这使得Spark代码非常精简,但新语言的引入增加了维护成本;Tez则不同,它仍采用Java作为开发语言,与Hadoop整个生态系统一脉相承。

7. 总结

Tez目前还处于开发阶段,尚未发布alpha版,从源代码可看出,很多地方仍保留了Hadoop MapReduce原有的注释,且代码中存在大量TODO标识尚未实现,但从它的jira[8]系统上可以看出,它的更新速度非常快,可以预想,一旦Tez成熟之后,用户可以很容易得使用Tez编写DAG应用程序,同时,经过Tez优化的Hive(Stinger)的运算速度也会更快,加上Hive正在开发的cost-based查询优化器,Hive将有能力在实时计算、交互式计算和离线计算等领域发挥更大更强的作用。此外,Tez和Spark、Stinger和Shark(运行在Spark之上的SQL引擎)将直接存在竞争关系,这将用户带来更多选择。

8. 参考资料

  • Hortonworks官方博客:http://hortonworks.com/blog/ .
  • Apache Tez Jira:https://issues.apache.org/jira/browse/TEZ .
  • Tez Code:https://github.com/apache/incubator-tez .
  • “The Stinger Initiative – Making Apache Hive 100 Times Faster”, Alan Gates, Arun Murhty, Owen O’Malley, April 2013 HUG.
  • “Apache Tez: Accelerating Hadoop Query Processing”, Bikas Saha, Arun Murhty, Hadoop Summit June 26-27, 2013.

原创文章,转载请注明: 转载自董的博客

本文链接地址: Tez:运行在YARN上的DAG计算框架

微信公众号:hadoop-123,专注于大数据技术分享,欢迎加入!

说点什么

avatar
  Subscribe  
提醒