范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

HadoopStreaming编程

  zebra (汇百家之长,成一人之声)
  1.背景
  Hadoop MapperReducer 默认提供Java,C++编程接口编程接口,是不是意味着其他编程语言就无法使用Hadoop MapperReducer了呢?不是的!Hadoop Streaming是一个Hadoop自带的编程接口,允许任何程序语言实现的程序在MR中使用,这样熟悉其他编程语言(如php,python,shell)的同学就可以顺利使用hadoop进行离线计算任务了。 2.工作原理
  Hadoop Streaming要求用户编写的Mapper/Reducer从标准输入(stdin)中读取数据,将结果写入到标准输出(stdout)中,如下图:
  mapper和reducer会从标准输入中读取用户数据,一行一行处理后发送给标准输出。Streaming工具会创建MapReduce作业,发送给各个tasktracker,同时监控整个作业的执行过程。
  这非常类似于Linux的管道机制。正因此,我们在linux本地可以通过如下命令进行job进行测试。 cat  |  | sort | 
  那么在程序里面如何获取标准输入输出呢,不同语言大同小异,例如php #获取标准输入 $in = fopen("php://stdin", "r");  #标准输出 print "$value	1 "; #或者 $out = fopen("php://stdout", "w"); fwrite($out, $value);3.world count
  该案例我们将会展现world count样例,用于统计文本中每个单词出现个数。 3.1 文件准备
  文件1. word.txt一篇英文文章,随便网上copy一篇吧;
  文件2. 使用不同编程语言编写的job可执行文件,下文分别示例一个php和一个python编写的;
  文件3.对应编程语言的解释器可执行文件,这里需要特别说明下,远程hadoop肯定是支持java,shell的,但是如果你用python3,php编写job的话,大概率是不支持的,这个时候需要在提交任务的时候一起提交该语言解释器。3.2 待处理的文件上传到hdfshdfs dfs -mkdir /work/ hdfs dfs -put word.txt /work/input.txt hdfs dfs -ls /work/3.4.php版本
  WcMr.php 的mrjob,使用php实现,为了让代码写起来更舒服。请继承我下面写的基类代码,放心用吧这都是测试过的! 3.4.2 job代码#!/usr/bin/php <?php require_once "./MapReduce.php"; /  * 实现一个英文单词计数的mrjob  */ class WcMr extends MapReduce {     /      * over write mapper      * @param $line      */     protected  function  mapper($line) {         $words = preg_split("/W/", $line, 0, PREG_SPLIT_NO_EMPTY);         foreach ($words as $word) {             $this->write($word, 1);         }     }     		/      * @param $key      * @param array $values      */     protected function  reducer($key, $values) {         $total = 0;         foreach ($values as $value) {             $total = $total + intval($value);         }         $this->write($key, $total);     } }    try {     $mapper=new WcMr();     $mapper->run($argv); } catch (Exception $exc) {     throw  $exc; }3.4.2基类代码<?php  /**  * mrJob标准抽象  */ class MapReduce {      private $stdIn = null;     private $stdout = null;     protected $argv = array();      public function __construct() {         $this->stdIn = fopen("php://stdin", "r");         $this->stdout = fopen("php://stdout", "w");     }      /**      * @param array $argv      */     private function runMapper($argv) {         $this->argv = $argv;         while (!feof($this->stdIn)) {             $line = trim(fgets($this->stdIn));             if ($line == "") {                 continue;             }             $this->mapper($line);         }     }       /**      * mapper需要子类去实现      * @param $line      */     protected function mapper($line) {     }       private function _yield($currentKey, $currentValue, &$nextKey, &$nextValue) {          yield $currentValue;         while (!feof($this->stdIn)) {             $line = fgets($this->stdIn);             list($key, $value) = explode("	", trim($line), 2);             if (trim($line) == "" || empty($key)) {                 continue;             }             if ($currentKey == $key) {                 yield $value;             } else {                 $nextKey = $key;                 $nextValue = $value;                 break;             }         }     }       private function runReducer($argv) {         $this->argv = $argv;         $nextKey = null;         $nextValue = null;         while (!feof($this->stdIn)) {             if (empty($nextKey)) {                 $line = fgets($this->stdIn);                 list($currentKey, $currentValue) = explode("	", trim($line), 2);                 if (trim($line) == "" || empty($currentKey)) {                     continue;                 }              } else {                 $currentKey = $nextKey;                 $currentValue = $nextValue;              }             $values = $this->_yield($currentKey, $currentValue, $nextKey, $nextValue);             $this->reducer($currentKey, $values);             if ($values->valid()) {                 foreach ($values as $value) {                     ;//把这个key下面没有遍历完毕的全部遍历完毕                 }             }         }      }       /**      * @param $key      * @param $values      */     protected function reducer($key, $values) {     }       /**      * 文本输出方法      * @param $key      * @param $value      */     protected function write($key, $value = "") {          if (!is_string($value)) {             $value = json_encode($value, JSON_UNESCAPED_UNICODE);         } else {             $value = strval($value);         }         if (!empty($key)) {             $key = $key . "	";         }         fwrite($this->stdout, $key . $value . PHP_EOL);     }       /**      * @param $argv      * @throws Exception      */     public function run($argv) {          if (empty($argv)) {             throw new Exception("$argv is empty $job->run($argv);");         }          if (!in_array($argv[1], array("--mapper", "--reducer"))) {             throw new Exception("you should set option [--mapper | --reducer]");         }         if ($argv[1] == "--mapper") {             $this->runMapper($argv);         } else if ($argv[1] == "--reducer") {             $this->runReducer($argv);         }     }      /**      *      */     public function __destruct() {         fclose($this->stdIn);         fclose($this->stdout);     } }3.4.3 本地调试cat inpput.txt | php WcMr.php --mapper | sort | php WcMr.php --reducer3.4.4 提交集群执行
  注意 不要漏掉php语言解释器,我这里路径在/home/work/php/bin/php,关于hadoop-streaming的一些参数见后面章节。${HADOOP_HOME}/bin/hadoop jar ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.8.3.jar  -files "/home/work/php/bin/php,/home/work/MapReduce.php,/home/work/WcMr.php"  -input  "/work/inpput.txt"  -output "/work/output"  -mapper "./php WcMr.php --mapper"  -reducer "./php WcMr.php --reducer" #如果没有reducer如下设置 #-reducer NONE3.5python版本3.5.1 job代码# 如果使用的是python2话在顶层加入以下代码 # from __future__ import print_function # from __future__ import pision # from __future__ import absolute_import  from map_reduce import MapReduce  class WorldCount(MapReduce):     """     WorldCount demo     """      def mapper(self, line):         words = line.strip().split(" ")         for word in words:             self.write(word, 1)      def reducer(self, key, iterator):         total = 0         for value in iterator:             total += int(value[1])          self.write(key, total)   if __name__ == "__main__":      try:         WorldCount().run(sys.argv)     except Exception as exp:         print(exp)         exit(1) 3.5.2 基类代码import sys from operator import itemgetter from itertools import groupby import json   class MapReduce(object):     """     MapReduce     """      def __run_mapper(self):         for line in sys.stdin:             line = line.strip()             if line != "":                 self.mapper(line)      def __read_reducer_output(self, files, separator="	"):         for line in files:             yield line.strip().split(separator, 1)      def __run_reducer(self):         data = self.__read_reducer_output(sys.stdin)         for key, iterator in groupby(data, itemgetter(0)):             self.reducer(key, iterator)             try:                 for _item in iterator:                     pass             except StopIteration as exp:                 continue      def mapper(self, line):         raise NotImplementedError      def reducer(self, key, iterator):         raise NotImplementedError      def run(self, argv):         if len(argv) < 2 or (argv[1] not in ["--mapper", "--reducer"]):             raise Exception("you should set option [--mapper | --reducer]")         if argv[1] == "--mapper":             self.__run_mapper()         elif argv[1] == "--reducer":             self.__run_reducer()      def write(self, key=None, value=None):          if not isinstance(value, str):             value = json.dumps(value)         if key == None:             print("{0}".format(value))         else:             print("{0}	{1}".format(key, value))3.5.3 本地调试cat inpput.txt | python3 wcmr.py --mapper | sort | python3 wcmr.py --reducer3.5.4 提交集群执行
  注意 不要漏掉python3语言解释器,我这里路径在/miniconda3/bin/python3,关于hadoop-streaming的一些参数见后面章节。当然如果你用python2的话,就不用上传解释器了,linux机器都有的。${HADOOP_HOME}/bin/hadoop jar ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.8.3.jar  -files "/miniconda3/bin/python3,/home/work/map_reduce.py,/home/work/wc.py"  -input  "/work/inpput.txt"  -output "/work/output"  -mapper "./python3 WcMr.php --mapper"  -reducer "./python3 WcMr.php --reducer" #如果没有reducer如下设置 #-reducer NONE3.6 查看计算结果
  直接从远程吧结算结果下载下来。 hdfs dfs -get /work/output .5.命令参数
  streaming使用样例: $ ${HADOOP_HOME}/bin/hadoop jar ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar      -input <输入目录>      -inputformat <输入格式 JavaClassName>      -output <输出目录>      -outputformat <输出格式 JavaClassName>      -mapper       -reducer       -combiner       -partitioner       -cmdenv   # 可以传递环境变量,可以当作参数传入到任务中,可以配置多个     -file <依赖的文件>  # 配置文件,字典等依赖,-file是一个deprecated的配置,可以使用-files。     -D   # 作业的属性配置streaming命令参数列表详细解释:
  参数名
  说明
  -input
  指定作业输入,path可以是文件或者目录,可以使用*通配符,-input选项可以使用多次指定多个文件或目录作为输入
  -output
  指定作业输出目录,path必须不存在,而且执行作业的用户必须有创建该目录的权限,-output只能使用一次
  -combiner
  指定combiner Java类
  -mapper
  mapper可执行程序或Java类
  -reducer
  reducer可执行程序或Java类
  -flies Optional
  分发本地文件
  -cacheFile Optional
  分发HDFS文件
  -cacheArchive Optional
  分发HDFS压缩文件
  -numReduceTasks Optional
  reduce任务个数
  -inputformat Optional
  InputFormat Java类
  -outputformat Optional
  OutputFormat Java类
  -inputreader Optional
  InputReader配置
  -cmdenv NAME=VALUE
  给mapper和reducer程序传递额外的环境变量,NAME是变量名,VALUE是变量值
  -mapdebug Optional
  mapper失败时运行的debug程序
  -reducedebug Optional
  reducer失败时运行的debug程序
  -verbose Optional
  详细输出模式
  -jobconf | -D NAME=VALUE Optional
  作业配置参数,NAME是参数名,可以指定的参数参考hadoop-default.xml,VALUE是参数值,例如用-jobconf mapred.job.name="jobName" 设置作业名 常见的(-jobconf)作业配置参数解释:
  参数名
  说明
  mapred.job.name
  作业名
  mapred.job.priority
  作业优先级,VERY_HIGH
  mapred.job.map.capacity
  最多同时运行map任务数
  mapred.job.reduce.capacity
  最多同时运行reduce任务数
  hadoop.job.ugi
  作业执行权限
  mapred.map.tasks | mapreduce.job.maps
  map任务个数
  mapred.reduce.tasks | mapreduce.job.reduces
  reduce任务个数
  mapred.job.groups
  作业可运行的计算节点分组
  mapred.task.timeout
  任务没有响应(输入输出)的最大时间
  mapred.compress.map.output
  map的输出是否压缩
  mapred.map.output.compression.codec
  map的输出压缩方式
  mapred.output.compress
  reduce的输出是否压缩
  mapred.output.compression.codec
  reduce的输出压缩方式
  stream.map.output.field.separator
  map输出分隔符,默认是
  stream.map.input.field.separator
  Map输入数据的分隔符,默认是
  stream.map.input.field.separator
  Reduce输入数据的分隔符
  stream.reduce.output.field.separator
  Reduce输出数据的分隔符

监管接连出手围剿苹果税苹果(AAPL。US)尚能饭否?近年来,如何监管大型科技平台正在成为各国政府实施公共治理的重要命题。对于苹果这家市值接近3万亿美元的科技巨头来说,一直都面临着全球范围内监管机构的批评乃至调查和诉讼。其中,引发最多成年人如何选择适合自已的助听器?(郑医生回答)对于助听器的选择,需要根据每个人的听力的状况。在能满足正常听到声音的情况下,尽量配小一些,比方说塞在耳朵里面的。但有些人听力特别差,需要的功率比较大,就要用耳背式的挂EpicGames宣布与乐高达成合作,将推出适合儿童的元宇宙项目记者崔鹏北京时间4月8日消息,知名游戏公司EpicGames与乐高集团宣布签署了一项全新合作,两家公司将在虚拟世界中创造一个对儿童更加友好的空间。官方公告称,两家公司将打造一个身临Rust头铁之路生命周期与引用有效性2022年04月06日日更94100天上一次我们学了Rust中的trait定义及使用。今天这次学习生命周期,这个概念相当庞大,这一节仍然是部分概念,后期还有高级的概念。01hr声明十天内,谷歌在法国连输两场反垄断官司!罚金超1。5亿欧元不到十天时间内,谷歌在法国连续遭遇两场反垄断案件的败诉。据外媒报道,4月7日,针对谷歌利用搜索市场支配地位偏袒自身广告业务的案件,巴黎上诉法院驳回了谷歌的上诉,要求谷歌按照罚单缴纳西门子与法国量子公司合作,推进量子计算多物理场模拟应用近日,基于中性原子的法国量子计算公司Pasqal宣布与西门子数字工业软件公司(SiemensDigitalIndustriesSoftware)开展一项多年研究合作,以推进量子计算机器人??元宇宙?我们正处在历史的选择前夜!!!马斯克最新表示,特斯拉最早将于明年(即2023年)开始生产一款名为擎天柱(Optimus)的人形机器人。而在去年,扎克伯格将Facebook更名为Me重磅谁能拯救元宇宙带来的伤害元宇宙,是自私自利的欧美资本家的思维创造出来的未来互联网科技,他的初想是建立一个以欧美财团政府为主的世界统治手段,以地球人类的生活为入手,最终的结果是数字数据资产化,加大加深,普通赵立坚再次回应美公司中国黑客攻击报告另有意图中国青年报客户端北京4月8日电(中青报中青网记者胡文利)关于美国网络安全公司声称中国黑客攻击印度电力部门一事,中国外交部发言人赵立坚继昨天之后,今天在例行记者会上再次作出回应。他表中国移动发现老板号古董号?这2个金贵号段,你用过吗引言随着互联网技术的深入发展,人们在生活中对手机的依赖性越来越强,而且手机是需要网络的,如果失去了网络和手机上的软件,手机与板砖并没有什么区别。而国内的移动网络主要由三大运营商提供机构3月全球手游发行商收入榜TOP3再次被中国厂商包揽SensorTower商店情报平台显示,2022年3月中国手游发行商在全球AppStore和GooglePlay的收入排名公布,共38个中国厂商入围全球手游发行商收入榜TOP100
谢谢你,让我从你的世界路过你是不是从心底爱着一个人?无论他是好的坏的,让你开心的难过的。印象里最深的一句话你有多恨一个人,就有多爱一个人。让你念念不忘的人你是有多爱?多恨?多无可奈何?爱情是两个人的事情,只用灵魂与你交换时间用灵魂与你交换时间时间像是一个魔术师给了我惊喜与悲伤,就如这次在我毫无准备的情况下把你带到我的面前。从此我也与你签下了这次的交换书灵魂与时间的交换书。灵魂的内心深处告诉我你不是我最中国男性平均身高,全世界增幅第一,突然冲上热搜第一前段时间,中国成功在东京奥运会上夺得多项赛事的冠军,累计获得38枚金牌。随后,中国运动健儿又在残奥会上取得96枚金牌的好成绩,再一次让世界各国看到了中国的体育实力。不久之后,中国即苏宁全民焕新节悟空榜羽绒服T袖价,ampampquot双11ampampquot末班车别错过!有人说三月不购物,一年的幸福指数都会降低。而就在三月,苏宁推出了315全民焕新节,其中苏宁百货打出春夏焕新的主题,不仅推出了超多的新品,还有超大的折扣贴心的服务,让消费者乐购其中。史上最严KPI?看苏宁小编818如何轻松完成一个亿目标KPI,上班族们最不想听到的词,毕竟和考核待遇以及升迁有关,一旦完不成就意味着这个月白干了一半时间。有一些领导比较苛刻,可能会给手下的员工定一个基本难以完成的KPI,比如昨天苏宁易画质和音效把握好双十一轻松买到万元内的80英寸以上大屏电视国人都比较喜欢大产品,就拿汽车行业来说,奔驰宝马奥迪等品牌旗下的汽车国产后都有加长版车型,在国内受到诸多消费者追捧。而电视行业也是如此,我们也都喜欢大屏电视,因为看着大气,看着爽,提升生活品质来苏宁嗨购五一厨电美容手机一网打尽五一长假马上就要到了,相信很多人已经拟定好计划了,有些人可能会宅在家中,有些人可能去旅游,有些人可能回家陪父母为了满足大家不同的计划,苏宁易购特意启动了嗨购五一,目前已经进入爆发阶高性价比手机大家都知道买手机最主要的是性价比高,用最少的价钱买到最好的性能配置,但那些大老牌子为了赚平民百姓的钱售价实在贵得太离谱了,性价比非常低。那现在我来为大家推荐几款高性价比手机1。在1最不值得买的三款手机第一款华为畅享20Pro,售价1999,在这个价位已经可以买到有晓龙870处理器的红米k40,但在华为畅享20Pro,只给你天机800的处理器,6。5英寸的LCD屏,屏幕已经没有任一个亿小目标!苏宁818这领导比一天要求做50张图还狠犹记得年初的时候,飞越旅行的创始人王小琴女士要求属下一天做出100张图片来,不仅被员工吐槽,更是被全网抨击,认为这是不把员工当人看。现在更狠的来了,正值苏宁易购818,苏宁高管俆金苏宁悟空榜白酒榜,茅台股价破2300,商品加购却被郎酒反超自2020年下半年开始,茅台就出尽了风头,股价屡创新高。今年2月份刚开始,茅台股价就突破了2300元,再次被推上了热搜。事实上,茅台作为白酒板块的龙头股,股价创新高并不算意外,加上