本文共 5003 字,大约阅读时间需要 16 分钟。
目录
RDD(Resilient Distributed Datasets弹性分布式数据集),是spark中最重要的概念,可以简单的把RDD理解成一个提供了许多操作接口的数据集合,和一般数据集不同的是,其实际数据分布存储于一批机器中(内存或磁盘中),RDD混合了各种计算模型,使得Spark可以应用于各种大数据处理场景当然,RDD肯定不会这么简单,它的功能还包括容错、集合内的数据可以并行处理等。RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。
李耳我有三件宝贝,持有而珍重它。第一件叫慈爱,第二件叫节俭,第三件叫不敢处在众人之先
江湖传说永流传:谷歌技术有"三宝",GFS、MapReduce和大表(BigTable)!
昨天,我在Xebia印度办公室发表了一个关于MapReduce的演说。演说进行得很顺利,听众们都能够理解MapReduce的概念(根据他们的反馈)。我成功地向技术听众们(解释了MapReduce的概念,这让我感到兴奋。在所有辛勤的工作之后,我们在Xebia印度办公室享用了丰盛的晚餐,然后我径直回了家。
回家后,我的妻子(Supriya)问道:“你的会开得怎么样?”我说还不错。 接着她又问我会议是的内容是什么(她不是从事软件或编程领域的工作的)。我告诉她说MapReduce。“Mapduce,那是什么玩意儿?”她问道: “跟地形图有关吗?”我说不,不是的,它和地形图一点关系也没有。“那么,它到底是什么玩意儿?”妻子问道。 “唔…让我们去Dominos(披萨连锁)吧,我会在餐桌上跟你好好解释。” 妻子说:“好的。” 然后我们就去了披萨店。 我们在Domions点餐之后,柜台的小伙子告诉我们说披萨需要15分钟才能准备好。于是,我问妻子:“你真的想要弄懂什么是MapReduce?” 她很坚定的回答说“是的”。 因此我问道: 我: 你是如何准备洋葱辣椒酱的?(以下并非准确食谱,请勿在家尝试) 妻子: 我会取一个洋葱,把它切碎,然后拌入盐和水,最后放进混合研磨机里研磨。这样就能得到洋葱辣椒酱了。 妻子: 但这和MapReduce有什么关系? 我: 你等一下。让我来编一个完整的情节,这样你肯定可以在15分钟内弄懂MapReduce. 妻子: 好吧。 我:现在,假设你想用薄荷、洋葱、番茄、辣椒、大蒜弄一瓶混合辣椒酱。你会怎么做呢? 妻子: 我会取薄荷叶一撮,洋葱一个,番茄一个,辣椒一根,大蒜一根,切碎后加入适量的盐和水,再放入混合研磨机里研磨,这样你就可以得到一瓶混合辣椒酱了。 我: 没错,让我们把MapReduce的概念应用到食谱上。Map和Reduce其实是两种操作,我来给你详细讲解下。Map(映射): 把洋葱、番茄、辣椒和大蒜切碎,是各自作用在这些物体上的一个Map操作。所以你给Map一个洋葱,Map就会把洋葱切碎。 同样的,你把辣椒,大蒜和番茄一一地拿给Map,你也会得到各种碎块。 所以,当你在切像洋葱这样的蔬菜时,你执行就是一个Map操作。 Map操作适用于每一种蔬菜,它会相应地生产出一种或多种碎块,在我们的例子中生产的是蔬菜块。在Map操作中可能会出现有个洋葱坏掉了的情况,你只要把坏洋葱丢了就行了。所以,如果出现坏洋葱了,Map操作就会过滤掉坏洋葱而不会生产出任何的坏洋葱块。 Reduce(化简):在这一阶段,你将各种蔬菜碎都放入研磨机里进行研磨,你就可以得到一瓶辣椒酱了。这意味要制成一瓶辣椒酱,你得研磨所有的原料。因此,研磨机通常将map操作的蔬菜碎聚集在了一起。 妻子: 所以,这就是MapReduce? 我: 你可以说是,也可以说不是。 其实这只是MapReduce的一部分,MapReduce的强大在于分布式计算。 妻子: 分布式计算? 那是什么?请给我解释下吧。 我: 没问题。 我: 假设你参加了一个辣椒酱比赛并且你的食谱赢得了最佳辣椒酱奖。得奖之后,辣椒酱食谱大受欢迎,于是你想要开始出售自制品牌的辣椒酱。假设你每天需要生产10000瓶辣椒酱,你会怎么办呢? 妻子: 我会找一个能为我大量提供原料的供应商。 我:是的..就是那样的。那你能否独自完成制作呢?也就是说,独自将原料都切碎? 仅仅一部研磨机又是否能满足需要?而且现在,我们还需要供应不同种类的辣椒酱,像洋葱辣椒酱、青椒辣椒酱、番茄辣椒酱等等。 妻子: 当然不能了,我会雇佣更多的工人来切蔬菜。我还需要更多的研磨机,这样我就可以更快地生产辣椒酱了。 我:没错,所以现在你就不得不分配工作了,你将需要几个人一起切蔬菜。每个人都要处理满满一袋的蔬菜,而每一个人都相当于在执行一个简单的Map操作。每一个人都将不断的从袋子里拿出蔬菜来,并且每次只对一种蔬菜进行处理,也就是将它们切碎,直到袋子空了为止。 这样,当所有的工人都切完以后,工作台(每个人工作的地方)上就有了洋葱块、番茄块、和蒜蓉等等。 妻子:但是我怎么会制造出不同种类的番茄酱呢? 我:现在你会看到MapReduce遗漏的阶段—搅拌阶段。MapReduce将所有输出的蔬菜碎都搅拌在了一起,这些蔬菜碎都是在以key为基础的map操作下产生的。搅拌将自动完成,你可以假设key是一种原料的名字,就像洋葱一样。 所以全部的洋葱keys都会搅拌在一起,并转移到研磨洋葱的研磨器里。这样,你就能得到洋葱辣椒酱了。同样地,所有的番茄也会被转移到标记着番茄的研磨器里,并制造出番茄辣椒酱。 披萨终于做好了,她点点头说她已经弄懂什么是MapReduce了。我只希望下次她听到MapReduce时,能更好的理解我到底在做些什么。网上其他人用最简短的语言解释MapReduce:
We want to count all the books in the library. You count up shelf #1, I count up shelf #2. That’s map. The more people we get, the faster it goes. 我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。 Now we get together and add our individual counts. That’s reduce. 现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”
<1>将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素, 输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区.
data = sc.textFile("/tmp/hive/tmp/1.txt")data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :21//使用map算子mapresult = data.map()mapresult: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at :23
<2>flatMap,属于Transformation算子,第一步和map一样,最后将所有的输出分区合并成一个
<3>count返回RDD中的元素数量
rdd.count()
<4>reduce,根据映射函数对RDD里的元素进行二元计算结果,返回计算结果
rdd.reduce(lambda x,y:x.a+y.a,x.b+y.b)
<5>collect用于将一个RDD转换成数组
rdd.collect
<6>countByKey(),countByKey用于统计RDD[K,V]中每个K的数量
rdd.countBykey
<7>foreach(),foreach用于遍历RDD,将函数f应用于每一个元素 <8>saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中
rdd.saveAsTextFile("/tmp/hive/itcast/python-bigdata.txt")
<1>将本地的Hamlet.txt上传到hdfs上
hadoop fs -put Hamlet.txt /tmp/hive/itcast/
其他操作
获取hdfs文件到本地hadoop fs -get /tmp/hive/itcast/python.txt ./
列出hdfs文件系统根目录下的目录和文件
hadoop fs -ls /
rm操作
hadoop fs -rm < hdfs file > ...hadoop fs -rm -r < hdfs dir>...每次可以删除多个文件或目录
Spark应用作为独立的进程运行,由驱动程序中的SparkContext协调。这个context将会连接到一些集群管理者(如YARN),这些管理者分配系统资源。集群上的每个worker由执行者(executor)管理,执行者反过来由SparkContext管理。执行者管理计算、存储,还有每台机器上的缓存。
开启spark后,SparkContext简略写法sc,对于sc可以进行文件的读取以及节点之间的协调
<1>读取文件并转换成一个RDD数据集
sc.textFile("itcast-python.txt")
<2>将数据从一个节点发送到其它节点上
#读取RDD类型的数据集,并且返回一个broadcast类型的数据bdata = sc.broadcast(data)
from operator import addtext = sc.textFile("/tmp/hive/itcast/Hamlet.txt")def tokenize(text): return text.split()words = text.flatMap(tokenize)wc = words.map(lambda x: (x,1))counts = wc.reduceByKey(add)counts.saveAsTextFile("/tmp/hive/itcast/hm")