// update version of rikonor // https://gist.github.com/rikonor/e53a33c27ed64861c91a095a59f0aa44 // and ismasan // https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c // // // for SSE implementation // fixing: // * adding unsubscribe event for // // improvement: // * adding identifier (outletCode) of each channel package main import ( "fmt" "io" "net/http" "sync" "github.com/gin-gonic/gin" ) // UnsubscribeFunc ... type UnsubscribeFunc func() error // Subscriber ... type Subscriber interface { Subscribe(outletCode string, c chan string) (UnsubscribeFunc, error) } // Notifier ... type Notifier interface { Notify(outletCode, message string) error } // NotificationCenter is implementation type NotificationCenter struct { outletMapChannel map[string]chan string subscribers map[chan string]struct{} subscribersMu *sync.Mutex } // NewNotificationCenter is constructor func NewNotificationCenter() *NotificationCenter { return &NotificationCenter{ subscribers: map[chan string]struct{}{}, subscribersMu: &sync.Mutex{}, outletMapChannel: make(map[string]chan string), } } // Subscribe ... func (nc *NotificationCenter) Subscribe(outletCode string, c chan string) (UnsubscribeFunc, error) { fmt.Printf("try subscribe %s\n", outletCode) nc.subscribersMu.Lock() defer nc.subscribersMu.Unlock() if _, exist := nc.outletMapChannel[outletCode]; exist { return nil, fmt.Errorf("outlet %s already registered", outletCode) } nc.subscribers[c] = struct{}{} nc.outletMapChannel[outletCode] = c unsubscribeFn := func() error { fmt.Printf("unsubscribe %s\n", outletCode) nc.subscribersMu.Lock() defer nc.subscribersMu.Unlock() delete(nc.subscribers, c) delete(nc.outletMapChannel, outletCode) return nil } return unsubscribeFn, nil } // Notify ... func (nc *NotificationCenter) Notify(outletCode, message string) error { fmt.Printf("notify %s\n", outletCode) nc.subscribersMu.Lock() defer nc.subscribersMu.Unlock() nc.outletMapChannel[outletCode] <- message return nil } func handleSSE(s Subscriber) gin.HandlerFunc { return func(ctx *gin.Context) { // prepare the channel c := make(chan string) // subscribe the outletCode and channel unsubscribeFn, err := s.Subscribe(ctx.Param("outletCode"), c) if err != nil { fmt.Printf("E %v\n", err.Error()) ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } // unsubscribe when exit this function defer func() { if err := unsubscribeFn(); err != nil { ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } }() // unsubscribe when client close the connections notify := ctx.Request.Context().Done() go func() { <-notify if err := unsubscribeFn(); err != nil { ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } }() // forever loop for listening message for { ctx.Stream(func(w io.Writer) bool { if message, ok := <-c; ok { ctx.SSEvent("message", message) return true } return false }) } } } func messageHandler(nc Notifier) gin.HandlerFunc { return func(ctx *gin.Context) { outletCode := ctx.Param("outletCode") nc.Notify(outletCode, fmt.Sprintf("Hello %s", outletCode)) ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("outletCode"))}) } } func main() { r := gin.Default() nc := NewNotificationCenter() // test call with this // curl GET http://localhost:3000/message/0208 r.GET("/message/:outletCode", messageHandler(nc)) r.GET("/handshake/:outletCode", handleSSE(nc)) r.Run(":3000") }