Created
November 25, 2020 13:56
-
-
Save karanchuri/02f8fed113b5776c7d1c646c4f584a43 to your computer and use it in GitHub Desktop.
kafka golang demo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "fmt" | |
| "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" | |
| ) | |
| func main() { | |
| c, err := kafka.NewConsumer(&kafka.ConfigMap{ | |
| "bootstrap.servers": "0.0.0.0:19091", | |
| "group.id": "myGroup", | |
| "auto.offset.reset": "earliest", | |
| }) | |
| if err != nil { | |
| panic(err) | |
| } | |
| c.SubscribeTopics([]string{"myTopic1"}, nil) | |
| for { | |
| msg, err := c.ReadMessage(-1) | |
| if err == nil { | |
| fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) | |
| } else { | |
| // The client will automatically try to recover from all errors. | |
| fmt.Printf("Consumer error: %v (%v)\n", err, msg) | |
| } | |
| } | |
| c.Close() | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "fmt" | |
| "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" | |
| ) | |
| func main() { | |
| p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "0.0.0.0:19091"}) | |
| if err != nil { | |
| panic(err) | |
| } | |
| defer p.Close() | |
| // Delivery report handler for produced messages | |
| go func() { | |
| for e := range p.Events() { | |
| switch ev := e.(type) { | |
| case *kafka.Message: | |
| if ev.TopicPartition.Error != nil { | |
| fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) | |
| } else { | |
| fmt.Printf("Delivered message to %v\n", ev.TopicPartition) | |
| } | |
| } | |
| } | |
| }() | |
| // Produce messages to topic (asynchronously) | |
| topic := "myTopic1" | |
| for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} { | |
| p.Produce(&kafka.Message{ | |
| TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, | |
| Value: []byte(word), | |
| }, nil) | |
| fmt.Println(word) | |
| } | |
| // Wait for message deliveries before shutting down | |
| p.Flush(15 * 1000) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| version: '2' | |
| services: | |
| zk1: | |
| image: confluentinc/cp-zookeeper:5.4.3 | |
| ports: | |
| - "22181:22181" | |
| environment: | |
| ZOOKEEPER_SERVER_ID: 1 | |
| ZOOKEEPER_CLIENT_PORT: 22181 | |
| ZOOKEEPER_TICK_TIME: 2000 | |
| ZOOKEEPER_INIT_LIMIT: 5 | |
| ZOOKEEPER_SYNC_LIMIT: 2 | |
| ZOOKEEPER_SERVERS: zk1:22888:23888;zk2:32888:33888;zk3:42888:43888 | |
| zk2: | |
| image: confluentinc/cp-zookeeper:5.4.3 | |
| ports: | |
| - "32181:32181" | |
| environment: | |
| ZOOKEEPER_SERVER_ID: 2 | |
| ZOOKEEPER_CLIENT_PORT: 32181 | |
| ZOOKEEPER_TICK_TIME: 2000 | |
| ZOOKEEPER_INIT_LIMIT: 5 | |
| ZOOKEEPER_SYNC_LIMIT: 2 | |
| ZOOKEEPER_SERVERS: zk1:22888:23888;zk2:32888:33888;zk3:42888:43888 | |
| zk3: | |
| image: confluentinc/cp-zookeeper:5.4.3 | |
| ports: | |
| - "42181:42181" | |
| environment: | |
| ZOOKEEPER_SERVER_ID: 3 | |
| ZOOKEEPER_CLIENT_PORT: 42181 | |
| ZOOKEEPER_TICK_TIME: 2000 | |
| ZOOKEEPER_INIT_LIMIT: 5 | |
| ZOOKEEPER_SYNC_LIMIT: 2 | |
| ZOOKEEPER_SERVERS: zk1:22888:23888;zk2:32888:33888;zk3:42888:43888 | |
| kafka-1: | |
| image: confluentinc/cp-kafka:5.4.3 | |
| ports: | |
| - "19092:19092" | |
| - "19091:19091" | |
| depends_on: | |
| - zk1 | |
| - zk2 | |
| - zk3 | |
| environment: | |
| KAFKA_BROKER_ID: 1 | |
| KAFKA_ZOOKEEPER_CONNECT: zk1:22181,zk2:32181,zk3:42181 | |
| KAFKA_LISTENERS: LISTENER_INTERNAL://kafka-1:19092,LISTENER_EXTERNAL://kafka-1:19091 | |
| KAFKA_ADVERTISED_LISTENERS: LISTENER_INTERNAL://kafka-1:19092,LISTENER_EXTERNAL://localhost:19091 | |
| KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_INTERNAL:PLAINTEXT,LISTENER_EXTERNAL:PLAINTEXT | |
| KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_INTERNAL | |
| kafka-2: | |
| image: confluentinc/cp-kafka:5.4.3 | |
| ports: | |
| - "29092:29092" | |
| - "29091:29091" | |
| depends_on: | |
| - zk1 | |
| - zk2 | |
| - zk3 | |
| environment: | |
| KAFKA_BROKER_ID: 2 | |
| KAFKA_ZOOKEEPER_CONNECT: zk1:22181,zk2:32181,zk3:42181 | |
| KAFKA_LISTENERS: LISTENER_INTERNAL://kafka-2:19092,LISTENER_EXTERNAL://kafka-2:29091 | |
| KAFKA_ADVERTISED_LISTENERS: LISTENER_INTERNAL://kafka-2:19092,LISTENER_EXTERNAL://localhost:29091 | |
| KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_INTERNAL:PLAINTEXT,LISTENER_EXTERNAL:PLAINTEXT | |
| KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_INTERNAL | |
| kafka-3: | |
| image: confluentinc/cp-kafka:5.4.3 | |
| ports: | |
| - "39092:39092" | |
| - "39091:39091" | |
| depends_on: | |
| - zk1 | |
| - zk2 | |
| - zk3 | |
| environment: | |
| KAFKA_BROKER_ID: 3 | |
| KAFKA_ZOOKEEPER_CONNECT: zk1:22181,zk2:32181,zk3:42181 | |
| KAFKA_LISTENERS: LISTENER_INTERNAL://kafka-3:19092,LISTENER_EXTERNAL://kafka-3:39091 | |
| KAFKA_ADVERTISED_LISTENERS: LISTENER_INTERNAL://kafka-3:19092,LISTENER_EXTERNAL://localhost:39091 | |
| KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_INTERNAL:PLAINTEXT,LISTENER_EXTERNAL:PLAINTEXT | |
| KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_INTERNAL |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "context" | |
| "fmt" | |
| "time" | |
| "github.com/segmentio/kafka-go" | |
| ) | |
| func main() { | |
| topic := "foo" | |
| partition := 0 | |
| conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:19091", topic, partition) | |
| if err != nil { | |
| fmt.Println("failed to dial leader:", err) | |
| } | |
| conn.SetReadDeadline(time.Now().Add(10*time.Second)) | |
| batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max | |
| b := make([]byte, 10e3) // 10KB max per message | |
| for { | |
| _, err := batch.Read(b) | |
| if err != nil { | |
| fmt.Println(err) | |
| break | |
| } | |
| fmt.Println(string(b)) | |
| } | |
| if err := batch.Close(); err != nil { | |
| fmt.Println("failed to close batch:", err) | |
| } | |
| if err := conn.Close(); err != nil { | |
| fmt.Println("failed to close connection:", err) | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "time" | |
| "github.com/segmentio/kafka-go" | |
| "fmt" | |
| "context" | |
| ) | |
| func main() { | |
| topic := "foo" | |
| partition := 0 | |
| conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:19091", topic, partition) | |
| if err != nil { | |
| fmt.Println("failed to dial leader:", err) | |
| } | |
| conn.SetWriteDeadline(time.Now().Add(10*time.Second)) | |
| _, err = conn.WriteMessages( | |
| kafka.Message{Value: []byte("one!")}, | |
| kafka.Message{Value: []byte("two!")}, | |
| kafka.Message{Value: []byte("three!")}, | |
| ) | |
| if err != nil { | |
| fmt.Println("failed to write messages:", err) | |
| } | |
| if err := conn.Close(); err != nil { | |
| fmt.Println("failed to close writer:", err) | |
| } | |
| fmt.Println("Conn Close") | |
| fmt.Println(err) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment