1. 概要

本文主要介绍了Hadoop Streaming的一些高级编程技巧,包括,怎样在mapredue作业中定制输出输出格式?怎样向mapreduce作业中传递参数?怎么在mapreduce作业中加载词典?怎样利用Hadoop Streamng处理二进制格式的数据等。

关于Hadoop Streaming的基本编程方法,可参考:Hadoop Streaming编程

2. 在mapreduce作业中定制输入输出格式

Hadoop 0.21.0之前的版本中的Hadoop Streaming工具只支持文本格式的数据,而从Hadoop 0.21.0开始,也支持二进制格式的数据。这里介绍文本文件的输入输出格式定制,关于二进制数据的格式,可参考第5节。

Hadoop Streaming提交作业的格式为:

Usage: $HADOOP_HOME/bin/hadoop jar \   $HADOOP_HOME/hadoop-streaming.jar [options]

其中,-D选项中的一些配置属性可定义输入输出格式,具体如下(注意,对于文本而言,每一行中存在一个key/value对,这里只能定制key和value之间的分割符,而行与行之间的分隔符不可定制,只能是\n):

(1)stream.map.input.field.separator/stream.map.output.field.separator: map task输入/输出数据的分隔符,默认均为\t。

(2)stream.num.map.output.key.fields:指定map task输出记录中key所占的域数目,如

每一行形式为,Key1\tkey2\tkey3\tvalue,采用默认的分隔符,且stream.num.map.output.key.fields设为2,则Key1\tkey2表示key,key3\tvalue表示value。

(3)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task输入/输出数据的分隔符,默认均为\t。

(4)stream.num.reduce.output.key.fields:指定reduce task输出记录中key所占的域数目

3. 向mapreduce作业传递参数

提交作业时,使用-cmdenv选项以环境变量的形式将你的参数传递给mapper/reducer,如:

 $HADOOP_HOME/bin/hadoop jar \
   contrib/streaming/hadoop-0.20.2-streaming.jar \
   -input input \
   -ouput output \
   -cmdenv grade=1 \
   …….

然后编写mapper或reducer时,使用main函数的第三个参数捕获你传入的环境变量,如:

int main(int argc, char*argv[], char*env[]){
int i, grade;
for(i = 0; env[i] != NULL; i++)
  if(strncmp(env[i], “grade=”, 6) == 0)
    grade=atoi(env[i]+6);
   …...
}

4. 在mapreduce作业中加载词典

提交作业时,使用-file选项,如:

 $HADOOP_HOME/bin/hadoop jar \
   contrib/streaming/hadoop-0.20.2-streaming.jar \
   -input input \
   -ouput output \
   -file dict.txt \
   …….

然后编写mapper或reducer时,像本地文件一样打开并使用dic.txt文件,如:

int main(int argc, char*argv[], char*env[]){
  FILE *fp;
  char buffer[1024];
  fp = fopen("dict.txt","r");
  if(!fp) return 1;
  while(fgets(buffer, 1024, fp)!=NULL) {
    ……
  }
  ……
}

如果要加载非常大的词典或配置文件,Hadoop Streaming还提供了另外一个选项-files,该选项后面跟的是HDFS上的一个文件(将你的配置文件放到HDFS上,再大也可以!!!),你可以在程序中像打开本地文件一样打开该文件,此外,你也可以使用#符号在本地建一个系统链接,如:

$HADOOP_HOME/bin/hadoop jar \
  contrib/streaming/hadoop-0.20.2-streaming.jar \
  -file  hdfs://host:fs_port/user/dict.txt#dict_link \
  …….

在代码中这样做:

如:

int main(int argc, char*argv[], char*env[]){
  FILE *fp;
  char buffer[1024];
  fp = fopen("dict_link ","r"); //or fp = fopen("dict.txt ","r");
  if(!fp) return 1;
  while(fgets(buffer, 1024, fp)!=NULL) {
  ……
  }
  ……
}

5. 处理二进制格式的数据

从Hadoop 0.21.0开始,streaming支持二进制文件(具体可参考:HADOOP-1722),用户提交作业时,使用-io选项指明二进制文件格式。0.21.0版本中增加了两种二进制文件格式,分别为:

(1) rawbytes:key和value均用【4个字节的长度+原始字节】表示

(2) typedbytes:key和value均用【1字节类型+4字节长度+原始字节】表示

用户提交作业时,如果用-io指定二进制格式为typedbytes,则map的输入输出,reduce的输入输出均为typedbytes,如果想细粒度的控制这几个输入输出,可采用以下几个选项:

-D stream.map.input=[identifier]
-D stream.map.output=[identifier]
-D stream.reduce.input=[identifier]
-D stream.reduce.output=[identifier]

你如果采用的python语言,下面是从 HADOOP-1722 中得到的一个例子(里面用到了解析typedbytes的python库,见:http://github.com/klbostee/typedbytes ):

mapper脚本如下:

import sys
import typedbytes

input =  typedbytes.PairedInput(sys.stdin)
output = typedbytes.PairedOutput(sys.stdout)
for(key, value) in input:
  for word in value.split():
    output.write((word, 1))

reducer脚本:

import sys
import typedbytes
from itertools import groupby
from operator import itemgetter

input =  typedbytes.PairedInput(sys.stdin)
output = typedbytes.PairedOutput(sys.stdout)
for(key, group) in groupby(input, itemgetter(0)):
  values = map(itemgetter(1), group)
  output.write((key, sum(values)))

6. 自定义counter并增加counter的值

用户采用某种语言编写的mapper或者reducer可采用标准错误输出(stderr)自定义和改变counter值,格式为:reporter:counter:<group>,<counter>,<amount>,如,在C语言编写的mapper/reducer中:

 fprintf(stderr, “reporter:counter:group,counter1,1”); //将组group中的counter1增加1

注:用户定义的自定义counter的最终结果会在桌面或者web界面上显示出来。

如果你想在mapreduce作业执行过程中,打印一些状态信息,同样可使用标准错误输出,格式为:reporter:status:<message>,如,在C语言编写的mapper/reducer中:

 fprintf(stderr, “reporter:status:mapreduce job is started…..”); //在shell桌面上打印“mapreduce job is started…..”

7. 在mapreduce使用Linux Pipes

迄今为止(0.21.0版本之前,包括0.21.0),Hadoop Streaming是不支持Linux Pipes,如:-mapper “cut -f1 | sed s/foo/bar/g”会报”java.io.IOException: Broken pipe”错误。

8. 在mapreduce中获取JobConf的属性值

在0.21.0版本中,streaming作业执行过程中,JobConf中以mapreduce开头的属性(如mapreduce.job.id)会传递给mapper和reducer,关于这些参数,可参考:http://hadoop.apache.org/mapreduce/docs/r0.21.0/mapred_tutorial.html#Configured+Parameters

其中,属性名字中的“.”会变成“_”,如mapreduce.job.id会变为mapreduce_job_id,用户可在mapper/reducer中获取这些属性值直接使用(可能是传递给环境变量参数,即main函数的第三个参数,本文作业还未进行验证)。

9. 一些Hadoop Streaming的开源软件包

(1) 针对Hadoop Streaming常用操作的C++封装包(如自定义和更新counter,输出状态信息等):https://github.com/dgleich/hadoopcxx

(2) C++实现的typedbytes代码库:https://github.com/dgleich/libtypedbytes

(3) python实现的typedbytes代码库: http://github.com/klbostee/typedbytes

(4) Java实现的typedbytes代码库(Hadoop 0.21.0代码中自带)

10. 总结

Hadoop Streaming使得程序员采用各种语言编写mapreduce程序变得可能,它具备程序员所需的大部分功能接口,同时由于这种方法编写mapreduce作业简单快速,越来越多的程序员开始尝试使用Hadoop Steraming。

11. 参考资料

原创文章,转载请注明: 转载自董的博客

本文链接地址: Hadoop Streaming高级编程

微信公众号:hadoop-123,专注于大数据技术分享,欢迎加入!

1
说点什么

avatar
1 Comment threads
0 Thread replies
0 Followers
 
Most reacted comment
Hottest comment thread
0 Comment authors
Recent comment authors
  Subscribe  
最新 最旧 得票最多
提醒
trackback

[…] Streaming高级编程方法,可参考这篇文章: Hadoop Streaming高级编程 […]