Skip to content

Instantly share code, notes, and snippets.

@mirzaakhena
Last active April 3, 2024 01:59
Show Gist options
  • Select an option

  • Save mirzaakhena/c75caa8529e648b7fc7b4becd4ca6fe6 to your computer and use it in GitHub Desktop.

Select an option

Save mirzaakhena/c75caa8529e648b7fc7b4becd4ca6fe6 to your computer and use it in GitHub Desktop.

Revisions

  1. mirzaakhena revised this gist Jul 27, 2020. 1 changed file with 0 additions and 147 deletions.
    147 changes: 0 additions & 147 deletions main.go
    Original file line number Diff line number Diff line change
    @@ -1,147 +0,0 @@
    package main

    import (
    "fmt"
    "net/http"
    "sync"

    "github.com/gin-gonic/gin"
    )

    // NotificationCenter ...
    type NotificationCenter struct {
    subscriberMessageChannelsID map[string]chan interface{}
    subscribersMu *sync.Mutex
    }

    // NewNotificationCenter ...
    func NewNotificationCenter() *NotificationCenter {
    return &NotificationCenter{
    subscribersMu: &sync.Mutex{},
    subscriberMessageChannelsID: make(map[string]chan interface{}),
    }
    }

    // WaitForMessage will blocking the process until returning the message
    func (nc *NotificationCenter) WaitForMessage(id string) <-chan interface{} {
    return nc.subscriberMessageChannelsID[id]
    }

    // Subscribe ...
    func (nc *NotificationCenter) Subscribe(id string) error {

    fmt.Printf(">>>> subscribe %s\n", id)

    nc.subscribersMu.Lock()
    defer nc.subscribersMu.Unlock()

    if _, exist := nc.subscriberMessageChannelsID[id]; exist {
    return fmt.Errorf("outlet %s already registered", id)
    }

    nc.subscriberMessageChannelsID[id] = make(chan interface{})

    return nil
    }

    // Unsubscribe ...
    func (nc *NotificationCenter) Unsubscribe(id string) error {

    fmt.Printf(">>>> unsubscribe %s\n", id)

    nc.subscribersMu.Lock()
    defer nc.subscribersMu.Unlock()

    if _, exist := nc.subscriberMessageChannelsID[id]; !exist {
    return fmt.Errorf("outlet %s is not registered yet", id)
    }

    close(nc.subscriberMessageChannelsID[id])
    delete(nc.subscriberMessageChannelsID, id)

    return nil
    }

    // Notify ...
    func (nc *NotificationCenter) Notify(id string, message interface{}) error {

    fmt.Printf(">>>> send message to %s\n", id)

    nc.subscribersMu.Lock()
    defer nc.subscribersMu.Unlock()

    if _, exist := nc.subscriberMessageChannelsID[id]; !exist {
    return fmt.Errorf("outlet %s is not registered", id)
    }

    nc.subscriberMessageChannelsID[id] <- message

    return nil
    }

    func handleSSE(nc *NotificationCenter) gin.HandlerFunc {
    return func(ctx *gin.Context) {

    id := ctx.Param("id")

    // subscribe the id and channel
    if err := nc.Subscribe(id); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }

    // unsubscribe if exit from this method
    defer func() {
    if err := nc.Unsubscribe(id); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }
    message := fmt.Sprintf("id %s close connection", id)
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": message})
    }()

    // forever loop for listening message
    for {

    select {

    case message := <-nc.WaitForMessage(id):
    ctx.SSEvent("message", message)
    ctx.Writer.Flush()

    case <-ctx.Request.Context().Done():
    return
    }

    }

    }
    }

    func messageHandler(nc *NotificationCenter) gin.HandlerFunc {
    return func(ctx *gin.Context) {
    id := ctx.Param("id")

    message := fmt.Sprintf("Hello %s", id)

    if err := nc.Notify(id, message); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }
    ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("id"))})
    }
    }

    func main() {

    r := gin.Default()

    nc := NewNotificationCenter()

    r.GET("/message/:id", messageHandler(nc))

    r.GET("/handshake/:id", handleSSE(nc))

    r.Run(":3000")

    }
  2. mirzaakhena renamed this gist Jul 27, 2020. 1 changed file with 3 additions and 3 deletions.
    6 changes: 3 additions & 3 deletions SSE → SSE.java
    Original file line number Diff line number Diff line change
    @@ -15,15 +15,15 @@
    import org.springframework.core.env.Environment;

    @SpringBootApplication
    public class WebsocketdemoApplication {
    public class SSE {

    Logger logger = LoggerFactory.getLogger(WebsocketdemoApplication.class);
    Logger logger = LoggerFactory.getLogger(SSE.class);

    @Autowired
    private Environment env;

    public static void main(String[] args) {
    SpringApplication.run(WebsocketdemoApplication.class, args);
    SpringApplication.run(SSE.class, args);
    }

    private final String PROP_HANDSHAKING_URL = "app.handshaking_url";
  3. mirzaakhena renamed this gist Jul 27, 2020. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  4. mirzaakhena revised this gist Jul 27, 2020. 1 changed file with 12 additions and 12 deletions.
    24 changes: 12 additions & 12 deletions WebsocketdemoApplication.java.java
    Original file line number Diff line number Diff line change
    @@ -37,19 +37,27 @@ private void initAplikasi() {

    Client client = ClientBuilder.newClient();
    this.tut = client.target(env.getProperty(PROP_HANDSHAKING_URL));

    doHandshaking();
    }

    private void doHandshaking() {
    logger.info(String.format("Handshaking..."));
    sse = SseEventSource.target(this.tut).build();//
    sse.register(this::onMessage, this::onError, this::rehandshaking);
    sse.register(this::onMessage, this::onError, this::onComplete);
    sse.open();
    }

    private void rehandshaking() {
    logger.info(String.format("Connection Closed!"));
    private void onMessage(InboundSseEvent event) {
    logger.info(String.format("Message : %s | %s", event.getName(), event.readData()));
    }

    private void onError(Throwable t) {
    logger.error("Error : %s", t.getMessage());
    }

    private void onComplete() {
    logger.info(String.format("Connection Closed!"));
    try {
    int retry = Integer.parseInt(env.getProperty(PROP_HANDSHAKING_RETRY_IN_SECOND));
    Thread.sleep(retry * 1000L);
    @@ -58,14 +66,6 @@ private void rehandshaking() {
    doHandshaking();
    }

    private void onError(Throwable t) {
    logger.error("Error : %s", t.getMessage());
    }

    private void onMessage(InboundSseEvent event) {
    logger.info(String.format("Message : %s", event.readData()));
    }

    }


  5. mirzaakhena revised this gist Jul 27, 2020. 1 changed file with 2 additions and 3 deletions.
    5 changes: 2 additions & 3 deletions WebsocketdemoApplication.java.java
    Original file line number Diff line number Diff line change
    @@ -29,15 +29,14 @@ public static void main(String[] args) {
    private final String PROP_HANDSHAKING_URL = "app.handshaking_url";
    private final String PROP_HANDSHAKING_RETRY_IN_SECOND = "app.handshaking_retry_in_second";

    private Client client;
    private WebTarget tut;
    private SseEventSource sse;

    @PostConstruct
    private void initAplikasi() {

    this.client = ClientBuilder.newClient();
    this.tut = this.client.target(env.getProperty(PROP_HANDSHAKING_URL));
    Client client = ClientBuilder.newClient();
    this.tut = client.target(env.getProperty(PROP_HANDSHAKING_URL));

    doHandshaking();
    }
  6. mirzaakhena revised this gist Jul 27, 2020. 1 changed file with 24 additions and 25 deletions.
    49 changes: 24 additions & 25 deletions WebsocketdemoApplication.java.java
    Original file line number Diff line number Diff line change
    @@ -35,38 +35,37 @@ public static void main(String[] args) {

    @PostConstruct
    private void initAplikasi() {

    this.client = ClientBuilder.newClient();
    this.tut = this.client.target(env.getProperty(PROP_HANDSHAKING_URL));

    doHandshaking();
    }

    private void doHandshaking() {
    logger.info(String.format("Start doHandshaking..."));
    logger.info(String.format("Handshaking..."));
    sse = SseEventSource.target(this.tut).build();//
    sse.register(this::onMessage, this::onError, this::rehandshaking);
    sse.open();
    }

    int retry = Integer.parseInt(env.getProperty(PROP_HANDSHAKING_RETRY_IN_SECOND));
    sse = SseEventSource.//
    target(this.tut).//
    reconnectingEvery(retry, TimeUnit.SECONDS).//
    build();//

    sse.register(this::onMessage, this::onError, () -> {
    logger.info(String.format("OnComplete"));
    try {
    Thread.sleep(retry * 1000L);
    } catch (InterruptedException e) {
    }
    doHandshaking();
    });
    sse.open();
    }

    private void onError(Throwable t) {
    logger.error("Error : %s", t.getMessage());
    }

    private void onMessage(InboundSseEvent event) {
    logger.info(String.format("Message : %s", event.readData()));
    }
    private void rehandshaking() {
    logger.info(String.format("Connection Closed!"));
    try {
    int retry = Integer.parseInt(env.getProperty(PROP_HANDSHAKING_RETRY_IN_SECOND));
    Thread.sleep(retry * 1000L);
    } catch (InterruptedException e) {
    }
    doHandshaking();
    }

    private void onError(Throwable t) {
    logger.error("Error : %s", t.getMessage());
    }

    private void onMessage(InboundSseEvent event) {
    logger.info(String.format("Message : %s", event.readData()));
    }

    }

  7. mirzaakhena revised this gist Jul 27, 2020. 1 changed file with 37 additions and 37 deletions.
    74 changes: 37 additions & 37 deletions main.go
    Original file line number Diff line number Diff line change
    @@ -17,8 +17,8 @@ type NotificationCenter struct {
    // NewNotificationCenter ...
    func NewNotificationCenter() *NotificationCenter {
    return &NotificationCenter{
    subscribersMu: &sync.Mutex{},
    subscriberMessageChannelsID: make(map[string]chan interface{}),
    subscribersMu: &sync.Mutex{},
    subscriberMessageChannelsID: make(map[string]chan interface{}),
    }
    }

    @@ -36,7 +36,7 @@ func (nc *NotificationCenter) Subscribe(id string) error {
    defer nc.subscribersMu.Unlock()

    if _, exist := nc.subscriberMessageChannelsID[id]; exist {
    return fmt.Errorf("outlet %s already registered", id)
    return fmt.Errorf("outlet %s already registered", id)
    }

    nc.subscriberMessageChannelsID[id] = make(chan interface{})
    @@ -53,7 +53,7 @@ func (nc *NotificationCenter) Unsubscribe(id string) error {
    defer nc.subscribersMu.Unlock()

    if _, exist := nc.subscriberMessageChannelsID[id]; !exist {
    return fmt.Errorf("outlet %s is not registered yet", id)
    return fmt.Errorf("outlet %s is not registered yet", id)
    }

    close(nc.subscriberMessageChannelsID[id])
    @@ -71,7 +71,7 @@ func (nc *NotificationCenter) Notify(id string, message interface{}) error {
    defer nc.subscribersMu.Unlock()

    if _, exist := nc.subscriberMessageChannelsID[id]; !exist {
    return fmt.Errorf("outlet %s is not registered", id)
    return fmt.Errorf("outlet %s is not registered", id)
    }

    nc.subscriberMessageChannelsID[id] <- message
    @@ -82,53 +82,53 @@ func (nc *NotificationCenter) Notify(id string, message interface{}) error {
    func handleSSE(nc *NotificationCenter) gin.HandlerFunc {
    return func(ctx *gin.Context) {

    id := ctx.Param("id")
    id := ctx.Param("id")

    // subscribe the id and channel
    if err := nc.Subscribe(id); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }
    // subscribe the id and channel
    if err := nc.Subscribe(id); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }

    // unsubscribe if exit from this method
    defer func() {
    if err := nc.Unsubscribe(id); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }
    message := fmt.Sprintf("id %s close connection", id)
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": message})
    }()
    // unsubscribe if exit from this method
    defer func() {
    if err := nc.Unsubscribe(id); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }
    message := fmt.Sprintf("id %s close connection", id)
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": message})
    }()

    // forever loop for listening message
    for {
    // forever loop for listening message
    for {

    select {
    select {

    case message := <-nc.WaitForMessage(id):
    ctx.SSEvent("message", message)
    ctx.Writer.Flush()
    case message := <-nc.WaitForMessage(id):
    ctx.SSEvent("message", message)
    ctx.Writer.Flush()

    case <-ctx.Request.Context().Done():
    return
    }
    case <-ctx.Request.Context().Done():
    return
    }

    }
    }

    }
    }

    func messageHandler(nc *NotificationCenter) gin.HandlerFunc {
    return func(ctx *gin.Context) {
    id := ctx.Param("id")
    id := ctx.Param("id")

    message := fmt.Sprintf("Hello %s", id)
    message := fmt.Sprintf("Hello %s", id)

    if err := nc.Notify(id, message); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }
    ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("id"))})
    if err := nc.Notify(id, message); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }
    ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("id"))})
    }
    }

  8. mirzaakhena revised this gist Jul 27, 2020. 2 changed files with 102 additions and 104 deletions.
    54 changes: 26 additions & 28 deletions WebsocketdemoApplication.java.java
    Original file line number Diff line number Diff line change
    @@ -17,42 +17,40 @@
    @SpringBootApplication
    public class WebsocketdemoApplication {

    Logger logger = LoggerFactory.getLogger(WebsocketdemoApplication.class);
    Logger logger = LoggerFactory.getLogger(WebsocketdemoApplication.class);

    @Autowired
    private Environment env;
    @Autowired
    private Environment env;

    public static void main(String[] args) {
    SpringApplication.run(WebsocketdemoApplication.class, args);
    }

    private final String PROP_HANDSHAKING_URL = "app.handshaking_url";
    private final String PROP_HANDSHAKING_RETRY_IN_SECOND = "app.handshaking_retry_in_second";
    public static void main(String[] args) {
    SpringApplication.run(WebsocketdemoApplication.class, args);
    }

    private Client client;
    private WebTarget tut;
    private SseEventSource sse;
    private final String PROP_HANDSHAKING_URL = "app.handshaking_url";
    private final String PROP_HANDSHAKING_RETRY_IN_SECOND = "app.handshaking_retry_in_second";

    @PostConstruct
    private void initAplikasi() {
    private Client client;
    private WebTarget tut;
    private SseEventSource sse;

    this.client = ClientBuilder.newClient();
    this.tut = this.client.target(env.getProperty(PROP_HANDSHAKING_URL));

    doHandshaking();
    }
    @PostConstruct
    private void initAplikasi() {
    this.client = ClientBuilder.newClient();
    this.tut = this.client.target(env.getProperty(PROP_HANDSHAKING_URL));
    doHandshaking();
    }

    private void doHandshaking() {
    logger.info(String.format("Start doHandshaking..."));
    private void doHandshaking() {
    logger.info(String.format("Start doHandshaking..."));

    int retry = Integer.parseInt(env.getProperty(PROP_HANDSHAKING_RETRY_IN_SECOND));
    sse = SseEventSource.//
    target(this.tut).//
    reconnectingEvery(retry, TimeUnit.SECONDS).//
    build();//
    int retry = Integer.parseInt(env.getProperty(PROP_HANDSHAKING_RETRY_IN_SECOND));
    sse = SseEventSource.//
    target(this.tut).//
    reconnectingEvery(retry, TimeUnit.SECONDS).//
    build();//

    sse.register(this::onMessage, this::onError, () -> {
    logger.info(String.format("OnComplete"));
    sse.register(this::onMessage, this::onError, () -> {
    logger.info(String.format("OnComplete"));
    try {
    Thread.sleep(retry * 1000L);
    } catch (InterruptedException e) {
    152 changes: 76 additions & 76 deletions main.go
    Original file line number Diff line number Diff line change
    @@ -2,146 +2,146 @@ package main

    import (
    "fmt"
    "net/http"
    "sync"
    "net/http"
    "sync"

    "github.com/gin-gonic/gin"
    "github.com/gin-gonic/gin"
    )

    // NotificationCenter ...
    type NotificationCenter struct {
    subscriberMessageChannelsID map[string]chan interface{}
    subscribersMu *sync.Mutex
    subscriberMessageChannelsID map[string]chan interface{}
    subscribersMu *sync.Mutex
    }

    // NewNotificationCenter ...
    func NewNotificationCenter() *NotificationCenter {
    return &NotificationCenter{
    subscribersMu: &sync.Mutex{},
    subscriberMessageChannelsID: make(map[string]chan interface{}),
    }
    return &NotificationCenter{
    subscribersMu: &sync.Mutex{},
    subscriberMessageChannelsID: make(map[string]chan interface{}),
    }
    }

    // WaitForMessage will blocking the process until returning the message
    func (nc *NotificationCenter) WaitForMessage(id string) <-chan interface{} {
    return nc.subscriberMessageChannelsID[id]
    return nc.subscriberMessageChannelsID[id]
    }

    // Subscribe ...
    func (nc *NotificationCenter) Subscribe(id string) error {

    fmt.Printf(">>>> subscribe %s\n", id)
    fmt.Printf(">>>> subscribe %s\n", id)

    nc.subscribersMu.Lock()
    defer nc.subscribersMu.Unlock()
    nc.subscribersMu.Lock()
    defer nc.subscribersMu.Unlock()

    if _, exist := nc.subscriberMessageChannelsID[id]; exist {
    return fmt.Errorf("outlet %s already registered", id)
    }
    if _, exist := nc.subscriberMessageChannelsID[id]; exist {
    return fmt.Errorf("outlet %s already registered", id)
    }

    nc.subscriberMessageChannelsID[id] = make(chan interface{})
    nc.subscriberMessageChannelsID[id] = make(chan interface{})

    return nil
    return nil
    }

    // Unsubscribe ...
    func (nc *NotificationCenter) Unsubscribe(id string) error {

    fmt.Printf(">>>> unsubscribe %s\n", id)
    fmt.Printf(">>>> unsubscribe %s\n", id)

    nc.subscribersMu.Lock()
    defer nc.subscribersMu.Unlock()
    nc.subscribersMu.Lock()
    defer nc.subscribersMu.Unlock()

    if _, exist := nc.subscriberMessageChannelsID[id]; !exist {
    return fmt.Errorf("outlet %s is not registered yet", id)
    }
    if _, exist := nc.subscriberMessageChannelsID[id]; !exist {
    return fmt.Errorf("outlet %s is not registered yet", id)
    }

    close(nc.subscriberMessageChannelsID[id])
    delete(nc.subscriberMessageChannelsID, id)
    close(nc.subscriberMessageChannelsID[id])
    delete(nc.subscriberMessageChannelsID, id)

    return nil
    return nil
    }

    // Notify ...
    func (nc *NotificationCenter) Notify(id string, message interface{}) error {

    fmt.Printf(">>>> send message to %s\n", id)
    fmt.Printf(">>>> send message to %s\n", id)

    nc.subscribersMu.Lock()
    defer nc.subscribersMu.Unlock()
    nc.subscribersMu.Lock()
    defer nc.subscribersMu.Unlock()

    if _, exist := nc.subscriberMessageChannelsID[id]; !exist {
    return fmt.Errorf("outlet %s is not registered", id)
    }
    if _, exist := nc.subscriberMessageChannelsID[id]; !exist {
    return fmt.Errorf("outlet %s is not registered", id)
    }

    nc.subscriberMessageChannelsID[id] <- message
    nc.subscriberMessageChannelsID[id] <- message

    return nil
    return nil
    }

    func handleSSE(nc *NotificationCenter) gin.HandlerFunc {
    return func(ctx *gin.Context) {
    return func(ctx *gin.Context) {

    id := ctx.Param("id")
    id := ctx.Param("id")

    // subscribe the id and channel
    if err := nc.Subscribe(id); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }
    // subscribe the id and channel
    if err := nc.Subscribe(id); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }

    // unsubscribe if exit from this method
    defer func() {
    if err := nc.Unsubscribe(id); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }
    message := fmt.Sprintf("id %s close connection", id)
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": message})
    }()
    // unsubscribe if exit from this method
    defer func() {
    if err := nc.Unsubscribe(id); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }
    message := fmt.Sprintf("id %s close connection", id)
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": message})
    }()

    // forever loop for listening message
    for {
    // forever loop for listening message
    for {

    select {
    select {

    case message := <-nc.WaitForMessage(id):
    ctx.SSEvent("message", message)
    ctx.Writer.Flush()
    case message := <-nc.WaitForMessage(id):
    ctx.SSEvent("message", message)
    ctx.Writer.Flush()

    case <-ctx.Request.Context().Done():
    return
    }
    case <-ctx.Request.Context().Done():
    return
    }

    }
    }

    }
    }
    }

    func messageHandler(nc *NotificationCenter) gin.HandlerFunc {
    return func(ctx *gin.Context) {
    id := ctx.Param("id")
    return func(ctx *gin.Context) {
    id := ctx.Param("id")

    message := fmt.Sprintf("Hello %s", id)
    message := fmt.Sprintf("Hello %s", id)

    if err := nc.Notify(id, message); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }
    ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("id"))})
    }
    if err := nc.Notify(id, message); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }
    ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("id"))})
    }
    }

    func main() {

    r := gin.Default()
    r := gin.Default()

    nc := NewNotificationCenter()
    nc := NewNotificationCenter()

    r.GET("/message/:id", messageHandler(nc))
    r.GET("/message/:id", messageHandler(nc))

    r.GET("/handshake/:id", handleSSE(nc))
    r.GET("/handshake/:id", handleSSE(nc))

    r.Run(":3000")
    r.Run(":3000")

    }
  9. mirzaakhena revised this gist Jul 27, 2020. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion main.go
    Original file line number Diff line number Diff line change
    @@ -1,7 +1,7 @@
    package main

    import (
    "fmt"
    "fmt"
    "net/http"
    "sync"

  10. mirzaakhena revised this gist Jul 27, 2020. 1 changed file with 0 additions and 2 deletions.
    2 changes: 0 additions & 2 deletions main.go
    Original file line number Diff line number Diff line change
    @@ -145,5 +145,3 @@ func main() {
    r.Run(":3000")

    }


  11. mirzaakhena revised this gist Jul 27, 2020. No changes.
  12. mirzaakhena revised this gist Jul 27, 2020. 2 changed files with 90 additions and 104 deletions.
    30 changes: 15 additions & 15 deletions SSEApplication.java → WebsocketdemoApplication.java.java
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,4 @@
    package com.mirzaakhena.ssedemo;
    package com.mirzaakhena.websocketdemo;

    import java.util.concurrent.TimeUnit;
    import javax.annotation.PostConstruct;
    @@ -15,24 +15,23 @@
    import org.springframework.core.env.Environment;

    @SpringBootApplication
    public class SSEApplication {
    public class WebsocketdemoApplication {

    Logger logger = LoggerFactory.getLogger(WebsocketdemoApplication.class);

    Logger logger = LoggerFactory.getLogger(SSEApplication.class);

    private final String PROP_HANDSHAKING_URL = "app.handshaking_url";
    private final String PROP_HANDSHAKING_RETRY_IN_SECOND = "app.handshaking_retry_in_second";

    @Autowired
    private Environment env;

    public static void main(String[] args) {
    SpringApplication.run(WebsocketdemoApplication.class, args);
    }

    private final String PROP_HANDSHAKING_URL = "app.handshaking_url";
    private final String PROP_HANDSHAKING_RETRY_IN_SECOND = "app.handshaking_retry_in_second";

    private Client client;
    private WebTarget tut;
    private SseEventSource sse;

    // mvn spring-boot:run
    public static void main(String[] args) {
    SpringApplication.run(SSEApplication.class, args);
    }

    @PostConstruct
    private void initAplikasi() {
    @@ -51,7 +50,7 @@ private void doHandshaking() {
    target(this.tut).//
    reconnectingEvery(retry, TimeUnit.SECONDS).//
    build();//

    sse.register(this::onMessage, this::onError, () -> {
    logger.info(String.format("OnComplete"));
    try {
    @@ -62,9 +61,9 @@ private void doHandshaking() {
    });
    sse.open();
    }

    private void onError(Throwable t) {
    logger.info(String.format("Error?????"));
    logger.error("Error : %s", t.getMessage());
    }

    private void onMessage(InboundSseEvent event) {
    @@ -74,6 +73,7 @@ private void onMessage(InboundSseEvent event) {
    }



    // application.properties
    // -----------------------
    // server.port = 8080
    164 changes: 75 additions & 89 deletions main.go
    Original file line number Diff line number Diff line change
    @@ -1,160 +1,146 @@
    // update version of rikonor
    // https://gist.github.com/rikonor/e53a33c27ed64861c91a095a59f0aa44
    // and ismasan
    // https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c
    //
    //
    // for SSE implementation
    // fixing:
    // * adding unsubscribe event for
    //
    // improvement:
    // * adding identifier (outletCode) of each channel

    package main

    import (
    "fmt"
    "io"
    "net/http"
    "sync"

    "github.com/gin-gonic/gin"
    )

    // UnsubscribeFunc ...
    type UnsubscribeFunc func() error

    // Subscriber ...
    type Subscriber interface {
    Subscribe(outletCode string, c chan string) (UnsubscribeFunc, error)
    }

    // Notifier ...
    type Notifier interface {
    Notify(outletCode, message string) error
    }

    // NotificationCenter is implementation
    // NotificationCenter ...
    type NotificationCenter struct {
    outletMapChannel map[string]chan string
    subscribers map[chan string]struct{}
    subscribersMu *sync.Mutex
    subscriberMessageChannelsID map[string]chan interface{}
    subscribersMu *sync.Mutex
    }

    // NewNotificationCenter is constructor
    // NewNotificationCenter ...
    func NewNotificationCenter() *NotificationCenter {
    return &NotificationCenter{
    subscribers: map[chan string]struct{}{},
    subscribersMu: &sync.Mutex{},
    outletMapChannel: make(map[string]chan string),
    subscribersMu: &sync.Mutex{},
    subscriberMessageChannelsID: make(map[string]chan interface{}),
    }
    }

    // WaitForMessage will blocking the process until returning the message
    func (nc *NotificationCenter) WaitForMessage(id string) <-chan interface{} {
    return nc.subscriberMessageChannelsID[id]
    }

    // Subscribe ...
    func (nc *NotificationCenter) Subscribe(outletCode string, c chan string) (UnsubscribeFunc, error) {
    func (nc *NotificationCenter) Subscribe(id string) error {

    fmt.Printf("try subscribe %s\n", outletCode)
    fmt.Printf(">>>> subscribe %s\n", id)

    nc.subscribersMu.Lock()
    defer nc.subscribersMu.Unlock()

    if _, exist := nc.outletMapChannel[outletCode]; exist {
    return nil, fmt.Errorf("outlet %s already registered", outletCode)
    if _, exist := nc.subscriberMessageChannelsID[id]; exist {
    return fmt.Errorf("outlet %s already registered", id)
    }

    nc.subscribers[c] = struct{}{}
    nc.outletMapChannel[outletCode] = c
    nc.subscriberMessageChannelsID[id] = make(chan interface{})

    return nil
    }

    // Unsubscribe ...
    func (nc *NotificationCenter) Unsubscribe(id string) error {

    fmt.Printf(">>>> unsubscribe %s\n", id)

    unsubscribeFn := func() error {
    fmt.Printf("unsubscribe %s\n", outletCode)
    nc.subscribersMu.Lock()
    defer nc.subscribersMu.Unlock()
    delete(nc.subscribers, c)
    delete(nc.outletMapChannel, outletCode)
    nc.subscribersMu.Lock()
    defer nc.subscribersMu.Unlock()

    return nil
    if _, exist := nc.subscriberMessageChannelsID[id]; !exist {
    return fmt.Errorf("outlet %s is not registered yet", id)
    }

    return unsubscribeFn, nil
    close(nc.subscriberMessageChannelsID[id])
    delete(nc.subscriberMessageChannelsID, id)

    return nil
    }

    // Notify ...
    func (nc *NotificationCenter) Notify(outletCode, message string) error {
    fmt.Printf("notify %s\n", outletCode)
    func (nc *NotificationCenter) Notify(id string, message interface{}) error {

    fmt.Printf(">>>> send message to %s\n", id)

    nc.subscribersMu.Lock()
    defer nc.subscribersMu.Unlock()
    nc.outletMapChannel[outletCode] <- message

    if _, exist := nc.subscriberMessageChannelsID[id]; !exist {
    return fmt.Errorf("outlet %s is not registered", id)
    }

    nc.subscriberMessageChannelsID[id] <- message

    return nil
    }

    func handleSSE(s Subscriber) gin.HandlerFunc {
    func handleSSE(nc *NotificationCenter) gin.HandlerFunc {
    return func(ctx *gin.Context) {

    // prepare the channel
    c := make(chan string)
    id := ctx.Param("id")

    // subscribe the outletCode and channel
    unsubscribeFn, err := s.Subscribe(ctx.Param("outletCode"), c)
    if err != nil {
    fmt.Printf("E %v\n", err.Error())
    // subscribe the id and channel
    if err := nc.Subscribe(id); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }

    // unsubscribe when exit this function
    // unsubscribe if exit from this method
    defer func() {
    if err := unsubscribeFn(); err != nil {
    if err := nc.Unsubscribe(id); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }

    message := fmt.Sprintf("id %s close connection", id)
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": message})
    }()

    // unsubscribe when client close the connections
    notify := ctx.Request.Context().Done()
    go func() {
    <-notify
    if err := unsubscribeFn(); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    // forever loop for listening message
    for {

    select {

    case message := <-nc.WaitForMessage(id):
    ctx.SSEvent("message", message)
    ctx.Writer.Flush()

    case <-ctx.Request.Context().Done():
    return
    }
    }()

    // forever loop for listening message
    for {
    ctx.Stream(func(w io.Writer) bool {
    if message, ok := <-c; ok {
    ctx.SSEvent("message", message)
    return true
    }
    return false
    })
    }

    }
    }

    func messageHandler(nc Notifier) gin.HandlerFunc {
    func messageHandler(nc *NotificationCenter) gin.HandlerFunc {
    return func(ctx *gin.Context) {
    outletCode := ctx.Param("outletCode")
    nc.Notify(outletCode, fmt.Sprintf("Hello %s", outletCode))
    ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("outletCode"))})
    id := ctx.Param("id")

    message := fmt.Sprintf("Hello %s", id)

    if err := nc.Notify(id, message); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }
    ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("id"))})
    }
    }

    // go run main.go
    func main() {

    r := gin.Default()

    nc := NewNotificationCenter()

    // test call with this
    // curl GET http://localhost:3000/message/0208
    r.GET("/message/:outletCode", messageHandler(nc))

    r.GET("/handshake/:outletCode", handleSSE(nc))

    r.GET("/message/:id", messageHandler(nc))

    r.GET("/handshake/:id", handleSSE(nc))

    r.Run(":3000")

  13. mirzaakhena revised this gist Jul 25, 2020. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion SSEApplication.java
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,4 @@
    package com.mirzaakhena.websocketdemo;
    package com.mirzaakhena.ssedemo;

    import java.util.concurrent.TimeUnit;
    import javax.annotation.PostConstruct;
  14. mirzaakhena revised this gist Jul 25, 2020. 2 changed files with 10 additions and 8 deletions.
    17 changes: 9 additions & 8 deletions SSEApplication.java
    Original file line number Diff line number Diff line change
    @@ -18,20 +18,21 @@
    public class SSEApplication {

    Logger logger = LoggerFactory.getLogger(SSEApplication.class);

    @Autowired
    private Environment env;

    public static void main(String[] args) {
    SpringApplication.run(SSEApplication.class, args);
    }


    private final String PROP_HANDSHAKING_URL = "app.handshaking_url";
    private final String PROP_HANDSHAKING_RETRY_IN_SECOND = "app.handshaking_retry_in_second";

    @Autowired
    private Environment env;

    private Client client;
    private WebTarget tut;
    private SseEventSource sse;

    // mvn spring-boot:run
    public static void main(String[] args) {
    SpringApplication.run(SSEApplication.class, args);
    }

    @PostConstruct
    private void initAplikasi() {
    1 change: 1 addition & 0 deletions main.go
    Original file line number Diff line number Diff line change
    @@ -143,6 +143,7 @@ func messageHandler(nc Notifier) gin.HandlerFunc {
    }
    }

    // go run main.go
    func main() {

    r := gin.Default()
  15. mirzaakhena revised this gist Jul 25, 2020. 1 changed file with 18 additions and 0 deletions.
    18 changes: 18 additions & 0 deletions SSEApplication.java
    Original file line number Diff line number Diff line change
    @@ -78,3 +78,21 @@ private void onMessage(InboundSseEvent event) {
    // server.port = 8080
    // app.handshaking_url = http://localhost:3000/handshake/0208
    // app.handshaking_retry_in_second = 5

    // pom.xml dependency
    // <dependencies>
    // <dependency>
    // <groupId>org.springframework.boot</groupId>
    // <artifactId>spring-boot-starter-web</artifactId>
    // </dependency>
    // <dependency>
    // <groupId>org.glassfish.jersey.media</groupId>
    // <artifactId>jersey-media-sse</artifactId>
    // <version>2.26</version>
    // </dependency>
    // <dependency>
    // <groupId>org.glassfish.jersey.inject</groupId>
    // <artifactId>jersey-hk2</artifactId>
    // <version>2.26</version>
    // </dependency>
    // </dependencies>
  16. mirzaakhena revised this gist Jul 25, 2020. 1 changed file with 6 additions and 0 deletions.
    6 changes: 6 additions & 0 deletions main.go
    Original file line number Diff line number Diff line change
    @@ -148,9 +148,15 @@ func main() {
    r := gin.Default()

    nc := NewNotificationCenter()

    // test call with this
    // curl GET http://localhost:3000/message/0208
    r.GET("/message/:outletCode", messageHandler(nc))

    r.GET("/handshake/:outletCode", handleSSE(nc))

    r.Run(":3000")

    }


  17. mirzaakhena revised this gist Jul 25, 2020. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions SSEApplication.java
    Original file line number Diff line number Diff line change
    @@ -17,13 +17,13 @@
    @SpringBootApplication
    public class SSEApplication {

    Logger logger = LoggerFactory.getLogger(WebsocketdemoApplication.class);
    Logger logger = LoggerFactory.getLogger(SSEApplication.class);

    @Autowired
    private Environment env;

    public static void main(String[] args) {
    SpringApplication.run(WebsocketdemoApplication.class, args);
    SpringApplication.run(SSEApplication.class, args);
    }

    private final String PROP_HANDSHAKING_URL = "app.handshaking_url";
  18. mirzaakhena renamed this gist Jul 25, 2020. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion WebsocketdemoApplication.java → SSEApplication.java
    Original file line number Diff line number Diff line change
    @@ -15,7 +15,7 @@
    import org.springframework.core.env.Environment;

    @SpringBootApplication
    public class WebsocketdemoApplication {
    public class SSEApplication {

    Logger logger = LoggerFactory.getLogger(WebsocketdemoApplication.class);

  19. mirzaakhena created this gist Jul 25, 2020.
    80 changes: 80 additions & 0 deletions WebsocketdemoApplication.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,80 @@
    package com.mirzaakhena.websocketdemo;

    import java.util.concurrent.TimeUnit;
    import javax.annotation.PostConstruct;
    import javax.ws.rs.client.Client;
    import javax.ws.rs.client.ClientBuilder;
    import javax.ws.rs.client.WebTarget;
    import javax.ws.rs.sse.InboundSseEvent;
    import javax.ws.rs.sse.SseEventSource;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.core.env.Environment;

    @SpringBootApplication
    public class WebsocketdemoApplication {

    Logger logger = LoggerFactory.getLogger(WebsocketdemoApplication.class);

    @Autowired
    private Environment env;

    public static void main(String[] args) {
    SpringApplication.run(WebsocketdemoApplication.class, args);
    }

    private final String PROP_HANDSHAKING_URL = "app.handshaking_url";
    private final String PROP_HANDSHAKING_RETRY_IN_SECOND = "app.handshaking_retry_in_second";

    private Client client;
    private WebTarget tut;
    private SseEventSource sse;

    @PostConstruct
    private void initAplikasi() {

    this.client = ClientBuilder.newClient();
    this.tut = this.client.target(env.getProperty(PROP_HANDSHAKING_URL));

    doHandshaking();
    }

    private void doHandshaking() {
    logger.info(String.format("Start doHandshaking..."));

    int retry = Integer.parseInt(env.getProperty(PROP_HANDSHAKING_RETRY_IN_SECOND));
    sse = SseEventSource.//
    target(this.tut).//
    reconnectingEvery(retry, TimeUnit.SECONDS).//
    build();//

    sse.register(this::onMessage, this::onError, () -> {
    logger.info(String.format("OnComplete"));
    try {
    Thread.sleep(retry * 1000L);
    } catch (InterruptedException e) {
    }
    doHandshaking();
    });
    sse.open();
    }

    private void onError(Throwable t) {
    logger.info(String.format("Error?????"));
    }

    private void onMessage(InboundSseEvent event) {
    logger.info(String.format("Message : %s", event.readData()));
    }

    }


    // application.properties
    // -----------------------
    // server.port = 8080
    // app.handshaking_url = http://localhost:3000/handshake/0208
    // app.handshaking_retry_in_second = 5
    156 changes: 156 additions & 0 deletions main.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,156 @@
    // update version of rikonor
    // https://gist.github.com/rikonor/e53a33c27ed64861c91a095a59f0aa44
    // and ismasan
    // https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c
    //
    //
    // for SSE implementation
    // fixing:
    // * adding unsubscribe event for
    //
    // improvement:
    // * adding identifier (outletCode) of each channel

    package main

    import (
    "fmt"
    "io"
    "net/http"
    "sync"

    "github.com/gin-gonic/gin"
    )

    // UnsubscribeFunc ...
    type UnsubscribeFunc func() error

    // Subscriber ...
    type Subscriber interface {
    Subscribe(outletCode string, c chan string) (UnsubscribeFunc, error)
    }

    // Notifier ...
    type Notifier interface {
    Notify(outletCode, message string) error
    }

    // NotificationCenter is implementation
    type NotificationCenter struct {
    outletMapChannel map[string]chan string
    subscribers map[chan string]struct{}
    subscribersMu *sync.Mutex
    }

    // NewNotificationCenter is constructor
    func NewNotificationCenter() *NotificationCenter {
    return &NotificationCenter{
    subscribers: map[chan string]struct{}{},
    subscribersMu: &sync.Mutex{},
    outletMapChannel: make(map[string]chan string),
    }
    }

    // Subscribe ...
    func (nc *NotificationCenter) Subscribe(outletCode string, c chan string) (UnsubscribeFunc, error) {

    fmt.Printf("try subscribe %s\n", outletCode)

    nc.subscribersMu.Lock()
    defer nc.subscribersMu.Unlock()

    if _, exist := nc.outletMapChannel[outletCode]; exist {
    return nil, fmt.Errorf("outlet %s already registered", outletCode)
    }

    nc.subscribers[c] = struct{}{}
    nc.outletMapChannel[outletCode] = c

    unsubscribeFn := func() error {
    fmt.Printf("unsubscribe %s\n", outletCode)
    nc.subscribersMu.Lock()
    defer nc.subscribersMu.Unlock()
    delete(nc.subscribers, c)
    delete(nc.outletMapChannel, outletCode)

    return nil
    }

    return unsubscribeFn, nil
    }

    // Notify ...
    func (nc *NotificationCenter) Notify(outletCode, message string) error {
    fmt.Printf("notify %s\n", outletCode)
    nc.subscribersMu.Lock()
    defer nc.subscribersMu.Unlock()
    nc.outletMapChannel[outletCode] <- message
    return nil
    }

    func handleSSE(s Subscriber) gin.HandlerFunc {
    return func(ctx *gin.Context) {

    // prepare the channel
    c := make(chan string)

    // subscribe the outletCode and channel
    unsubscribeFn, err := s.Subscribe(ctx.Param("outletCode"), c)
    if err != nil {
    fmt.Printf("E %v\n", err.Error())
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }

    // unsubscribe when exit this function
    defer func() {
    if err := unsubscribeFn(); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }

    }()

    // unsubscribe when client close the connections
    notify := ctx.Request.Context().Done()
    go func() {
    <-notify
    if err := unsubscribeFn(); err != nil {
    ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
    return
    }
    }()

    // forever loop for listening message
    for {
    ctx.Stream(func(w io.Writer) bool {
    if message, ok := <-c; ok {
    ctx.SSEvent("message", message)
    return true
    }
    return false
    })
    }

    }
    }

    func messageHandler(nc Notifier) gin.HandlerFunc {
    return func(ctx *gin.Context) {
    outletCode := ctx.Param("outletCode")
    nc.Notify(outletCode, fmt.Sprintf("Hello %s", outletCode))
    ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("outletCode"))})
    }
    }

    func main() {

    r := gin.Default()

    nc := NewNotificationCenter()
    r.GET("/message/:outletCode", messageHandler(nc))
    r.GET("/handshake/:outletCode", handleSSE(nc))

    r.Run(":3000")

    }