HadoopStreaming编程
zebra (汇百家之长,成一人之声)
1.背景
Hadoop MapperReducer 默认提供Java,C++编程接口编程接口,是不是意味着其他编程语言就无法使用Hadoop MapperReducer了呢?不是的!Hadoop Streaming是一个Hadoop自带的编程接口,允许任何程序语言实现的程序在MR中使用,这样熟悉其他编程语言(如php,python,shell)的同学就可以顺利使用hadoop进行离线计算任务了。 2.工作原理
Hadoop Streaming要求用户编写的Mapper/Reducer从标准输入(stdin)中读取数据,将结果写入到标准输出(stdout)中,如下图:
mapper和reducer会从标准输入中读取用户数据,一行一行处理后发送给标准输出。Streaming工具会创建MapReduce作业,发送给各个tasktracker,同时监控整个作业的执行过程。
这非常类似于Linux的管道机制。正因此,我们在linux本地可以通过如下命令进行job进行测试。 cat | | sort |
那么在程序里面如何获取标准输入输出呢,不同语言大同小异,例如php #获取标准输入 $in = fopen("php://stdin", "r"); #标准输出 print "$value 1 "; #或者 $out = fopen("php://stdout", "w"); fwrite($out, $value);3.world count
该案例我们将会展现world count样例,用于统计文本中每个单词出现个数。 3.1 文件准备
文件1. word.txt一篇英文文章,随便网上copy一篇吧;
文件2. 使用不同编程语言编写的job可执行文件,下文分别示例一个php和一个python编写的;
文件3.对应编程语言的解释器可执行文件,这里需要特别说明下,远程hadoop肯定是支持java,shell的,但是如果你用python3,php编写job的话,大概率是不支持的,这个时候需要在提交任务的时候一起提交该语言解释器。3.2 待处理的文件上传到hdfshdfs dfs -mkdir /work/ hdfs dfs -put word.txt /work/input.txt hdfs dfs -ls /work/3.4.php版本
WcMr.php 的mrjob,使用php实现,为了让代码写起来更舒服。请继承我下面写的基类代码,放心用吧这都是测试过的! 3.4.2 job代码#!/usr/bin/php <?php require_once "./MapReduce.php"; / * 实现一个英文单词计数的mrjob */ class WcMr extends MapReduce { / * over write mapper * @param $line */ protected function mapper($line) { $words = preg_split("/W/", $line, 0, PREG_SPLIT_NO_EMPTY); foreach ($words as $word) { $this->write($word, 1); } } / * @param $key * @param array $values */ protected function reducer($key, $values) { $total = 0; foreach ($values as $value) { $total = $total + intval($value); } $this->write($key, $total); } } try { $mapper=new WcMr(); $mapper->run($argv); } catch (Exception $exc) { throw $exc; }3.4.2基类代码<?php /** * mrJob标准抽象 */ class MapReduce { private $stdIn = null; private $stdout = null; protected $argv = array(); public function __construct() { $this->stdIn = fopen("php://stdin", "r"); $this->stdout = fopen("php://stdout", "w"); } /** * @param array $argv */ private function runMapper($argv) { $this->argv = $argv; while (!feof($this->stdIn)) { $line = trim(fgets($this->stdIn)); if ($line == "") { continue; } $this->mapper($line); } } /** * mapper需要子类去实现 * @param $line */ protected function mapper($line) { } private function _yield($currentKey, $currentValue, &$nextKey, &$nextValue) { yield $currentValue; while (!feof($this->stdIn)) { $line = fgets($this->stdIn); list($key, $value) = explode(" ", trim($line), 2); if (trim($line) == "" || empty($key)) { continue; } if ($currentKey == $key) { yield $value; } else { $nextKey = $key; $nextValue = $value; break; } } } private function runReducer($argv) { $this->argv = $argv; $nextKey = null; $nextValue = null; while (!feof($this->stdIn)) { if (empty($nextKey)) { $line = fgets($this->stdIn); list($currentKey, $currentValue) = explode(" ", trim($line), 2); if (trim($line) == "" || empty($currentKey)) { continue; } } else { $currentKey = $nextKey; $currentValue = $nextValue; } $values = $this->_yield($currentKey, $currentValue, $nextKey, $nextValue); $this->reducer($currentKey, $values); if ($values->valid()) { foreach ($values as $value) { ;//把这个key下面没有遍历完毕的全部遍历完毕 } } } } /** * @param $key * @param $values */ protected function reducer($key, $values) { } /** * 文本输出方法 * @param $key * @param $value */ protected function write($key, $value = "") { if (!is_string($value)) { $value = json_encode($value, JSON_UNESCAPED_UNICODE); } else { $value = strval($value); } if (!empty($key)) { $key = $key . " "; } fwrite($this->stdout, $key . $value . PHP_EOL); } /** * @param $argv * @throws Exception */ public function run($argv) { if (empty($argv)) { throw new Exception("$argv is empty $job->run($argv);"); } if (!in_array($argv[1], array("--mapper", "--reducer"))) { throw new Exception("you should set option [--mapper | --reducer]"); } if ($argv[1] == "--mapper") { $this->runMapper($argv); } else if ($argv[1] == "--reducer") { $this->runReducer($argv); } } /** * */ public function __destruct() { fclose($this->stdIn); fclose($this->stdout); } }3.4.3 本地调试cat inpput.txt | php WcMr.php --mapper | sort | php WcMr.php --reducer3.4.4 提交集群执行
注意 不要漏掉php语言解释器,我这里路径在/home/work/php/bin/php,关于hadoop-streaming的一些参数见后面章节。${HADOOP_HOME}/bin/hadoop jar ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.8.3.jar -files "/home/work/php/bin/php,/home/work/MapReduce.php,/home/work/WcMr.php" -input "/work/inpput.txt" -output "/work/output" -mapper "./php WcMr.php --mapper" -reducer "./php WcMr.php --reducer" #如果没有reducer如下设置 #-reducer NONE3.5python版本3.5.1 job代码# 如果使用的是python2话在顶层加入以下代码 # from __future__ import print_function # from __future__ import pision # from __future__ import absolute_import from map_reduce import MapReduce class WorldCount(MapReduce): """ WorldCount demo """ def mapper(self, line): words = line.strip().split(" ") for word in words: self.write(word, 1) def reducer(self, key, iterator): total = 0 for value in iterator: total += int(value[1]) self.write(key, total) if __name__ == "__main__": try: WorldCount().run(sys.argv) except Exception as exp: print(exp) exit(1) 3.5.2 基类代码import sys from operator import itemgetter from itertools import groupby import json class MapReduce(object): """ MapReduce """ def __run_mapper(self): for line in sys.stdin: line = line.strip() if line != "": self.mapper(line) def __read_reducer_output(self, files, separator=" "): for line in files: yield line.strip().split(separator, 1) def __run_reducer(self): data = self.__read_reducer_output(sys.stdin) for key, iterator in groupby(data, itemgetter(0)): self.reducer(key, iterator) try: for _item in iterator: pass except StopIteration as exp: continue def mapper(self, line): raise NotImplementedError def reducer(self, key, iterator): raise NotImplementedError def run(self, argv): if len(argv) < 2 or (argv[1] not in ["--mapper", "--reducer"]): raise Exception("you should set option [--mapper | --reducer]") if argv[1] == "--mapper": self.__run_mapper() elif argv[1] == "--reducer": self.__run_reducer() def write(self, key=None, value=None): if not isinstance(value, str): value = json.dumps(value) if key == None: print("{0}".format(value)) else: print("{0} {1}".format(key, value))3.5.3 本地调试cat inpput.txt | python3 wcmr.py --mapper | sort | python3 wcmr.py --reducer3.5.4 提交集群执行
注意 不要漏掉python3语言解释器,我这里路径在/miniconda3/bin/python3,关于hadoop-streaming的一些参数见后面章节。当然如果你用python2的话,就不用上传解释器了,linux机器都有的。${HADOOP_HOME}/bin/hadoop jar ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.8.3.jar -files "/miniconda3/bin/python3,/home/work/map_reduce.py,/home/work/wc.py" -input "/work/inpput.txt" -output "/work/output" -mapper "./python3 WcMr.php --mapper" -reducer "./python3 WcMr.php --reducer" #如果没有reducer如下设置 #-reducer NONE3.6 查看计算结果
直接从远程吧结算结果下载下来。 hdfs dfs -get /work/output .5.命令参数
streaming使用样例: $ ${HADOOP_HOME}/bin/hadoop jar ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -input <输入目录> -inputformat <输入格式 JavaClassName> -output <输出目录> -outputformat <输出格式 JavaClassName> -mapper -reducer -combiner -partitioner -cmdenv # 可以传递环境变量,可以当作参数传入到任务中,可以配置多个 -file <依赖的文件> # 配置文件,字典等依赖,-file是一个deprecated的配置,可以使用-files。 -D # 作业的属性配置streaming命令参数列表详细解释:
参数名
说明
-input
指定作业输入,path可以是文件或者目录,可以使用*通配符,-input选项可以使用多次指定多个文件或目录作为输入
-output
指定作业输出目录,path必须不存在,而且执行作业的用户必须有创建该目录的权限,-output只能使用一次
-combiner
指定combiner Java类
-mapper
mapper可执行程序或Java类
-reducer
reducer可执行程序或Java类
-flies Optional
分发本地文件
-cacheFile Optional
分发HDFS文件
-cacheArchive Optional
分发HDFS压缩文件
-numReduceTasks Optional
reduce任务个数
-inputformat Optional
InputFormat Java类
-outputformat Optional
OutputFormat Java类
-inputreader Optional
InputReader配置
-cmdenv NAME=VALUE
给mapper和reducer程序传递额外的环境变量,NAME是变量名,VALUE是变量值
-mapdebug Optional
mapper失败时运行的debug程序
-reducedebug Optional
reducer失败时运行的debug程序
-verbose Optional
详细输出模式
-jobconf | -D NAME=VALUE Optional
作业配置参数,NAME是参数名,可以指定的参数参考hadoop-default.xml,VALUE是参数值,例如用-jobconf mapred.job.name="jobName" 设置作业名 常见的(-jobconf)作业配置参数解释:
参数名
说明
mapred.job.name
作业名
mapred.job.priority
作业优先级,VERY_HIGH
mapred.job.map.capacity
最多同时运行map任务数
mapred.job.reduce.capacity
最多同时运行reduce任务数
hadoop.job.ugi
作业执行权限
mapred.map.tasks | mapreduce.job.maps
map任务个数
mapred.reduce.tasks | mapreduce.job.reduces
reduce任务个数
mapred.job.groups
作业可运行的计算节点分组
mapred.task.timeout
任务没有响应(输入输出)的最大时间
mapred.compress.map.output
map的输出是否压缩
mapred.map.output.compression.codec
map的输出压缩方式
mapred.output.compress
reduce的输出是否压缩
mapred.output.compression.codec
reduce的输出压缩方式
stream.map.output.field.separator
map输出分隔符,默认是
stream.map.input.field.separator
Map输入数据的分隔符,默认是
stream.map.input.field.separator
Reduce输入数据的分隔符
stream.reduce.output.field.separator
Reduce输出数据的分隔符
特斯拉深夜自动播放鬼故事,这难道就是灵魂战车?这篇新闻倒是有点离奇,前些日子,一位特斯拉Model3车主郑女士在晚上开车的时候,车子刚刚开进地库里,车里边突然出现闹鬼情况,车里的音响发出我死得好冤枉啊!的凄凉女声!这大半夜的,
取一个好听的名字有多重要宝妈必看奇妙知识季对于父母和朋友来说,在孩子出生后,命名他们已经成为首要任务,一些父母在怀孕时开始计划。钱先生和他的妻子已经结婚四五年了,但没有孩子。为了怀孕,这对夫妇想了很多方法,但仍然
我,70后四川人,9年自驾游40个国家,爱尔兰妻子喜欢中国文化我,70后四川人,9年自驾游40个国家,爱尔兰妻子喜欢中国文化讲述老姚整理肖寒先生图片来源于网络,侵删我是老姚,是一名70后,四川成都人。我经历过两段婚姻,第一段婚姻是前妻出轨,最
为什么要全网显示ip?因为中文互联网越来越国际化最近全网陆陆续续公开了网络IP地址。有很多博主翻车了,我就说我自己看见的一个。之前,在短视频平台上出现了很多纽约博主,内容是天天拍各种纽约城市街景。然后告诉你,纽约生活多么多姿多彩
怎么取名字,自古至今有讲究,这些风俗习惯要明白名字伴随终身,古代把取名字看做是一件大事,所以也就派生出了许多的讲究。名,说文解字认为名,自命也,从口,从,者冥也,冥不相见,故以口自名。冥,既命。中国远古人认为万事万物都是有生命
退役球星现状43岁阿泰斯特身材依然健壮,社交媒体和妻子秀恩爱在NBA中总是存在许多硬汉所谓的硬往往是指这类球员在某一个领域拥有十分出色的发挥,但要说硬汉中的硬汉,我想阿泰斯特一定榜上有名,如今阿泰斯特已经退役多年,年龄43岁的他身材保持其实
何润东一家做公益,柜姐妻子气质不输超模,母亲穿着富贵更抢镜不管时尚小旋风怎么刮,总有一部分姐妹坚定简约风路线不动摇。大家可别以为简约风造型随便搭搭就ok,高端水平的简约风look所呈现出的时尚效果可不比一些前卫style要差,能将颜色与元
教程丨少儿美术课程西瓜教学目的1了解西瓜的百科知识。2通过观察西瓜图片,了解西瓜的颜色和造型特征。3学习运用拓印肌理来表现西瓜的瓜瓤质感。4通过撕纸粘贴的方式,感受绘画带来的的乐趣。教学工具圆形白色卡纸
家有学生,建议吃这5种高锌食物,被称智力之源,常吃学习棒孩子是一个家庭的新希望,当他们正处于发育时期,每天学习用脑的时间比较多,妈妈们要经常给孩子多吃一些可以增强记忆力的食物。接下来跟大家分享家里有孩子,建议吃5种高锌食物,被称智力之源
北方边城绥芬河我住在北方边境城市绥芬河,下学来到这里一晃而过二十年了,已经是我的第二故乡了。记得刚来时第一份工作是在龙须沟市场往各个饭店和快餐点送菜,得益于这个工作我转遍了城市的大街小巷。锦州一
山西旅游宣传口号问题5。1假期马上就要过去了,虽然今年因为疫情很多人都出不了门,但旅游业也是我们一直力推的绿色产业,要推动旅游产业发展,我觉得宣传口号很重要,这是给游客的第一印象。唐代建筑黄河山西的旅