Last active
April 3, 2024 01:59
-
-
Save mirzaakhena/c75caa8529e648b7fc7b4becd4ca6fe6 to your computer and use it in GitHub Desktop.
Server Sent Events Java Client Implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "fmt" | |
| "net/http" | |
| "sync" | |
| "github.com/gin-gonic/gin" | |
| ) | |
| // NotificationCenter ... | |
| type NotificationCenter struct { | |
| subscriberMessageChannelsID map[string]chan interface{} | |
| subscribersMu *sync.Mutex | |
| } | |
| // NewNotificationCenter ... | |
| func NewNotificationCenter() *NotificationCenter { | |
| return &NotificationCenter{ | |
| subscribersMu: &sync.Mutex{}, | |
| subscriberMessageChannelsID: make(map[string]chan interface{}), | |
| } | |
| } | |
| // WaitForMessage will blocking the process until returning the message | |
| func (nc *NotificationCenter) WaitForMessage(id string) <-chan interface{} { | |
| return nc.subscriberMessageChannelsID[id] | |
| } | |
| // Subscribe ... | |
| func (nc *NotificationCenter) Subscribe(id string) error { | |
| fmt.Printf(">>>> subscribe %s\n", id) | |
| nc.subscribersMu.Lock() | |
| defer nc.subscribersMu.Unlock() | |
| if _, exist := nc.subscriberMessageChannelsID[id]; exist { | |
| return fmt.Errorf("outlet %s already registered", id) | |
| } | |
| nc.subscriberMessageChannelsID[id] = make(chan interface{}) | |
| return nil | |
| } | |
| // Unsubscribe ... | |
| func (nc *NotificationCenter) Unsubscribe(id string) error { | |
| fmt.Printf(">>>> unsubscribe %s\n", id) | |
| nc.subscribersMu.Lock() | |
| defer nc.subscribersMu.Unlock() | |
| if _, exist := nc.subscriberMessageChannelsID[id]; !exist { | |
| return fmt.Errorf("outlet %s is not registered yet", id) | |
| } | |
| close(nc.subscriberMessageChannelsID[id]) | |
| delete(nc.subscriberMessageChannelsID, id) | |
| return nil | |
| } | |
| // Notify ... | |
| func (nc *NotificationCenter) Notify(id string, message interface{}) error { | |
| fmt.Printf(">>>> send message to %s\n", id) | |
| nc.subscribersMu.Lock() | |
| defer nc.subscribersMu.Unlock() | |
| if _, exist := nc.subscriberMessageChannelsID[id]; !exist { | |
| return fmt.Errorf("outlet %s is not registered", id) | |
| } | |
| nc.subscriberMessageChannelsID[id] <- message | |
| return nil | |
| } | |
| func handleSSE(nc *NotificationCenter) gin.HandlerFunc { | |
| return func(ctx *gin.Context) { | |
| id := ctx.Param("id") | |
| // subscribe the id and channel | |
| if err := nc.Subscribe(id); err != nil { | |
| ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) | |
| return | |
| } | |
| // unsubscribe if exit from this method | |
| defer func() { | |
| if err := nc.Unsubscribe(id); err != nil { | |
| ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) | |
| return | |
| } | |
| message := fmt.Sprintf("id %s close connection", id) | |
| ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": message}) | |
| }() | |
| // forever loop for listening message | |
| for { | |
| select { | |
| case message := <-nc.WaitForMessage(id): | |
| ctx.SSEvent("message", message) | |
| ctx.Writer.Flush() | |
| case <-ctx.Request.Context().Done(): | |
| return | |
| } | |
| } | |
| } | |
| } | |
| func messageHandler(nc *NotificationCenter) gin.HandlerFunc { | |
| return func(ctx *gin.Context) { | |
| id := ctx.Param("id") | |
| message := fmt.Sprintf("Hello %s", id) | |
| if err := nc.Notify(id, message); err != nil { | |
| ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) | |
| return | |
| } | |
| ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("id"))}) | |
| } | |
| } | |
| func main() { | |
| r := gin.Default() | |
| nc := NewNotificationCenter() | |
| r.GET("/message/:id", messageHandler(nc)) | |
| r.GET("/handshake/:id", handleSSE(nc)) | |
| r.Run(":3000") | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.mirzaakhena.websocketdemo; | |
| import java.util.concurrent.TimeUnit; | |
| import javax.annotation.PostConstruct; | |
| import javax.ws.rs.client.Client; | |
| import javax.ws.rs.client.ClientBuilder; | |
| import javax.ws.rs.client.WebTarget; | |
| import javax.ws.rs.sse.InboundSseEvent; | |
| import javax.ws.rs.sse.SseEventSource; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import org.springframework.beans.factory.annotation.Autowired; | |
| import org.springframework.boot.SpringApplication; | |
| import org.springframework.boot.autoconfigure.SpringBootApplication; | |
| import org.springframework.core.env.Environment; | |
| @SpringBootApplication | |
| public class WebsocketdemoApplication { | |
| Logger logger = LoggerFactory.getLogger(WebsocketdemoApplication.class); | |
| @Autowired | |
| private Environment env; | |
| public static void main(String[] args) { | |
| SpringApplication.run(WebsocketdemoApplication.class, args); | |
| } | |
| private final String PROP_HANDSHAKING_URL = "app.handshaking_url"; | |
| private final String PROP_HANDSHAKING_RETRY_IN_SECOND = "app.handshaking_retry_in_second"; | |
| private Client client; | |
| private WebTarget tut; | |
| private SseEventSource sse; | |
| @PostConstruct | |
| private void initAplikasi() { | |
| this.client = ClientBuilder.newClient(); | |
| this.tut = this.client.target(env.getProperty(PROP_HANDSHAKING_URL)); | |
| doHandshaking(); | |
| } | |
| private void doHandshaking() { | |
| logger.info(String.format("Start doHandshaking...")); | |
| int retry = Integer.parseInt(env.getProperty(PROP_HANDSHAKING_RETRY_IN_SECOND)); | |
| sse = SseEventSource.// | |
| target(this.tut).// | |
| reconnectingEvery(retry, TimeUnit.SECONDS).// | |
| build();// | |
| sse.register(this::onMessage, this::onError, () -> { | |
| logger.info(String.format("OnComplete")); | |
| try { | |
| Thread.sleep(retry * 1000L); | |
| } catch (InterruptedException e) { | |
| } | |
| doHandshaking(); | |
| }); | |
| sse.open(); | |
| } | |
| private void onError(Throwable t) { | |
| logger.error("Error : %s", t.getMessage()); | |
| } | |
| private void onMessage(InboundSseEvent event) { | |
| logger.info(String.format("Message : %s", event.readData())); | |
| } | |
| } | |
| // application.properties | |
| // ----------------------- | |
| // server.port = 8080 | |
| // app.handshaking_url = http://localhost:3000/handshake/0208 | |
| // app.handshaking_retry_in_second = 5 | |
| // pom.xml dependency | |
| // <dependencies> | |
| // <dependency> | |
| // <groupId>org.springframework.boot</groupId> | |
| // <artifactId>spring-boot-starter-web</artifactId> | |
| // </dependency> | |
| // <dependency> | |
| // <groupId>org.glassfish.jersey.media</groupId> | |
| // <artifactId>jersey-media-sse</artifactId> | |
| // <version>2.26</version> | |
| // </dependency> | |
| // <dependency> | |
| // <groupId>org.glassfish.jersey.inject</groupId> | |
| // <artifactId>jersey-hk2</artifactId> | |
| // <version>2.26</version> | |
| // </dependency> | |
| // </dependencies> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment