Skip to content

Instantly share code, notes, and snippets.

@mirzaakhena
Last active April 3, 2024 01:59
Show Gist options
  • Select an option

  • Save mirzaakhena/c75caa8529e648b7fc7b4becd4ca6fe6 to your computer and use it in GitHub Desktop.

Select an option

Save mirzaakhena/c75caa8529e648b7fc7b4becd4ca6fe6 to your computer and use it in GitHub Desktop.
Server Sent Events Java Client Implementation
// update version of rikonor
// https://gist.github.com/rikonor/e53a33c27ed64861c91a095a59f0aa44
// and ismasan
// https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c
//
//
// for SSE implementation
// fixing:
// * adding unsubscribe event for
//
// improvement:
// * adding identifier (outletCode) of each channel
package main
import (
"fmt"
"io"
"net/http"
"sync"
"github.com/gin-gonic/gin"
)
// UnsubscribeFunc ...
type UnsubscribeFunc func() error
// Subscriber ...
type Subscriber interface {
Subscribe(outletCode string, c chan string) (UnsubscribeFunc, error)
}
// Notifier ...
type Notifier interface {
Notify(outletCode, message string) error
}
// NotificationCenter is implementation
type NotificationCenter struct {
outletMapChannel map[string]chan string
subscribers map[chan string]struct{}
subscribersMu *sync.Mutex
}
// NewNotificationCenter is constructor
func NewNotificationCenter() *NotificationCenter {
return &NotificationCenter{
subscribers: map[chan string]struct{}{},
subscribersMu: &sync.Mutex{},
outletMapChannel: make(map[string]chan string),
}
}
// Subscribe ...
func (nc *NotificationCenter) Subscribe(outletCode string, c chan string) (UnsubscribeFunc, error) {
fmt.Printf("try subscribe %s\n", outletCode)
nc.subscribersMu.Lock()
defer nc.subscribersMu.Unlock()
if _, exist := nc.outletMapChannel[outletCode]; exist {
return nil, fmt.Errorf("outlet %s already registered", outletCode)
}
nc.subscribers[c] = struct{}{}
nc.outletMapChannel[outletCode] = c
unsubscribeFn := func() error {
fmt.Printf("unsubscribe %s\n", outletCode)
nc.subscribersMu.Lock()
defer nc.subscribersMu.Unlock()
delete(nc.subscribers, c)
delete(nc.outletMapChannel, outletCode)
return nil
}
return unsubscribeFn, nil
}
// Notify ...
func (nc *NotificationCenter) Notify(outletCode, message string) error {
fmt.Printf("notify %s\n", outletCode)
nc.subscribersMu.Lock()
defer nc.subscribersMu.Unlock()
nc.outletMapChannel[outletCode] <- message
return nil
}
func handleSSE(s Subscriber) gin.HandlerFunc {
return func(ctx *gin.Context) {
// prepare the channel
c := make(chan string)
// subscribe the outletCode and channel
unsubscribeFn, err := s.Subscribe(ctx.Param("outletCode"), c)
if err != nil {
fmt.Printf("E %v\n", err.Error())
ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
return
}
// unsubscribe when exit this function
defer func() {
if err := unsubscribeFn(); err != nil {
ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
return
}
}()
// unsubscribe when client close the connections
notify := ctx.Request.Context().Done()
go func() {
<-notify
if err := unsubscribeFn(); err != nil {
ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
return
}
}()
// forever loop for listening message
for {
ctx.Stream(func(w io.Writer) bool {
if message, ok := <-c; ok {
ctx.SSEvent("message", message)
return true
}
return false
})
}
}
}
func messageHandler(nc Notifier) gin.HandlerFunc {
return func(ctx *gin.Context) {
outletCode := ctx.Param("outletCode")
nc.Notify(outletCode, fmt.Sprintf("Hello %s", outletCode))
ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("outletCode"))})
}
}
func main() {
r := gin.Default()
nc := NewNotificationCenter()
r.GET("/message/:outletCode", messageHandler(nc))
r.GET("/handshake/:outletCode", handleSSE(nc))
r.Run(":3000")
}
package com.mirzaakhena.websocketdemo;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.sse.InboundSseEvent;
import javax.ws.rs.sse.SseEventSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.core.env.Environment;
@SpringBootApplication
public class SSEApplication {
Logger logger = LoggerFactory.getLogger(WebsocketdemoApplication.class);
@Autowired
private Environment env;
public static void main(String[] args) {
SpringApplication.run(WebsocketdemoApplication.class, args);
}
private final String PROP_HANDSHAKING_URL = "app.handshaking_url";
private final String PROP_HANDSHAKING_RETRY_IN_SECOND = "app.handshaking_retry_in_second";
private Client client;
private WebTarget tut;
private SseEventSource sse;
@PostConstruct
private void initAplikasi() {
this.client = ClientBuilder.newClient();
this.tut = this.client.target(env.getProperty(PROP_HANDSHAKING_URL));
doHandshaking();
}
private void doHandshaking() {
logger.info(String.format("Start doHandshaking..."));
int retry = Integer.parseInt(env.getProperty(PROP_HANDSHAKING_RETRY_IN_SECOND));
sse = SseEventSource.//
target(this.tut).//
reconnectingEvery(retry, TimeUnit.SECONDS).//
build();//
sse.register(this::onMessage, this::onError, () -> {
logger.info(String.format("OnComplete"));
try {
Thread.sleep(retry * 1000L);
} catch (InterruptedException e) {
}
doHandshaking();
});
sse.open();
}
private void onError(Throwable t) {
logger.info(String.format("Error?????"));
}
private void onMessage(InboundSseEvent event) {
logger.info(String.format("Message : %s", event.readData()));
}
}
// application.properties
// -----------------------
// server.port = 8080
// app.handshaking_url = http://localhost:3000/handshake/0208
// app.handshaking_retry_in_second = 5
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment