package main import ( "fmt" "net/http" "sync" "github.com/gin-gonic/gin" ) // NotificationCenter ... type NotificationCenter struct { subscriberMessageChannelsID map[string]chan interface{} subscribersMu *sync.Mutex } // NewNotificationCenter ... func NewNotificationCenter() *NotificationCenter { return &NotificationCenter{ subscribersMu: &sync.Mutex{}, subscriberMessageChannelsID: make(map[string]chan interface{}), } } // WaitForMessage will blocking the process until returning the message func (nc *NotificationCenter) WaitForMessage(id string) <-chan interface{} { return nc.subscriberMessageChannelsID[id] } // Subscribe ... func (nc *NotificationCenter) Subscribe(id string) error { fmt.Printf(">>>> subscribe %s\n", id) nc.subscribersMu.Lock() defer nc.subscribersMu.Unlock() if _, exist := nc.subscriberMessageChannelsID[id]; exist { return fmt.Errorf("outlet %s already registered", id) } nc.subscriberMessageChannelsID[id] = make(chan interface{}) return nil } // Unsubscribe ... func (nc *NotificationCenter) Unsubscribe(id string) error { fmt.Printf(">>>> unsubscribe %s\n", id) nc.subscribersMu.Lock() defer nc.subscribersMu.Unlock() if _, exist := nc.subscriberMessageChannelsID[id]; !exist { return fmt.Errorf("outlet %s is not registered yet", id) } close(nc.subscriberMessageChannelsID[id]) delete(nc.subscriberMessageChannelsID, id) return nil } // Notify ... func (nc *NotificationCenter) Notify(id string, message interface{}) error { fmt.Printf(">>>> send message to %s\n", id) nc.subscribersMu.Lock() defer nc.subscribersMu.Unlock() if _, exist := nc.subscriberMessageChannelsID[id]; !exist { return fmt.Errorf("outlet %s is not registered", id) } nc.subscriberMessageChannelsID[id] <- message return nil } func handleSSE(nc *NotificationCenter) gin.HandlerFunc { return func(ctx *gin.Context) { id := ctx.Param("id") // subscribe the id and channel if err := nc.Subscribe(id); err != nil { ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } // unsubscribe if exit from this method defer func() { if err := nc.Unsubscribe(id); err != nil { ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } message := fmt.Sprintf("id %s close connection", id) ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": message}) }() // forever loop for listening message for { select { case message := <-nc.WaitForMessage(id): ctx.SSEvent("message", message) ctx.Writer.Flush() case <-ctx.Request.Context().Done(): return } } } } func messageHandler(nc *NotificationCenter) gin.HandlerFunc { return func(ctx *gin.Context) { id := ctx.Param("id") message := fmt.Sprintf("Hello %s", id) if err := nc.Notify(id, message); err != nil { ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("id"))}) } } func main() { r := gin.Default() nc := NewNotificationCenter() r.GET("/message/:id", messageHandler(nc)) r.GET("/handshake/:id", handleSSE(nc)) r.Run(":3000") }