Skip to content

Instantly share code, notes, and snippets.

@umkh
Last active May 3, 2021 04:57
Show Gist options
  • Save umkh/e69d88b9903d021a6932aeb2d09243a8 to your computer and use it in GitHub Desktop.
Save umkh/e69d88b9903d021a6932aeb2d09243a8 to your computer and use it in GitHub Desktop.
Golang with RabbitMQ. Example to declare a priority queue.
package main
import (
"context"
"fmt"
"github.com/streadway/amqp"
"log"
"os"
"os/signal"
"time"
)
func main() {
args := make(amqp.Table)
args["x-max-priority"] = 9
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err)
defer conn.Close()
ch, err := conn.Channel()
failOnError(err)
defer ch.Close()
err = ch.Qos(1, 0, false)
failOnError(err)
err = ch.ExchangeDeclare(
"logs_top",
"topic",
true,
false,
false,
false,
args,
)
failOnError(err)
msgs, err := consumer(ch, "priority", args)
failOnError(err)
stop := make(chan os.Signal)
signal.Notify(stop, os.Interrupt)
go func() {
log.Println("Waiting message.")
for msg := range msgs {
fmt.Println(msg.Priority)
msg.Ack(false)
time.Sleep(time.Second * 1)
}
}()
<-stop
log.Println("Shutting down.")
const timeout = 3 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
<-ctx.Done()
}
// consumer ...
func consumer(ch *amqp.Channel, key string, args map[string]interface{}) (<-chan amqp.Delivery, error) {
q, err := ch.QueueDeclare(
key,
true,
false,
false,
false,
args,
)
failOnError(err)
err = ch.QueueBind(
q.Name,
key,
"logs_top",
false,
args,
)
failOnError(err)
msgs, err := ch.Consume(
q.Name,
key,
false,
false,
false,
false,
args,
)
return msgs, err
}
// failOnError ...
func failOnError(err error) {
if err != nil {
log.Printf("%s", err)
}
}
package main
import (
"github.com/streadway/amqp"
"log"
"os"
"strconv"
)
func main() {
args := make(amqp.Table)
args["x-max-priority"] = 9
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Println(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Println(err)
}
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_top",
"topic",
true,
false,
false,
false,
args,
)
if err != nil {
log.Println(err)
}
priority, _ := strconv.Atoi(os.Args[1])
count, _ := strconv.Atoi(os.Args[2])
for i := 0; i < count; i++ {
err := ch.Publish(
"logs_top",
"priority",
false,
false,
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
Body: []byte(os.Args[1]),
Priority: uint8(priority),
},
)
if err != nil {
log.Println(err)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment