1. kafka
1.1 安装
- kafka_2.13-2.8.0.tgz , 前面的版本号是编译 Kafka 源代码的 Scala 编译器版本, 真正的版本号是2.8.0
1 | wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz |
- 启动
1 | # 一个终端 |
安装 java
报错: /root/kafka_2.13-2.8.0/bin/kafka-run-class.sh: line 330: exec: java: not found
Your local environment must have Java 8+ installed.
1 | apt install openjdk-11-jre-headless |
1.2 systemctl 服务
https://gist.github.com/vipmax/9ceeaa02932ba276fa810c923dbcbd4f
vi /etc/systemd/system/kafka-zookeeper.service
1 | [Unit] |
vi /etc/systemd/system/kafka.service
1 | [Unit] |
- 启动
1 | systemctl daemon-reload |
1.3 kafka 使用
创建一个 topic.
1
2
3
4bin/kafka-topics.sh --create --topic pingback --bootstrap-server localhost:9092
# 删除topic
bin/kafka-topics.sh --delete --topic pingback --bootstrap-server localhost:9092显示
1
2
3
4
5
6# 显示所有 topics
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 显示特定 topic 属性
bin/kafka-topics.sh --describe --topic pingback --bootstrap-server localhost:9092看组
1
2
3
4
5
6# 显示所有组
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
# 看特定组的消费情况
bin/kafka-consumer-groups.sh --describe --group test-consumer-group --bootstrap-server localhost:9092生产
1
2
3
4bin/kafka-console-producer.sh --topic pingback --bootstrap-server localhost:9092
#>This is my first event
#>This is my second event
#You can stop the producer client with Ctrl-C at any time.消费
1
2
3
4
5
6bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pingback
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pingback --from-beginning --group test-consumer-group
#This is my first event
#This is my second event
#You can stop the consumer client with Ctrl-C at any time.重置offset
1
2
3
4
5# 回到最早该消费的点
bin/kafka-consumer-groups.sh --group test-consumer-group --bootstrap-server localhost:9092 --reset-offsets --to-earliest --all-topics --execute
# 重置到特定的 offset
bin/kafka-consumer-groups.sh --group test-consumer-group --bootstrap-server localhost:9092 --reset-offsets --topic pingback:0 --to-offset 123 --execute提交过offset,latest和earliest没有区别,但是在没有提交offset情况下,用latest直接会导致无法读取旧数据。
删除数据
1
2
3
4
5# 清理数据
1. bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name pingback --alter --add-config retention.ms=1000
2. bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name pingback --alter --delete-config retention.ms
# 重置最初
1.4 配置
1.4.1 kafka支持远程访问
打开config/server.properties配置文件,更改如下
把31行的注释去掉,listeners=PLAINTEXT://:9092
把36行的注释去掉,把advertised.listeners值改为PLAINTEXT://host_ip:9092
1.4.2 修改 log 路径
Kafka的data目录是存储Kafka的数据文件的目录,是在${KAFKA_HOME}/config/server.properties中修改
1 | log.dirs=/data/kafka_data |
注意:log.dirs可以配置多个目录,需要用逗号分隔开
1.4.3 周期删除数据
sudo vi config/server.properties
1 | log.retention.minutes=3 |
2. filebeat
2.1 安装
1 | wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.12.1-linux-x86_64.tar.gz |
2.2 filebeat.yml
1 | filebeat.inputs: |
2.3 systemctl 服务
vi /etc/systemd/system/filebeat.service
1 | [Unit] |
启动
1
systemctl start filebeat
3. golang 读取 kafka
1 | package main |
4. 参考资料
- https://www.liuvv.com/p/f03714cc.html
- https://blog.csdn.net/u010889616/article/details/80640330
- https://birdben.github.io/2016/12/15/Kafka/Kafka%E5%AD%A6%E4%B9%A0%EF%BC%88%E5%9B%9B%EF%BC%89%E6%8C%87%E5%AE%9AKafka%E7%9A%84data%E5%92%8Clogs%E8%B7%AF%E5%BE%84/
- https://blog.csdn.net/qq_41926119/article/details/104510481
- https://github.com/Shopify/sarama