Last active
April 3, 2024 01:59
-
-
Save mirzaakhena/c75caa8529e648b7fc7b4becd4ca6fe6 to your computer and use it in GitHub Desktop.
Revisions
-
mirzaakhena revised this gist
Jul 27, 2020 . 1 changed file with 0 additions and 147 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,147 +0,0 @@ -
mirzaakhena renamed this gist
Jul 27, 2020 . 1 changed file with 3 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -15,15 +15,15 @@ import org.springframework.core.env.Environment; @SpringBootApplication public class SSE { Logger logger = LoggerFactory.getLogger(SSE.class); @Autowired private Environment env; public static void main(String[] args) { SpringApplication.run(SSE.class, args); } private final String PROP_HANDSHAKING_URL = "app.handshaking_url"; -
mirzaakhena renamed this gist
Jul 27, 2020 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
mirzaakhena revised this gist
Jul 27, 2020 . 1 changed file with 12 additions and 12 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -37,19 +37,27 @@ private void initAplikasi() { Client client = ClientBuilder.newClient(); this.tut = client.target(env.getProperty(PROP_HANDSHAKING_URL)); doHandshaking(); } private void doHandshaking() { logger.info(String.format("Handshaking...")); sse = SseEventSource.target(this.tut).build();// sse.register(this::onMessage, this::onError, this::onComplete); sse.open(); } private void onMessage(InboundSseEvent event) { logger.info(String.format("Message : %s | %s", event.getName(), event.readData())); } private void onError(Throwable t) { logger.error("Error : %s", t.getMessage()); } private void onComplete() { logger.info(String.format("Connection Closed!")); try { int retry = Integer.parseInt(env.getProperty(PROP_HANDSHAKING_RETRY_IN_SECOND)); Thread.sleep(retry * 1000L); @@ -58,14 +66,6 @@ private void rehandshaking() { doHandshaking(); } } -
mirzaakhena revised this gist
Jul 27, 2020 . 1 changed file with 2 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -29,15 +29,14 @@ public static void main(String[] args) { private final String PROP_HANDSHAKING_URL = "app.handshaking_url"; private final String PROP_HANDSHAKING_RETRY_IN_SECOND = "app.handshaking_retry_in_second"; private WebTarget tut; private SseEventSource sse; @PostConstruct private void initAplikasi() { Client client = ClientBuilder.newClient(); this.tut = client.target(env.getProperty(PROP_HANDSHAKING_URL)); doHandshaking(); } -
mirzaakhena revised this gist
Jul 27, 2020 . 1 changed file with 24 additions and 25 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -35,38 +35,37 @@ public static void main(String[] args) { @PostConstruct private void initAplikasi() { this.client = ClientBuilder.newClient(); this.tut = this.client.target(env.getProperty(PROP_HANDSHAKING_URL)); doHandshaking(); } private void doHandshaking() { logger.info(String.format("Handshaking...")); sse = SseEventSource.target(this.tut).build();// sse.register(this::onMessage, this::onError, this::rehandshaking); sse.open(); } private void rehandshaking() { logger.info(String.format("Connection Closed!")); try { int retry = Integer.parseInt(env.getProperty(PROP_HANDSHAKING_RETRY_IN_SECOND)); Thread.sleep(retry * 1000L); } catch (InterruptedException e) { } doHandshaking(); } private void onError(Throwable t) { logger.error("Error : %s", t.getMessage()); } private void onMessage(InboundSseEvent event) { logger.info(String.format("Message : %s", event.readData())); } } -
mirzaakhena revised this gist
Jul 27, 2020 . 1 changed file with 37 additions and 37 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -17,8 +17,8 @@ type NotificationCenter struct { // NewNotificationCenter ... func NewNotificationCenter() *NotificationCenter { return &NotificationCenter{ subscribersMu: &sync.Mutex{}, subscriberMessageChannelsID: make(map[string]chan interface{}), } } @@ -36,7 +36,7 @@ func (nc *NotificationCenter) Subscribe(id string) error { defer nc.subscribersMu.Unlock() if _, exist := nc.subscriberMessageChannelsID[id]; exist { return fmt.Errorf("outlet %s already registered", id) } nc.subscriberMessageChannelsID[id] = make(chan interface{}) @@ -53,7 +53,7 @@ func (nc *NotificationCenter) Unsubscribe(id string) error { defer nc.subscribersMu.Unlock() if _, exist := nc.subscriberMessageChannelsID[id]; !exist { return fmt.Errorf("outlet %s is not registered yet", id) } close(nc.subscriberMessageChannelsID[id]) @@ -71,7 +71,7 @@ func (nc *NotificationCenter) Notify(id string, message interface{}) error { defer nc.subscribersMu.Unlock() if _, exist := nc.subscriberMessageChannelsID[id]; !exist { return fmt.Errorf("outlet %s is not registered", id) } nc.subscriberMessageChannelsID[id] <- message @@ -82,53 +82,53 @@ func (nc *NotificationCenter) Notify(id string, message interface{}) error { func handleSSE(nc *NotificationCenter) gin.HandlerFunc { return func(ctx *gin.Context) { id := ctx.Param("id") // subscribe the id and channel if err := nc.Subscribe(id); err != nil { ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } // unsubscribe if exit from this method defer func() { if err := nc.Unsubscribe(id); err != nil { ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } message := fmt.Sprintf("id %s close connection", id) ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": message}) }() // forever loop for listening message for { select { case message := <-nc.WaitForMessage(id): ctx.SSEvent("message", message) ctx.Writer.Flush() case <-ctx.Request.Context().Done(): return } } } } func messageHandler(nc *NotificationCenter) gin.HandlerFunc { return func(ctx *gin.Context) { id := ctx.Param("id") message := fmt.Sprintf("Hello %s", id) if err := nc.Notify(id, message); err != nil { ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("id"))}) } } -
mirzaakhena revised this gist
Jul 27, 2020 . 2 changed files with 102 additions and 104 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -17,42 +17,40 @@ @SpringBootApplication public class WebsocketdemoApplication { Logger logger = LoggerFactory.getLogger(WebsocketdemoApplication.class); @Autowired private Environment env; public static void main(String[] args) { SpringApplication.run(WebsocketdemoApplication.class, args); } private final String PROP_HANDSHAKING_URL = "app.handshaking_url"; private final String PROP_HANDSHAKING_RETRY_IN_SECOND = "app.handshaking_retry_in_second"; private Client client; private WebTarget tut; private SseEventSource sse; @PostConstruct private void initAplikasi() { this.client = ClientBuilder.newClient(); this.tut = this.client.target(env.getProperty(PROP_HANDSHAKING_URL)); doHandshaking(); } private void doHandshaking() { logger.info(String.format("Start doHandshaking...")); int retry = Integer.parseInt(env.getProperty(PROP_HANDSHAKING_RETRY_IN_SECOND)); sse = SseEventSource.// target(this.tut).// reconnectingEvery(retry, TimeUnit.SECONDS).// build();// sse.register(this::onMessage, this::onError, () -> { logger.info(String.format("OnComplete")); try { Thread.sleep(retry * 1000L); } catch (InterruptedException e) { This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,146 +2,146 @@ package main import ( "fmt" "net/http" "sync" "github.com/gin-gonic/gin" ) // NotificationCenter ... type NotificationCenter struct { subscriberMessageChannelsID map[string]chan interface{} subscribersMu *sync.Mutex } // NewNotificationCenter ... func NewNotificationCenter() *NotificationCenter { return &NotificationCenter{ subscribersMu: &sync.Mutex{}, subscriberMessageChannelsID: make(map[string]chan interface{}), } } // WaitForMessage will blocking the process until returning the message func (nc *NotificationCenter) WaitForMessage(id string) <-chan interface{} { return nc.subscriberMessageChannelsID[id] } // Subscribe ... func (nc *NotificationCenter) Subscribe(id string) error { fmt.Printf(">>>> subscribe %s\n", id) nc.subscribersMu.Lock() defer nc.subscribersMu.Unlock() if _, exist := nc.subscriberMessageChannelsID[id]; exist { return fmt.Errorf("outlet %s already registered", id) } nc.subscriberMessageChannelsID[id] = make(chan interface{}) return nil } // Unsubscribe ... func (nc *NotificationCenter) Unsubscribe(id string) error { fmt.Printf(">>>> unsubscribe %s\n", id) nc.subscribersMu.Lock() defer nc.subscribersMu.Unlock() if _, exist := nc.subscriberMessageChannelsID[id]; !exist { return fmt.Errorf("outlet %s is not registered yet", id) } close(nc.subscriberMessageChannelsID[id]) delete(nc.subscriberMessageChannelsID, id) return nil } // Notify ... func (nc *NotificationCenter) Notify(id string, message interface{}) error { fmt.Printf(">>>> send message to %s\n", id) nc.subscribersMu.Lock() defer nc.subscribersMu.Unlock() if _, exist := nc.subscriberMessageChannelsID[id]; !exist { return fmt.Errorf("outlet %s is not registered", id) } nc.subscriberMessageChannelsID[id] <- message return nil } func handleSSE(nc *NotificationCenter) gin.HandlerFunc { return func(ctx *gin.Context) { id := ctx.Param("id") // subscribe the id and channel if err := nc.Subscribe(id); err != nil { ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } // unsubscribe if exit from this method defer func() { if err := nc.Unsubscribe(id); err != nil { ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } message := fmt.Sprintf("id %s close connection", id) ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": message}) }() // forever loop for listening message for { select { case message := <-nc.WaitForMessage(id): ctx.SSEvent("message", message) ctx.Writer.Flush() case <-ctx.Request.Context().Done(): return } } } } func messageHandler(nc *NotificationCenter) gin.HandlerFunc { return func(ctx *gin.Context) { id := ctx.Param("id") message := fmt.Sprintf("Hello %s", id) if err := nc.Notify(id, message); err != nil { ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("id"))}) } } func main() { r := gin.Default() nc := NewNotificationCenter() r.GET("/message/:id", messageHandler(nc)) r.GET("/handshake/:id", handleSSE(nc)) r.Run(":3000") } -
mirzaakhena revised this gist
Jul 27, 2020 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,7 +1,7 @@ package main import ( "fmt" "net/http" "sync" -
mirzaakhena revised this gist
Jul 27, 2020 . 1 changed file with 0 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -145,5 +145,3 @@ func main() { r.Run(":3000") } -
mirzaakhena revised this gist
Jul 27, 2020 . No changes.There are no files selected for viewing
-
mirzaakhena revised this gist
Jul 27, 2020 . 2 changed files with 90 additions and 104 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,4 +1,4 @@ package com.mirzaakhena.websocketdemo; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; @@ -15,24 +15,23 @@ import org.springframework.core.env.Environment; @SpringBootApplication public class WebsocketdemoApplication { Logger logger = LoggerFactory.getLogger(WebsocketdemoApplication.class); @Autowired private Environment env; public static void main(String[] args) { SpringApplication.run(WebsocketdemoApplication.class, args); } private final String PROP_HANDSHAKING_URL = "app.handshaking_url"; private final String PROP_HANDSHAKING_RETRY_IN_SECOND = "app.handshaking_retry_in_second"; private Client client; private WebTarget tut; private SseEventSource sse; @PostConstruct private void initAplikasi() { @@ -51,7 +50,7 @@ private void doHandshaking() { target(this.tut).// reconnectingEvery(retry, TimeUnit.SECONDS).// build();// sse.register(this::onMessage, this::onError, () -> { logger.info(String.format("OnComplete")); try { @@ -62,9 +61,9 @@ private void doHandshaking() { }); sse.open(); } private void onError(Throwable t) { logger.error("Error : %s", t.getMessage()); } private void onMessage(InboundSseEvent event) { @@ -74,6 +73,7 @@ private void onMessage(InboundSseEvent event) { } // application.properties // ----------------------- // server.port = 8080 This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,160 +1,146 @@ package main import ( "fmt" "net/http" "sync" "github.com/gin-gonic/gin" ) // NotificationCenter ... type NotificationCenter struct { subscriberMessageChannelsID map[string]chan interface{} subscribersMu *sync.Mutex } // NewNotificationCenter ... func NewNotificationCenter() *NotificationCenter { return &NotificationCenter{ subscribersMu: &sync.Mutex{}, subscriberMessageChannelsID: make(map[string]chan interface{}), } } // WaitForMessage will blocking the process until returning the message func (nc *NotificationCenter) WaitForMessage(id string) <-chan interface{} { return nc.subscriberMessageChannelsID[id] } // Subscribe ... func (nc *NotificationCenter) Subscribe(id string) error { fmt.Printf(">>>> subscribe %s\n", id) nc.subscribersMu.Lock() defer nc.subscribersMu.Unlock() if _, exist := nc.subscriberMessageChannelsID[id]; exist { return fmt.Errorf("outlet %s already registered", id) } nc.subscriberMessageChannelsID[id] = make(chan interface{}) return nil } // Unsubscribe ... func (nc *NotificationCenter) Unsubscribe(id string) error { fmt.Printf(">>>> unsubscribe %s\n", id) nc.subscribersMu.Lock() defer nc.subscribersMu.Unlock() if _, exist := nc.subscriberMessageChannelsID[id]; !exist { return fmt.Errorf("outlet %s is not registered yet", id) } close(nc.subscriberMessageChannelsID[id]) delete(nc.subscriberMessageChannelsID, id) return nil } // Notify ... func (nc *NotificationCenter) Notify(id string, message interface{}) error { fmt.Printf(">>>> send message to %s\n", id) nc.subscribersMu.Lock() defer nc.subscribersMu.Unlock() if _, exist := nc.subscriberMessageChannelsID[id]; !exist { return fmt.Errorf("outlet %s is not registered", id) } nc.subscriberMessageChannelsID[id] <- message return nil } func handleSSE(nc *NotificationCenter) gin.HandlerFunc { return func(ctx *gin.Context) { id := ctx.Param("id") // subscribe the id and channel if err := nc.Subscribe(id); err != nil { ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } // unsubscribe if exit from this method defer func() { if err := nc.Unsubscribe(id); err != nil { ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } message := fmt.Sprintf("id %s close connection", id) ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": message}) }() // forever loop for listening message for { select { case message := <-nc.WaitForMessage(id): ctx.SSEvent("message", message) ctx.Writer.Flush() case <-ctx.Request.Context().Done(): return } } } } func messageHandler(nc *NotificationCenter) gin.HandlerFunc { return func(ctx *gin.Context) { id := ctx.Param("id") message := fmt.Sprintf("Hello %s", id) if err := nc.Notify(id, message); err != nil { ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("id"))}) } } func main() { r := gin.Default() nc := NewNotificationCenter() r.GET("/message/:id", messageHandler(nc)) r.GET("/handshake/:id", handleSSE(nc)) r.Run(":3000") -
mirzaakhena revised this gist
Jul 25, 2020 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,4 +1,4 @@ package com.mirzaakhena.ssedemo; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; -
mirzaakhena revised this gist
Jul 25, 2020 . 2 changed files with 10 additions and 8 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -18,20 +18,21 @@ public class SSEApplication { Logger logger = LoggerFactory.getLogger(SSEApplication.class); private final String PROP_HANDSHAKING_URL = "app.handshaking_url"; private final String PROP_HANDSHAKING_RETRY_IN_SECOND = "app.handshaking_retry_in_second"; @Autowired private Environment env; private Client client; private WebTarget tut; private SseEventSource sse; // mvn spring-boot:run public static void main(String[] args) { SpringApplication.run(SSEApplication.class, args); } @PostConstruct private void initAplikasi() { This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -143,6 +143,7 @@ func messageHandler(nc Notifier) gin.HandlerFunc { } } // go run main.go func main() { r := gin.Default() -
mirzaakhena revised this gist
Jul 25, 2020 . 1 changed file with 18 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -78,3 +78,21 @@ private void onMessage(InboundSseEvent event) { // server.port = 8080 // app.handshaking_url = http://localhost:3000/handshake/0208 // app.handshaking_retry_in_second = 5 // pom.xml dependency // <dependencies> // <dependency> // <groupId>org.springframework.boot</groupId> // <artifactId>spring-boot-starter-web</artifactId> // </dependency> // <dependency> // <groupId>org.glassfish.jersey.media</groupId> // <artifactId>jersey-media-sse</artifactId> // <version>2.26</version> // </dependency> // <dependency> // <groupId>org.glassfish.jersey.inject</groupId> // <artifactId>jersey-hk2</artifactId> // <version>2.26</version> // </dependency> // </dependencies> -
mirzaakhena revised this gist
Jul 25, 2020 . 1 changed file with 6 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -148,9 +148,15 @@ func main() { r := gin.Default() nc := NewNotificationCenter() // test call with this // curl GET http://localhost:3000/message/0208 r.GET("/message/:outletCode", messageHandler(nc)) r.GET("/handshake/:outletCode", handleSSE(nc)) r.Run(":3000") } -
mirzaakhena revised this gist
Jul 25, 2020 . 1 changed file with 2 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -17,13 +17,13 @@ @SpringBootApplication public class SSEApplication { Logger logger = LoggerFactory.getLogger(SSEApplication.class); @Autowired private Environment env; public static void main(String[] args) { SpringApplication.run(SSEApplication.class, args); } private final String PROP_HANDSHAKING_URL = "app.handshaking_url"; -
mirzaakhena renamed this gist
Jul 25, 2020 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -15,7 +15,7 @@ import org.springframework.core.env.Environment; @SpringBootApplication public class SSEApplication { Logger logger = LoggerFactory.getLogger(WebsocketdemoApplication.class); -
mirzaakhena created this gist
Jul 25, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,80 @@ package com.mirzaakhena.websocketdemo; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.WebTarget; import javax.ws.rs.sse.InboundSseEvent; import javax.ws.rs.sse.SseEventSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.core.env.Environment; @SpringBootApplication public class WebsocketdemoApplication { Logger logger = LoggerFactory.getLogger(WebsocketdemoApplication.class); @Autowired private Environment env; public static void main(String[] args) { SpringApplication.run(WebsocketdemoApplication.class, args); } private final String PROP_HANDSHAKING_URL = "app.handshaking_url"; private final String PROP_HANDSHAKING_RETRY_IN_SECOND = "app.handshaking_retry_in_second"; private Client client; private WebTarget tut; private SseEventSource sse; @PostConstruct private void initAplikasi() { this.client = ClientBuilder.newClient(); this.tut = this.client.target(env.getProperty(PROP_HANDSHAKING_URL)); doHandshaking(); } private void doHandshaking() { logger.info(String.format("Start doHandshaking...")); int retry = Integer.parseInt(env.getProperty(PROP_HANDSHAKING_RETRY_IN_SECOND)); sse = SseEventSource.// target(this.tut).// reconnectingEvery(retry, TimeUnit.SECONDS).// build();// sse.register(this::onMessage, this::onError, () -> { logger.info(String.format("OnComplete")); try { Thread.sleep(retry * 1000L); } catch (InterruptedException e) { } doHandshaking(); }); sse.open(); } private void onError(Throwable t) { logger.info(String.format("Error?????")); } private void onMessage(InboundSseEvent event) { logger.info(String.format("Message : %s", event.readData())); } } // application.properties // ----------------------- // server.port = 8080 // app.handshaking_url = http://localhost:3000/handshake/0208 // app.handshaking_retry_in_second = 5 This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,156 @@ // update version of rikonor // https://gist.github.com/rikonor/e53a33c27ed64861c91a095a59f0aa44 // and ismasan // https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c // // // for SSE implementation // fixing: // * adding unsubscribe event for // // improvement: // * adding identifier (outletCode) of each channel package main import ( "fmt" "io" "net/http" "sync" "github.com/gin-gonic/gin" ) // UnsubscribeFunc ... type UnsubscribeFunc func() error // Subscriber ... type Subscriber interface { Subscribe(outletCode string, c chan string) (UnsubscribeFunc, error) } // Notifier ... type Notifier interface { Notify(outletCode, message string) error } // NotificationCenter is implementation type NotificationCenter struct { outletMapChannel map[string]chan string subscribers map[chan string]struct{} subscribersMu *sync.Mutex } // NewNotificationCenter is constructor func NewNotificationCenter() *NotificationCenter { return &NotificationCenter{ subscribers: map[chan string]struct{}{}, subscribersMu: &sync.Mutex{}, outletMapChannel: make(map[string]chan string), } } // Subscribe ... func (nc *NotificationCenter) Subscribe(outletCode string, c chan string) (UnsubscribeFunc, error) { fmt.Printf("try subscribe %s\n", outletCode) nc.subscribersMu.Lock() defer nc.subscribersMu.Unlock() if _, exist := nc.outletMapChannel[outletCode]; exist { return nil, fmt.Errorf("outlet %s already registered", outletCode) } nc.subscribers[c] = struct{}{} nc.outletMapChannel[outletCode] = c unsubscribeFn := func() error { fmt.Printf("unsubscribe %s\n", outletCode) nc.subscribersMu.Lock() defer nc.subscribersMu.Unlock() delete(nc.subscribers, c) delete(nc.outletMapChannel, outletCode) return nil } return unsubscribeFn, nil } // Notify ... func (nc *NotificationCenter) Notify(outletCode, message string) error { fmt.Printf("notify %s\n", outletCode) nc.subscribersMu.Lock() defer nc.subscribersMu.Unlock() nc.outletMapChannel[outletCode] <- message return nil } func handleSSE(s Subscriber) gin.HandlerFunc { return func(ctx *gin.Context) { // prepare the channel c := make(chan string) // subscribe the outletCode and channel unsubscribeFn, err := s.Subscribe(ctx.Param("outletCode"), c) if err != nil { fmt.Printf("E %v\n", err.Error()) ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } // unsubscribe when exit this function defer func() { if err := unsubscribeFn(); err != nil { ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } }() // unsubscribe when client close the connections notify := ctx.Request.Context().Done() go func() { <-notify if err := unsubscribeFn(); err != nil { ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) return } }() // forever loop for listening message for { ctx.Stream(func(w io.Writer) bool { if message, ok := <-c; ok { ctx.SSEvent("message", message) return true } return false }) } } } func messageHandler(nc Notifier) gin.HandlerFunc { return func(ctx *gin.Context) { outletCode := ctx.Param("outletCode") nc.Notify(outletCode, fmt.Sprintf("Hello %s", outletCode)) ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("outletCode"))}) } } func main() { r := gin.Default() nc := NewNotificationCenter() r.GET("/message/:outletCode", messageHandler(nc)) r.GET("/handshake/:outletCode", handleSSE(nc)) r.Run(":3000") }