1. 安装与部署 Kafka 3.x 支持 KRaft 模式,无需依赖 ZooKeeper。以下配置基于 Docker Compose:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 services: kafka: image: apache/kafka:3.9.0 container_name: kafka network_mode: bridge ports: - "9092:9092" extra_hosts: - "host.docker.internal:host-gateway" environment: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: broker,controller KAFKA_LISTENERS: EXTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 KAFKA_ADVERTISED_LISTENERS: EXTERNAL://10.228.1.232:9092,CONTROLLER://localhost:9093 KAFKA_INTER_BROKER_LISTENER_NAME: EXTERNAL KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_CONTROLLER_QUORUM_VOTERS: 1 @localhost:9093 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_LOG_DIRS: /var/lib/kafka/data CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' volumes: - kafka_data:/var/lib/kafka/data healthcheck: test: [ "CMD-SHELL" , "/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list > /dev/null 2>&1" ] interval: 5s timeout: 5s retries: 12 start_period: 20s kafka-ui: image: provectuslabs/kafka-ui:latest container_name: kafka-ui network_mode: bridge ports: - "18080:8080" environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: 10.228 .1 .232 :9092 extra_hosts: - "host.docker.internal:host-gateway" depends_on: - kafka volumes: kafka_data:
1.1 启动服务 1 docker-compose up -d kafka kafka-ui
1.2 创建 Topic 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 docker exec -it kafka bash -lc \ "/opt/kafka/bin/kafka-topics.sh --create \ --topic agent-events-persistent \ --bootstrap-server localhost:9092 \ --partitions 3 \ --replication-factor 1" docker exec -it kafka bash -lc \ "/opt/kafka/bin/kafka-topics.sh --create \ --topic agent-events-ephemeral \ --bootstrap-server localhost:9092 \ --partitions 3 \ --replication-factor 1"
1.3 命令行操作 操作 命令 创建 Topic kafka-topics --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1生产消息 kafka-console-producer --broker-list localhost:9092 --topic test消费消息 kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning消费组模式 kafka-console-consumer --bootstrap-server localhost:9092 --topic test --group test-group --from-beginning
2. 网络配置详解 2.1 Listeners 与 Advertised Listeners %%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#4F46E5', 'primaryTextColor': '#000', 'primaryBorderColor': '#3730A3', 'lineColor': '#6366F1', 'secondaryColor': '#10B981', 'tertiaryColor': '#F59E0B'}}}%%
flowchart LR
subgraph Container["Kafka 容器"]
L["LISTENERS 0.0.0.0:9092"]
end
subgraph Host["宿主机"]
P["端口映射 9092:9092"]
end
subgraph Client["客户端"]
C["连接请求"]
end
C -->|"1. 首次连接"| P
P --> L
L -->|"2. 返回 ADVERTISED 10.228.1.232:9092"| C
C -->|"3. 后续通信"| P
classDef primary fill:#4F46E5,stroke:#3730A3,color:#fff
classDef success fill:#10B981,stroke:#059669,color:#fff
classDef info fill:#06B6D4,stroke:#0891B2,color:#fff
class L primary
class P success
class C info关键配置 :
1 2 KAFKA_LISTENERS: EXTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 KAFKA_ADVERTISED_LISTENERS: EXTERNAL://10.228.1.232:9092,CONTROLLER://localhost:9093
两者区别 :
配置项 作用 示例值 LISTENERSKafka 服务端绑定的监听地址(” 开门地址 “) 0.0.0.0:9092ADVERTISED_LISTENERS返回给客户端的可达地址(” 对外名片 “) 10.228.1.232:9092
配置错误时,常见现象是 “ 能建立连接,但收发消息失败 “。
2.2 KRaft 模式下的 CONTROLLER KRaft 模式要求配置 CONTROLLER 监听器用于控制器间通信。由于 CONTROLLER 仅在集群内部使用,不对外暴露:
LISTENERS 设为 0.0.0.0:9093(监听所有网卡)ADVERTISED_LISTENERS 设为 localhost:9093(容器内可达即可)2.3 Docker 网络问题 %%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#4F46E5', 'primaryTextColor': '#000', 'primaryBorderColor': '#3730A3', 'lineColor': '#6366F1', 'secondaryColor': '#10B981', 'tertiaryColor': '#F59E0B'}}}%%
flowchart TD
subgraph Compose["Compose 网络 (project_default)"]
K1["Kafka 容器"]
A1["App 容器"]
end
subgraph Bridge["默认 bridge 网络"]
A2["其他容器"]
end
subgraph Host["宿主机"]
H["Host IP:9092"]
end
A1 -->|"✅ 直接通信"| K1
A2 -->|"❌ 网络隔离"| K1
A2 -->|"✅ 端口映射"| H
H --> K1
classDef primary fill:#4F46E5,stroke:#3730A3,color:#fff
classDef warning fill:#F59E0B,stroke:#D97706,color:#000
classDef success fill:#10B981,stroke:#059669,color:#fff
class K1 primary
class A1 success
class A2 warning要点 :
同一 Compose 文件中的容器共享 project_default 网络,可直接通信 docker run 默认加入 bridge 网络,与 Compose 网络隔离跨网络访问需通过宿主机端口映射 Linux 容器内使用 host.docker.internal 需显式添加:--add-host=host.docker.internal:host-gateway 3. Go 语言集成 Go 生态有多个 Kafka 客户端库:
以下示例使用 sarama:
3.1 生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package mainimport ( "fmt" "log" "github.com/IBM/sarama" ) func main () { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true producer, err := sarama.NewSyncProducer([]string {"127.0.0.1:9092" }, config) if err != nil { log.Fatalf("创建生产者失败: %v" , err) } defer producer.Close() msg := &sarama.ProducerMessage{ Topic: "web_log" , Value: sarama.StringEncoder("this is a test log" ), } partition, offset, err := producer.SendMessage(msg) if err != nil { log.Fatalf("发送消息失败: %v" , err) } fmt.Printf("消息已发送: partition=%d, offset=%d\n" , partition, offset) }
配置说明 :
配置项 作用 RequiredAcks = WaitForAll等待所有 ISR 副本确认 Partitioner = NewRandomPartitioner随机分配分区 Return.Successes = true同步模式必需
3.2 消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package mainimport ( "fmt" "log" "os" "os/signal" "syscall" "github.com/IBM/sarama" ) func main () { consumer, err := sarama.NewConsumer([]string {"localhost:9092" }, nil ) if err != nil { log.Fatalf("创建消费者失败: %v" , err) } defer consumer.Close() partitionConsumer, err := consumer.ConsumePartition("web_log" , 0 , sarama.OffsetNewest) if err != nil { log.Fatalf("创建分区消费者失败: %v" , err) } defer partitionConsumer.Close() signals := make (chan os.Signal, 1 ) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) for { select { case msg := <-partitionConsumer.Messages(): fmt.Printf("收到消息: partition=%d, offset=%d, value=%s\n" , msg.Partition, msg.Offset, string (msg.Value)) case <-signals: fmt.Println("收到退出信号" ) return } } }
Offset 策略 :
值 含义 OffsetNewest从最新消息开始消费 OffsetOldest从最早消息开始消费
4. 参考资料