Skip to content

Instantly share code, notes, and snippets.

@karanchuri
Created November 25, 2020 13:56
Show Gist options
  • Save karanchuri/02f8fed113b5776c7d1c646c4f584a43 to your computer and use it in GitHub Desktop.
Save karanchuri/02f8fed113b5776c7d1c646c4f584a43 to your computer and use it in GitHub Desktop.
kafka golang demo
package main
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "0.0.0.0:19091",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"myTopic1"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
// The client will automatically try to recover from all errors.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
c.Close()
}
package main
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "0.0.0.0:19091"})
if err != nil {
panic(err)
}
defer p.Close()
// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
topic := "myTopic1"
for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
fmt.Println(word)
}
// Wait for message deliveries before shutting down
p.Flush(15 * 1000)
}
version: '2'
services:
zk1:
image: confluentinc/cp-zookeeper:5.4.3
ports:
- "22181:22181"
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 22181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zk1:22888:23888;zk2:32888:33888;zk3:42888:43888
zk2:
image: confluentinc/cp-zookeeper:5.4.3
ports:
- "32181:32181"
environment:
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zk1:22888:23888;zk2:32888:33888;zk3:42888:43888
zk3:
image: confluentinc/cp-zookeeper:5.4.3
ports:
- "42181:42181"
environment:
ZOOKEEPER_SERVER_ID: 3
ZOOKEEPER_CLIENT_PORT: 42181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zk1:22888:23888;zk2:32888:33888;zk3:42888:43888
kafka-1:
image: confluentinc/cp-kafka:5.4.3
ports:
- "19092:19092"
- "19091:19091"
depends_on:
- zk1
- zk2
- zk3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zk1:22181,zk2:32181,zk3:42181
KAFKA_LISTENERS: LISTENER_INTERNAL://kafka-1:19092,LISTENER_EXTERNAL://kafka-1:19091
KAFKA_ADVERTISED_LISTENERS: LISTENER_INTERNAL://kafka-1:19092,LISTENER_EXTERNAL://localhost:19091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_INTERNAL:PLAINTEXT,LISTENER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_INTERNAL
kafka-2:
image: confluentinc/cp-kafka:5.4.3
ports:
- "29092:29092"
- "29091:29091"
depends_on:
- zk1
- zk2
- zk3
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zk1:22181,zk2:32181,zk3:42181
KAFKA_LISTENERS: LISTENER_INTERNAL://kafka-2:19092,LISTENER_EXTERNAL://kafka-2:29091
KAFKA_ADVERTISED_LISTENERS: LISTENER_INTERNAL://kafka-2:19092,LISTENER_EXTERNAL://localhost:29091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_INTERNAL:PLAINTEXT,LISTENER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_INTERNAL
kafka-3:
image: confluentinc/cp-kafka:5.4.3
ports:
- "39092:39092"
- "39091:39091"
depends_on:
- zk1
- zk2
- zk3
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zk1:22181,zk2:32181,zk3:42181
KAFKA_LISTENERS: LISTENER_INTERNAL://kafka-3:19092,LISTENER_EXTERNAL://kafka-3:39091
KAFKA_ADVERTISED_LISTENERS: LISTENER_INTERNAL://kafka-3:19092,LISTENER_EXTERNAL://localhost:39091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_INTERNAL:PLAINTEXT,LISTENER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_INTERNAL
package main
import (
"context"
"fmt"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
topic := "foo"
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:19091", topic, partition)
if err != nil {
fmt.Println("failed to dial leader:", err)
}
conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
b := make([]byte, 10e3) // 10KB max per message
for {
_, err := batch.Read(b)
if err != nil {
fmt.Println(err)
break
}
fmt.Println(string(b))
}
if err := batch.Close(); err != nil {
fmt.Println("failed to close batch:", err)
}
if err := conn.Close(); err != nil {
fmt.Println("failed to close connection:", err)
}
}
package main
import (
"time"
"github.com/segmentio/kafka-go"
"fmt"
"context"
)
func main() {
topic := "foo"
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:19091", topic, partition)
if err != nil {
fmt.Println("failed to dial leader:", err)
}
conn.SetWriteDeadline(time.Now().Add(10*time.Second))
_, err = conn.WriteMessages(
kafka.Message{Value: []byte("one!")},
kafka.Message{Value: []byte("two!")},
kafka.Message{Value: []byte("three!")},
)
if err != nil {
fmt.Println("failed to write messages:", err)
}
if err := conn.Close(); err != nil {
fmt.Println("failed to close writer:", err)
}
fmt.Println("Conn Close")
fmt.Println(err)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment