当前位置: 首页>>Frameworks (Spark) On YARN>> 阅读正文

Apache Spark学习:利用Scala语言开发Spark应用程序

Category: Frameworks (Spark) On YARN View: 44,995 Author: Dong
, ,

  • 评论 (12)
  • 引用通告 (1)
发表评论 发起引用

  • 1楼srcCode 回复

    Post: 2014-01-06 02:53

    董老师,你好。我想问一下在Spark的RDD上做collect以后,collect的结果是放在内存中还是磁盘上。以前用hadoop实现KMeans时,每次迭代都要把聚类结果写到磁盘上,若用Spark的话是不是可以直接在内存中读上次迭代的结果,比如说读collect的结果。

    [回复]

    Dong 回复:

    collect结果会直接返回给driver,你可以认为是在内存里。

    [回复]

  • 2楼打盹 回复

    Post: 2014-03-14 05:03

    您好,关注您的博客好久了,从中学到很多,感谢!
    最近刚开始看spark相关的内容,有个问题在这想请教一下:在从hdfs读入源文件以后,将2个rdd的内容进行join生成一个新的rdd,然后对新的rdd进行处理;新的rdd假如有4条记录,如果可以同时执行4个task的话,我希望的现象是4个task每个处理1条记录,但是实际的情况好像不是这样的,有的task很快就结束了,而有的task要执行很久(每条记录的数据量是很接近的)。我想知道这是为什么呢?这个应该从yarn调度这部分找原因么?
    希望您能解答一下,谢谢!

    [回复]

    打盹 回复:

    补充:从执行时间上来看,更像是有的task没有处理任何记录,只是启动之后就结束了;而执行很久的task处理了多条记录。

    [回复]

  • 3楼Jerry 回复

    Post: 2014-03-17 09:29

    董老师,Spark的cache()的作用是什么?文档上说是加载进内存加速重用(MEMORY_ONLY)。Spark本来就是内存型计算,如果不cache是否只能在该RDD上计算一次,而不能计算多次?

    [回复]

    Dong 回复:

    如果不进行“物化”(将中间结果保存下来,不管是内存还是磁盘),则每次需要重算,cache()的作用就是为了让一些结果不再重算。

    [回复]

  • 4楼Jerry 回复

    Post: 2014-03-18 09:27

    董老师,问一个Spark Streaming的问题:假设设置Duration为10s,如果处理时间过长(如5s、10s等情况下),会不会影响数据接收和后续处理?还是下一Duration的数据交由另一个Task来处理了?

    [回复]

  • 5楼Iven Tseng 回复

    Post: 2014-03-21 09:32

    董老师,我是安装spark0.9.0+hadoop 2.2.0.根据你的代码进行了spark yarn模式测试 ,只有Wordcount成功,以下是三个错误的情况,扰烦您能帮看下,谢谢
    Wordcount2:报以下错误:
    xception in thread “Thread-3″ java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:154)
    Caused by: java.io.IOException: No input paths specified in job
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:198)
    at org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:45)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58)
    at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:354)
    at WordCount2$.main(WordCount2.scala:27)
    at WordCount2.main(WordCount2.scala)

    TopK的我的一直报:)
    nknown/unsupported param List(
    Usage: org.apache.spark.deploy.yarn.Client [options]
    Options:….
    您之前在eclipse spark集成环境中说的问题应该在spark0.9中修复了,但现在我还是遇到org.apache.spark.deploy.yarn.Client未找到 命令

    Join中的错误如下:
    Exception in thread “main” java.lang.NullPointerException
    at org.apache.spark.deploy.yarn.Client$$anonfun$logClusterResourceDetails$2.apply(Client.scala:151)
    at org.apache.spark.deploy.yarn.Client$$anonfun$logClusterResourceDetails$2.apply(Client.scala:150)
    at org.apache.spark.Logging$class.logInfo(Logging.scala:49)
    at org.apache.spark.deploy.yarn.Client.logInfo(Client.scala:53)
    at org.apache.spark.deploy.yarn.Client.logClusterResourceDetails(Client.scala:149)
    at org.apache.spark.deploy.yarn.Client.runApp(Client.scala:79)
    at org.apache.spark.deploy.yarn.Client.run(Client.scala:115)
    at org.apache.spark.deploy.yarn.Client$.main(Client.scala:492)
    at org.apache.spark.deploy.yarn.Client.main(Client.scala)

    [回复]

  • 6楼徐徐的 回复

    Post: 2014-03-25 03:17

    董老师,请问下该怎么理解spark中“slice”和“partition”的概念,他们和“task”有什么关系?
    原来我是把slice想成MR中的InputSplit,最后发现又不完全对。之前看到好多资料,主要是以下几点让我迷惑:
    1.Spark run one task for each slice;
    2….Create one slice for each block of file;
    3.When you persist an RDD, each node stores any slices of it that it computes in memory ;
    4.If any partition of an RDD is lost …;
    5….,spark creates a task to process each partition of the dataset…
    Ps:1234来自 http://spark.apache.org/docs/0.9.0/scala-programming-guide.html
    5来自 http://www.cs.berkeley.edu/~matei/papers/2010/hotcloud_spark.pdf

    [回复]

    Dong 回复:

    slice和parition是spark中的通用概念,表示一个原始数据块,可以是MapReduce中的input split,也可以是其他系统中的其他叫法,但在spark中,都称为slice或者partition。input split是MR中的数据块表示方式,这是不同体系中的叫法。task是计算单元,由输入数据、处理逻辑和输出数据组成,slice或者parition是数据表示方式,仅仅是task中的“输入数据”或者“输出数据”。

    [回复]

  • 7楼wangyh 回复

    Post: 2014-04-18 02:37

    董老师,您好!想请教您一个问题。
    想在spark中运行scala程序,object SparkPageRank {
    def main(args: Array[String]) {
    if (args.length < 3) {
    System.err.println("Usage: PageRank “)
    System.exit(1)
    }
    这种带参数的程序该怎么执行呢?
    sbt/sbt run 之后会提示输入要执行的类的数组代号,如果后面跟参数就不合适。请问这样的类该怎么执行呢

    [回复]

    Dong 回复:

    不要用sbt执行,直接打包后,仿照spark中run example例子运行。 实际上就是打成独立jar包,运行jar包即可,跟hadoop运行作业类似,可以参考我的这篇文章,给出了代码和运行脚本:http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/

    [回复]

    Dong 回复:

    仿照这个例子:https://github.com/sryza/simplesparkapp

    [回复]

  • 8楼至尊宝 回复

    Post: 2014-05-12 13:12

    董老师,您好!很幸运,我在找了网上有限的spark资源后,看到了您的博客,非常受用,感谢您!我是一个初学者,按照网上的许多安装攻略在公司环境上捣鼓了一天后还是没安装成功,所以想问下您,在您文章中提到的“你可以从这里下载代码、编译好的jar包和运行脚本”,这些jar包只要导入Eclipse中就可以在只安装了JDK但没有安装SCALA和SPARK的环境中开发了么?希望您指导,另外这些资料可不可以重新开启下载,或者发一份给我,jj8783803@sina.com这是我的邮箱,万分感谢!

    [回复]

  • 9楼明明 回复

    Post: 2014-07-25 09:35

    董老师 您好,
    这两天开始学spark。看了您很多blog,受益匪浅。
    有个问题想问您。
    您觉得如果用spark做开发,用python 好还是scala好?原因呢?

    [回复]

  • 10楼wlu 回复

    Post: 2014-08-05 03:30

    我觉得没必要用keyBy,join本来就是Pair类的操作。

    [回复]

  • 11楼candy 回复

    Post: 2015-04-17 11:26

    董老师,您好,
    我在yarn模式下运行spark streaming,将结果用saveTextFile存下来,但为什么输出目录的文件都为空呢?

    [回复]

  • 12楼codeflitting 回复

    Post: 2015-07-03 04:11

    董老师 你好

    我写了个简单地程序,时而跑的通时而跑不通,报错信息如下:
    15/07/03 12:05:58 WARN TaskSetManager: Lost task 74.0 in stage 0.0 (TID 4857, 23.slave.adh): java.io.IOException: java.lang.reflect.InvocationTargetException
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:62)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
    at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
    at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:167)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
    … 12 more
    Caused by: java.lang.IllegalArgumentException
    at org.apache.spark.io.SnappyCompressionCodec.(CompressionCodec.scala:152)
    … 21 more

    这种是什么问题呢?

    [回复]

发表评论