0%

kafka安装和golang实战

1. 安装和使用

1.1 mac安装

1
2
3
4
5
6
7
# 如果报错,提示安装 java, 安装即可
brew install kafka

# 先启动 zookeeper, 再启动 kafaka
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties && kafka-server-start /usr/local/etc/kafka/server.properties

# 此时用 ps 查看进程可以看到是用 java 起来的。

1.2 使用

  • 创建Topic
1
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  • 生产消息
1
kafka-console-producer --broker-list localhost:9092 --topic test
  • 消费消息
1
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
  • 使用消费组消费
1
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --group test-consumer1 --from-beginning

2. golang 使用 kafka

github 上有多个轮子,选用了star最多的 sarama

2.1 生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"

"github.com/Shopify/sarama"
)

func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回

// 连接kafka
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()

// 发送消息
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder("this is a test log1")
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

2.2 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"fmt"

"github.com/Shopify/sarama"
)

func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}

// 取出老的值
oldest, _ := consumer.ConsumePartition("web_log", 0, sarama.OffsetOldest)
defer oldest.AsyncClose()
for msg := range oldest.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}

// 消费新增加的值
newest, _ := consumer.ConsumePartition("web_log", 0, sarama.OffsetNewest)
defer newest.AsyncClose()
for msg := range newest.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
}

3. 参考资料

给作者打赏,可以加首页微信,咨询作者相关问题!