一、搭建1、kafka搭建1、解压tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/ mv kafka_2.11-2.4.1/ kafka2、创建logs文件夹 在/opt/module/kafka目录下创建logs文件夹 mkdir logs3、修改配置文件cd config/vi server.properties #broker的全局唯一编号,不能重复 broker.id=0 #删除topic功能使能,当前版本此配置默认为true,已从配置文件移除 delete.topic.enable=true #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘IO的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka运行日志存放的路径 log.dirs=/opt/module/kafka/logs #topic在当前broker上的分区个数 num.partitions=1 #用来恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 #segment文件保留的最长时间,超时将被删除 log.retention.hours=168 #配置连接Zookeeper集群地址 zookeeper.connect=bigdata:21814、配置环境变量vim /etc/profile.d/my_env.sh #KAFKA_HOME export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin5、启动kafka 启动之前先启动zk vim mykafkaservices.sh#!/bin/bash if [ $# -lt 1 ] then echo "Input Args Error....." exit fi for i in bigdata do case $1 in start) echo "==================START $i KAFKA===================" ssh $i /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties ;; stop) echo "==================STOP $i KAFKA===================" ssh $i /opt/module/kafka/bin/kafka-server-stop.sh stop ;; *) echo "Input Args Error....." exit ;; esac done6、常用命令1、创建topic kafka-topics.sh --zookeeper bigdata:2181 --create --replication-factor 1 --partitions 1 --topic first2、查看topickafka-topics.sh --zookeeper bigdata:2181 --list 3、删除topic kafka-topics.sh --zookeeper bigdata:2181 --delete --topic first 4、生产消息kafka-console-producer.sh --broker-list bigdata:9092 --topic first 5、消费消息kafka-console-consumer.sh --bootstrap-server bigdata:9092 --topic first kafka-console-consumer.sh --bootstrap-server bigdata:9092 --from-beginning --topic first 6、查看topic详情kafka-topics.sh --zookeeper bigdata:2181 --describe –-topic first 7、修改分区数kafka-topics.sh --zookeeper bigdata:2181 --alter –-topic first --partitions 6 7、案例测试1、编辑flume配置vim flume-kafka.conf # define a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/module/data/flume.log # sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = bigdata:9092 a1.sinks.k1.kafka.topic = first a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 2、启动 vim flume-kafka.sh /opt/module/flume/bin/flume-ng agent --conf /opt/module/flume/conf/ --name a1 --conf-file /opt/module/flume/job/flume-kafka.conf -Dflume.root.logger=INFO,console sh flume-kafka.sh 3、写入数据 echo hello >> /opt/module/data/flume.log 4、kafka验证数据kafka-console-consumer.sh --bootstrap-server bigdata:9092 --topic first2、kafka监控平台搭建1、解压tar -zxvf kafka-eagle-bin-1.4.5.tar.gz tar -zxvf kafka-eagle-web-2.0.0-bin.tar.gz -C /opt/module/ mv kafka-eagle-web-2.0.0/ eagle 2、启动文件执行权限cd /opt/module/eagle/bin chmod 777 ke.sh 3、修改配置文件vim conf/system-config.properties ###################################### # multi zookeeper&kafka cluster list ###################################### kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=bigdata:2181 ###################################### # kafka offset storage ###################################### cluster1.kafka.eagle.offset.storage=kafka ###################################### # enable kafka metrics ###################################### kafka.eagle.metrics.charts=true kafka.eagle.sql.fix.error=false ###################################### # kafka jdbc driver address ###################################### kafka.eagle.driver=com.mysql.jdbc.Driver kafka.eagle.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull kafka.eagle.username=root kafka.eagle.password=123456