package main import ( "context" "fmt" "github.com/streadway/amqp" "log" "os" "os/signal" "time" ) func main() { args := make(amqp.Table) args["x-max-priority"] = 9 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err) defer conn.Close() ch, err := conn.Channel() failOnError(err) defer ch.Close() err = ch.Qos(1, 0, false) failOnError(err) err = ch.ExchangeDeclare( "logs_top", "topic", true, false, false, false, args, ) failOnError(err) msgs, err := consumer(ch, "priority", args) failOnError(err) stop := make(chan os.Signal) signal.Notify(stop, os.Interrupt) go func() { log.Println("Waiting message.") for msg := range msgs { fmt.Println(msg.Priority) msg.Ack(false) time.Sleep(time.Second * 1) } }() <-stop log.Println("Shutting down.") const timeout = 3 * time.Second ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() <-ctx.Done() } // consumer ... func consumer(ch *amqp.Channel, key string, args map[string]interface{}) (<-chan amqp.Delivery, error) { q, err := ch.QueueDeclare( key, true, false, false, false, args, ) failOnError(err) err = ch.QueueBind( q.Name, key, "logs_top", false, args, ) failOnError(err) msgs, err := ch.Consume( q.Name, key, false, false, false, false, args, ) return msgs, err } // failOnError ... func failOnError(err error) { if err != nil { log.Printf("%s", err) } }