集群部署
环境安装
如果当前还没安装 kafka 环境,先到官网页面下载(https://kafka.apache.org/downloads),这里以最新版(v4.1.1)为例:
$ wget https://dlcdn.apache.org/kafka/4.1.1/kafka_2.13-4.1.1.tgz
需要强调一点,kafka 对 Java 环境有版本要求,如果当前 Java 环境不满足 kafka 运行要求,将无法正常启动,具体可以查看官方文档说明。
比如 v4.1.1 推荐使用 JDK21:https://kafka.apache.org/documentation/#java。
kafka 下载安装成功后,开始配置环境变量:
# Java
export JAVA_HOME=/usr/local/lib/java/jdk21
export PATH=$JAVA_HOME/bin:$PATH
# Kafka
export KAFKA_HOME=/usr/local/lib/kafka/kafka_4_1_1
export PATH=$KAFKA_HOME/bin:$PATH
准备工作
正常情况下,都是在多机器部署集群,每台机器都是一个 broker 节点。但是因为我只有一台机器,所以我就通过不同的配置文件来实现单机部署集群。
(在任意目录下)创建几个(集群节点建议最少为 3 个)broker 文件夹,每个文件夹都当做一个 broker 配置环境。之后在各自的配置环境下分别创建一个 data 目录和 broker.properties 配置文件。最终环境结构如下:
$ mkdir standalone-cluster
.
├── broker_1
│ ├── broker.properties
│ └── data
├── broker_2
│ ├── broker.properties
│ └── data
└── broker_3
├── broker.properties
└── data
broker.properties 无需手动创建,直接将 KAFKA_HOME 下的配置文件拷贝过来即可。
$ cp $KAFKA_HOME/config/broker.properties broker_1/
$ cp $KAFKA_HOME/config/broker.properties broker_2/
$ cp $KAFKA_HOME/config/broker.properties broker_3/
之后修改 broker.properties 配置文件。
broker 配置
集群中每个 broker 都是一个节点,同一个集群的 node.id 必须是唯一的,node.id 在配置时建议从 1 开始自增
broker_1 基于原始配置文件,做如下修改:
# 节点角色
# - controller: 元数据节点
# - broker: 数据存储节点
#
# 这里由于机器原因, 每个节点都将承担两种角色使用.
#
# 如果机器足够多, 建议将 controller 和 broker 分开部署(尤其是生产环境).
# controller 节点推荐至少部署 3 个
# broker 节点也建议至少部署 3 个
process.roles=broker,controller
# 是否允许自动创建 topic
# 如果想早点下班, 生产环境一定要设置 false
auto.create.topics.enable=false
# 节点ID
# 集群中每个节点 ID 都是唯一的, 建议从 1 开始自增
node.id=1
# (通信)服务启动端口
#
# - 如果当前节点仅作为 broker 使用, 只需要配置一个 PLAINTEXT 通信服务端口即可
# - 如果当前节点仅作为 controller 使用, 只需要配置一个 CONTROLLER 通信服务端口即可
# - 但如果当前节点同时承担多种角色, 那这里就要配置多个, 注意不同通信服务端口号不要冲突
#
# 除了这里列出的两种通信协议, 还可以配置 SSL 等通信协议
# 具体可查询文档对 listener.security.protocol.map 配置项的说明
listeners=PLAINTEXT://:19092,CONTROLLER://:19093
# 对外开放地址
#
# 端口就是前面配置的监听端口
#
# 每个通信协议服务可被外部访问的地址, 前面配置了一个服务这里就配置几个
# 千万不要配置 localhost, 一定要使用 ip 或 域名
advertised.listeners=PLAINTEXT://172.21.11.1:19092,CONTROLLER://172.21.11.1:19093
# 将当前节点注册集群到集群
# 用于告诉 broker 首次初始化去哪里获取集群元数据信息
#
# 这里建议将要所有 controller 节点都配置上去, 实际只要配置任意一
# 个能正常通信的 controller 节点即可, 配置多个是为了防止意外情况
controller.quorum.bootstrap.servers=172.21.11.1:19093,172.21.11.1:29093,172.21.11.1:39093
# 配置真正的 controller 节点(选举节点)
# controller.quorum.bootstrap.servers 只是初始化使用, 该配置才是用于指定集群运行时真正的选举节点
# 一定要将所有 controller 节点都配置上去!
# 另外配置格式为: 节点ID@通信地址
controller.quorum.voters=1@172.21.11.1:19093,2@172.21.11.1:29093,3@172.21.11.1:39093
# 数据写入目录
#
# 最好使用机器绝对路径, 别使用相对路径
# 如果使用相对路径, 数据将写入执行命令所在目录的相对目录
log.dirs=/usr/local/lib/kafka/kafka_4_1_1/standalone-cluster/broker_1/data
NOTE
其他 broker 节点配置也如出一辙,注意修改下 node.id、listeners 和 log.dirs
broker_2 配置文件:
process.roles=broker,controller
auto.create.topics.enable=false
# 注意节点ID
node.id=2
# 注意端口号
listeners=PLAINTEXT://:29092,CONTROLLER://:29093
advertised.listeners=PLAINTEXT://172.21.11.1:29092,CONTROLLER://172.21.11.1:29093
controller.quorum.bootstrap.servers=172.21.11.1:19093,172.21.11.1:29093,172.21.11.1:39093
controller.quorum.voters=1@172.21.11.1:19093,2@172.21.11.1:29093,3@172.21.11.1:39093
# 注意输出目录
log.dirs=/usr/local/lib/kafka/kafka_4_1_1/standalone-cluster/broker_2/data
broker_3 配置文件:
process.roles=broker,controller
auto.create.topics.enable=false
# 注意节点ID
node.id=3
# 注意端口号
listeners=PLAINTEXT://:39092,CONTROLLER://:39093
advertised.listeners=PLAINTEXT://172.21.11.1:39092,CONTROLLER://172.21.11.1:39093
controller.quorum.bootstrap.servers=172.21.11.1:19093,172.21.11.1:29093,172.21.11.1:39093
controller.quorum.voters=1@172.21.11.1:19093,2@172.21.11.1:29093,3@172.21.11.1:39093
# 注意输出目录
log.dirs=/usr/local/lib/kafka/kafka_4_1_1/standalone-cluster/broker_3/data
生成集群ID(cluster id)
配置文件都调整完成后,就可以生成集群ID(cluster.id)了。cluster.id 就是一个 UUID,可以使用任意工具生成,不过 kafka 推荐使用 bin/kafka-storage.sh。只需要在任意一台 broker 节点执行如下命令即可:
$ bin/kafka-storage.sh random-uuid
i1KwsyLMSr6-Mfx6deLpkg # cluster id
集群ID(cluster.id)生成成功后,就可以使用该 ID 初始化 broker 元数据了。
初始化 broker 元数据
broker 元数据只需要在首次加入 cluster 之前初始化一次即可,后续重启无需重复初始化。
使用前面生成的 cluster.id 挨个初始化 broker。
初始化 broker_1:
$ bin/kafka-storage.sh format \
--clusster-id i1KwsyLMSr6-Mfx6deLpkg \
--config standalone-cluster/broker_1/broker.properties
如果执行成功,会输出类似如下结果:
Formatting metadata directory /usr/local/lib/kafka/kafka_4_1_1/standalone-cluster/broker_1/data with metadata.version 4.1-IV1.
并且 data 目录下会有两个文件:
$ ls standalone-cluster/broker_1/data/
bootstrap.checkpoint meta.properties
bootstrap.checkpoint 是集群元数据日志的初始快照,包含了节点启动时必须知道的最基础配置信息。是一个二进制文件,可以使用 hexdump 查看:
$ hexdump -C bootstrap.checkpoint
00000000 00 00 00 00 00 00 00 00 00 00 00 47 00 00 00 00 |...........G....|
00000010 02 13 77 76 1f 00 20 00 00 00 00 00 00 01 9a 9b |..wv.. .........|
00000020 dd cf 26 00 00 01 9a 9b dd cf 26 ff ff ff ff ff |..&.......&.....|
00000030 ff ff ff ff ff ff ff ff ff 00 00 00 01 2a 00 00 |.............*..|
00000040 00 08 00 00 00 03 16 00 00 00 00 00 00 00 00 00 |................|
00000050 00 00 00 00 00 00 00 00 00 00 01 00 00 00 b9 00 |................|
00000060 00 00 00 02 02 26 7c 9b 00 00 00 00 00 03 00 00 |.....&|.........|
00000070 01 9a 9b dd cf 3d 00 00 01 9a 9b dd cf 3d ff ff |.....=.......=..|
00000080 ff ff ff ff ff ff ff ff ff ff ff ff 00 00 00 04 |................|
00000090 3a 00 00 00 01 2e 01 0c 00 11 6d 65 74 61 64 61 |:.........metada|
000000a0 74 61 2e 76 65 72 73 69 6f 6e 00 1b 00 00 5a 00 |ta.version....Z.|
000000b0 00 02 01 4e 01 0c 00 21 65 6c 69 67 69 62 6c 65 |...N...!eligible|
000000c0 2e 6c 65 61 64 65 72 2e 72 65 70 6c 69 63 61 73 |.leader.replicas|
000000d0 2e 76 65 72 73 69 6f 6e 00 01 00 00 34 00 00 04 |.version....4...|
000000e0 01 28 01 0c 00 0e 67 72 6f 75 70 2e 76 65 72 73 |.(....group.vers|
000000f0 69 6f 6e 00 01 00 00 40 00 00 06 01 34 01 0c 00 |ion....@....4...|
00000100 14 74 72 61 6e 73 61 63 74 69 6f 6e 2e 76 65 72 |.transaction.ver|
00000110 73 69 6f 6e 00 02 00 00 00 00 00 00 00 00 00 05 |sion............|
00000120 00 00 00 3f 00 00 00 00 02 b5 d9 01 d7 00 20 00 |...?.......... .|
00000130 00 00 00 00 00 01 9a 9b dd cf 3e 00 00 01 9a 9b |..........>.....|
00000140 dd cf 3e ff ff ff ff ff ff ff ff ff ff ff ff ff |..>.............|
00000150 ff 00 00 00 01 1a 00 00 00 08 00 00 00 04 06 00 |................|
00000160 00 00 00 |...|
00000163
meta.properties 是一个普通文本文件,用于存储集群ID、Broker 节点ID 等基本信息:
$ cat meta.properties
cluster.id=i1KwsyLMSr6-Mfx6deLpkg
directory.id=mZHZbZnYrI9KAJHHctSxeg
node.id=1
version=1
其他两个 broker 也执行同样操作(cluster.id 不能变,调整下要使用的配置文件即可):
# broker_2
$ bin/kafka-storage.sh format \
--clusster-id i1KwsyLMSr6-Mfx6deLpkg \
--config standalone-cluster/broker_2/broker.properties
# broker_3
$ bin/kafka-storage.sh format \
--clusster-id i1KwsyLMSr6-Mfx6deLpkg \
--config standalone-cluster/broker_3/broker.properties
启动 broker
所有准备工作都完成后,就可以启动 broker 节点了,命令如下:
$ bin/kafka-server-start.sh standalone-cluster/broker_1/broker.properties
默认情况下,broker 是以前台运行,使用 CTRL-C 关闭日志输出就等于关闭 broker 节点。如果想在后台运行 broker 服务,可以加上 -daemon 参数指定以守护进程运行:
$ bin/kafka-server-start.sh -daemon standalone-cluster/broker_1/broker.properties
输出示例:
...
[2025-11-19 19:30:59,102] INFO [RaftManager id=1] Node 2 disconnected. (org.apache.kafka.clients.NetworkClient)
[2025-11-19 19:30:59,102] WARN [RaftManager id=1] Connection to node 2 (/172.21.11.1:29093) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient)
[2025-11-19 19:30:59,187] INFO [MetadataLoader id=1] initializeNewPublishers: the loader is still catching up because we still don't know the high water mark yet. (org.apache.kafka.image.loader.MetadataLoader)
[2025-11-19 19:30:59,287] INFO [MetadataLoader id=1] initializeNewPublishers: the loader is still catching up because we still don't know the high water mark yet. (org.apache.kafka.image.loader.MetadataLoader)
[2025-11-19 19:30:59,388] INFO [MetadataLoader id=1] initializeNewPublishers: the loader is still catching up because we still don't know the high water mark yet. (org.apache.kafka.image.loader.MetadataLoader)
现在只启动一个 broker 节点,会提示其他节点还没找到。继续启动 broker_2:
$ bin/kafka-server-start.sh standalone-cluster/broker_2/broker.properties
...
[2025-11-19 19:32:25,238] INFO [MetadataLoader id=2] InitializeNewPublishers: initializing BrokerRegistrationTracker(id=2) with a snapshot at offset 8 (org.apache.kafka.image.loader.MetadataLoader)
[2025-11-19 19:32:25,240] INFO [ControllerRegistrationManager id=2 incarnation=W17irwObTtK09M02miaOBA] Our registration has been persisted to the metadata log. (kafka.server.ControllerRegistrationManager)
[2025-11-19 19:32:25,241] INFO [BrokerServer id=2] Waiting for the broker to be unfenced (kafka.server.BrokerServer)
[2025-11-19 19:32:25,277] INFO [BrokerLifecycleManager id=2] The broker has been unfenced. Transitioning from RECOVERY to RUNNING. (kafka.server.BrokerLifecycleManager)
[2025-11-19 19:32:25,277] INFO [BrokerServer id=2] Finished waiting for the broker to be unfenced (kafka.server.BrokerServer)
[2025-11-19 19:32:25,278] INFO authorizerStart completed for endpoint PLAINTEXT. Endpoint is now READY. (org.apache.kafka.server.network.EndpointReadyFutures)
[2025-11-19 19:32:25,278] INFO [SocketServer listenerType=BROKER, nodeId=2] Enabling request processing. (kafka.network.SocketServer)
[2025-11-19 19:32:25,279] INFO Awaiting socket connections on 0.0.0.0:29092. (kafka.network.DataPlaneAcceptor)
[2025-11-19 19:32:25,280] INFO [BrokerServer id=2] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2025-11-19 19:32:25,280] INFO [BrokerServer id=2] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2025-11-19 19:32:25,281] INFO [BrokerServer id=2] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2025-11-19 19:32:25,281] INFO [BrokerServer id=2] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2025-11-19 19:32:25,281] INFO [BrokerServer id=2] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2025-11-19 19:32:25,281] INFO Kafka version: 4.1.1 (org.apache.kafka.common.utils.AppInfoParser)
[2025-11-19 19:32:25,281] INFO Kafka commitId: be816b82d25370ce (org.apache.kafka.common.utils.AppInfoParser)
[2025-11-19 19:32:25,282] INFO Kafka startTimeMs: 1763551945281 (org.apache.kafka.common.utils.AppInfoParser)
[2025-11-19 19:32:25,283] INFO [KafkaRaftServer nodeId=2] Kafka Server started (kafka.server.KafkaRaftServer)
此时 broker_1 的输出日志信息就变了:
...
[2025-11-19 10:40:58,892] INFO [RaftManager id=1] Node 3 disconnected. (org.apache.kafka.clients.NetworkClient)
[2025-11-19 10:40:58,892] WARN [RaftManager id=1] Connection to node 3 (localhost/127.0.0.1:39093) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient)
[2025-11-19 10:40:59,893] INFO [RaftManager id=1] Node 3 disconnected. (org.apache.kafka.clients.NetworkClient)
[2025-11-19 10:40:59,893] WARN [RaftManager id=1] Connection to node 3 (localhost/127.0.0.1:39093) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient)
[2025-11-19 10:41:02,532] INFO [RaftManager id=1] Updated in-memory voters from VoterSet(voters={1=VoterNode(voterKey=ReplicaKey(id=1, directoryId=eep6_l2c2cfyygUhb3YTtA), listeners=Endpoints(endpoints={ListenerName(CONTROLLER)=localhost/<unresolved>:19093}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:1]), 2=VoterNode(voterKey=ReplicaKey(id=2, directoryId=yB30f_T5xHVSMlXaxIvB0Q), listeners=Endpoints(endpoints={ListenerName(CONTROLLER)=localhost/<unresolved>:29093}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:1]), 3=VoterNode(voterKey=ReplicaKey(id=3, directoryId=<undefined>), listeners=Endpoints(endpoints={ListenerName(CONTROLLER)=localhost/127.0.0.1:39093}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:0])}) to VoterSet(voters={1=VoterNode(voterKey=ReplicaKey(id=1, directoryId=eep6_l2c2cfyygUhb3YTtA), listeners=Endpoints(endpoints={ListenerName(CONTROLLER)=localhost/<unresolved>:19093}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:1]), 2=VoterNode(voterKey=ReplicaKey(id=2, directoryId=yB30f_T5xHVSMlXaxIvB0Q), listeners=Endpoints(endpoints={ListenerName(CONTROLLER)=localhost/<unresolved>:29093}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:1]), 3=VoterNode(voterKey=ReplicaKey(id=3, directoryId=L15uNU9Ut5zR_yI7uYz4vQ), listeners=Endpoints(endpoints={ListenerName(CONTROLLER)=localhost/<unresolved>:39093}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:1])}) (org.apache.kafka.raft.internals.UpdateVoterHandler)
继续启动 broker_3:
$ bin/kafka-server-start.sh standalone-cluster/broker_3/broker.properties
此时 broker_1 输出信息:
...
[2025-11-19 19:33:48,048] INFO [RaftManager id=1] Updated in-memory voters from VoterSet(voters={1=VoterNode(voterKey=ReplicaKey(id=1, directoryId=mZHZbZnYrI9KAJHHctSxeg), listeners=Endpoints(endpoints={ListenerName(CONTROLLER)=172.21.11.1/<unresolved>:19093}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:1]), 2=VoterNode(voterKey=ReplicaKey(id=2, directoryId=n_pijmdI0n3Pyy6bjfZbGw), listeners=Endpoints(endpoints={ListenerName(CONTROLLER)=172.21.11.1/<unresolved>:29093}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:1]), 3=VoterNode(voterKey=ReplicaKey(id=3, directoryId=<undefined>), listeners=Endpoints(endpoints={ListenerName(CONTROLLER)=/172.21.11.1:39093}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:0])}) to VoterSet(voters={1=VoterNode(voterKey=ReplicaKey(id=1, directoryId=mZHZbZnYrI9KAJHHctSxeg), listeners=Endpoints(endpoints={ListenerName(CONTROLLER)=172.21.11.1/<unresolved>:19093}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:1]), 2=VoterNode(voterKey=ReplicaKey(id=2, directoryId=n_pijmdI0n3Pyy6bjfZbGw), listeners=Endpoints(endpoints={ListenerName(CONTROLLER)=172.21.11.1/<unresolved>:29093}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:1]), 3=VoterNode(voterKey=ReplicaKey(id=3, directoryId=DSLQTzPiAuGqiqv4TIudoQ), listeners=Endpoints(endpoints={ListenerName(CONTROLLER)=172.21.11.1/<unresolved>:39093}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:1])}) (org.apache.kafka.raft.internals.UpdateVoterHandler)
所有 broker 节点都正常启动,就大功告成了!
现在可以使用 jps 命令查看下 broker 进程信息:
$ jps -lm | grep -v jps
16328 kafka.Kafka standalone-cluster/broker_1/broker.properties
16810 kafka.Kafka standalone-cluster/broker_3/broker.properties
17278 kafka.Kafka standalone-cluster/broker_2/broker.properties
还能进一步查看进程使用的端口信息(以 broker_1 进程为例):
$ lsof -nP -p 16328 | grep LISTEN
java 16328 root 116u IPv6 41093 0t0 TCP *:40263 (LISTEN)
java 16328 root 137u IPv6 39508 0t0 TCP *:19093 (LISTEN)
java 16328 root 168u IPv6 39512 0t0 TCP *:19092 (LISTEN)