Kafka 安装与 Go 实战

1. 安装与部署

Kafka 3.x 支持 KRaft 模式,无需依赖 ZooKeeper。以下配置基于 Docker Compose:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
services:
kafka:
image: apache/kafka:3.9.0
container_name: kafka
network_mode: bridge
ports:
- "9092:9092"
extra_hosts:
- "host.docker.internal:host-gateway"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: EXTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: EXTERNAL://10.228.1.232:9092,CONTROLLER://localhost:9093
KAFKA_INTER_BROKER_LISTENER_NAME: EXTERNAL
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_LOG_DIRS: /var/lib/kafka/data
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
volumes:
- kafka_data:/var/lib/kafka/data
healthcheck:
test: [
"CMD-SHELL",
"/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list > /dev/null 2>&1"
]
interval: 5s
timeout: 5s
retries: 12
start_period: 20s

kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
network_mode: bridge
ports:
- "18080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: 10.228.1.232:9092
extra_hosts:
- "host.docker.internal:host-gateway"
depends_on:
- kafka

volumes:
kafka_data:

1.1 启动服务

1
docker-compose up -d kafka kafka-ui

1.2 创建 Topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 创建持久化事件 Topic
docker exec -it kafka bash -lc \
"/opt/kafka/bin/kafka-topics.sh --create \
--topic agent-events-persistent \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1"

# 创建临时事件 Topic
docker exec -it kafka bash -lc \
"/opt/kafka/bin/kafka-topics.sh --create \
--topic agent-events-ephemeral \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1"

1.3 命令行操作

操作命令
创建 Topickafka-topics --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
生产消息kafka-console-producer --broker-list localhost:9092 --topic test
消费消息kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
消费组模式kafka-console-consumer --bootstrap-server localhost:9092 --topic test --group test-group --from-beginning

2. 网络配置详解

2.1 Listeners 与 Advertised Listeners

%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#4F46E5', 'primaryTextColor': '#000', 'primaryBorderColor': '#3730A3', 'lineColor': '#6366F1', 'secondaryColor': '#10B981', 'tertiaryColor': '#F59E0B'}}}%%
flowchart LR
    subgraph Container["Kafka 容器"]
        L["LISTENERS 0.0.0.0:9092"]
    end

    subgraph Host["宿主机"]
        P["端口映射 9092:9092"]
    end

    subgraph Client["客户端"]
        C["连接请求"]
    end

    C -->|"1. 首次连接"| P
    P --> L
    L -->|"2. 返回 ADVERTISED 10.228.1.232:9092"| C
    C -->|"3. 后续通信"| P

    classDef primary fill:#4F46E5,stroke:#3730A3,color:#fff
    classDef success fill:#10B981,stroke:#059669,color:#fff
    classDef info fill:#06B6D4,stroke:#0891B2,color:#fff

    class L primary
    class P success
    class C info

关键配置

1
2
KAFKA_LISTENERS: EXTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: EXTERNAL://10.228.1.232:9092,CONTROLLER://localhost:9093

两者区别

配置项作用示例值
LISTENERSKafka 服务端绑定的监听地址(” 开门地址 “)0.0.0.0:9092
ADVERTISED_LISTENERS返回给客户端的可达地址(” 对外名片 “)10.228.1.232:9092

配置错误时,常见现象是 “ 能建立连接,但收发消息失败 “。

2.2 KRaft 模式下的 CONTROLLER

KRaft 模式要求配置 CONTROLLER 监听器用于控制器间通信。由于 CONTROLLER 仅在集群内部使用,不对外暴露:

  • LISTENERS 设为 0.0.0.0:9093(监听所有网卡)
  • ADVERTISED_LISTENERS 设为 localhost:9093(容器内可达即可)

2.3 Docker 网络问题

%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#4F46E5', 'primaryTextColor': '#000', 'primaryBorderColor': '#3730A3', 'lineColor': '#6366F1', 'secondaryColor': '#10B981', 'tertiaryColor': '#F59E0B'}}}%%
flowchart TD
    subgraph Compose["Compose 网络 (project_default)"]
        K1["Kafka 容器"]
        A1["App 容器"]
    end

    subgraph Bridge["默认 bridge 网络"]
        A2["其他容器"]
    end

    subgraph Host["宿主机"]
        H["Host IP:9092"]
    end

    A1 -->|"✅ 直接通信"| K1
    A2 -->|"❌ 网络隔离"| K1
    A2 -->|"✅ 端口映射"| H
    H --> K1

    classDef primary fill:#4F46E5,stroke:#3730A3,color:#fff
    classDef warning fill:#F59E0B,stroke:#D97706,color:#000
    classDef success fill:#10B981,stroke:#059669,color:#fff

    class K1 primary
    class A1 success
    class A2 warning

要点

  • 同一 Compose 文件中的容器共享 project_default 网络,可直接通信
  • docker run 默认加入 bridge 网络,与 Compose 网络隔离
  • 跨网络访问需通过宿主机端口映射
  • Linux 容器内使用 host.docker.internal 需显式添加:--add-host=host.docker.internal:host-gateway

3. Go 语言集成

Go 生态有多个 Kafka 客户端库:

特点
sarama纯 Go 实现,Star 最多,社区活跃
confluent-kafka-go基于 librdkafka,性能最佳
kafka-go纯 Go 实现,API 简洁 (推荐这个)

以下示例使用 sarama:

3.1 生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"log"

"github.com/IBM/sarama"
)

func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true

producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
log.Fatalf("创建生产者失败: %v", err)
}
defer producer.Close()

msg := &sarama.ProducerMessage{
Topic: "web_log",
Value: sarama.StringEncoder("this is a test log"),
}

partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Fatalf("发送消息失败: %v", err)
}
fmt.Printf("消息已发送: partition=%d, offset=%d\n", partition, offset)
}

配置说明

配置项作用
RequiredAcks = WaitForAll等待所有 ISR 副本确认
Partitioner = NewRandomPartitioner随机分配分区
Return.Successes = true同步模式必需

3.2 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/IBM/sarama"
)

func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatalf("创建消费者失败: %v", err)
}
defer consumer.Close()

partitionConsumer, err := consumer.ConsumePartition("web_log", 0, sarama.OffsetNewest)
if err != nil {
log.Fatalf("创建分区消费者失败: %v", err)
}
defer partitionConsumer.Close()

signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

for {
select {
case msg := <-partitionConsumer.Messages():
fmt.Printf("收到消息: partition=%d, offset=%d, value=%s\n",
msg.Partition, msg.Offset, string(msg.Value))
case <-signals:
fmt.Println("收到退出信号")
return
}
}
}

Offset 策略

含义
OffsetNewest从最新消息开始消费
OffsetOldest从最早消息开始消费

4. 参考资料