范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

大数据Flume技术解析

  (一)Flume概述、Flume快速入门1 Flume 概述1.1 Flume 定义
  Flume 是Cloudera 提供的一个高可用的,高可靠的,分布式的 海量日志采集、聚合和传输的系统。 Flume 基于流式架构,灵活简单。
  Flume 最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到 HDFS。 1.2 Flume 基础架构
  1.2.1 Agent
  Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。
  Agent   主要由 3 个部分组成, Source   、 Channel   、 Sink   。1.2.2 Source
  Source 是负责接收数据到 Flume Agent 的组件。 Source 组件可以处理各种类型、各种格式的日志数据,包括 avro   、 thrift   、 exec   、 jms   、 spooling directory   、netcat  、 sequence generator   、 syslog  、http  、legacy  。1.2.3 Sink
  Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent 。
  Sink组件目的地包括  hdfs   、 logger   、 avro   、 thrift   、 ipc   、 file   、 HBase   、 solr   、自定义。1.2.4 Channel
  Channel 是位于 Source 和 Sink 之间的缓冲区。因此, Channel 允许 Source 和 Sink 运作在不同的速率上。 Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作。
  Flume 自带三种 Channel: Memory Channel   和 File Channel   以及 Kafka Channel   。
  Memory Channel是内存中的队列。 Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕
  机或者重启都会导致数据丢失。
  File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。 1.2.5 Event
  传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。Event 由 Header   和 Body   两部分组成, Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组。
  2 Flume 快速入门2.1 Flume 安装部署2.1.1 安装地址
  (1)Flume 官网地址
  http://flume.apache.org/
  (2)文档查看地址
  http://flume.apache.org/FlumeUserGuide.html
  (3)下载地址
  http://archive.apache.org/dist/flume/ 2.1.2 安装部署
  (1)将 apache-flume-1.7.0-bin.tar.gz 上传到 linux 的 /opt/software   目录下
  (2)解压 apache-flume-1.7.0 bin.tar.gz 到 /opt/module/  目录下[Tom@hadoop102 software]$ tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/ 1
  (3)修改 apache-flume-1.7.0-bin 的名称为 flume-1.7.0 [Tom@hadoop102 module]$ mv apache-flume-1.7.0-bin flume-1.7.0 1
  (4)将 flume/conf 下的 flume-env.sh.template 文件修改为 flume-env.sh,并配置  flume-env.sh   文件[Tom@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh [Tom@hadoop102 conf]$ vim flume-env.sh export JAVA_HOME=/opt/module/jdk1.8.0_212 1232.2 Flume 入门案例2.2.1 监控端口数据官方案例
  1. 案例需求
  使用 Flume 监听一个端口, 收集该端口数据 ,并打印到控制台。
  2. 需求分析
  3. 实现步骤
  (1)安装 netcat 工具 [Tom@hadoop102 software]$ sudo yum install -y nc 1
  (2)判断44444 端口是否被占用 [Tom@hadoop102 flume-1.7.0]$ sudo netstat -tunlp | grep 44444 1
  (3)创建Flume Agent 配置文件 flume-netcat-logger.conf
  在 flume-1.7.0 目录下创建 job 文件夹并进入 job 文件夹。[Tom@hadoop102 flume]$ mkdir job [Tom@hadoop102 flume]$ cd job/ 12
  在 job 文件夹下创建 Flume Agent 配置文件 netcat-flume-logger.conf  。[Tom@hadoop102 job]$ vim netcat-flume-logger.conf 1
  在  netcat-flume-logger.conf   文件中添加如下内容:# Name the components on this agent    a1:表示agent的名称 a1.sources = r1    # r1:表示a1的Source的名称 a1.sinks = k1    # k1:表示a1的Sink的名称 a1.channels = c1    # c1: 表示a1的Channel的名称  # Describe/configure the source a1.sources.r1.type = netcat    # 表示a1的输入源类型为netcat端口类型 a1.sources.r1.bind = localhost    # 表示a1的监听的主机 a1.sources.r1.port = 44444    # 表示a1的监听的端口号  # Describe the sink a1.sinks.k1.type = logger    # 表示a1的输出目的地是控制台logger类型  # Use a channel which buffers events in memory a1.channels.c1.type = memory    # 表示a1的channel类型是memory内存型 a1.channels.c1.capacity = 1000    # 表示a1的channel总容量为1000个event a1.channels.c1.transactionCapacity = 100    # 表示a1的channel传输时收集到了100条event以后再去提交事务  # Bind the source and sink to the channel a1.sources.r1.channels = c1    # 表示将r1和c1连接起来 a1.sinks.k1.channel = c1    # 表示将k1和c1连接起来 123456789101112131415161718192021
  注:配置文件来源于官方手册 http://flume.apache.org/FlumeUserGuide.html
  (4)先开启 flume 监听端口
  第一种写法: [Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/netcat-flume-logger.conf  -Dflume.root.logger=INFO,console 1
  第二种写法 [Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console 1
  参数说明:
  --conf/-c  :表示配置文件存储在conf/目录
  --name/-n  :表示给agent 起名为a1
  --conf-file/-f  :flume 本次启动读取的配置文件是在job 文件夹下的flume-telnet.conf文件。
  -Dflume.root.logger=INFO,console   :-D 表示 flume 运行时动态修改flume.root.logger 参数属性值,并将控制台日志打印级别设置为INFO 级别。日志级别包括:log、info、warn、error。
  (5)使用 netcat 工具向本机的 44444 端口发送内容 [Tom@hadoop102 job]$ nc localhost 44444 hello OK HUST OK 12345
  (6)在Flume 监听页面观察接收数据情况
  2.2.2 实时监控单个追加文件
  1. 案例需求:实时监控 Hive 日志,并上传到 HDFS 中
  2. 需求分析
  3. 实现步骤
  (1)Flume 要想将数据输出到HDFS,须持有Hadoop 相关jar 包。将以下 jar 包
  拷贝到 opt/module/flume-1.7.0/lib  文件夹下。
  (2)创建 file-flume-hdfs.conf  文件[Tom@hadoop102 job]$ vim file-flume-hdfs.conf 1
  注:要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于Hive 日志在 Linux 系统中,所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行Linux 命令来读取文件。
  添加如下内容 # Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2  # Describe/configure the source a2.sources.r2.type = exec a2.sources.r2.command = tail -F /opt/module/hive-3.1.2/logs/hive.log  # Describe the sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H #上传文件的前缀 a2.sinks.k2.hdfs.filePrefix = logs- #是否按照时间滚动文件夹 a2.sinks.k2.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a2.sinks.k2.hdfs.batchSize = 1000 #设置文件类型,可支持压缩 a2.sinks.k2.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k2.hdfs.rollInterval = 30 #设置每个文件的滚动大小 a2.sinks.k2.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a2.sinks.k2.hdfs.rollCount = 0  # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100  # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2 1234567891011121314151617181920212223242526272829303132333435363738394041
  注意:对于所有与时间相关的转义序列,Event Header 中必须存在以’ timestamp’的 key (除非hdfs.useLocalTimeStamp 设置为 true ,此方法会使用 TimestampInterceptor 自动添加 timestamp)。 a3.sinks.k3.hdfs.useLocalTimeStamp = true  。
  (3)运行 Flume [Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/file-flume-hdfs.conf 1
  (4)开启 Hadoop 和 Hive 并操作 Hive 产生日志 [Tom@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh [Tom@hadoop102 hadoop-3.1.3]$ sbin/start-yarn.sh [Tom@hadoop102 hive-3.1.2]$ bin/hive 123
  (5)在HDFS上查看文件
  2.3.3 实时监控目录下多个新文件
  1. 案例需求:使用 Flume 监听整个目录的文件,并上传至HDFS
  2. 需求分析
  3. 实现步骤
  (1)创建配置文件 dir-flume-hdfs.conf
  创建一个文件 [Tom@hadoop102 job]$ vim dir-flume-hdfs.conf 1
  添加如下内容 # Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2  # Describe/configure the source a2.sources.r2.type = spooldir a2.sources.r2.spoolDir = /opt/module/flume-1.7.0/upload #忽略所有以.tmp结尾的文件,不上传 a2.sources.r2.ignorePattern = ([^ ]*.tmp)  # Describe the sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H #上传文件的前缀 a2.sinks.k2.hdfs.filePrefix = upload- #是否按照时间滚动文件夹 a2.sinks.k2.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a2.sinks.k2.hdfs.batchSize = 1000 #设置文件类型,可支持压缩 a2.sinks.k2.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k2.hdfs.rollInterval = 30 #设置每个文件的滚动大小 a2.sinks.k2.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a2.sinks.k2.hdfs.rollCount = 0  # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100  # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2 12345678910111213141516171819202122232425262728293031323334353637383940414243
  (2)启动监控文件夹命令 [Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/dir-flume-hdfs.conf 1
  说明:在使用 Spooling Directory Source 时不要在监控目录中创建并持续修改文件,上传完成的文件会以 .COMPLETED  结尾,被监控文件夹每 500 毫秒扫描一次文件变动。
  (3)向upload 文件夹中添加文件
  在 /opt/module/flume-1.7.0  目录下创建 upload 文件夹[Tom@hadoop102 flume]$ mkdir upload 1
  向 upload 文件夹中添加文件 [huxili@hadoop102 upload]$ touch hust.txt [huxili@hadoop102 upload]$ touch hust.tmp [huxili@hadoop102 upload]$ touch hust.log 123
  (4) 查看 HDFS 上的数据
  (5)等待 1s,再次查询 upload 文件夹 [Tom@hadoop102 upload]$ ll -rw-rw-r--. 1 Tom Tom  0 9月  11 20:38 hust.log.COMPLETED -rw-rw-r--. 1 Tom Tom  0 9月  11 20:38 hust.tmp -rw-rw-r--. 1 Tom Tom  0 9月  11 20:38 hust.txt.COMPLETED 12342.2.4 实时监控目录下的多个追加文件
  Exec source  适用于监控一个实时追加的文件,但不能保证数据不丢失;Spooldir Source  能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控;而Taildir Source  既能够实现断点续传,又可以保证数据不丢失,还能够进行实时监控。
  1. 案例需求:使用 Flume 监听整个目录的实时追加文件,并上传至HDFS。(在实际操作中我们直接打印到控制台,这样更直观)
  2. 需求分析:
  3. 实现步骤
  (1)创建配置文件flume-taildir-hdfs.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1  # Describe/configure the source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /opt/module/flume-1.7.0/files/file1.txt a1.sources.r1.filegroups.f2 = /opt/module/flume-1.7.0/files/file2.txt a1.sources.r1.positionFile = /opt/module/flume-1.7.0/position/position.json  # Describe the sink a1.sinks.k1.type = logger  # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100  # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 12345678910111213141516171819202122
  (2)启动监控文件夹命令 [Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-taildir-hdfs.conf 1
  (3)向 files 文件夹中追加内容 [Tom@hadoop102 flume]$ mkdir files 1
  向 upload 文件夹中添加文件 [Tom@hadoop102 files]$ echo hello >> file1.txt  [Tom@hadoop102 files]$ echo hust >> file2.txt  12
  (4)查看数据
  Taildir 说明:
  Taildir Source 维护了一个json 格式的 position File,其会定期的往 position File 中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File 的格式如下: {"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"} {"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"} 12
  注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件。
  (二)Flume进阶、常见问题1Flume进阶1.1 Flume 事务
  1.2 Flume Agent 内部原理
  重要组件
  (1)ChannelSelector
  ChannelSelector的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型,分别是 Replicating  (复制)和Multiplexing  (多路复用)。
  ReplicatingSelector 会将同一个 Event 发往所有的 Channel,Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel 。
  (2)SinkProcessor
  SinkProcessor 共有三种类型,分别是 DefaultSinkProcessor  、LoadBalancingSinkProcessor  和 FailoverSinkProcessor  。
  DefaultSinkProcessor 对应的是单个的 Sink,LoadBalancingSinkProcessor 和 FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以实现故障转移的功能。1.3 Flume 拓扑结构1.3.1 简单串联
  这种模式是将多个 flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的目的存储系统。此模式不建议桥接过多的 flume 数量,flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。 1.3.2 复制和多路复用
  Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个 channel 中,或者将不同数据分发到不同的 channel 中, sink 可以选择传送到不同的目的地。 1.3.3 负载均衡和故障转移
  Flume 支持使用将多个 sink 逻辑上分到一个 sink 组, sink 组配合不同的 SinkProcessor 可以实现负载均衡和错误恢复的功能。 1.3.4 聚合
  这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个flume 采集日志,传送到一个集中收集日志的 flume,再由此flume 上传到hdfs、hive、hbase 等,进行日志分析。 1.4 Flume 企业开发案例1.4.1 复制和多路复用
  1. 案例需求
  使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给Flume-3,Flume-3 负责输出到 Local FileSystem。
  2. 需求分析
  3. 实现步骤
  (1)准备工作
  在 /opt/module/flume-1.7.0/job  目录下创建 group1 文件夹[Tom@hadoop102 job]$ cd group1/ 1
  在 /opt/module/flume-1.7.0/datas/  目录下创建 flume3 文件夹[Tom@hadoop102 datas]$ mkdir flume3 1
  (2)创建flume-file-flume.conf
  配置1 个接收日志文件的source 和两个channel、两个sink,分别输送给 flume-flume-hdfs 和 flume-flume-dir 。
  编辑配置文件 [Tom@hadoop102 group1]$ vim flume-file-flume.conf 1
  添加如下内容 #name a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 k2  #Source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/flume-1.7.0/data/hive.log a1.sources.r1.positionFile = /opt/module/flume-1.7.0/position/position1.json  # 将数据流复制给所有channel a1.sources.r1.selector.type = replicating  #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100  a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100  # Sink # sink 端的 avro 是一个数据发送者 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141  a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142  #Bind a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 12345678910111213141516171819202122232425262728293031323334353637
  (3)创建 flume-flume-hdfs .conf
  配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink 。
  编辑配置文件 [Tom@hadoop102 group1]$ vim flume-flume-hdfs.conf 1
  添加如下内容 #name a2.sources = r1 a2.channels = c1 a2.sinks = k1  #Source a2.sources.r1.type = avro a2.sources.r1.bind = hadoop102 a2.sources.r1.port = 4141  #Channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100  #Sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/group1/%Y%m%d/%H #上传文件的前缀 a2.sinks.k1.hdfs.filePrefix = logs- #是否按照时间滚动文件夹 a2.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a2.sinks.k1.hdfs.batchSize = 1000 #设置文件类型,可支持压缩 a2.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k1.hdfs.rollInterval = 30 #设置每个文件的滚动大小 a2.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a2.sinks.k1.hdfs.rollCount = 0  #Bind a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1 123456789101112131415161718192021222324252627282930313233343536373839404142
  (4)创建 flume-flume-dir .conf
  配置上级 Flume 输出的 Source,输出是到本地目录的 Sink 。
  编辑配置文件 [Tom@hadoop102 group1]$ vim flume-flume-dir.conf 1
  添加如下内容 #name a3.sources = r1 a3.channels = c1 a3.sinks = k1  #Source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop102 a3.sources.r1.port = 4142  #Channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100  #Sink a3.sinks.k1.type = file_roll a3.sinks.k1.sink.directory = /opt/module/flume-1.7.0/data/group1  #Bind a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1 12345678910111213141516171819202122
  提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。
  (5)执行配置文件
  分别启动对应的 flume 进程:flume-flume-dir,flume-flume-hdfs,flume-file-flume 。 [Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf  [Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf  [Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf 12345
  (6)启动 Hadoop 并向 hive.log 添加数据 [Tom@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh [Tom@hadoop102 hadoop-3.1.3]$ sbin/start-yarn.sh  [Tom@hadoop102 data]$ echo hello >> hive.log  [Tom@hadoop102 data]$ echo hust >> hive.log  12345
  (7)检查 HDFS 上数据
  (8)检查  /opt/module/flume-1.7.0/datas/flume3  目录中数据总用量 16 -rw-rw-r--. 1 Tom Tom  6 9月  12 22:02 1631453983368-46 -rw-rw-r--. 1 Tom Tom  5 9月  12 22:02 1631453983368-47 -rw-rw-r--. 1 Tom Tom  0 9月  12 22:03 1631453983368-48 12341.4.2 负载均衡和故障转移
  1. 案例需求
  使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3 ,采用 FailoverSinkProcessor ,实现故障转移的功能。
  2. 需求分析
  3. 实现步骤
  (1)准备工作
  在 /opt/module/flume-1.7.0/job  目录下创建 group2 文件夹[Tom@hadoop102 job]$ cd group2/ 1
  (2)创建 flume-netcat-flume.conf
  配置1 个 netcat source 和1 个channel、1 个sink group(2 个sink),分别输送给 flume-flume-console1 和 flume-flume-console2。
  编辑配置文件 [Tom@hadoop102 group2]$ vim flume-netcat-flume.conf 1
  添加如下内容 #name a1.sources = r1 a1.channels = c1 a1.sinks = k1 k2 a1.sinkgroups = g1  #Source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444  #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100  #Sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141  a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142  #SinkGroup a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000  #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1 123456789101112131415161718192021222324252627282930313233343536
  (3)创建 flume-flume-console1 .conf
  配置上级 Flume 输出的 Source,输出是到本地控制台。
  编辑配置文件 [Tom@hadoop102 group2]$ vim flume-flume-console1.conf 1
  添加如下内容 #name a2.sources = r1 a2.channels = c1 a2.sinks = k1  #Source a2.sources.r1.type = avro a2.sources.r1.bind = hadoop102 a2.sources.r1.port = 4141  #Channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100  #Sink a2.sinks.k1.type = logger  #Bind a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1 123456789101112131415161718192021
  (4)创建 flume-flume-console2 .conf
  配置上级 Flume 输出的 Source,输出是到本地控制台。
  编辑配置文件 [Tom@hadoop102 group2]$ vim flume-flume-console2.conf 1
  添加如下内容 #name a3.sources = r1 a3.channels = c1 a3.sinks = k1  #Source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop102 a3.sources.r1.port = 4142  #Channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100  #Sink a3.sinks.k1.type = logger  #Bind a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1 123456789101112131415161718192021
  (5)执行配置文件
  分别开启对应配置文件:flume-flume-console2,flume-flume-console1,flume-netcat-flume 。 [Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console  [huxili@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-consosole1.conf -Dflume.root.logger=INFO,console  [huxili@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf 12345
  (6)使用 netcat 工具向本机的 44444 端口发送内容 [Tom@hadoop102 flume-1.7.0]$ nc localhost 44444 1
  (7)查看flume-flume-console2 及 flume-flume-console1 的控制台打印日志
  (8)将 flume-flume-console2 kill ,观察 flume-flume-console1 的控制台打印情况。
  使用 jps -ml  查看 Flume 进程[Tom@hadoop102 job]$ jps -ml 5696 org.apache.flume.node.Application -n a3 -f job/group2/flume-flume-console2.conf 5430 org.apache.flume.node.Application -n a1 -f job/group2/flume-netcat-flume.conf 5275 org.apache.flume.node.Application -n a2 -f job/group2/flume-flume-console1.conf 3581 org.apache.hadoop.hdfs.server.datanode.DataNode 5821 sun.tools.jps.Jps -ml 3438 org.apache.hadoop.hdfs.server.namenode.NameNode 12345671.4.3 聚合
  1. 案例需求
  hadoop102 上的 Flume1 监控文件 opt/module/data/group.log
  hadoop103 上的 Flume2 监控某一个端口的数据流,
  Flume1 与 Flume2 将数据发送给 hadoop104 上的 Flume3,Flume3 将最终数据打印到控制台。
  2. 需求分析
  3. 实现步骤
  (1)准备工作
  分发 Flume [Tom@hadoop102 module]$ xsync flume 1
  在hadoop102、hadoop103 以及hadoop104 的 /opt/module/flume-1.7.0/job  目录下创建一个group3 文件夹。[Tom@hadoop102 job]$ mkdir group3 [Tom@hadoop103 job]$ mkdir group3 [Tom@hadoop104 job]$ mkdir group3 123
  (2)创建 flume1-logger-flume.conf
  配置 Source 用于监控 hive.log 文件,配置 Sink 输出数据到下一级 Flume。
  在 hadoop102 上编辑配置文件 [Tom@hadoop102 group3]$ vim flume1-logger-flume.conf 1
  添加如下内容 #name a2.sources = r1 a2.channels = c1 a2.sinks = k1  #Source a2.sources.r1.type = TAILDIR a2.sources.r1.filegroups = f1 a2.sources.r1.filegroups.f1 = /opt/module/flume-1.7.0/data/flume.log a2.sources.r1.positionFile = /opt/module/flume-1.7.0/position/position2.json  #Channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100  #Sink a2.sinks.k1.type = avro a2.sinks.k1.hostname = hadoop104 a2.sinks.k1.port = 4141  #Bind a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1 123456789101112131415161718192021222324
  (3)创建 flume2-netcat-flume.conf
  配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume:在 hadoop103 上编辑配置文件 [Tom@hadoop103 group3 ]$ vim flume2-netcat-flume.conf 1
  添加如下内容 #name a3.sources = r1 a3.channels = c1 a3.sinks = k1  #Source a3.sources.r1.type = netcat a3.sources.r1.bind = localhost a3.sources.r1.port = 44444  #Channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100  #Sink a3.sinks.k1.type = avro a3.sinks.k1.hostname = hadoop104 a3.sinks.k1.port = 4142  #Bind a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1 1234567891011121314151617181920212223
  (4)创建 flume3-flume-logger.conf
  配置 source 用于接收 flume1 与 flume2 发送过来的数据流,最终合并后 sink 到控制台。
  在 hadoop104 上编辑配置文件 [Tom@hadoop104 group3 ]$ touch flume3-flume-logger.conf [Tom@hadoop104 group3 ]$ vim flume3-flume-logger.conf 12
  添加如下内容 #name a4.sources = r1 r2 a4.channels = c1 a4.sinks = k1  #Source a4.sources.r1.type = avro a4.sources.r1.bind = hadoop104 a4.sources.r1.port = 4141  a4.sources.r2.type = avro a4.sources.r2.bind = hadoop104 a4.sources.r2.port = 4142  #Channel a4.channels.c1.type = memory a4.channels.c1.capacity = 1000 a4.channels.c1.transactionCapacity = 100  #Sink a4.sinks.k1.type = logger  #Bind a4.sources.r1.channels = c1 a4.sources.r2.channels = c1 a4.sinks.k1.channel = c1 1234567891011121314151617181920212223242526
  (5) 执行配置文件
  分别开启对应配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf 。 [Tom@hadoop104 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a4 -f job/group4/flume3-flume-logger.conf -Dflume.root.logger=INFO,console  [Tom@hadoop103 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/group4/flume2-netcat-flume.conf  [huxili@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/group4/flume1-logger-flume.conf   12345
  (6)在 hadoop102 上向  /opt/module/flume-1.7.0/data/  目录下的 group .log 追加内容[Tom@hadoop102 data]$ echo "hello" >> flume.log  1
  (7)在 hadoop103 上向 44444 端口发送数据 [Tom@hadoop103 flume-1.7.0]$ nc localhost 44444 hust OK 123
  (8)检查 hadoop104 上数据
  1.5 自定义 Interceptor
  1. 案例需求
  使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。
  2. 需求分析
  在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构, Multiplexing 的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel 中,所以我们需要自定义一个Interceptor,为不同类型的event 的Header 中的key 赋予不同的值。
  在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义 interceptor 区分数字和字母,将其分别发往不同的分析系统(Channel)(实际测试时,我们测试字符串是否包含’‘hello’’)。
  3. 实现步骤
  (1)创建一个 maven 项目,并引入以下依赖。                       org.apache.flume             flume-ng-core             1.7.0           1234567
  (2)定义 CustomInterceptor 类并实现 Interceptor 接口。 package com.Tom.interceptor;  import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;  import java.util.ArrayList; import java.util.List; import java.util.Map;  public class TypeInterceptor implements Interceptor {     // 声明一个存放事件的集合     private List addHeaderEvents;      @Override     public void initialize() {         // 初始化         addHeaderEvents = new ArrayList();     }      @Override     // 单个事件拦截     public Event intercept(Event event) {         // 1. 获取事件的头信息         Map headers = event.getHeaders();          // 2. 获取事件中的body信息         String body = new String(event.getBody());          // 3. 根据body中是否有"hello"来决定添加怎样的头信息         if(body.contains("hello")){             // 4. 添加头信息             headers.put("topic", "first");         } else {             headers.put("topic", "second");         }         return event;     }      @Override     // 批量事件拦截     public List intercept(List events) {         // 1. 清空集合         addHeaderEvents.clear();         // 2. 遍历events         for (Event event : events){             // 3. 给每一个事件添加头信息             addHeaderEvents.add(intercept(event));         }         // 4. 返回结果         return addHeaderEvents;     }      @Override     public void close() {      }      public static class Builder implements Interceptor.Builder{         @Override         public Interceptor build() {             return new TypeInterceptor();         }          @Override         public void configure(Context context) {          }     } } 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  (3)编辑 flume 配置文件
  为 hadoop102 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor 。 #Name a2.sources = r1 a2.channels = c1 c2 a2.sinks = k1 k2  #Source a2.sources.r1.type = netcat a2.sources.r1.bind = localhost a2.sources.r1.port = 44444  #Interceptor a2.sources.r1.interceptors = i1 a2.sources.r1.interceptors.i1.type = com.Tom.interceptor.TypeInterceptor$Builder  #Channel Selector a2.sources.r1.selector.type = multiplexing a2.sources.r1.selector.header = topic a2.sources.r1.selector.mapping.first = c1 a2.sources.r1.selector.mapping.second = c2  #Channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100  a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100  #Sink a2.sinks.k1.type = avro a2.sinks.k1.hostname = hadoop103 a2.sinks.k1.port = 4141  a2.sinks.k2.type = avro a2.sinks.k2.hostname = hadoop104 a2.sinks.k2.port = 4142  #Bind a2.sources.r1.channels = c1 c2 a2.sinks.k1.channel = c1 a2.sinks.k2.channel = c2 123456789101112131415161718192021222324252627282930313233343536373839404142
  为 hadoop103 上的 Flume 2 配置一个 avro source 和一个 logger sink 。 #Name a3.sources = r1 a3.sinks = k1 a3.channels = c1  #Source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop103 a3.sources.r1.port = 4141  #Sink a3.sinks.k1.type = logger  #Channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100  #Bind a3.sinks.k1.channel = c1 a3.sources.r1.channels = c1 123456789101112131415161718192021
  为hadoop104 上的 Flume3 配置一个 avro source 和一个 logger sink 。 #Name a4.sources = r1 a4.sinks = k1 a4.channels = c1  #Source a4.sources.r1.type = avro a4.sources.r1.bind = hadoop104 a4.sources.r1.port = 4142  #Sink a4.sinks.k1.type = logger a4.channels.c1.type = memory a4.channels.c1.capacity = 1000 a4.channels.c1.transactionCapacity = 100  # Channel a4.sinks.k1.channel = c1 a4.sources.r1.channels = c1 12345678910111213141516171819
  (4)分别在 hadoop103,hadoop104,hadoop102 上启动 flume 进程 (注意启动顺序)。 [Tom@hadoop103 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/interceptor/flume3.conf -Dflume.root.logger=INFO,console  [Tom@hadoop104 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a4 -f job/interceptor/flume4.conf -Dflume.root.logger=INFO,console  [Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/interceptor/flume2.conf  12345
  (5)在 hadoop102 使用 netcat 向 localhost:44444 发送字符串。 [Tom@hadoop102 flume-1.7.0]$ nc localhost 44444 helloworld OK world    OK thanks OK hello hust OK 123456789
  (6)观察 hadoop103 和 hadoop104 打印的日志 。
  1.6 自定义 Source
  1. 介绍
  Source 是负责接收数据到 Flume Agent 的 组件。 Source 组件可以处理各种类型、各种格式的日志数据 包括 avro 、 thrift 、 exec 、 jms 、 spooling directory 、 netcat 、 sequence、generator 、 syslog 、 http 、 legacy 。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source 。
  官方也提供了自定义 source 的接口:
  https://flume.apache.org/FlumeDeveloperGuide.html#source
  根据官方说明自定义 MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。
  实现相应方法:
  getBackOffSleepIncrement() //暂不用
  getMaxBackOffSleepInterval() //暂不用
  configure(Context context) //初始化 context (读取配置文件内容)
  process() //获取数据封装成 event 并写入 channel ,这个方法将被循环调用。
  使用场景:读取 MySQL 数据或者其他文件系统。
  2. 需求
  使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置。
  3. 分析
  4. 编码
  导入 pom 依赖                       org.apache.flume             flume-ng-core             1.7.0               1234567
  编写代码 package source;  import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource;  public class MySource extends AbstractSource implements Configurable, PollableSource {     // 定义全局的前缀和后缀     private String prefix;     private String subfix;      /**      * 1. 接受数据(for循环造数据)      * 2. 封装为事件      * 3. 将事件传给channel      */     @Override     public Status process() throws EventDeliveryException {         Status status = null;         // 1. 接收数据         try {             for (int i = 0; i < 5; ++i){                 // 2. 构建事件对象                 SimpleEvent evevt = new SimpleEvent();                  //3. 给事件设置值                 evevt.setBody((prefix + "--" + i + "--" + subfix).getBytes());                  //4. 将事件传给channel                 getChannelProcessor().processEvent(evevt);                  status = Status.READY;             }         } catch (Exception e) {             e.printStackTrace();             status = Status.BACKOFF;         }          try {             Thread.sleep(2000);         } catch (InterruptedException e) {             e.printStackTrace();         }          // 返回结果         return status;     }      @Override     public long getBackOffSleepIncrement() {         return 0;     }      @Override     public long getMaxBackOffSleepInterval() {         return 0;     }      @Override     public void configure(Context context) {         // 读取配置文件, 给前后缀赋值         prefix = context.getString("prefix");         subfix = context.getString("subfix", "Tom");      } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  5. 测试
  (1)打包。将写好的代码打包,并放到 flume 的 lib 目录(opt/module/flume)下。
  (2)配置文件 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1  # Describe/configure the source a1.sources.r1.type = source.MySource a1.sources.r1.prefix = online a1.sources.r1.subfix = offline  # Describe the sink a1.sinks.k1.type = logger  # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100  # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 123456789101112131415161718192021
  (3)开启任务 [Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console 1
  (4)结果展示
  1.7 自定义 Sink
  1. 介绍
  Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent 。
  Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。  事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
  Sink 组件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些Sink。
  官方也提供了自定义sink 的接口:
  https://flume.apache.org/FlumeDeveloperGuide.html#sink
  根据官方说明自定义 MySink 需要继承 AbstractSink 类并实现 Configurable 接口。
  实现相应方法:
  configure(Context context) //初始化context(读取配置文件内容)
  process() //从 Channel 读取获取数据(event),这个方法将被循环调用。
  使用场景:读取 Channel 数据写入 MySQL 或者其他文件系统。
  2. 需求
  使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。
  流程分析:
  3. 编码 package sink;  import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory;  public class MySink extends AbstractSink implements Configurable {     // 获取Logger对象     private Logger logger = LoggerFactory.getLogger(MySink.class);      // 定义两个属性,前后缀     private String prefix;     private String subfix;      /**      * 1 获取Channel      * 2 从Channel获取事务及数据      * 3 发送数据      */     @Override     public Status process() throws EventDeliveryException {         // 1 定义返回值         Status status = null;          // 2 获取Channel         Channel channel = getChannel();          // 3 从Channel获取事务         Transaction transaction = channel.getTransaction();          // 4 开启事务         transaction.begin();          try {             // 5 从Channel获取数据             Event event = channel.take();              // 6 处理事件             if (event != null){                 String body = new String(event.getBody());                 logger.info(prefix + body  + subfix);                 // logger.error(prefix + body  + subfix);             }              // 7 提交事务             transaction.commit();              // 8 成功提交, 修改状态信息             status = Status.READY;         } catch (ChannelException e) {             e.printStackTrace();              // 9 提交事务失败             transaction.rollback();              // 10 修改状态             status = Status.BACKOFF;          } finally {             // 11 最终, 关闭事务             transaction.close();         }          // 12 返回状态信息         return status;     }      @Override     public void configure(Context context) {          // 读取配置文件, 为前后缀赋值         prefix = context.getString("prefix");         subfix = context.getString("subfix", "Tom");     } } 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  4. 测试
  (1)打包。将写好的代码打包,并放到 flume 的 lib 目录(/opt/module/flume)下。
  (2)配置文件 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1  # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444  # Describe the sink a1.sinks.k1.type = sink.MySink a1.sinks.k1.prefix = online-- a1.sinks.k1.subfix = --offline  # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100  # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 1234567891011121314151617181920212223
  (3)开启任务 [Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console  [Tom@hadoop102 flume-1.7.0]$ nc localhost 44444 hello OK HUST OK 1234567
  (4)结果展示
  2 常见问题2.1 你是如何实现 Flume 数据传输的监控的?
  使用第三方框架 Ganglia 实时监控Flume。 2.2 Flume 的 Source,Sink,Channel 的作用?你们 Source 是什么类型?
  1. 作用
  (1)Source 组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy
  (2)Channel 组件对采集到的数据进行缓存,可以存放在 Memory 或File 中。
  (3)Sink 组件是用于把数据发送到目的地的组件,目的地包括 HDFS、Logger、avro、thrift、ipc、file、Hbase、solr、自定义。
  2. 我公司采用的Source 类型为
  (1)监控后台日志:exec
  (2)监控后台产生日志的端口:netcat、exec、spooldir 2.3 Flume 的 Channel Selectors
  Channel Selectors,可以让不同的项目日志通过不同的 Channel 到不同的 Sink 中去。
  官方文档上 Channel Selectors 有两种类型:Replicating Channel Selector (default)和 Multiplexing Channel Selector
  这两种Selector的区别是:Replicating 会将source过来的events发往所有channel,而Multiplexing可以选择该发往哪些Channel。 2.4 Flume 参数调优
  (1)Source
  增加 Source 个数(使用 Tair Dir Source 时可增加 FileGroups 个数)可以增大 Source 的读取数据的能力。例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个 Source 以保证 Source 有足够的能力获取到新产生的数据。
  batchSize 参数决定 Source 一次批量运输到 Channel 的 event 条数,适当调大这个参数可以提高Source 搬运 Event 到 Channel 时的性能。
  (2)Channel
  type 选择 memory 时 Channel 的性能最好,但是如果 Flume 进程意外挂掉可能会丢失数据。 type 选择 file 时 Channel 的容错性更好,但是性能上会比 memory channel 差。
  使用 file Channel 时 dataDirs 配置多个不同盘下的目录可以提高性能。
  Capacity 参数决定 Channel 可容纳最大的 event 条数。 transactionCapacity 参数决定每
  次 Source 往 channel 里面写的最大 event 条数和每次 Sink 从 channel 里面读的最大 event
  条数。  transactionCapacity 需要大于 Source 和 Sink 的 batchSize 参数。
  (3)Sink
  增加 Sink 的个数可以增加 Sink 消费 event 的能力。 Sink 也不是越多越好够用就行,过
  多的 Sink 会占用系统资源,造成系统资源不必要的浪费。
  batchSize 参数决定 Sink 一次批量从 Channel 读取的 event 条数,适当调大这个参数可
  以提高 Sink 从 Channel 搬出 event 的性能。 2.5 Flume 的事务机制
  Flume的事务机制(类似数据库的事务机制): Flume 使用两个独立的事务分别负责从 Soucrce 到 Channel ,以及从 Channel 到 Sink 的事件传递。比如 spooling directory source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到 Channel 且提交成功,那么 Soucrce 就将该文件标记为完成。同理,事务以类似的方式处理从 Channel 到 Sink 的传递过程,如果因为某种原因使得事件无法记录,那么事务将会回滚。且所有的事件都会保持到 Channel 中,等待重新传递。 2.6 Flume 采集数据会丢失吗?
  根据 Flume 的架构原理, Flume 是不可能丢失数据的,其内部有完善的事务机制,Source 到 Channel 是事务性的, Channel 到 Sink 是事务性的,因此这两个环节不会出现数据的丢失,唯一可能丢失数据的情况是 Channel 采用 memory Channel agent 宕机导致数据丢失,或者 Channel 存储数据已满,导致 Source 不再写入,未写入的数据丢失。
  Flume 不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由 Sink 发出,但是没有接收到响应, Sink 会再次发送数据,此时可能会导致数据的重复 。

游戏史上闻所未闻的武侠世界,九阴江湖不愧是一代人的武侠梦相信有不少的武侠爱好者,现在已经成家立业了吧,在十多年前的时候,武侠游戏是如此的火热,几乎在网吧里面清一色都是在玩武侠游戏的人,而在这众多的游戏之中,九阴真经更是成为了无数武侠玩家中国唯一没有平原的省份,GDP是海南省的3倍,中国最美的地方之一在西南云贵高原的东部有一个地方,是世界知名的山地旅游地和旅游大省。明代著名旅行家徐霞客游历于此,都对迷人的风景赞不绝口。它拥有雄伟壮丽的喀斯特地貌和世界罕见的瀑布群,被中国国家地理电池修复三大电池优缺分明,谁是王者?目前大家说的电动车电池主流分三种铅酸三元锂磷酸铁锂。三种材料电池各有优缺优点缺点电池修复技术铅酸能够低温大倍率放电,内阻小,单体能做大容量,工艺成熟危害环境,单体电压低2V,能量密常喝冰水会伤胃?喝热的就没事了?这几件事不注意,肠胃病照样找上门天气炎热,很多人的生活乐趣大概就是饭前饭后来杯冰水,可总有人提醒不要喝冰的,对肠胃不好很多人都觉得冷水不宜喝,而热水则算得上是一副良药,感冒发烧肚子疼都强调多喝。但你有没有想过,冰红薯真的能降血糖吗?多少糖友如果早这样吃,就不会丢命我晕,你咋还吃红薯,你不知道你血糖高吗李奶奶焦急地对老伴说道。吃红薯咋啦,人家说吃红薯可以降血糖呢王爷爷不解道。红薯那么甜,怎么可能降血糖,肯定是升血糖才对,你赶紧别吃了李奶奶再一三伏天竟然这么危险,这四件事千万别做入伏模式已开启,再热也别做这4件事!小心伤肝伤肺伤血管一年中最热的时段来了!而且今年的伏天有40天!说实话,没入伏前已经感觉到巨热,进入伏后,基本处于保命状态。三伏是一年中气温最高从不跟风,我即是风什么是英伦风尚?英国著名设计师LuellaDayrellBartley曾在她的著作LuellasGuidetoEnglishStyle写到Englandisallaboutrule皮肤爱出油,多半是这5个原因,要学会预防明明半个小时前刚洗的脸,竟然开始出油明明早上才洗的头,半天就开始出油皮肤爱出油,真的是一件特别尴尬的事情。尤其对于爱美的女生来说,出油不仅会影响她们的颜值,而且还会影响她们的自信。爽肤水后面加一样护肤品,滋润皮肤不说,还能促进吸收类别的不同精华水精华水介于爽肤水和精华液之间的一种护肤品,既有爽肤水的普适性,同时也添加了精华液成分,满足肌肤的多重护理需求。市面上所谓的高机能水精粹水营养液以及美容液都可以算在这讲真,穿对衣服绝不简单!甜美气质的职场造型,堪称穿搭模板文洛薇Hi,我是洛薇,继续我们的时尚穿搭之旅,变美永远不迷路。甜美女生的早秋职场怎么穿?今天,它是主角。非同寻常的8月底,将迎来两件盛事一是学生开学二是新人入职。作为人生的每一次飞2022夏天又火了一条裙子叫花苞裙,洋气显瘦,配短袖穿就很美夏季,女孩们都挺喜欢穿小裙子的,这个季节流行的裙子很多,尤其是百褶裙,印花裙,碎花裙都让大家爱不释手,这些时尚的裙子搭配起来真的很美很温柔,如果你的身材和颜值比较普通,但又想要通过赵丽颖的9张美照,每帧都是珍藏款1。抱歉,其实我是一个艺术家,凝视像你这样的美女是我的工作。2。我常想着,我和你都看同一个月亮,看同一个太阳,所以我和你的距离其实并没有多远。3。我们像某个星球的小熊,总是不小心打苍兰诀热度破万,王鹤棣爆红,关晓彤陈立农武艺也沾光了看点2022年8月21日,电视剧苍兰诀在爱奇艺的站内热度破万。制片人王一栩发长文庆祝称苍兰诀成为了爆款,热血笨蛋们证明了自己,各种合作纷至沓来,一切困难仿佛从未发生过。在爱奇艺的热42岁章子怡抱娃外出,全程不撒手尽显母爱,宝宝颜值神似妈妈如今的娱乐圈夫妇曾经被看好的都离婚了,没被看好的却似乎过得很如鱼似水,提及章子怡和汪峰这一对,就可以说是许多人比较意外的了,没想到如今一家人幸福的让许多人称羡不已,也打脸了当初那些王鹤棣开大G,父亲却坚持开炸串店,看到棣爸颜值没搞错吧?说起王鹤棣的话,相信大家肯定不陌生,他是娱乐圈特别帅气的男爱豆,不光演技好,唱跳更是不在话下。他与虞书欣合作的苍兰诀上线之后,大家才意识到,原来他不是徒有其名,表演天赋很强,未来星虞书欣王鹤棣主演的苍兰诀,卷起的是一股什么风?影视畅聊季热点观察团媒体人周刊谁也没想到这几天,很多人得了诀症。虞书欣王鹤棣主演的东方奇幻题材的苍兰诀,彻底火了。开播1小时后便被韩国买了版权,走出国门。在8月21日,也就是在开播曼晚为曼联球员评分瓦拉内和利桑德罗马丁内斯收获最高9分直播吧8月23日讯英超第3轮,曼联坐镇主场21战胜利物浦收获了新赛季首胜。曼彻斯特晚报为曼联球员的表现做出了评分,其中多数球员收获好评,主帅滕哈赫获评8分。曼彻斯特晚报赛后评分德赫曝曼联夏窗转会花费或达3亿!最后十日欲买3人,有机会还想签德容曼联今夏已经花费1。25亿英镑签下泰雷尔马拉西亚埃里克森利桑德罗马丁内斯和卡塞米罗4名新援,夏窗只剩最后10天,英媒宣称红魔还有大计划,希望再买3人,最终的引援总投入恐怕达到3亿英曼联VS利物浦首发曝光荷兰失意帝星坐镇中场,B费领衔,C罗冲锋北京时间8月23日凌晨0300,新赛季英超第3轮将上演一场引人关注的重头戏,红魔曼联坐镇主场迎战红军利物浦,球迷称之为双红大战。不过,本赛季的双红大战却因为曼联无底线地战绩下滑变得11,大冷!轰24脚74控球无用,9亿豪门又不赢,利物浦跌至第15文彬少侃球(首发)英超第2轮比赛,利物浦队对阵水晶宫队,这场比赛之前,外界基本上看好利物浦队能取胜,作为身价高达9亿欧左右的豪门球队,利物浦队新赛季首战遭遇了富勒姆队阻击,主场22转会消息巴塞罗那曼联阿贾克斯阿斯顿维拉曼城德佩的阵营和巴萨现在终于可以终止合同了但这取决于尤文图斯,尤文图斯尚未完成与德佩的协议,仍在讨论薪水等问题。谈判将继续直到德佩和尤文图斯达成协议,因为巴萨现在准备终止合同。哈维发布说说游戏里那些小号在0几年的国内,有一款游戏就不得不提,它霸占了全国各大网吧,他就是彼时陈天桥带着盛大推出的传奇游戏,那时候网吧里人山人海,走路都走不动。但是这些人里面有很多都是小号,当年几乎每个人
朋友圈精彩心语文案图片满眼醉意,山河皆你很喜欢的一句话在自己的世界里独善其身,在别人的世界里顺其自然。做一个积极向上的人,读温柔的句子,见阳光的人,眼里全是温柔和你。心中有事,装作若无其事,便是阅历。心中有事,还能若无其两款入门级全画幅微单横向对比尼康Z5和佳能RP各项性能深度分析随着对影像画质要求的不断提升,摄影器材也在不断升级。目前对于爱好摄影并且追求画质的小伙伴,全画幅入门级微单已经成为基本门槛。而目前的全画幅入门级微单,带着镜头套装在万元价位之内的,正在摆脱小众标签的地心历险记探洞是一场始于好奇,揭秘未知的探索之旅一起来玩吧封面新闻记者杨霁月见习记者赵奕摄影报道余与四人拥火以入,入之愈深,其进愈难,而其见愈奇。王安石游褒禅山时的惊叹还回响在时光长廊。余亦悔其随之而不得极夫游之乐也。他将未能见到山洞全貌网传小米汽车高配将采用宁德时代麒麟电池入门版配刀片电池8月17日,据36氪独家报道,小米汽车电池方案目前敲定了两家助力供应商,将由宁德时代和比亚迪旗下的弗迪电池供应。据此前爆料消息,小米汽车首款车规划了高低两个配置,低配车型计划采用4一个小赛道,却让红杉腾讯分别投了30家公司,再过几年它会香吗?作者冬雪编辑Judy来源IT桔子在红杉腾讯高瓴都在投,但这一赛道至今没有诞生一家巨头中,我们以近三年频繁获投的国内CRM公司为基点,探讨当全球最大的SaaS软件服务公司Salesf河南省四方制药集团有限公司用良心和质量打造百年企业河南经济报记者沈彤文图河南省四方制药集团有限公司(以下简称四方制药)原为周口药胶厂,位于素有南皮都之称的周口市,是全国生产胶剂较早的厂家之一。在一百四十六年的发展历程中,四方制药始格力成立餐饮管理公司,要挑战外卖配送市场?格力成立餐饮管理公司,经营范围含外卖递送服务北京13名餐饮从业人员购买假冒健康证,已被警方拘留。详情请看红餐网每日餐讯。格力成立餐饮管理公司经营范围含外卖递送服务中国经济周刊消息,科技公司中报告急,小米集团W2022财年中报显示净利润下降95。03近日,又是半年报集中发布的日子,多家科技公司在营业总收入上有着不同程度的下滑,在净利润上的表现更是让人担忧。据小米集团W发布最新2022财年中报(20220101至20220630有色金属矿采选业上市公司环境榜涉24家,国成矿业居末位关于中国上市公司环境绩效榜和本期有色金属矿采选业榜单自2021年起,澎湃新闻与公众环境研究中心(IPE)联合发布中国上市公司环境绩效动态榜单和行业分榜单。本期分析解读行业为有色金属福建奔驰副总裁尊龙很低调但爱公益公司曾多次召回同一车型引关注运营商财经网郝紫艳文近日,福建奔驰汽车公司召回158辆国产V级和威霆汽车,引起不少关注。本次运营商财经就将目光聚焦于福建奔驰副总裁尊龙,为大家揭秘他的过往经历。与影视明星尊龙不同,开车用手机导航竟然也违规?iPad蓝牙可以用吗?开车用手机导航竟然也违规?iPad蓝牙可以用吗?最近我们收到很多客户来问关于新州开车能否使用手机的问题,这也是前段时间社交媒体上讨论度比较高的问题。新州关于开车使用手机和其他电子设潘玮柏忙工作不忘陪老婆,带宣云买衣服还贴心帮拎东西,两人好甜8月14日,潘玮柏最新动态曝光。有网友发文称在长沙某商场偶遇到了潘玮柏及其妻子宣云。目前,潘玮柏正在湖南参与录制第二季乘风破浪的哥哥,因此他被偶遇到并不为奇,但难得是潘帅在忙工作之再看星光大道4对组合状况玖月奇迹一拍两散,凤凰传奇长红不衰要说走出最多草根明星的舞台,大约就是2004年开始的星光大道了从星光大道开始以来,一个又一个草根歌手登上了舞台。他们展示了让人吃惊的实力和能力。更难的是,这些歌手很多是来自于非专业杨帆最是青春难忘怀,在央视网综中和张蕾一起合唱同桌的你杨帆最是青春难忘怀,在央视网综中和张蕾一起合唱同桌的你聚焦艺人最新动态,传递圈内主流声音。晓今娱全网特供今日头条独家首发,严禁转载本文由晓今娱原创,欢迎关注,带你一起长知识!杨帆和2022微博电影之夜,周冬雨抢镜,沈腾偷轮胎,邓超生图笑不停8月14日,微博电影之夜开启,真是众星云集,造型各异,在精修图下明星都是神采飞扬,精美别致,美轮美奂,到了红毯上,看看明星的生图能否还能抗住高清镜头。一年一度的微博之夜真是大咖云集700位明星到场祝寿,金像奖为他改期,曾志伟在娱乐圈地位有多牛这绝对是娱乐圈最轰动的生日宴,香港TVB怒砸200w为其庆生,全国700多位明星赶来为他贺寿,曾志伟60岁大寿到底有多风光,千万网友忍不住好奇!除了老一辈的任达华成龙谢贤等超级巨星好家伙!耗资2。5亿10位明星参演,猎屠3天票房仅1000万本周国内电影市场迎来了七部新片上线,可惜虽然新片众多,也有断。桥猎屠这种全明星阵容制作成本高的影片,但依然无法撼动沈腾主演的独行月球。断。桥票房昙花一现,后劲不足,独行月球连续18玫瑰之战揭面海王令仪撒腿就跑,顾念取证,叶勤勤试探电视剧玫瑰之战通过重返职场的顾念办理一个个案件来推进剧情,同时作为前夫宋嘉辰的二辩律师,顾念在调查取证杜康案中前夫陷入了多少。玫瑰这些案子中,出现了不少海王,包括赵永霖宋嘉辰高天宇瞧瞧娱乐圈里的明星们孕肚照片(二)一序言摘要瞧瞧娱乐圈里的明星们孕肚照片二瞧瞧娱乐圈里的明星们孕肚照片(二)5王灿,王灿本名王孔达,出生于1973年11月4日,出生在台湾省高雄市凤山区,我国台湾省男演员。6谢娜,谢国民女神倪萍近况曝光,略显消瘦惹人心疼,与网友畅谈缓解焦虑的秘诀近日,国民女神倪萍老师在短视频平台分享了一则关于如何缓解焦虑的视频。视频中倪萍老师的消瘦引起了网友关注。视频中倪萍老师的脸小了一圈,宽宽的双眼皮稍有些松弛了,她穿着黑白格子衬衫搭配曝唐嫣将出演独身女人,男主是何以笙箫默里的老熟人近日,网上有了关于唐嫣的新剧消息,她的粉丝都非常高兴,那作为路人,对于唐嫣的作品也是没有任何抗拒力,从仙剑三就开始关注她,唐嫣所有的剧也基本都看了,不得不说她的观众缘真的很好!唐嫣日本媒体称中国高水准论文超越了美国日本文部科学省科技与学术政策研究所依据美国调查公司ClarivateAnalytics的数据,分析了世界主要国家从2018到2020年三年间发表的论文篇数,并以三年的平均数据作为计