Skip to content

Instantly share code, notes, and snippets.

@adamw
Created May 10, 2024 17:01
Show Gist options
  • Save adamw/8eba6ae90dd497bcddccc55e7083c50d to your computer and use it in GitHub Desktop.
Save adamw/8eba6ae90dd497bcddccc55e7083c50d to your computer and use it in GitHub Desktop.

Revisions

  1. adamw created this gist May 10, 2024.
    75 changes: 75 additions & 0 deletions ws.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,75 @@
    //> using dep com.softwaremill.sttp.tapir::tapir-netty-server-sync:1.10.7

    import ox.channels.{Actor, ActorRef, Channel, ChannelClosed, Default, DefaultResult, selectOrClosed}
    import ox.{fork, releaseAfterScope, supervised}
    import sttp.tapir.*
    import sttp.tapir.CodecFormat.*
    import sttp.tapir.server.netty.sync.{Id, NettySyncServer, OxStreams}

    import java.util.UUID

    type ChatMemberId = UUID

    case class ChatMember(id: ChatMemberId, channel: Channel[Message])
    object ChatMember:
    def create: ChatMember = ChatMember(UUID.randomUUID(), Channel.bufferedDefault[Message])

    class ChatRoom:
    private var members: Map[ChatMemberId, ChatMember] = Map()

    def connected(m: ChatMember): Unit =
    members = members + (m.id -> m)
    println(s"Connected: ${m.id}, number of members: ${members.size}")

    def disconnected(m: ChatMember): Unit =
    members = members - m.id
    println(s"Disconnected: ${m.id}, number of members: ${members.size}")

    def incoming(message: Message): Unit =
    println(s"Broadcasting: ${message.v}")
    members = members.flatMap { (id, member) =>
    selectOrClosed(member.channel.sendClause(message), Default(())) match
    case member.channel.Sent() => Some((id, member))
    case _: ChannelClosed =>
    println(s"Channel of member $id closed, removing from members")
    None
    case DefaultResult(_) =>
    println(s"Buffer for member $id full, not sending message")
    Some((id, member))
    }

    //

    case class Message(v: String) // could be more complex, e.g. JSON including nickname + message
    given Codec[String, Message, TextPlain] = Codec.string.map(Message(_))(_.v)

    val chatEndpoint = endpoint.get
    .in("chat")
    .out(webSocketBody[Message, TextPlain, Message, TextPlain](OxStreams))

    def chatProcessor(a: ActorRef[ChatRoom]): OxStreams.Pipe[Message, Message] =
    incoming => {
    val member = ChatMember.create

    a.tell(_.connected(member))

    fork {
    incoming.foreach { msg =>
    a.tell(_.incoming(msg))
    }
    }

    releaseAfterScope {
    member.channel.done()
    a.tell(_.disconnected(member))
    }

    member.channel
    }

    @main def chatWsServer(): Unit =
    supervised {
    val chatActor = Actor.create(new ChatRoom)
    val chatServerEndpoint = chatEndpoint.serverLogicSuccess[Id](_ => chatProcessor(chatActor))
    NettySyncServer().addEndpoint(chatServerEndpoint).startAndWait()
    }