1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package main
import ( "fmt"
"github.com/Shopify/sarama" )
func main() { consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { fmt.Printf("fail to start consumer, err:%v\n", err) return }
oldest, _ := consumer.ConsumePartition("web_log", 0, sarama.OffsetOldest) defer oldest.AsyncClose() for msg := range oldest.Messages() { fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) } newest, _ := consumer.ConsumePartition("web_log", 0, sarama.OffsetNewest) defer newest.AsyncClose() for msg := range newest.Messages() { fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) } }
|