kafka集群管理指南(一)
本指南使用的工具为kafka/bin目录下相关脚本。添加/删除topics
可以使用如下命令进行新增topics: > bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
其中,—topic表示主题名称,—partitions表示分区数,—replication-factor表示副本数,—config表示主题配置,会覆盖默认的配置项。
可以使用下述命令删除topic: > bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name修改topics配置
可以使用kafka-topics.sh命令进行修改topics。新增partitions
比如你要新增partitions,那么你可以使用如下命令: > bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name --partitions 40新增configs > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y删除configs > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x优雅关闭kafka服务器
Kafka 集群将自动检测任何 broker 关闭或故障,并为该机器上的分区选举新的领导者。 无论服务器发生故障还是为了维护或配置更改而有意关闭,都会发生这种情况。 对于后一种情况,Kafka 支持一种更优雅的停止服务器的机制,而不仅仅是杀死它。 当服务器正常停止时,它将利用两个优化:它将所有日志同步到磁盘,以避免在重新启动时需要进行任何日志恢复(即验证日志尾部所有消息的校验和)。 日志恢复需要时间,因此这会加快有意重新启动的速度。它将在关闭之前将服务器作为领导者的任何分区迁移到其他副本。 这将使领导转移更快,并将每个分区不可用的时间最小化到几毫秒。
每当服务器停止而不是硬终止时,同步日志将自动发生,但受控领导迁移需要使用特殊设置: controlled.shutdown.enable=true
请注意,只有在broker上托管的所有分区都具有副本(即复制因子大于 1 并且这些副本中至少有一个处于活动状态)时,受控关闭才会成功。 这通常是您想要的,因为关闭最后一个副本会使该主题分区不可用。集群建数据复制/数据跨区域复制
Kafka 管理员可以定义跨越单个 Kafka 集群、数据中心或地理区域边界的数据流。 有关详细信息,请参阅异地复制部分。平衡leadership
每当 Broker 停止或崩溃时,该 Broker 分区的领导权就会转移到其他副本。 当 broker 重新启动时,它只会是其所有分区的跟随者,这意味着它不会用于客户端读取和写入。
为了避免这种不平衡,Kafka 有一个首选副本的概念。 如果分区的副本列表是 1,5,9,则节点 1 优先于节点 5 或 9 作为领导者,因为它在副本列表中更早。 默认情况下,Kafka 集群将尝试将领导权恢复到首选副本。 也就是要进行如下配置: auto.leader.rebalance.enable=true
您也可以将其设置为 false,但您需要通过运行以下命令手动将领导恢复到恢复的副本: > bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port管理消费者组列出所有消费者组
使用 ConsumerGroupCommand 工具,我们可以列出、描述或删除消费者组。 消费者组可以手动删除,也可以在该组的最后提交的偏移量到期时自动删除。 手动删除仅在组没有任何活动成员时才有效。 例如,要列出所有主题的所有消费者组: > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list test-consumer-group列出消费者组消费位置 > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID topic3 0 241019 395308 154289 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 topic2 1 520678 803288 282610 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 topic3 1 241018 398817 157799 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 topic1 0 854144 855809 1665 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 topic2 0 460537 803290 342753 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 topic3 2 243655 398812 155157 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4
有许多额外的"describe"选项可用于提供有关消费者组的更多详细信息:
--members:此选项提供消费者组中所有活动成员的列表。 > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members CONSUMER-ID HOST CLIENT-ID #PARTITIONS consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0
--members --vrbose:除了上面"--members"选项报告的信息之外,此选项还提供分配给每个成员的分区。 > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 topic1(0), topic2(0) consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 topic3(2) consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 topic2(1), topic3(0,1) consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 -
--offsets:这是默认的描述选项,提供与"--describe"选项相同的输出。
--state:此选项提供有用的组级信息。 > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS localhost:9092 (0) range Stable 4
要手动删除一个或多个消费者组,可以使用"--delete"选项: > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group Deletion of requested consumer groups ("my-group", "my-other-group") was successful.重置消费者组的偏移量
要重置消费者组的偏移量,可以使用"--reset-offsets"选项。 此选项一次支持一个消费者组。 它需要定义以下范围:--all-topics 或--topic。 必须选择一个范围,除非您使用"--from-file"方案。 此外,首先确保消费者实例处于非活动状态。 有关更多详细信息,请参阅 KIP-122。
它有 3 个执行选项:(默认)显示要重置的偏移量。--execute : 执行 --reset-offsets 进程。--export :将结果导出为 CSV 格式。
--reset-offsets 还有以下场景可供选择(必须至少选择一个场景):--to-datetime :将偏移量重置为日期时间的偏移量。 格式:"YYYY-MM-DDTHH:mm:SS.sss"--to-earliest :将偏移量重置为最早的偏移量。--to-latest :将偏移量重置为最新偏移量。--shift-by :重置偏移量将当前偏移量移动"n",其中"n"可以是正数或负数。--from-file :将偏移量重置为 CSV 文件中定义的值。--to-current :将偏移重置为当前偏移。--by-duration :将偏移量重置为从当前时间戳开始的持续时间偏移量。 格式:"PnDTnHnMnS"--to-offset :将偏移量重置为特定偏移量。
请注意,超出范围的偏移将被调整到可用的偏移结束。 例如,如果offset end 为10,offset shift request 为15,那么实际上会选择offset at 10。
例如,要将消费者组的偏移量重置为最新的偏移量:> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest TOPIC PARTITION NEW-OFFSET topic1 0 0
如果您使用旧的high-level消费者并将组元数据存储在 ZooKeeper 中(即 offsets.storage=zookeeper),请传递 --zookeeper 而不是 --bootstrap-server:> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list扩充kafka集群
将服务器添加到 Kafka 集群很容易,只需为它们分配一个唯一的broker ID 并在新服务器上启动 Kafka。 然而,这些新服务器不会自动分配任何数据分区,因此除非将分区移动到它们,否则在创建新主题之前它们不会做任何工作。 因此,通常当您将机器添加到集群时,您会希望将一些现有数据迁移到这些机器上。
迁移数据的过程是手动启动的,但完全自动化。 在幕后,Kafka 将添加新服务器作为它正在迁移的分区的跟随者,并允许它完全复制该分区中的现有数据。 当新服务器完全复制此分区的内容并加入同步副本时,现有副本之一将删除其分区的数据。
分区重新分配工具可用于在broker之间移动分区。 理想的分区分布将确保所有broker的数据负载和分区大小均匀。 分区重新分配工具无法自动研究 Kafka 集群中的数据分布并移动分区以获得均匀的负载分布。 因此,管理员必须弄清楚应该移动哪些主题或分区。
分区重新分配工具可以在 3 种互斥模式下运行:--generate:在这种模式下,给定一个主题列表和一个broker列表,该工具生成一个候选重新分配,以将指定主题的所有分区移动到新的broker。 此选项仅提供一种方便的方法来生成给定主题和目标代理broker的分区重新分配计划。--execute:在这种模式下,该工具根据用户提供的重新分配计划启动分区的重新分配。 (使用 --reassignment-json-file 选项)。 这可以是由管理员手工制作的自定义重新分配计划,也可以使用 --generate 选项提供--verify:在此模式下,该工具会验证上次 --execute 期间列出的所有分区的重新分配状态。 状态可以是成功完成、失败或进行中自动将数据迁移到新机器
分区重新分配工具可用于将某些主题从当前brokers移至新添加的broker。 这在扩展现有集群时通常很有用,因为将整个主题移动到新的一组broker比一次移动一个分区更容易。 当用于执行此操作时,用户应提供待移动的brokers的主题列表和新brokers的目标主题列表。 然后,该工具将给定主题列表的所有分区均匀分布在新的brokers上。 在此过程中,主题的复制因子保持不变。 实际上,输入主题列表的所有分区的副本都从旧brokers移动到新添加的brokers。
例如,以下示例将主题 foo1,foo2 的所有分区移动到新的一组broker 5,6。 在此移动结束时,主题 foo1 和 foo2 的所有分区将仅存在于broker 5,6 上。
由于该工具接受输入格式为 json 文件的主题列表,因此您首先需要确定要移动的主题并创建 json 文件,如下所示: > cat topics-to-move.json {"topics": [{"topic": "foo1"}, {"topic": "foo2"}], "version":1 }
json文件准备好后,使用分区重新分配工具生成候选分配:> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } Proposed partition reassignment configuration {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}] }
该工具生成一个候选分配,它将所有分区从主题 foo1,foo2 移动到broker 5,6。 但是请注意,此时分区移动还没有开始,它只是告诉您当前的分配和建议的新分配。 如果您想回滚到当前分配,应保存当前分配。 新分配应保存在 json 文件(例如 expand-cluster-reassignment.json)中,以使用 --execute 选项输入到工具中,如下所示: > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}] }
最后,该工具可以使用 --verify 选项来检查分区重新分配的状态。 请注意,相同的 expand-cluster-reassignment.json(与 --execute 选项一起使用)应该与 --verify 选项一起使用:> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo1,1] is in progress Reassignment of partition [foo1,2] is in progress Reassignment of partition [foo2,0] completed successfully Reassignment of partition [foo2,1] completed successfully Reassignment of partition [foo2,2] completed successfully自定义分区分配和迁移
分区重新分配工具还可用于有选择地将分区的副本移动到一组特定的broker。 当以这种方式使用时,假设用户知道重新分配计划并且不需要工具来生成候选重新分配,有效地跳过--generate步骤并直接移动到--execute步骤
例如,以下示例将主题 foo1 的分区 0 移动到代理 5,6,将主题 foo2 的分区 1 移动到代理 2,3:
第一步是在 json 文件中手工制作自定义重新分配计划:> cat custom-reassignment.json {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
然后,使用带有 --execute 选项的 json 文件开始重新分配过程:> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]}, {"topic":"foo2","partition":1,"replicas":[3,4]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] }
--verify 选项可与该工具一起使用以检查分区重新分配的状态。 请注意,应将相同的 custom-reassignment.json(与 --execute 选项一起使用)与 --verify 选项一起使用:> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --verify Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo2,1] completed successfully
连续拨号软件自动呼叫,自动外呼软件连续拨号软件自动呼叫,自动外呼软件自动呼叫软件是通过系统自动拨号,再将记录分配给空闲座席,从而很大程度上提高座席代表的工作效率。随着呼叫中心的发展,自动外呼通常也指预测外呼,系统通
deVereGroupCEO比特币已成为投资主流deVereGroupCEO比特币已成为投资主流10月22日消息,deVereGroup首席执行官兼创始人NigelGreen预测,比特币将重新回到并超越此前创下的历史新高,比特币
BetaShares的新加密ETF上市前15分钟打破了ASX记录BTC跌破61000美元BTC跌破61000美元,现报60986。2美元,日内跌幅达到0。09,行情波动较大,请做好风险控制。BetaShares的新加密ETF上市前15分钟打破了
axb电销线路,axb电销软件axb电销线路,axb电销软件近年来,我国进入了数字征信时代,对信用信息采集整理保存和加工进行了限制性规定,明确了征信机构不得以非法方式采集信息,这一举措,尤其对以电话销售的企业有
axb电销系统容易封号?axb电销系统靠谱吗axb电销系统容易封号?axb电销系统靠谱吗电销软件在最近几年关注度非常高。目前来说,销售人员打电话封号常有发生。而为了解决封号问题,很多企业都在用电销软件。其中,电销企业听过最多
电销外呼系统怎么安装,外呼系统排名电销外呼系统怎么安装,外呼系统排名市面上外呼系统公司很多,但是选择时最主要的是根据需求去匹配,选择最适合业务发展需要的系统。在选择外呼系统公司时,可以通过以下几个方面进行考量。1能
AXB电销外呼系统,axb电销防封系统搭建AXB电销外呼系统,axb电销防封系统搭建目前三大运营商的卡一天打几十个电话可能被封了,封号多了很可能被运营商拉入征信黑名单。因此很电销企业不得不到处寻找防封的电销系统,其中AXB
axb电销系统容易封号?电销axb软件下载电销axb系统搭建代码技术是一方面,最重要是有外呼线路。解决了线路问题,代码程序如果是自己有能力开发的,可以自己写。如果没这方面的技术人才,也可以购买他人的源码,根据自己的要求定制
比亚迪e2热销中优惠高达8000元给生活稳稳幸福感,2021款e2家庭纯电优选。配备高安全长寿命刀片电池严苛测试,关心和保障您的每次出行NEDC401KM超长续航满足日常上下班代步,户外郊游需求,畅行无忧直上绿牌,
海豚售价9。38万元起欢迎莅临赏鉴纯电新物种海豚正式上市!综合补贴后售价9。38万元12。18万元!海豚是海洋车系的首款车型,也是首款采用海洋美学设计理念的车型,首款基于e平台3。0打造的车型。搭载DiLink3。
axb电销系统合法,axb电销系统搭建axb电销系统合法,axb电销系统搭建axb线路的话是现在目前电销企业运用最多的一种线路,因为这个线路相对来说审核机制没有那么严格,除了一些特殊的行业不能使用之外,其他的都是可以用