Skip to content

Instantly share code, notes, and snippets.

@mirzaakhena
Last active April 3, 2024 01:59
Show Gist options
  • Select an option

  • Save mirzaakhena/c75caa8529e648b7fc7b4becd4ca6fe6 to your computer and use it in GitHub Desktop.

Select an option

Save mirzaakhena/c75caa8529e648b7fc7b4becd4ca6fe6 to your computer and use it in GitHub Desktop.
Server Sent Events Java Client Implementation
package main
import (
"fmt"
"net/http"
"sync"
"github.com/gin-gonic/gin"
)
// NotificationCenter ...
type NotificationCenter struct {
subscriberMessageChannelsID map[string]chan interface{}
subscribersMu *sync.Mutex
}
// NewNotificationCenter ...
func NewNotificationCenter() *NotificationCenter {
return &NotificationCenter{
subscribersMu: &sync.Mutex{},
subscriberMessageChannelsID: make(map[string]chan interface{}),
}
}
// WaitForMessage will blocking the process until returning the message
func (nc *NotificationCenter) WaitForMessage(id string) <-chan interface{} {
return nc.subscriberMessageChannelsID[id]
}
// Subscribe ...
func (nc *NotificationCenter) Subscribe(id string) error {
fmt.Printf(">>>> subscribe %s\n", id)
nc.subscribersMu.Lock()
defer nc.subscribersMu.Unlock()
if _, exist := nc.subscriberMessageChannelsID[id]; exist {
return fmt.Errorf("outlet %s already registered", id)
}
nc.subscriberMessageChannelsID[id] = make(chan interface{})
return nil
}
// Unsubscribe ...
func (nc *NotificationCenter) Unsubscribe(id string) error {
fmt.Printf(">>>> unsubscribe %s\n", id)
nc.subscribersMu.Lock()
defer nc.subscribersMu.Unlock()
if _, exist := nc.subscriberMessageChannelsID[id]; !exist {
return fmt.Errorf("outlet %s is not registered yet", id)
}
close(nc.subscriberMessageChannelsID[id])
delete(nc.subscriberMessageChannelsID, id)
return nil
}
// Notify ...
func (nc *NotificationCenter) Notify(id string, message interface{}) error {
fmt.Printf(">>>> send message to %s\n", id)
nc.subscribersMu.Lock()
defer nc.subscribersMu.Unlock()
if _, exist := nc.subscriberMessageChannelsID[id]; !exist {
return fmt.Errorf("outlet %s is not registered", id)
}
nc.subscriberMessageChannelsID[id] <- message
return nil
}
func handleSSE(nc *NotificationCenter) gin.HandlerFunc {
return func(ctx *gin.Context) {
id := ctx.Param("id")
// subscribe the id and channel
if err := nc.Subscribe(id); err != nil {
ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
return
}
// unsubscribe if exit from this method
defer func() {
if err := nc.Unsubscribe(id); err != nil {
ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
return
}
message := fmt.Sprintf("id %s close connection", id)
ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": message})
}()
// forever loop for listening message
for {
select {
case message := <-nc.WaitForMessage(id):
ctx.SSEvent("message", message)
ctx.Writer.Flush()
case <-ctx.Request.Context().Done():
return
}
}
}
}
func messageHandler(nc *NotificationCenter) gin.HandlerFunc {
return func(ctx *gin.Context) {
id := ctx.Param("id")
message := fmt.Sprintf("Hello %s", id)
if err := nc.Notify(id, message); err != nil {
ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
return
}
ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("id"))})
}
}
func main() {
r := gin.Default()
nc := NewNotificationCenter()
r.GET("/message/:id", messageHandler(nc))
r.GET("/handshake/:id", handleSSE(nc))
r.Run(":3000")
}
package com.mirzaakhena.websocketdemo;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.sse.InboundSseEvent;
import javax.ws.rs.sse.SseEventSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.core.env.Environment;
@SpringBootApplication
public class WebsocketdemoApplication {
Logger logger = LoggerFactory.getLogger(WebsocketdemoApplication.class);
@Autowired
private Environment env;
public static void main(String[] args) {
SpringApplication.run(WebsocketdemoApplication.class, args);
}
private final String PROP_HANDSHAKING_URL = "app.handshaking_url";
private final String PROP_HANDSHAKING_RETRY_IN_SECOND = "app.handshaking_retry_in_second";
private Client client;
private WebTarget tut;
private SseEventSource sse;
@PostConstruct
private void initAplikasi() {
this.client = ClientBuilder.newClient();
this.tut = this.client.target(env.getProperty(PROP_HANDSHAKING_URL));
doHandshaking();
}
private void doHandshaking() {
logger.info(String.format("Start doHandshaking..."));
int retry = Integer.parseInt(env.getProperty(PROP_HANDSHAKING_RETRY_IN_SECOND));
sse = SseEventSource.//
target(this.tut).//
reconnectingEvery(retry, TimeUnit.SECONDS).//
build();//
sse.register(this::onMessage, this::onError, () -> {
logger.info(String.format("OnComplete"));
try {
Thread.sleep(retry * 1000L);
} catch (InterruptedException e) {
}
doHandshaking();
});
sse.open();
}
private void onError(Throwable t) {
logger.error("Error : %s", t.getMessage());
}
private void onMessage(InboundSseEvent event) {
logger.info(String.format("Message : %s", event.readData()));
}
}
// application.properties
// -----------------------
// server.port = 8080
// app.handshaking_url = http://localhost:3000/handshake/0208
// app.handshaking_retry_in_second = 5
// pom.xml dependency
// <dependencies>
// <dependency>
// <groupId>org.springframework.boot</groupId>
// <artifactId>spring-boot-starter-web</artifactId>
// </dependency>
// <dependency>
// <groupId>org.glassfish.jersey.media</groupId>
// <artifactId>jersey-media-sse</artifactId>
// <version>2.26</version>
// </dependency>
// <dependency>
// <groupId>org.glassfish.jersey.inject</groupId>
// <artifactId>jersey-hk2</artifactId>
// <version>2.26</version>
// </dependency>
// </dependencies>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment