Skip to content

Instantly share code, notes, and snippets.

@exu
Forked from schmohlio/sse.go
Created June 15, 2020 07:56
Show Gist options
  • Save exu/2f6cd04143aa6ffd2461b8f9782b9d66 to your computer and use it in GitHub Desktop.
Save exu/2f6cd04143aa6ffd2461b8f9782b9d66 to your computer and use it in GitHub Desktop.

Revisions

  1. @schmohlio schmohlio revised this gist Nov 15, 2016. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions sse.go
    Original file line number Diff line number Diff line change
    @@ -121,6 +121,7 @@ func (broker *Broker) listen() {
    select {
    case clientMessageChan <- event:
    case <-time.After(patience):
    log.Print("Skipping client.")
    }
    }
    }
  2. @schmohlio schmohlio revised this gist Nov 15, 2016. 1 changed file with 25 additions and 12 deletions.
    37 changes: 25 additions & 12 deletions sse.go
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,8 @@
    // v2 of the great example of SSE in go by @ismasan.
    // includes fixes:
    // * infinite loop ending in panic
    // * closing a client twice
    // * potentially blocked listen() from closing a connection during multiplex step.
    package main

    import (
    @@ -7,6 +12,10 @@ import (
    "time"
    )

    // the amount of time to wait when pushing a message to
    // a slow client or a client that closed after `range clients` started.
    const patience time.Duration = time.Second*1

    // Example SSE server in Golang.
    // $ go run sse.go

    @@ -23,6 +32,7 @@ type Broker struct {

    // Client connections registry
    clients map[chan []byte]bool

    }

    func NewServer() (broker *Broker) {
    @@ -71,19 +81,19 @@ func (broker *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
    // Listen to connection close and un-register messageChan
    notify := rw.(http.CloseNotifier).CloseNotify()

    go func() {
    <-notify
    broker.closingClients <- messageChan
    }()

    for {
    select {
    case <-notify:
    return
    default:

    // Write to the ResponseWriter
    // Server Sent Events compatible
    fmt.Fprintf(rw, "data: %s\n\n", <-messageChan)
    // Write to the ResponseWriter
    // Server Sent Events compatible
    fmt.Fprintf(rw, "data: %s\n\n", <-messageChan)

    // Flush the data immediatly instead of buffering it for later.
    flusher.Flush()
    // Flush the data immediatly instead of buffering it for later.
    flusher.Flush()
    }
    }

    }
    @@ -108,7 +118,10 @@ func (broker *Broker) listen() {
    // We got a new event from the outside!
    // Send event to all connected clients
    for clientMessageChan, _ := range broker.clients {
    clientMessageChan <- event
    select {
    case clientMessageChan <- event:
    case <-time.After(patience):
    }
    }
    }
    }
    @@ -130,4 +143,4 @@ func main() {

    log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", broker))

    }
    }
  3. Ismael Celis revised this gist May 22, 2014. 1 changed file with 1 addition and 2 deletions.
    3 changes: 1 addition & 2 deletions sse.go
    Original file line number Diff line number Diff line change
    @@ -65,7 +65,6 @@ func (broker *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
    // Remove this client from the map of connected clients
    // when this handler exits.
    defer func() {
    fmt.Println("HERE.")
    broker.closingClients <- messageChan
    }()

    @@ -83,7 +82,7 @@ func (broker *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
    // Server Sent Events compatible
    fmt.Fprintf(rw, "data: %s\n\n", <-messageChan)

    // Flush the data inmediatly instead of buffering it for later.
    // Flush the data immediatly instead of buffering it for later.
    flusher.Flush()
    }

  4. Ismael Celis revised this gist May 13, 2014. 1 changed file with 50 additions and 51 deletions.
    101 changes: 50 additions & 51 deletions sse.go
    Original file line number Diff line number Diff line change
    @@ -4,7 +4,7 @@ import (
    "fmt"
    "log"
    "net/http"
    "time"
    "time"
    )

    // Example SSE server in Golang.
    @@ -26,66 +26,66 @@ type Broker struct {
    }

    func NewServer() (broker *Broker) {
    // Instantiate a broker
    // Instantiate a broker
    broker = &Broker{
    Notifier: make(chan []byte, 1),
    newClients: make(chan chan []byte),
    closingClients: make(chan chan []byte),
    clients: make(map[chan []byte]bool),
    }

    // Set it running - listening and broadcasting events
    // Set it running - listening and broadcasting events
    go broker.listen()

    return
    }

    func (broker *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {

    // Make sure that the writer supports flushing.
    //
    flusher, ok := rw.(http.Flusher)

    if !ok {
    http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
    return
    }
    // Make sure that the writer supports flushing.
    //
    flusher, ok := rw.(http.Flusher)

    rw.Header().Set("Content-Type", "text/event-stream")
    rw.Header().Set("Cache-Control", "no-cache")
    rw.Header().Set("Connection", "keep-alive")
    rw.Header().Set("Access-Control-Allow-Origin", "*")
    if !ok {
    http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
    return
    }

    rw.Header().Set("Content-Type", "text/event-stream")
    rw.Header().Set("Cache-Control", "no-cache")
    rw.Header().Set("Connection", "keep-alive")
    rw.Header().Set("Access-Control-Allow-Origin", "*")

    // Each connection registers its own message channel with the Broker's connections registry
    messageChan := make(chan []byte)
    // Each connection registers its own message channel with the Broker's connections registry
    messageChan := make(chan []byte)

    // Signal the broker that we have a new connection
    broker.newClients <- messageChan
    // Signal the broker that we have a new connection
    broker.newClients <- messageChan

    // Remove this client from the map of connected clients
    // when this handler exits.
    defer func() {
    fmt.Println("HERE.")
    broker.closingClients <- messageChan
    }()
    // Remove this client from the map of connected clients
    // when this handler exits.
    defer func() {
    fmt.Println("HERE.")
    broker.closingClients <- messageChan
    }()

    // Listen to connection close and un-register messageChan
    notify := rw.(http.CloseNotifier).CloseNotify()
    // Listen to connection close and un-register messageChan
    notify := rw.(http.CloseNotifier).CloseNotify()

    go func() {
    <-notify
    broker.closingClients <- messageChan
    }()
    go func() {
    <-notify
    broker.closingClients <- messageChan
    }()

    for {
    for {

    // Write to the ResponseWriter
    // Server Sent Events compatible
    fmt.Fprintf(rw, "data: %s\n\n", <-messageChan)
    // Write to the ResponseWriter
    // Server Sent Events compatible
    fmt.Fprintf(rw, "data: %s\n\n", <-messageChan)

    // Flush the data inmediatly instead of buffering it for later.
    flusher.Flush()
    }
    // Flush the data inmediatly instead of buffering it for later.
    flusher.Flush()
    }

    }

    @@ -106,7 +106,7 @@ func (broker *Broker) listen() {
    log.Printf("Removed client. %d registered clients", len(broker.clients))
    case event := <-broker.Notifier:

    // We got a new event from the outside!
    // We got a new event from the outside!
    // Send event to all connected clients
    for clientMessageChan, _ := range broker.clients {
    clientMessageChan <- event
    @@ -116,20 +116,19 @@ func (broker *Broker) listen() {

    }


    func main() {

    broker := NewServer()
    broker := NewServer()

    go func() {
    for {
    time.Sleep(time.Second * 2)
    eventString := fmt.Sprintf("the time is %v", time.Now())
    log.Println("Receiving event")
    broker.Notifier <- []byte(eventString)
    }
    }()
    go func() {
    for {
    time.Sleep(time.Second * 2)
    eventString := fmt.Sprintf("the time is %v", time.Now())
    log.Println("Receiving event")
    broker.Notifier <- []byte(eventString)
    }
    }()

    log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", broker))
    log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", broker))

    }
    }
  5. Ismael Celis renamed this gist May 13, 2014. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  6. Ismael Celis created this gist May 13, 2014.
    135 changes: 135 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,135 @@
    package main

    import (
    "fmt"
    "log"
    "net/http"
    "time"
    )

    // Example SSE server in Golang.
    // $ go run sse.go

    type Broker struct {

    // Events are pushed to this channel by the main events-gathering routine
    Notifier chan []byte

    // New client connections
    newClients chan chan []byte

    // Closed client connections
    closingClients chan chan []byte

    // Client connections registry
    clients map[chan []byte]bool
    }

    func NewServer() (broker *Broker) {
    // Instantiate a broker
    broker = &Broker{
    Notifier: make(chan []byte, 1),
    newClients: make(chan chan []byte),
    closingClients: make(chan chan []byte),
    clients: make(map[chan []byte]bool),
    }

    // Set it running - listening and broadcasting events
    go broker.listen()

    return
    }

    func (broker *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {

    // Make sure that the writer supports flushing.
    //
    flusher, ok := rw.(http.Flusher)

    if !ok {
    http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
    return
    }

    rw.Header().Set("Content-Type", "text/event-stream")
    rw.Header().Set("Cache-Control", "no-cache")
    rw.Header().Set("Connection", "keep-alive")
    rw.Header().Set("Access-Control-Allow-Origin", "*")

    // Each connection registers its own message channel with the Broker's connections registry
    messageChan := make(chan []byte)

    // Signal the broker that we have a new connection
    broker.newClients <- messageChan

    // Remove this client from the map of connected clients
    // when this handler exits.
    defer func() {
    fmt.Println("HERE.")
    broker.closingClients <- messageChan
    }()

    // Listen to connection close and un-register messageChan
    notify := rw.(http.CloseNotifier).CloseNotify()

    go func() {
    <-notify
    broker.closingClients <- messageChan
    }()

    for {

    // Write to the ResponseWriter
    // Server Sent Events compatible
    fmt.Fprintf(rw, "data: %s\n\n", <-messageChan)

    // Flush the data inmediatly instead of buffering it for later.
    flusher.Flush()
    }

    }

    func (broker *Broker) listen() {
    for {
    select {
    case s := <-broker.newClients:

    // A new client has connected.
    // Register their message channel
    broker.clients[s] = true
    log.Printf("Client added. %d registered clients", len(broker.clients))
    case s := <-broker.closingClients:

    // A client has dettached and we want to
    // stop sending them messages.
    delete(broker.clients, s)
    log.Printf("Removed client. %d registered clients", len(broker.clients))
    case event := <-broker.Notifier:

    // We got a new event from the outside!
    // Send event to all connected clients
    for clientMessageChan, _ := range broker.clients {
    clientMessageChan <- event
    }
    }
    }

    }


    func main() {

    broker := NewServer()

    go func() {
    for {
    time.Sleep(time.Second * 2)
    eventString := fmt.Sprintf("the time is %v", time.Now())
    log.Println("Receiving event")
    broker.Notifier <- []byte(eventString)
    }
    }()

    log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", broker))

    }