1、环境版本- 版本:kafka2.11,zookeeper3.4
注意:这里zookeeper3.4也是基于集群模式部署。
2、解压重命名- tar -zxvf kafka_2.11-0.11.0.0.tgz
- mv kafka_2.11-0.11.0.0 kafka2.11
创建日志目录
- [root@en-master kafka2.11]# mkdir logs
注意:以上操作需要同步到集群下其他服务上。
3、添加环境变量- vim /etc/profile
- export KAFKA_HOME=/opt/kafka2.11
- export PATH=$PATH:$KAFKA_HOME/bin
- source /etc/profile
4、修改核心配置- [root@en-master /opt/kafka2.11/config]# vim server.properties
- -- 核心修改如下
- # 唯一编号
- broker.id=0
- # 开启topic删除
- delete.topic.enable=true
- # 日志地址
- log.dirs=/opt/kafka2.11/logs
- # zk集群
- zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
注意:broker.id安装集群服务个数编排即可,集群下不能重复。
5、启动kafka集群- # 启动命令
- [root@node02 kafka2.11]# bin/kafka-server-start.sh -daemon config/server.properties
- # 停止命令
- [root@node02 kafka2.11]# bin/kafka-server-stop.sh
- # 进程查看
- [root@node02 kafka2.11]# jps
注意:这里默认启动了zookeeper集群服务,并且集群下的kafka分别启动。
6、基础管理命令创建topic
- bin/kafka-topics.sh --zookeeper zk01:2181
- --create --replication-factor 3 --partitions 1 --topic one-topic
参数说明:
- replication-factor 定义副本个数
- partitions 定义分区个数
- topic:定义topic名称
查看topic列表
- bin/kafka-topics.sh --zookeeper zk01:2181 --list
修改topic分区
- bin/kafka-topics.sh --zookeeper zk01:2181 --alter --topic one-topic --partitions 5
查看topic
- bin/kafka-topics.sh --zookeeper zk01:2181
- --describe --topic one-topic
发送消息
- bin/kafka-console-producer.sh
- --broker-list 192.168.72.133:9092 --topic one-topic
消费消息
- bin/kafka-console-consumer.sh
- --bootstrap-server 192.168.72.133:9092 --from-beginning --topic one-topic
删除topic
- bin/kafka-topics.sh --zookeeper zk01:2181
- --delete --topic first
7、Zk集群用处Kafka集群中有一个broker会被选举为Controller,Controller依赖Zookeeper环境,管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。