Skip to content

Instantly share code, notes, and snippets.

@hanguofeng
Last active August 29, 2015 14:25
Show Gist options
  • Select an option

  • Save hanguofeng/d88f2c227e9cfcc8eeec to your computer and use it in GitHub Desktop.

Select an option

Save hanguofeng/d88f2c227e9cfcc8eeec to your computer and use it in GitHub Desktop.
kafka-offset-printer.go
package main
import (
"flag"
"log"
"os"
"time"
"github.com/wvanbergen/kazoo-go"
"gopkg.in/Shopify/sarama.v1"
)
var (
err error
kazooClient *kazoo.Kazoo
kafkaClient sarama.Client
zookeeper string
)
func init() {
flag.StringVar(&zookeeper, "zookeeper", "127.0.0.1:2181", "zookeeper list")
}
func main() {
initClients()
procLogsize()
procConsumerGroupOffset()
}
func initClients() {
flag.Parse()
kazooConfig := kazoo.NewConfig()
kazooClient, err = kazoo.NewKazooFromConnectionString(zookeeper, kazooConfig)
if nil != err {
log.Printf("err:%#v", err)
os.Exit(-1)
}
kafkaClientConfig := sarama.NewConfig()
brokerList, err := kazooClient.BrokerList()
if nil != err {
log.Printf("err:%#v", err)
os.Exit(-1)
}
kafkaClient, err = sarama.NewClient(brokerList, kafkaClientConfig)
if nil != err {
log.Printf("err:%#v", err)
os.Exit(-1)
}
}
func procLogsize() {
topics, err := kafkaClient.Topics()
if nil != err {
log.Printf("err:%#v", err)
os.Exit(-1)
}
for _, topic := range topics {
partitions, err := kafkaClient.Partitions(topic)
if nil != err {
log.Printf("err:%#v", err)
os.Exit(-1)
}
for _, partition := range partitions {
offset, err := kafkaClient.GetOffset(topic, partition, sarama.OffsetNewest)
if nil != err {
log.Printf("err:%#v", err)
os.Exit(-1)
}
log.Printf("[logsize][time:%d][topic:%s][partition:%d],[offset:%d]", time.Now().Unix(), topic, partition, offset)
}
}
}
func procConsumerGroupOffset() {
groups, err := kazooClient.Consumergroups()
if nil != err {
log.Printf("err:%#v", err)
os.Exit(-1)
}
topics, err := kafkaClient.Topics()
if nil != err {
log.Printf("err:%#v", err)
os.Exit(-1)
}
for _, group := range groups {
for _, topic := range topics {
partitions, err := kafkaClient.Partitions(topic)
if nil != err {
log.Printf("err:%#v", err)
os.Exit(-1)
}
for _, partition := range partitions {
offset, err := group.FetchOffset(topic, partition)
if nil != err {
log.Printf("err:%#v", err)
os.Exit(-1)
}
log.Printf("[groupoffset][time:%d][group:%s][topic:%s][partition:%d],[offset:%d]", time.Now().Unix(), group.Name, topic, partition, offset)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment