Last active
April 3, 2024 01:59
-
-
Save mirzaakhena/c75caa8529e648b7fc7b4becd4ca6fe6 to your computer and use it in GitHub Desktop.
Server Sent Events Java Client Implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // update version of rikonor | |
| // https://gist.github.com/rikonor/e53a33c27ed64861c91a095a59f0aa44 | |
| // and ismasan | |
| // https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c | |
| // | |
| // | |
| // for SSE implementation | |
| // fixing: | |
| // * adding unsubscribe event for | |
| // | |
| // improvement: | |
| // * adding identifier (outletCode) of each channel | |
| package main | |
| import ( | |
| "fmt" | |
| "io" | |
| "net/http" | |
| "sync" | |
| "github.com/gin-gonic/gin" | |
| ) | |
| // UnsubscribeFunc ... | |
| type UnsubscribeFunc func() error | |
| // Subscriber ... | |
| type Subscriber interface { | |
| Subscribe(outletCode string, c chan string) (UnsubscribeFunc, error) | |
| } | |
| // Notifier ... | |
| type Notifier interface { | |
| Notify(outletCode, message string) error | |
| } | |
| // NotificationCenter is implementation | |
| type NotificationCenter struct { | |
| outletMapChannel map[string]chan string | |
| subscribers map[chan string]struct{} | |
| subscribersMu *sync.Mutex | |
| } | |
| // NewNotificationCenter is constructor | |
| func NewNotificationCenter() *NotificationCenter { | |
| return &NotificationCenter{ | |
| subscribers: map[chan string]struct{}{}, | |
| subscribersMu: &sync.Mutex{}, | |
| outletMapChannel: make(map[string]chan string), | |
| } | |
| } | |
| // Subscribe ... | |
| func (nc *NotificationCenter) Subscribe(outletCode string, c chan string) (UnsubscribeFunc, error) { | |
| fmt.Printf("try subscribe %s\n", outletCode) | |
| nc.subscribersMu.Lock() | |
| defer nc.subscribersMu.Unlock() | |
| if _, exist := nc.outletMapChannel[outletCode]; exist { | |
| return nil, fmt.Errorf("outlet %s already registered", outletCode) | |
| } | |
| nc.subscribers[c] = struct{}{} | |
| nc.outletMapChannel[outletCode] = c | |
| unsubscribeFn := func() error { | |
| fmt.Printf("unsubscribe %s\n", outletCode) | |
| nc.subscribersMu.Lock() | |
| defer nc.subscribersMu.Unlock() | |
| delete(nc.subscribers, c) | |
| delete(nc.outletMapChannel, outletCode) | |
| return nil | |
| } | |
| return unsubscribeFn, nil | |
| } | |
| // Notify ... | |
| func (nc *NotificationCenter) Notify(outletCode, message string) error { | |
| fmt.Printf("notify %s\n", outletCode) | |
| nc.subscribersMu.Lock() | |
| defer nc.subscribersMu.Unlock() | |
| nc.outletMapChannel[outletCode] <- message | |
| return nil | |
| } | |
| func handleSSE(s Subscriber) gin.HandlerFunc { | |
| return func(ctx *gin.Context) { | |
| // prepare the channel | |
| c := make(chan string) | |
| // subscribe the outletCode and channel | |
| unsubscribeFn, err := s.Subscribe(ctx.Param("outletCode"), c) | |
| if err != nil { | |
| fmt.Printf("E %v\n", err.Error()) | |
| ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) | |
| return | |
| } | |
| // unsubscribe when exit this function | |
| defer func() { | |
| if err := unsubscribeFn(); err != nil { | |
| ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) | |
| return | |
| } | |
| }() | |
| // unsubscribe when client close the connections | |
| notify := ctx.Request.Context().Done() | |
| go func() { | |
| <-notify | |
| if err := unsubscribeFn(); err != nil { | |
| ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) | |
| return | |
| } | |
| }() | |
| // forever loop for listening message | |
| for { | |
| ctx.Stream(func(w io.Writer) bool { | |
| if message, ok := <-c; ok { | |
| ctx.SSEvent("message", message) | |
| return true | |
| } | |
| return false | |
| }) | |
| } | |
| } | |
| } | |
| func messageHandler(nc Notifier) gin.HandlerFunc { | |
| return func(ctx *gin.Context) { | |
| outletCode := ctx.Param("outletCode") | |
| nc.Notify(outletCode, fmt.Sprintf("Hello %s", outletCode)) | |
| ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("outletCode"))}) | |
| } | |
| } | |
| func main() { | |
| r := gin.Default() | |
| nc := NewNotificationCenter() | |
| r.GET("/message/:outletCode", messageHandler(nc)) | |
| r.GET("/handshake/:outletCode", handleSSE(nc)) | |
| r.Run(":3000") | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.mirzaakhena.websocketdemo; | |
| import java.util.concurrent.TimeUnit; | |
| import javax.annotation.PostConstruct; | |
| import javax.ws.rs.client.Client; | |
| import javax.ws.rs.client.ClientBuilder; | |
| import javax.ws.rs.client.WebTarget; | |
| import javax.ws.rs.sse.InboundSseEvent; | |
| import javax.ws.rs.sse.SseEventSource; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import org.springframework.beans.factory.annotation.Autowired; | |
| import org.springframework.boot.SpringApplication; | |
| import org.springframework.boot.autoconfigure.SpringBootApplication; | |
| import org.springframework.core.env.Environment; | |
| @SpringBootApplication | |
| public class SSEApplication { | |
| Logger logger = LoggerFactory.getLogger(WebsocketdemoApplication.class); | |
| @Autowired | |
| private Environment env; | |
| public static void main(String[] args) { | |
| SpringApplication.run(WebsocketdemoApplication.class, args); | |
| } | |
| private final String PROP_HANDSHAKING_URL = "app.handshaking_url"; | |
| private final String PROP_HANDSHAKING_RETRY_IN_SECOND = "app.handshaking_retry_in_second"; | |
| private Client client; | |
| private WebTarget tut; | |
| private SseEventSource sse; | |
| @PostConstruct | |
| private void initAplikasi() { | |
| this.client = ClientBuilder.newClient(); | |
| this.tut = this.client.target(env.getProperty(PROP_HANDSHAKING_URL)); | |
| doHandshaking(); | |
| } | |
| private void doHandshaking() { | |
| logger.info(String.format("Start doHandshaking...")); | |
| int retry = Integer.parseInt(env.getProperty(PROP_HANDSHAKING_RETRY_IN_SECOND)); | |
| sse = SseEventSource.// | |
| target(this.tut).// | |
| reconnectingEvery(retry, TimeUnit.SECONDS).// | |
| build();// | |
| sse.register(this::onMessage, this::onError, () -> { | |
| logger.info(String.format("OnComplete")); | |
| try { | |
| Thread.sleep(retry * 1000L); | |
| } catch (InterruptedException e) { | |
| } | |
| doHandshaking(); | |
| }); | |
| sse.open(); | |
| } | |
| private void onError(Throwable t) { | |
| logger.info(String.format("Error?????")); | |
| } | |
| private void onMessage(InboundSseEvent event) { | |
| logger.info(String.format("Message : %s", event.readData())); | |
| } | |
| } | |
| // application.properties | |
| // ----------------------- | |
| // server.port = 8080 | |
| // app.handshaking_url = http://localhost:3000/handshake/0208 | |
| // app.handshaking_retry_in_second = 5 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment