范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

HadoopStreaming编程

  zebra (汇百家之长,成一人之声)
  1.背景
  Hadoop MapperReducer 默认提供Java,C++编程接口编程接口,是不是意味着其他编程语言就无法使用Hadoop MapperReducer了呢?不是的!Hadoop Streaming是一个Hadoop自带的编程接口,允许任何程序语言实现的程序在MR中使用,这样熟悉其他编程语言(如php,python,shell)的同学就可以顺利使用hadoop进行离线计算任务了。 2.工作原理
  Hadoop Streaming要求用户编写的Mapper/Reducer从标准输入(stdin)中读取数据,将结果写入到标准输出(stdout)中,如下图:
  mapper和reducer会从标准输入中读取用户数据,一行一行处理后发送给标准输出。Streaming工具会创建MapReduce作业,发送给各个tasktracker,同时监控整个作业的执行过程。
  这非常类似于Linux的管道机制。正因此,我们在linux本地可以通过如下命令进行job进行测试。 cat  |  | sort | 
  那么在程序里面如何获取标准输入输出呢,不同语言大同小异,例如php #获取标准输入 $in = fopen("php://stdin", "r");  #标准输出 print "$value	1 "; #或者 $out = fopen("php://stdout", "w"); fwrite($out, $value);3.world count
  该案例我们将会展现world count样例,用于统计文本中每个单词出现个数。 3.1 文件准备
  文件1. word.txt一篇英文文章,随便网上copy一篇吧;
  文件2. 使用不同编程语言编写的job可执行文件,下文分别示例一个php和一个python编写的;
  文件3.对应编程语言的解释器可执行文件,这里需要特别说明下,远程hadoop肯定是支持java,shell的,但是如果你用python3,php编写job的话,大概率是不支持的,这个时候需要在提交任务的时候一起提交该语言解释器。3.2 待处理的文件上传到hdfshdfs dfs -mkdir /work/ hdfs dfs -put word.txt /work/input.txt hdfs dfs -ls /work/3.4.php版本
  WcMr.php 的mrjob,使用php实现,为了让代码写起来更舒服。请继承我下面写的基类代码,放心用吧这都是测试过的! 3.4.2 job代码#!/usr/bin/php <?php require_once "./MapReduce.php"; /  * 实现一个英文单词计数的mrjob  */ class WcMr extends MapReduce {     /      * over write mapper      * @param $line      */     protected  function  mapper($line) {         $words = preg_split("/W/", $line, 0, PREG_SPLIT_NO_EMPTY);         foreach ($words as $word) {             $this->write($word, 1);         }     }     		/      * @param $key      * @param array $values      */     protected function  reducer($key, $values) {         $total = 0;         foreach ($values as $value) {             $total = $total + intval($value);         }         $this->write($key, $total);     } }    try {     $mapper=new WcMr();     $mapper->run($argv); } catch (Exception $exc) {     throw  $exc; }3.4.2基类代码<?php  /**  * mrJob标准抽象  */ class MapReduce {      private $stdIn = null;     private $stdout = null;     protected $argv = array();      public function __construct() {         $this->stdIn = fopen("php://stdin", "r");         $this->stdout = fopen("php://stdout", "w");     }      /**      * @param array $argv      */     private function runMapper($argv) {         $this->argv = $argv;         while (!feof($this->stdIn)) {             $line = trim(fgets($this->stdIn));             if ($line == "") {                 continue;             }             $this->mapper($line);         }     }       /**      * mapper需要子类去实现      * @param $line      */     protected function mapper($line) {     }       private function _yield($currentKey, $currentValue, &$nextKey, &$nextValue) {          yield $currentValue;         while (!feof($this->stdIn)) {             $line = fgets($this->stdIn);             list($key, $value) = explode("	", trim($line), 2);             if (trim($line) == "" || empty($key)) {                 continue;             }             if ($currentKey == $key) {                 yield $value;             } else {                 $nextKey = $key;                 $nextValue = $value;                 break;             }         }     }       private function runReducer($argv) {         $this->argv = $argv;         $nextKey = null;         $nextValue = null;         while (!feof($this->stdIn)) {             if (empty($nextKey)) {                 $line = fgets($this->stdIn);                 list($currentKey, $currentValue) = explode("	", trim($line), 2);                 if (trim($line) == "" || empty($currentKey)) {                     continue;                 }              } else {                 $currentKey = $nextKey;                 $currentValue = $nextValue;              }             $values = $this->_yield($currentKey, $currentValue, $nextKey, $nextValue);             $this->reducer($currentKey, $values);             if ($values->valid()) {                 foreach ($values as $value) {                     ;//把这个key下面没有遍历完毕的全部遍历完毕                 }             }         }      }       /**      * @param $key      * @param $values      */     protected function reducer($key, $values) {     }       /**      * 文本输出方法      * @param $key      * @param $value      */     protected function write($key, $value = "") {          if (!is_string($value)) {             $value = json_encode($value, JSON_UNESCAPED_UNICODE);         } else {             $value = strval($value);         }         if (!empty($key)) {             $key = $key . "	";         }         fwrite($this->stdout, $key . $value . PHP_EOL);     }       /**      * @param $argv      * @throws Exception      */     public function run($argv) {          if (empty($argv)) {             throw new Exception("$argv is empty $job->run($argv);");         }          if (!in_array($argv[1], array("--mapper", "--reducer"))) {             throw new Exception("you should set option [--mapper | --reducer]");         }         if ($argv[1] == "--mapper") {             $this->runMapper($argv);         } else if ($argv[1] == "--reducer") {             $this->runReducer($argv);         }     }      /**      *      */     public function __destruct() {         fclose($this->stdIn);         fclose($this->stdout);     } }3.4.3 本地调试cat inpput.txt | php WcMr.php --mapper | sort | php WcMr.php --reducer3.4.4 提交集群执行
  注意 不要漏掉php语言解释器,我这里路径在/home/work/php/bin/php,关于hadoop-streaming的一些参数见后面章节。${HADOOP_HOME}/bin/hadoop jar ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.8.3.jar  -files "/home/work/php/bin/php,/home/work/MapReduce.php,/home/work/WcMr.php"  -input  "/work/inpput.txt"  -output "/work/output"  -mapper "./php WcMr.php --mapper"  -reducer "./php WcMr.php --reducer" #如果没有reducer如下设置 #-reducer NONE3.5python版本3.5.1 job代码# 如果使用的是python2话在顶层加入以下代码 # from __future__ import print_function # from __future__ import pision # from __future__ import absolute_import  from map_reduce import MapReduce  class WorldCount(MapReduce):     """     WorldCount demo     """      def mapper(self, line):         words = line.strip().split(" ")         for word in words:             self.write(word, 1)      def reducer(self, key, iterator):         total = 0         for value in iterator:             total += int(value[1])          self.write(key, total)   if __name__ == "__main__":      try:         WorldCount().run(sys.argv)     except Exception as exp:         print(exp)         exit(1) 3.5.2 基类代码import sys from operator import itemgetter from itertools import groupby import json   class MapReduce(object):     """     MapReduce     """      def __run_mapper(self):         for line in sys.stdin:             line = line.strip()             if line != "":                 self.mapper(line)      def __read_reducer_output(self, files, separator="	"):         for line in files:             yield line.strip().split(separator, 1)      def __run_reducer(self):         data = self.__read_reducer_output(sys.stdin)         for key, iterator in groupby(data, itemgetter(0)):             self.reducer(key, iterator)             try:                 for _item in iterator:                     pass             except StopIteration as exp:                 continue      def mapper(self, line):         raise NotImplementedError      def reducer(self, key, iterator):         raise NotImplementedError      def run(self, argv):         if len(argv) < 2 or (argv[1] not in ["--mapper", "--reducer"]):             raise Exception("you should set option [--mapper | --reducer]")         if argv[1] == "--mapper":             self.__run_mapper()         elif argv[1] == "--reducer":             self.__run_reducer()      def write(self, key=None, value=None):          if not isinstance(value, str):             value = json.dumps(value)         if key == None:             print("{0}".format(value))         else:             print("{0}	{1}".format(key, value))3.5.3 本地调试cat inpput.txt | python3 wcmr.py --mapper | sort | python3 wcmr.py --reducer3.5.4 提交集群执行
  注意 不要漏掉python3语言解释器,我这里路径在/miniconda3/bin/python3,关于hadoop-streaming的一些参数见后面章节。当然如果你用python2的话,就不用上传解释器了,linux机器都有的。${HADOOP_HOME}/bin/hadoop jar ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.8.3.jar  -files "/miniconda3/bin/python3,/home/work/map_reduce.py,/home/work/wc.py"  -input  "/work/inpput.txt"  -output "/work/output"  -mapper "./python3 WcMr.php --mapper"  -reducer "./python3 WcMr.php --reducer" #如果没有reducer如下设置 #-reducer NONE3.6 查看计算结果
  直接从远程吧结算结果下载下来。 hdfs dfs -get /work/output .5.命令参数
  streaming使用样例: $ ${HADOOP_HOME}/bin/hadoop jar ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar      -input <输入目录>      -inputformat <输入格式 JavaClassName>      -output <输出目录>      -outputformat <输出格式 JavaClassName>      -mapper       -reducer       -combiner       -partitioner       -cmdenv   # 可以传递环境变量,可以当作参数传入到任务中,可以配置多个     -file <依赖的文件>  # 配置文件,字典等依赖,-file是一个deprecated的配置,可以使用-files。     -D   # 作业的属性配置streaming命令参数列表详细解释:
  参数名
  说明
  -input
  指定作业输入,path可以是文件或者目录,可以使用*通配符,-input选项可以使用多次指定多个文件或目录作为输入
  -output
  指定作业输出目录,path必须不存在,而且执行作业的用户必须有创建该目录的权限,-output只能使用一次
  -combiner
  指定combiner Java类
  -mapper
  mapper可执行程序或Java类
  -reducer
  reducer可执行程序或Java类
  -flies Optional
  分发本地文件
  -cacheFile Optional
  分发HDFS文件
  -cacheArchive Optional
  分发HDFS压缩文件
  -numReduceTasks Optional
  reduce任务个数
  -inputformat Optional
  InputFormat Java类
  -outputformat Optional
  OutputFormat Java类
  -inputreader Optional
  InputReader配置
  -cmdenv NAME=VALUE
  给mapper和reducer程序传递额外的环境变量,NAME是变量名,VALUE是变量值
  -mapdebug Optional
  mapper失败时运行的debug程序
  -reducedebug Optional
  reducer失败时运行的debug程序
  -verbose Optional
  详细输出模式
  -jobconf | -D NAME=VALUE Optional
  作业配置参数,NAME是参数名,可以指定的参数参考hadoop-default.xml,VALUE是参数值,例如用-jobconf mapred.job.name="jobName" 设置作业名 常见的(-jobconf)作业配置参数解释:
  参数名
  说明
  mapred.job.name
  作业名
  mapred.job.priority
  作业优先级,VERY_HIGH
  mapred.job.map.capacity
  最多同时运行map任务数
  mapred.job.reduce.capacity
  最多同时运行reduce任务数
  hadoop.job.ugi
  作业执行权限
  mapred.map.tasks | mapreduce.job.maps
  map任务个数
  mapred.reduce.tasks | mapreduce.job.reduces
  reduce任务个数
  mapred.job.groups
  作业可运行的计算节点分组
  mapred.task.timeout
  任务没有响应(输入输出)的最大时间
  mapred.compress.map.output
  map的输出是否压缩
  mapred.map.output.compression.codec
  map的输出压缩方式
  mapred.output.compress
  reduce的输出是否压缩
  mapred.output.compression.codec
  reduce的输出压缩方式
  stream.map.output.field.separator
  map输出分隔符,默认是
  stream.map.input.field.separator
  Map输入数据的分隔符,默认是
  stream.map.input.field.separator
  Reduce输入数据的分隔符
  stream.reduce.output.field.separator
  Reduce输出数据的分隔符

特斯拉深夜自动播放鬼故事,这难道就是灵魂战车?这篇新闻倒是有点离奇,前些日子,一位特斯拉Model3车主郑女士在晚上开车的时候,车子刚刚开进地库里,车里边突然出现闹鬼情况,车里的音响发出我死得好冤枉啊!的凄凉女声!这大半夜的,取一个好听的名字有多重要宝妈必看奇妙知识季对于父母和朋友来说,在孩子出生后,命名他们已经成为首要任务,一些父母在怀孕时开始计划。钱先生和他的妻子已经结婚四五年了,但没有孩子。为了怀孕,这对夫妇想了很多方法,但仍然我,70后四川人,9年自驾游40个国家,爱尔兰妻子喜欢中国文化我,70后四川人,9年自驾游40个国家,爱尔兰妻子喜欢中国文化讲述老姚整理肖寒先生图片来源于网络,侵删我是老姚,是一名70后,四川成都人。我经历过两段婚姻,第一段婚姻是前妻出轨,最为什么要全网显示ip?因为中文互联网越来越国际化最近全网陆陆续续公开了网络IP地址。有很多博主翻车了,我就说我自己看见的一个。之前,在短视频平台上出现了很多纽约博主,内容是天天拍各种纽约城市街景。然后告诉你,纽约生活多么多姿多彩怎么取名字,自古至今有讲究,这些风俗习惯要明白名字伴随终身,古代把取名字看做是一件大事,所以也就派生出了许多的讲究。名,说文解字认为名,自命也,从口,从,者冥也,冥不相见,故以口自名。冥,既命。中国远古人认为万事万物都是有生命退役球星现状43岁阿泰斯特身材依然健壮,社交媒体和妻子秀恩爱在NBA中总是存在许多硬汉所谓的硬往往是指这类球员在某一个领域拥有十分出色的发挥,但要说硬汉中的硬汉,我想阿泰斯特一定榜上有名,如今阿泰斯特已经退役多年,年龄43岁的他身材保持其实何润东一家做公益,柜姐妻子气质不输超模,母亲穿着富贵更抢镜不管时尚小旋风怎么刮,总有一部分姐妹坚定简约风路线不动摇。大家可别以为简约风造型随便搭搭就ok,高端水平的简约风look所呈现出的时尚效果可不比一些前卫style要差,能将颜色与元教程丨少儿美术课程西瓜教学目的1了解西瓜的百科知识。2通过观察西瓜图片,了解西瓜的颜色和造型特征。3学习运用拓印肌理来表现西瓜的瓜瓤质感。4通过撕纸粘贴的方式,感受绘画带来的的乐趣。教学工具圆形白色卡纸家有学生,建议吃这5种高锌食物,被称智力之源,常吃学习棒孩子是一个家庭的新希望,当他们正处于发育时期,每天学习用脑的时间比较多,妈妈们要经常给孩子多吃一些可以增强记忆力的食物。接下来跟大家分享家里有孩子,建议吃5种高锌食物,被称智力之源北方边城绥芬河我住在北方边境城市绥芬河,下学来到这里一晃而过二十年了,已经是我的第二故乡了。记得刚来时第一份工作是在龙须沟市场往各个饭店和快餐点送菜,得益于这个工作我转遍了城市的大街小巷。锦州一山西旅游宣传口号问题5。1假期马上就要过去了,虽然今年因为疫情很多人都出不了门,但旅游业也是我们一直力推的绿色产业,要推动旅游产业发展,我觉得宣传口号很重要,这是给游客的第一印象。唐代建筑黄河山西的旅
PGC大赛MCG战队连续拿下首鸡,PeRo战队圈运最好,结果最烂对于绝地求生这款游戏,相信绝大多数FPS游戏爱好者们肯定都再熟悉不过了,绝地求生作为电竞圈首款以军事竞赛为背景的大逃杀类型的吃鸡游戏,在上线之后就凭借着其独特的游戏机制深受众多FP传说之下决定善良还是邪恶的,并非和平线和杀戮线传说之下虽然只是一款看似简简单单平平无奇的RPG类游戏,复古的像素画风甚至谈不上精美,但这款游戏最吸引人的亮点却恰恰就藏在这并不精美的画风之下。似乎自从我最初接触传说之下这款游戏开10岁女孩身高168CM,医生3种食物是身高增长剂,常给孩子吃每一个父母都希望自己的孩子能够长个大高个,这样也可以为孩子的颜值增分不少,虽然遗传因素会决定身高,但是后天的营养补充以及锻炼对身高的影响也是非常重要的。案例分析10岁的英子,今年是4个月宝宝哭闹不停,原因竟是脊椎受伤,奶奶抱姿有问题?轩轩4个月时,因为常常哭闹不止,去医院被查出脊椎侧弯,医生说明原因后,轩轩妈愤怒不已。宝宝都喜欢被拥抱,因为会让他们有温暖安全的感觉。但是,抱不对,孩子也很可能受到伤害。轩轩的妈妈良医18岁女子怀八胞胎?一胎到底能生几个孩子从优生优育的角度来说,单胎活产才是最优解。记者周洁网文世界里,一胎六七宝似乎挺常见的,不过在现实生活中,如果一胎怀上了八个孩子,那就是上新闻的程度了。近日,河南女子怀八胞胎的话题持不想孩子养成讨好型人格,家长要少用这3句话教训娃爱孩子,母鸡都会。但是教育好孩子,却是一门艺术。所以关于教育的话题从古至今一直都是人们所关注的焦点,而不同的父母,因为使用的教育方法不一样,所以养出来的娃性格方面也有很大的差异。有千万别去!去过的人都不想再离开活佛济公故里,神山秀水天台天台(ti),以境内天台山得名,以佛宗道源山水神秀著称,东连宁海三门,西接磐安,南邻仙居临海,北界新昌。天台素有大八景小八景,有名有姓三十处,无名无姓数不吃红薯,到底是升血糖还是降血糖?可能有些人想错了,不妨了解下导语在寒冷的冬天,吃上一个热乎乎的烤红薯既暖胃又暖心,天气冷了以后,大街小巷都能够看到烤红薯的身影,路过闻到一股浓香的红薯气味,让人垂延欲滴。红薯香甜可口,大家都很爱吃,它不仅可以新官上任三把火!国足面临大换血,曝新帅上任后李铁团队仅留一人国足在十二强赛中的屡屡失利终于让中国的足球迷们对国足的主教练李铁忍无可忍,在国民的强烈要求下,受到了巨大舆论压力的李铁不得不辞职离开了国足。而在李铁离职过后,谁来接任李铁的位置成为女排联赛全明星赛阵容公布,不见外援身影惹来球迷争议随着新赛季女排联赛第一阶段比赛结束,各队进入10天的休整期。在休赛日期间,中国排协将于12月11日12日举办全明星赛。日前,中国排协公布了全明星赛名单,上海女排有3人入选,分别是杨人类首次抵达雅浦海沟8919米深渊底部上海交大四人深海科考团队采回200多个珍贵深渊样本人民网上海12月6日电(葛俊俊)12月5日,由上海交大四人深海科考团队搭乘探索一号科考船,结束了在西太平洋海域的深渊科考任务,顺利返航抵达三亚。本航次中,上海交大深部生命国际研究中