Created
December 31, 2017 16:53
-
-
Save savaki/f5dadb594d6cf72e1b54265e4e75e9e3 to your computer and use it in GitHub Desktop.
Revisions
-
savaki created this gist
Dec 31, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,98 @@ package main import ( "context" "crypto/tls" "crypto/x509" "flag" "fmt" "io/ioutil" "log" "strings" "time" "github.com/segmentio/kafka-go" "github.com/segmentio/ksuid" ) var ( brokers string topic string records int certFile string caFile string keyFile string ) func init() { flag.StringVar(&brokers, "brokers", "localhost:9092", "broker addresses, comma-separated") flag.StringVar(&topic, "topic", "topic", "topic to produce to") flag.IntVar(&records, "records", 250000, "number of records to read from kafka") flag.StringVar(&certFile, "cert", "_cert.pem", "tls cert") flag.StringVar(&caFile, "ca", "_ca.pem", "tls ca") flag.StringVar(&keyFile, "key", "_key.pem", "tls key") flag.Parse() } func check(err error) { if err != nil { log.Fatalln(err) } } func tlsConfig() *tls.Config { certPEM, err := ioutil.ReadFile(certFile) check(err) caPEM, err := ioutil.ReadFile(caFile) check(err) keyPEM, err := ioutil.ReadFile(keyFile) check(err) if certPEM == nil || caPEM == nil || keyPEM == nil { panic("tls configuration not available") } cert, err := tls.X509KeyPair([]byte(certPEM), []byte(keyPEM)) check(err) caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM([]byte(caPEM)) return &tls.Config{ Certificates: []tls.Certificate{cert}, RootCAs: caCertPool, InsecureSkipVerify: true, } } func main() { groupID := ksuid.New().String() r := kafka.NewReader(kafka.ReaderConfig{ Brokers: strings.Split(brokers, ","), Topic: topic, GroupID: groupID, Dialer: &kafka.Dialer{TLS: tlsConfig()}, }) defer r.Close() ctx := context.Background() var start time.Time var count int for { _, err := r.ReadMessage(ctx) check(err) count++ if count == 1 { start = time.Now() } if count == records { break } } elapsed := time.Now().Sub(start) fmt.Printf("kafka-go: %v records, %v\n", records, elapsed) }