当前位置: 首页>>hadoop 1.0>> 阅读正文

Hadoop Streaming 编程

Category: hadoop 1.0 View: 273,614 Author: Dong
, ,

  • 评论 (22)
  • 引用通告 (3)
发表评论 发起引用

  • 1楼wens 回复

    Post: 2012-05-03 05:50

     char *querys  = index(buffer, ‘ ‘);
                char *query = NULL;
                if(querys == NULL) continue;
                querys += 1; /*  not to include ‘\t’ */

    c语言中定义的querys的作用是什么?后面的代码没有用到啊~~~谢谢

    [回复]

    Dong 回复:

    去掉吧,这个地方本来要进行特殊情况检查的。

    [回复]

  • 2楼alex 回复

    Post: 2012-10-09 04:04

    可以将执行文件放在 hdfs中么?

    [回复]

    Dong 回复:

    可以的,可以事先放到HDFS上。

    [回复]

    alex 回复:

    怎么设置, 我尝试用 -files 和 -file 都不成功
    另外,在hadoop中,什么情况下把文件放在磁盘上比较好? 什么情况下放在hdfs下比较好?
    谢谢

    [回复]

  • 3楼jimmy 回复

    Post: 2013-04-01 03:15

    简单清晰。赞~

    [回复]

  • 4楼BBC 回复

    Post: 2013-08-22 07:42

    为什么我写了一个类似的C程序,可以在每台机器的本地执行cat input.txt |./Mapper |sort |./Reducer,但是在Hadoop streaming上每次都无法完成执行?(我在程序中使用了opencv的函数加载图像数据,不知道是不是这样不行?)

    [回复]

  • 5楼armon 回复

    Post: 2013-09-11 07:03

    董大侠,我使用你的C语言程序跑了一下,job一直不成功啊,help!我是在单机模式下的,每次执行reduce job都是0%->33%->0%->33%…然后最后错误日志如下:
    ERROR streaming.StreamJob: Job not successful. Error: # of failed Reduce Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201309112126_0005_r_000000
    我把脚本添加如下一句话还是一样的错误:
    -jobconf mapred.reduce.tasks=1

    [回复]

    armon 回复:

    不好意思,是我代码写错了,已解决~

    [回复]

    jinking01 回复:

    我也遇到如下问题了:
    每次执行reduce job都是0%->33%->0%->33%…然后最后错误日志如下:
    ERROR streaming.StreamJob: Job not successful. Error: # of failed Reduce Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201309112126_0005_r_000000
    我用的是集群,你说的你代码写错了,到底是哪错了?

    [回复]

    Dong 回复:

    shell终端上看到的错误无法用于诊断,你可以在界面上查看详细信息,或者查看任务日志,里面有具体的错误原因。

    [回复]

    shanlinglc 回复:

    你好,请问这个问题是怎么解决的呢?我用python写的,把worldcount的代码的mapper换了一下,不过确认测试是正确的,但是最后出现了这种情况,不知道你是错在哪儿的,怎么解决的?能分享一下吗?

    [回复]

  • 6楼望海悦人 回复

    Post: 2014-02-25 12:29

    董大侠,我用python语言跑了一下woedcount例子一直不成功,我在本地测试python代码是可行的,放在分布式集群上就不行了 错误如下:
    root@D:/usr/hadoop/hadoop-1.0.4/bin# ./hadoop jar /usr/hadoop/hadoop-1.0.4/contrib/streaming/hadoop-streaming-1.0.4.jar -mapper /op/Mapper.py -reducer /op/Reducer.py -input /input/* -output /pyout
    packageJobJar: [/home/tmp/hadoop-root/hadoop-unjar6784931193663934358/] [] /tmp/streamjob6868795349759160255.jar tmpDir=null
    14/02/25 16:34:20 INFO util.NativeCodeLoader: Loaded the native-hadoop library
    14/02/25 16:34:20 WARN snappy.LoadSnappy: Snappy native library not loaded
    14/02/25 16:34:20 INFO mapred.FileInputFormat: Total input paths to process : 1
    14/02/25 16:34:20 INFO streaming.StreamJob: getLocalDirs(): [/home/tmp/hadoop-root/mapred/local]
    14/02/25 16:34:20 INFO streaming.StreamJob: Running job: job_201402251626_0001
    14/02/25 16:34:20 INFO streaming.StreamJob: To kill this job, run:
    14/02/25 16:34:20 INFO streaming.StreamJob: /usr/hadoop/hadoop-1.0.4/libexec/../bin/hadoop job -Dmapred.job.tracker=D:9001 -kill job_201402251626_0001
    14/02/25 16:34:20 INFO streaming.StreamJob: Tracking URL: http://D:50030/jobdetails.jsp?jobid=job_201402251626_0001
    14/02/25 16:34:21 INFO streaming.StreamJob: map 0% reduce 0%
    14/02/25 16:35:02 INFO streaming.StreamJob: map 100% reduce 100%
    14/02/25 16:35:02 INFO streaming.StreamJob: To kill this job, run:
    14/02/25 16:35:02 INFO streaming.StreamJob: /usr/hadoop/hadoop-1.0.4/libexec/../bin/hadoop job -Dmapred.job.tracker=D:9001 -kill job_201402251626_0001
    14/02/25 16:35:02 INFO streaming.StreamJob: Tracking URL: http://D:50030/jobdetails.jsp?jobid=job_201402251626_0001
    14/02/25 16:35:02 ERROR streaming.StreamJob: Job not successful. Error: # of failed Map Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201402251626_0001_m_000000
    14/02/25 16:35:02 INFO streaming.StreamJob: killJob…
    Streaming Command Failed!
    我真不知道是哪里错了?谢谢解答

    [回复]

  • 7楼Dong 回复

    Post: 2014-02-26 01:27

    改成这样:
    ./hadoop jar /usr/hadoop/hadoop-1.0.4/contrib/streaming/hadoop-streaming-1.0.4.jar -mapper Mapper.py -reducer Reducer.py -file /op/Mapper.py -file /op/Reducer.py -input /input/* -output /pyout 我这篇文章最后有说明

    [回复]

  • 8楼gucasboy 回复

    Post: 2014-03-13 04:38

    懂大侠,我想请教一下,使用streaming后,reducer的输入也像java那样,是按key值排序、合并好的吗?
    reducer的输入是什么样的格式?

    [回复]

    Dong 回复:

    reduce输入是按照key排序的,它的输入就是map的输出,默认key和value之间分隔符是\t,当然,你也可以指定其他分隔符。

    [回复]

    gucasboy 回复:

    再请教一下,我在伪分布式情况下执行下列语句:
    bin/hadoop fs -put conf input/1.txt
    bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input input/1.txt -output output -mapper cat -reducer wc
    bin/hadoop fs -cat output/*
    结果是:
    1036 3775 38246
    cat: File does not exist: /user/hadoop/output/_logs
    这是怎么回事?跟wc 1.txt的结果不一样,而且1.txt中的内容不同,结果都是这样的。请指教。谢谢。

    [回复]

    gucasboy 回复:

    是我太二了,没有清空input 和 output里面的内容。

    [回复]

  • 9楼gucasboy 回复

    Post: 2014-03-17 09:03

    你好,请教一下,我想比较一下java和C++、C实现的wordcount的执行效率,有什么方法可以记录map、reduce的执行时间吗?谢谢

    [回复]

    dong 回复:

    有几种方式:一种是只开启一个map和一个reduce,比较作业执行时间, 另一种通过界面可以看到每个任务执行时间,最好的一种是分析作业的jobhistory日志,里面有你需要的所有信息。

    [回复]

  • 10楼gucasboy 回复

    Post: 2014-03-20 04:27

    请教一下,我在单节点伪分布式环境下用hadoop自带的wordcount,做了几个测试,并跟c++写的做了比较,想请教几个问题。
    1. java写的程序中,我如何设定不调用reducer?就是零个reducer,我想看看maper的输出。
    2. 我将hadoop自带的wordcount中的一行代码:job.setReducerClass()屏蔽掉了,输出结果按key的值排列。我屏蔽掉这一行是没调用reducer还是调用了默认的IdentityReducer?这个排序是在哪完成的?
    3. 我看到一篇博客(http://langyu.iteye.com/blog/992916)里面写到“当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。”意思好像是在map端系统自动完成了排序,跟自己写的mapper函数无关,但是我在使用C++写的mapper,不指定combiner,reducer个数为0时,输出是没有排序的;指定reducer个数为1,使用默认的identityReducer,输出个数为1个,结果排序了。这应该说明排序是在reducer端完成的吧?这是怎么回事?
    4. 我用c++的mapper做了测试,指定reducer的个数为0,输出的结果为什么有两个?mapper的输出不是跟mapper的个数一致吗?hadoop中mapper的个数默认是多少?在哪设置reducer的个数?

    一堆问题,希望能指教。非常感谢。

    [回复]

    Dong 回复:

    java程序可以直接通过参数mapred.reduce.tasks设置reduce task数目,如果设为0,则没有reduce,只有map,C++也一样,不过C++有一个专门的参数:-numReduceTasks。 如果把job.setReducerClass()屏蔽掉,就是你不设置reduce class,这时候会调用默认的IdentityReducer。当job既有map也有reduce时,map task输出结果会排序,reduce task调用reduce()之前会全局排序。如果只有map没有reduce,则不会排序,除非有combiner。排序是mapreduce内置的机制,用户无法干涉。如果C++设置reduce task为0,则会map task直接将结束写到HDFS上,默认最少会启动2个map task,每个对应一个文件,所以有两个。reduce task默认有一个。

    [回复]

    gucasboy 回复:

    非常感谢。我自己做了测试,跟您告诉我的一致。不过我发现了有一点不一样,在自带的wordcount例子里面,我通过mapred.reduce.tasks=0屏蔽掉了所以reducer,同时把java代码里面的combiner屏蔽掉了,输入了一个文件,说明只有一个mapper,结果是没有排序的;不屏蔽combiner,输出一个文件,结果也是没有排序的,说明combiner没有调用,这是怎么回事?
    您说默认的有两个map task,但是为什么系统自带的和我用C++写的,map的输出个数不一样呢?

    [回复]

    Dong 回复:

    mapper个数跟你的文件大小有关,如果你的文件小于64MB(如果是2.0,则是128MB),则只会有一个map task。如果有combiner(你在C++中实现了,且使用-combiner指定),则结果是有序的。 另外,map task默认个数是1还是2,这个不同编程方式可能自己做了修改,比如streaming,这个我确认一下,不过一般不会探讨这个,毕竟基本没有作业处理这么少的数据。

    [回复]

  • 11楼gucasboy 回复

    Post: 2014-03-20 11:08

    使用streaming,_logs/history里面是一个jar包和XML文件,这怎么分析执行情况啊?

    [回复]

    Dong 回复:

    除了jar包和cml,还有一个jobhistory文件,这个文件记录了作业运行时详细信息。

    [回复]

  • 12楼rookie 回复

    Post: 2014-03-28 10:46

    董哥您好,请问一下您是否用过 spring-hadoop 框架的stream
    我在eclipse测试的时候基本正常,就是 -D:作业的一些属性的配置(例如stream.map.input.field.separator/stream.map.output.field.separator)无法生效,如果您用过的话或者有时间的话,帮忙看一下,谢谢您

    [回复]

    Dong 回复:

    没用过spring-hadoop 框架,身边也没听说有人在用。

    [回复]

    rookie 回复:

    好的,谢谢您了

    [回复]

  • 13楼小武 回复

    Post: 2014-05-11 05:52

    董哥!请教一个问题:
    mapper函数需要接参数-f如:

    cat apriori.data | ./mapper_2.py -f part-00000 |./reducer_2.py

    怎么在hadoop的Streaming中指定mapper的参数呢?不胜感激!

    [回复]

  • 14楼tieke 回复

    Post: 2014-08-17 07:16

    董老师,你好,我的wordcount在eclipse上跑,是好好的,但是部署到搭建的完全分布式的环境上跑,跑完M-R后,还会再跑M-R,具体报错信息如下,4/08/17 14:55:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
    14/08/17 14:55:42 INFO client.RMProxy: Connecting to ResourceManager at master/134.64.110.236:8032
    14/08/17 14:55:43 INFO input.FileInputFormat: Total input paths to process : 1
    14/08/17 14:55:44 INFO mapreduce.JobSubmitter: number of splits:1
    14/08/17 14:55:44 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1408258517683_0001
    14/08/17 14:55:45 INFO impl.YarnClientImpl: Submitted application application_1408258517683_0001
    14/08/17 14:55:45 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1408258517683_0001/
    14/08/17 14:55:45 INFO mapreduce.Job: Running job: job_1408258517683_0001
    14/08/17 14:55:55 INFO mapreduce.Job: Job job_1408258517683_0001 running in uber mode : false
    14/08/17 14:55:55 INFO mapreduce.Job: map 0% reduce 0%
    14/08/17 14:56:05 INFO mapreduce.Job: map 100% reduce 0%
    14/08/17 14:56:11 INFO mapreduce.Job: map 100% reduce 100%
    14/08/17 14:56:23 INFO mapreduce.Job: map 0% reduce 0%
    14/08/17 14:56:23 INFO mapreduce.Job: Job job_1408258517683_0001 failed with state FAILED due to: Application application_1408258517683_0001 failed 2 times due to AM Container for appattempt_1408258517683_0001_000002 exited with exitCode: 1 due to: Exception from container-launch: org.apache.hadoop.util.Shell$ExitCodeException:
    org.apache.hadoop.util.Shell$ExitCodeException:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:505)
    at org.apache.hadoop.util.Shell.run(Shell.java:418)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:300)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

    希望你有空可以帮分析一下,什么原因。谢谢

    [回复]

  • 15楼刘亦菲 回复

    Post: 2014-08-29 10:59

    董大师,您讲的streaming模式操作的hdfs上的文件,求教python用什么模式来操作hbase的数据,然后提交job?

    [回复]

    刘亦菲 回复:

    补充一下,希望能出个详细的文档。谢谢!!我现在只做到能用thrift接口连接hbase,但是提交的模式完全不懂。

    [回复]

  • 16楼求指点啊。 回复

    Post: 2014-09-28 03:32

    c++和python哪个执行的效率高啊。怎么提升c++的运行效率呢?

    [回复]

  • 17楼张国涛 回复

    Post: 2014-10-22 04:50

    $ bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input input -output output -mapper /bin/cat -reducer Reduce.py -file Reduce.py

    还是报错
    Caused by: java.io.IOException: Cannot run program “/tmp/hadoop-xxx/mapred/local/taskTracker/xxx/jobcache/job_201410212035_0015/attempt_201410212035_0015_r_000000_3/work/./Reduce.py”: error=2, No such file or directory

    请问这是怎么回事呢??

    [回复]

  • 18楼blackproof 回复

    Post: 2014-11-12 09:52

    有个问题,我想用python写hadoop的partition还有groupcompare 实现二次排序,要怎么写

    [回复]

  • 19楼blackproof 回复

    Post: 2014-11-12 12:23

    还有个问题,我写的python有引用自己的python,也放在-file里面了,可以import报错,不知道多python文件怎么运行

    [回复]

  • 20楼Nathan 回复

    Post: 2014-11-25 07:31

    能否用c#呢?如何实现,是不是一定要在node上安装.net 环境?

    [回复]

  • 21楼lcyvino 回复

    Post: 2015-03-16 13:00

    请问这个问题您解决了吗?
    我输入:hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -mapper Mapper.py -reducer Reducer.py -file Mapper.py -file Reducer.py -input input/example.txt -output output

    出现和您一样的错误,用的也是Python.

    [回复]

  • 22楼kiki 回复

    Post: 2015-06-04 12:19

    请问如果输入目录(目录不可更改)inputdir有几百个,每个目录下面的文件都是输入文件(加起来大概有500G左右),那该如何解决?这个hadoop streaming命令好像不能支持这么多输入目录。

    [回复]

发表评论