Skip to content

Instantly share code, notes, and snippets.

Created October 25, 2015 12:40
Show Gist options
  • Save anonymous/28accfa8e5f3fe187c4d to your computer and use it in GitHub Desktop.
Save anonymous/28accfa8e5f3fe187c4d to your computer and use it in GitHub Desktop.

Revisions

  1. @invalid-email-address Anonymous created this gist Oct 25, 2015.
    61 changes: 61 additions & 0 deletions GCMSender.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,61 @@
    package app.actors

    import akka.event.Logging
    import akka.http.scaladsl._
    import akka.http.scaladsl.model.HttpHeader.ParsingResult
    import akka.http.scaladsl.model._
    import akka.stream.ActorMaterializer
    import akka.typed.ScalaDSL._
    import akka.typed._
    import argonaut.Argonaut._
    import implicits.actor._
    import infrastructure.GCM
    import launch.RTConfig

    import scala.util.Try
    import scalaz.Scalaz._

    object GCMSender {
    sealed trait In
    case class Send(message: GCM.Message) extends In

    object Internal {
    case class GCMComplete(
    message: GCM.Message, result: Try[HttpResponse]
    ) extends In
    }

    def behaviour(
    authHeader: HttpHeader, httpMaterializer: ActorMaterializer
    ): Behavior[Send] =
    ContextAware[In] { ctx =>
    val untypedSystem = ctx.system.asUntyped
    val http = Http(untypedSystem)
    val log = Logging(untypedSystem, ctx.self.asUntyped)
    val headers = Vector(authHeader)

    Static {
    case Send(m) =>
    log.info("Sending GCM message: {}", m)
    val body = m.asJson.nospaces
    log.debug("GCM message as JSON: {}", body)
    val future = http.singleRequest(HttpRequest(
    HttpMethods.POST, "https://gcm-http.googleapis.com/gcm/send",
    headers, HttpEntity(MediaTypes.`application/json`, body)
    ))(httpMaterializer)

    // Logging isn't thread safe.
    import ctx.executionContext
    future.onComplete(r => ctx.self ! GCMSender.Internal.GCMComplete(m, r))
    case Internal.GCMComplete(message, result) =>
    log.info("GCM response for {}: {}", message, result)
    }
    }.narrow

    def authHeader(key: RTConfig.GCM.Key) =
    HttpHeader.parse("Authorization", s"key=${key.value}") match {
    case ParsingResult.Ok(header, _) => header.right
    case ParsingResult.Error(error) =>
    s"Cannot turn '$key' into HTTP header: $error".left
    }
    }
    341 changes: 341 additions & 0 deletions GameActor.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,341 @@
    package app.actors.game

    import akka.event.LoggingAdapter
    import akka.typed.ScalaDSL._
    import akka.typed._
    import app.actors.NetClient
    import app.models.game._
    import app.models.game.events._
    import app.models.game.world.WObject.Id
    import app.models.game.world._
    import app.models.game.world.maps.WorldMaterializer
    import app.models.game.world.props.ExtractionSpeed
    import implicits._
    import implicits.actor._
    import org.joda.time.DateTime
    import utils.data.{NonEmptyVector, Timeframe}

    import scala.concurrent.duration._
    import scala.language.{existentials, implicitConversions}
    import scalaz.Scalaz._
    import scalaz._

    object GameActor {
    type Ref = akka.typed.ActorRef[In]
    private[this] type NetClientJoinedRef = ActorRef[NetClient.LoggedInState.GameJoined]
    type NetClientOutRef = ActorRef[NetClient.InGameState.FromGameActor]

    sealed trait Message

    case class ClientData(human: Human, replyTo: NetClientOutRef)

    sealed trait In extends Message
    object In {
    case class Join(
    clientData: ClientData, replyJoined: NetClientJoinedRef
    ) extends In
    case class Warp(
    clientData: ClientData, position: Vect2, warpable: WarpableCompanion.Some
    ) extends In
    /* path does not include objects position and ends in target position */
    case class Move(
    clientData: ClientData, id: WObject.Id, path: NonEmptyVector[Vect2]
    ) extends In
    case class Attack(
    clientData: ClientData, id: WObject.Id, target: Vect2 \/ WObject.Id
    ) extends In
    case class MoveAttack(
    move: Move, target: Vect2 \/ WObject.Id
    ) extends In
    case class Special(
    clientData: ClientData, id: WObject.Id
    ) extends In
    case class ToggleWaitingForRoundEnd(clientData: ClientData) extends In
    case class Concede(clientData: ClientData) extends In
    }

    private[this] sealed trait Internal extends Message
    private[this] object Internal {
    case object CheckTurnTime extends Internal
    }

    sealed trait NetClientOut
    implicit def asNetClient(msg: NetClientOut): NetClient.InGameState.FromGameActor =
    NetClient.InGameState.FromGameActor(msg)

    object NetClientOut {
    case class Init(
    id: World.Id, bounds: Bounds, objects: WorldObjs.All,
    warpZonePoints: Iterable[Vect2], visiblePoints: Iterable[Vect2],
    selfTeam: Team, otherTeams: Iterable[Team],
    self: HumanState, others: Iterable[(Player, Option[HumanState])],
    warpableObjects: Iterable[WarpableStats],
    objectives: RemainingObjectives, currentTurn: TurnStartedEvt,
    extractionSpeeds: Set[ExtractionSpeed]
    ) extends NetClientOut
    case class Events(events: Vector[FinalEvent]) extends NetClientOut
    case class Error(error: String) extends NetClientOut
    }

    private[this] def initMsg(human: Human, gaGame: GameActorGame)
    (implicit log: LoggingAdapter): String \/ NetClientOut.Init = {
    val game = gaGame.game
    val visibleGame = game.visibleBy(human)
    val states = visibleGame.states
    val resourceMap = visibleGame.world.resourcesMap
    def stateFor(p: Player): String \/ HumanState = for {
    gameState <- states.get(p).
    toRightDisjunction(s"can't get game state for $p in $states")
    resources <- resourceMap.get(p).
    toRightDisjunction(s"can't get game state for $p in $resourceMap")
    } yield HumanState(resources, visibleGame.world.populationFor(p), gameState)

    stateFor(human).map { selfState =>
    NetClientOut.Init(
    game.world.id, visibleGame.world.bounds,
    visibleGame.world.objects ++
    game.world.noLongerVisibleImmovableObjectsFor(human.team),
    visibleGame.world.warpZoneMap.map.keys.map(_._1),
    visibleGame.world.visibilityMap.map.keys.map(_._1),
    human.team, game.world.teams - human.team, selfState,
    (game.world.players - human).map { player =>
    player -> (
    if (player.isFriendOf(human)) stateFor(player).toOption
    else None
    )
    },
    selfState.gameState.canWarp,
    game.remainingObjectives(human.team),
    TurnStartedEvt(gaGame.currentPlayer, gaGame.currentTurnTimeframe),
    ExtractionSpeed.values
    )
    }
    }

    private def events(
    human: Human, ref: NetClientOutRef, events: Events
    )(implicit log: LoggingAdapter): Unit = {
    log.debug("### Dispatching events for {} ###", human)
    log.debug("Events ({}):", events.size)
    val viewedEvents = events.flatMap { event =>
    log.debug("* {}", event)
    val viewed = event.asViewedBy(human)
    if (log.isDebugEnabled) viewed.foreach(log.debug("*** {}", _))
    viewed
    }

    ref ! NetClientOut.Events(viewedEvents)
    }

    case class StartingHuman(
    human: Human, resources: Resources,
    replyJoined: NetClientJoinedRef, client: NetClientOutRef
    ) {
    def game = Game.StartingPlayer(human, resources)
    }

    def behavior(
    worldMaterializer: WorldMaterializer,
    turnTimerSettings: Option[TurnTimers.Settings],
    aiTeam: Team, starting: Set[GameActor.StartingHuman]
    ): Behavior[In] = ContextAware[Message] { ctx =>
    implicit val log = ctx.createLogging()

    def scheduleCheckTurnTime() =
    ctx.schedule(1.second, ctx.self, Internal.CheckTurnTime)

    log.debug(
    "initializing game actor: starting={} turnTimer={}, aiTeam={}",
    starting, turnTimerSettings, aiTeam
    )

    var clients = starting.map(data => data.human -> data.client).toMap

    var game: GameActorGame = {
    val humanTeams = starting.map(_.human.team)
    val world = worldMaterializer.materialize(humanTeams).right_!
    log.debug("World initialized to {}", world)
    val objectives = Map(
    aiTeam -> Objectives(
    destroyAllCriticalObjects = Some(Objective.DestroyAllCriticalObjects)
    )
    ) ++ humanTeams.map { _ -> Objectives(
    // gatherResources = Some(Objective.GatherResources(world, Resources(200), Percentage(0.15))),
    // collectVps = Some(Objective.CollectVPs(VPS(10))),
    destroyAllCriticalObjects = Some(Objective.DestroyAllCriticalObjects)
    ) }.toMap
    log.debug("Objectives initialized to {}", objectives)
    SemiRealtimeGame(
    world, starting.map(_.game), objectives,
    turnTimerSettings.map(WithCurrentTime(_, DateTime.now))
    ).fold(
    err => throw new IllegalStateException(s"Cannot initialize game: $err"),
    evented => {
    log.debug("Turn based game initialized to {}", evented)
    starting.foreach { data =>
    val init = initMsg(data.human, evented.value).right_!

    data.replyJoined ! NetClient.LoggedInState.GameJoined(data.human, ctx.self)
    // We need to init the game to starting state.
    data.client ! init
    events(data.human, data.client, evented.events)
    }
    starting.foreach { data =>
    events(data.human, data.client, evented.events)
    }
    evented.value
    }
    )
    }

    def checkedTurnTimes = game.checkTurnTimes(DateTime.now)

    def update(
    requester: NetClientOutRef, f: GameActorGame => GameActorGame.Result
    ): Behavior[Message] = {
    log.debug("Updating game by a request from {}", requester)

    val afterTimeCheck = checkedTurnTimes
    afterTimeCheck.value.fold(
    _ => postGameChange(afterTimeCheck),
    tbg => f(tbg).map(evt => afterTimeCheck.events ++: evt).fold(
    err => {
    log.error(err)
    requester ! NetClientOut.Error(err)
    Same
    },
    postGameChange
    )
    )
    }

    def postGameChange(
    evented: Evented[Winner \/ GameActorGame]
    ): Behavior[Message] = {
    dispatchEvents(evented.events)
    evented.value.fold(
    winner => {
    log.info("Game is finished, won by {}", winner)
    Stopped
    },
    g => {
    game = g
    Same
    }
    )
    }

    def dispatchEvents(events: Events): Unit = {
    if (events.nonEmpty) clients.foreach { case (human, ref) =>
    GameActor.events(human, ref, events)
    }
    }

    var turnTimerChecker = scheduleCheckTurnTime()

    Full {
    case Sig(_, PostStop) =>
    turnTimerChecker.cancel()
    Same

    case Msg(_, msg) => msg match {
    case Internal.CheckTurnTime =>
    postGameChange(checkedTurnTimes)
    turnTimerChecker = scheduleCheckTurnTime()
    Same

    case In.Join(ClientData(human, replyTo), joinedRef) =>
    joinedRef ! NetClient.LoggedInState.GameJoined(human, ctx.self)
    def doInit(gaGame: GameActorGame): Unit = {
    replyTo ! initMsg(human, gaGame).right_!
    clients += human -> replyTo
    }

    if (game.isJoined(human)) {
    log.info("Rejoining {} to {}", human, ctx.self)
    doInit(game)
    }
    else {
    log.error("Unknown human trying to join the game: {}", human)
    }
    Same

    case In.Warp(clientData, position, warpable) =>
    update(clientData.replyTo, _.warp(clientData.human, position, warpable, DateTime.now))
    case In.Move(clientData, id, path) =>
    update(clientData.replyTo, _.move(clientData.human, id, path, DateTime.now))
    case In.Attack(clientData, id, target) =>
    update(clientData.replyTo, _.attack(clientData.human, id, target, DateTime.now))
    case In.MoveAttack(move, target) =>
    update(
    move.clientData.replyTo,
    _.moveAttack(move.clientData.human, move.id, move.path, target, DateTime.now)
    )
    case In.Special(clientData, id) =>
    update(clientData.replyTo, _.special(clientData.human, id, DateTime.now))
    case In.ToggleWaitingForRoundEnd(clientData) =>
    update(clientData.replyTo, _.toggleWaitingForRoundEnd(clientData.human, DateTime.now))
    case In.Concede(clientData) =>
    update(clientData.replyTo, _.concede(clientData.human, DateTime.now))
    }
    }
    }.narrow
    }

    object GameActorGame {
    type Result = Game.ResultT[Winner \/ GameActorGame]
    }
    trait GameActorGame {
    import GameActorGame._

    def warp(human: Human, position: Vect2, warpable: WarpableCompanion.Some, now: DateTime)
    (implicit log: LoggingAdapter): Result

    def move(human: Human, id: WObject.Id, path: NonEmptyVector[Vect2], now: DateTime)
    (implicit log: LoggingAdapter): Result

    def special(human: Human, id: WObject.Id, now: DateTime)(implicit log: LoggingAdapter): Result

    def attack(human: Human, id: WObject.Id, target: Vect2 \/ WObject.Id, now: DateTime)
    (implicit log: LoggingAdapter): Result

    def moveAttack(
    human: Human, id: Id, path: NonEmptyVector[Vect2], target: Vect2 \/ WObject.Id,
    now: DateTime
    )(implicit log: LoggingAdapter): Result

    def toggleWaitingForRoundEnd(human: Human, now: DateTime)(implicit log: LoggingAdapter): Result
    def concede(human: Human, now: DateTime)(implicit log: LoggingAdapter): Result

    def game: Game
    def isJoined(human: Human)(implicit log: LoggingAdapter): Boolean
    def currentPlayer: Player
    def currentTurnTimeframe: Option[Timeframe]
    def currentTurnStartedEvt = TurnStartedEvt(currentPlayer, currentTurnTimeframe)

    def checkTurnTimes(time: DateTime)(implicit log: LoggingAdapter)
    : Evented[Winner \/ GameActorGame]
    }

    trait GameActorGameStarter[GAGame <: GameActorGame] {
    type StartedGame = String \/ Evented[GAGame]

    def apply(
    world: World, starting: Set[Game.StartingPlayer],
    objectives: Game.ObjectivesMap,
    turnTimerSettings: Option[WithCurrentTime[TurnTimers.Settings]]
    )(implicit log: LoggingAdapter): StartedGame = {
    val game = Game(world, starting, objectives)
    game.flatMap(apply(_, turnTimerSettings))
    }

    def apply(game: Game, turnTimerSettings: Option[WithCurrentTime[TurnTimers.Settings]])
    (implicit log: LoggingAdapter): StartedGame = {
    val turnTimers = turnTimerSettings.map(_.map(TurnTimers(game.world.humans, _)))
    startNewGame(game, turnTimers)
    }

    protected[this] def startNewGame(
    game: Game, turnTimers: Option[WithCurrentTime[TurnTimers]]
    )(implicit log: LoggingAdapter): String \/ Evented[GAGame]
    }
    359 changes: 359 additions & 0 deletions GamesManagerActor.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,359 @@
    package app.actors.game

    import java.util.UUID

    import akka.event.Logging
    import akka.typed.ScalaDSL._
    import akka.typed._
    import app.actors.NetClient.LoggedInState.JoinGame.{PvPMode, Mode}
    import app.actors.game.GameActor.{ClientData, StartingHuman}
    import app.actors.{GCMSender, NetClient}
    import app.models.User
    import app.models.game.world.maps.{GameMaps, SingleplayerMap, WorldMaterializer}
    import app.models.game.world.{ExtractorStats, Resources, World}
    import app.models.game.{Bot, Human, Team, TurnTimers}
    import implicits._, implicits.actor._
    import infrastructure.GCM
    import launch.RTConfig
    import org.joda.time.DateTime
    import spire.math.UInt

    import scala.concurrent.duration._
    import scalaz.Scalaz._
    import scalaz.effect.IO

    object GamesManagerActor {
    type Ref = ActorRef[In]

    val StartingResources = ExtractorStats.cost * Resources(4)

    sealed trait Message

    sealed trait In extends Message
    object In {
    case class FromNetClient(msg: NetClient.GamesManagerFwd) extends In

    // After user connects to the server, he should check whether he is in game or not.
    case class CheckUserStatus(user: User, client: NetClient.LoggedInRef) extends In

    // Game joining
    case class Join(
    user: User, mode: NetClient.LoggedInState.JoinGame.Mode,
    replyTo: NetClient.LoggedInRef
    ) extends In
    case class CancelJoinGame(
    user: User, replyTo: ActorRef[NetClient.LoggedInState.JoinGameCancelled.type]
    ) extends In

    // Stats report for control client
    case class StatsReport(replyTo: ActorRef[StatsReportData]) extends In

    case object ShutdownInitiated extends In
    }

    private[this] sealed trait Internal extends Message
    private[this] object Internal {
    case object CleanupBackgroundWaitingList extends Internal
    /* Check if we can shutdown. */
    case object CheckShutdown extends Internal

    case class GameTerminated(ref: GameActor.Ref) extends Internal
    case class ClientTerminated(ref: NetClient.LoggedInRef) extends Internal
    }

    // TODO: proper singleplayer
    // object PVEGame {
    // sealed trait PresetTeam {
    // def gameTeam: Team
    // }
    // object PresetTeam {
    // object Red extends PresetTeam { val gameTeam = Team() }
    // object Blue extends PresetTeam { val gameTeam = Team() }
    // }
    //
    // val empty = PVEGame(None, Set.empty, Set.empty)
    // }
    // case class PVEGame(ref: Option[ActorRef], redTeamPlayers: Set[User], blueTeamPlayers: Set[User]) {
    // def giveTeam: PVEGame.PresetTeam =
    // redTeamPlayers.size ?|? blueTeamPlayers.size match {
    // case Ordering.LT => PVEGame.PresetTeam.Red
    // case Ordering.GT => PVEGame.PresetTeam.Blue
    // case Ordering.EQ => if (Random.chance(0.5)) PVEGame.PresetTeam.Red else PVEGame.PresetTeam.Blue
    // }
    //
    // def add(user: User, team: PresetTeam): PVEGame = team match {
    // case PresetTeam.Red => copy(redTeamPlayers = redTeamPlayers + user)
    // case PresetTeam.Blue => copy(blueTeamPlayers = blueTeamPlayers + user)
    // }
    // }

    case class StatsReportData(users: UInt, games: UInt)

    case class BackgroundToken(value: String) extends AnyVal
    object BackgroundToken {
    val newToken = IO { BackgroundToken(UUID.randomUUID().toString) }
    }

    case class WaitingListEntry(
    user: User, client: NetClient.LoggedInRef,
    backgroundToken: BackgroundToken
    )

    private def joinGame(
    game: GameActor.Ref, human: Human, client: NetClient.LoggedInRef
    ): Unit = game ! GameActor.In.Join(ClientData(human, client), client)

    def behaviour(
    maps: GameMaps, gcm: Option[(ActorRef[GCMSender.Send], RTConfig.GCM)]
    )(implicit rtConfig: RTConfig): Behavior[In] = ContextAware[Message] { ctx =>
    val log = ctx.createLogging()

    def scheduleCleanup(): Unit =
    ctx.schedule(1.second, ctx.self, Internal.CleanupBackgroundWaitingList)

    def scheduleShutdownMode(): Unit =
    ctx.schedule(1.second, ctx.self, Internal.CheckShutdown)

    var waitingList = Vector.empty[WaitingListEntry]
    // token -> last heartbeat
    var waitingInBackground = Map.empty[BackgroundToken, DateTime]
    var user2game = Map.empty[User, (GameActor.Ref, Human)]
    var game2humans = Map.empty[GameActor.Ref, Set[Human]]

    def removeBackgroundToken(token: BackgroundToken): Unit = {
    log.info("Removing background token: {}", token)
    waitingInBackground -= token
    notifyGCM()
    }

    def notifyGCM(): Unit = {
    gcm.foreach { case (ref, cfg) =>
    val foreground = GCM.Data.SearchingForOpponent.InForeground(UInt(waitingList.size))
    val background = GCM.Data.SearchingForOpponent.InBackground(UInt(waitingInBackground.size))
    ref ! GCMSender.Send(
    GCM.searchingForOpponent(foreground, background, cfg.searchForOpponentTTL)
    )
    }
    }

    def noExistingGame(
    user: User, mode: Mode, client: NetClient.LoggedInRef
    ): Unit = {
    mode match {
    case Mode.Singleplayer =>
    //launchRandomGenerated(user, client)
    launchPVE(user, client)
    case pvp: PvPMode =>
    val token = BackgroundToken.newToken.unsafePerformIO()
    val entry = WaitingListEntry(user, client, token)
    waitingList :+= entry
    if (waitingList.size < pvp.playersNeeded) {
    log.debug(
    "Added {} from {} to {} waiting list: {}",
    user, client, mode, waitingList
    )
    notifyGCM()
    ctx.watchWith(client, Internal.ClientTerminated(client))
    client ! NetClient.LoggedInState.WaitingListJoined(token)
    }
    else fromWaitingList(pvp)
    }
    }

    def launchPVE(user: User, client: NetClient.LoggedInRef) = {
    // TODO: proper PVE
    // val team = pveGame.giveTeam
    // if (pveGame.ref.isEmpty) {
    // val game = createGame(
    // maps.pve.random, Some(TurnTimers.Settings()), Team(),
    // Set(StartingHuman(Human(user, team.gameTeam), StartingResources, client))
    // )
    // pveGame = pveGame.copy(ref = Some(game))
    // }
    // pveGame = pveGame.add(user, team)

    createGame(
    maps.pve.random, None, Team(),
    Set(StartingHuman(Human(user, Team()), StartingResources, client, client))
    )
    }

    // def launchRandomGenerated(user: User, client: NetClient.LoggedInRef) = {
    // val materializer = SingleplayerMap { data => implicit log =>
    // val npcTeam = Team()
    // val npcBot = Bot(npcTeam)
    // val spawnerBot = Bot(npcTeam)
    // World.create(
    // data.humanTeam, () => npcBot, () => spawnerBot, staticObjectsKnownAtStart = false
    // )
    // }
    // createGame(
    // materializer, None, Team(),
    // Set(StartingHuman(Human(user, Team()), StartingResources, client, client))
    // )
    // }

    def fromWaitingList(mode: PvPMode): Unit = {
    val (entries, newWaitingList) = waitingList.splitAt(mode.playersNeeded)
    waitingList = newWaitingList
    notifyGCM()

    val teams = Vector.fill(mode.teams)(Team())
    val players = entries.zipWithIndex.map { case (entry, idx) =>
    val team = teams.wrapped(idx)
    StartingHuman(
    Human(entry.user, team), StartingResources, entry.client, entry.client
    )
    }.toSet

    log.debug(
    "Fetched {} from waiting list for mode {}, rest={}", players, mode, newWaitingList
    )
    // TODO: will fail if we have more teams than any of the maps support
    val map = maps.pvpMapFor(mode.playersNeeded).right_!.unsafePerformIO()
    val npcTeam = Team()

    createGame(map, Some(TurnTimers.Settings()), npcTeam, players)
    }

    def createGame(
    worldMaterializer: WorldMaterializer, turnTimerSettings: Option[TurnTimers.Settings],
    npcTeam: Team, starting: Set[GameActor.StartingHuman]
    ): GameActor.Ref = {
    val game = ctx.spawnAnonymous(Props(GameActor.behavior(
    worldMaterializer, turnTimerSettings, npcTeam, starting
    )))
    ctx.watchWith(game, Internal.GameTerminated(game))
    starting.foreach { data =>
    user2game += data.human.user -> ((game, data.human))
    }
    game2humans += game -> starting.map(_.human)
    log.info("Game {} created for {}", game, starting)
    game
    }

    scheduleCleanup()
    Total[Message] {
    case Internal.CleanupBackgroundWaitingList =>
    val now = DateTime.now()
    val expiredKeys = waitingInBackground.keys.filter { token =>
    val lastBeat = waitingInBackground(token)
    val timePassed = now - lastBeat
    val active = timePassed <= rtConfig.gamesManager.backgroundHeartbeatTTL.duration
    if (! active) log.debug(
    "Timing out background token {}: {} > {}",
    token, timePassed, rtConfig.gamesManager.backgroundHeartbeatTTL.duration
    )
    !active
    }
    expiredKeys.foreach(waitingInBackground -= _)
    if (expiredKeys.nonEmpty) notifyGCM()

    scheduleCleanup()
    Same

    case In.ShutdownInitiated =>
    log.info("Shutdown mode initiated.")
    scheduleShutdownMode()
    Same

    case Internal.CheckShutdown =>
    val games = game2humans.size
    log.debug("Checking for shutdown state, games: {}", games)
    if (games === 0) {
    log.info("No games alive, shutting down.")
    ctx.system.terminate()
    Stopped
    }
    else {
    scheduleShutdownMode()
    Same
    }

    case In.CheckUserStatus(user, client) =>
    user2game.get(user).foreach { case (game, human) =>
    log.info("{} joining game {} on user status check", human, game)
    joinGame(game, human, client)
    }
    Same

    case In.Join(user, mode, replyTo) =>
    user2game.get(user).fold2(
    {
    if (waitingList.exists(_.user === user)) log.warning(
    "Not joining a new game, because {} is already in a waiting list, ref: {}",
    user, replyTo
    )
    else noExistingGame(user, mode, replyTo)
    },
    { case (game, human) =>
    log.info("{} joining game {} on game join", human, game)
    joinGame(game, human, replyTo)
    }
    )
    Same

    case In.CancelJoinGame(user, replyTo) =>
    waitingList.indexWhere(_.user === user) match {
    case -1 =>
    log.warning("Not cancelling join game, because {} is not in a waiting list.", user)
    case idx =>
    val entry = waitingList(idx)
    ctx.unwatch(entry.client)
    waitingList = waitingList.removeAt(idx)
    notifyGCM()
    replyTo ! NetClient.LoggedInState.JoinGameCancelled
    }
    Same

    case In.StatsReport(replyTo) =>
    replyTo ! StatsReportData(UInt(user2game.size), UInt(game2humans.size))
    Same

    case In.FromNetClient(NetClient.NotLoggedInState.CancelBackgroundToken(token)) =>
    removeBackgroundToken(token)
    Same

    case In.FromNetClient(NetClient.MsgHandlerConnectionIn.BackgroundSFO(kind, token)) =>
    if (waitingInBackground contains token) {
    kind match {
    case NetClient.MsgHandlerConnectionIn.BackgroundSFO.Kind.Heartbeat =>
    waitingInBackground += token -> DateTime.now()
    log.debug("Background heartbeat from {}", token)
    case NetClient.MsgHandlerConnectionIn.BackgroundSFO.Kind.Cancel =>
    removeBackgroundToken(token)
    }
    }
    else {
    // TODO: should we tell sender that his heartbeat was expired?
    log.info("Ignoring background {} from unknown token: {}", kind, token)
    }
    Same

    case Internal.ClientTerminated(ref) =>
    waitingList.zipWithIndex.collectFirst {
    case (entry @ WaitingListEntry(_, `ref`, _), idx) =>
    (entry, idx)
    }.foreach { case (entry, idx) =>
    log.info("{} going into background", entry)
    waitingList = waitingList.removeAt(idx)
    waitingInBackground += entry.backgroundToken -> DateTime.now()
    notifyGCM()
    }
    Same

    case Internal.GameTerminated(ref) =>
    game2humans.get(ref) match {
    case Some(humans) =>
    log.info("Game {} terminated for humans {}", ref, humans)
    game2humans -= ref
    humans.foreach { human => user2game -= human.user }
    case None =>
    log.warning(
    "Game {} terminated, but can't find it in our state!", ref
    )
    }
    Same
    }
    }.narrow
    }
    186 changes: 186 additions & 0 deletions MsgHandler.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,186 @@
    package app.actors

    import java.nio.ByteOrder

    import akka.io.Tcp
    import akka.typed.ScalaDSL._
    import akka.typed._
    import akka.util.ByteString
    import akka.{actor => untyped}
    import app.protobuf.parsing.Parsing
    import app.protobuf.serializing.Serializing
    import implicits.actor._
    import utils.network.IntFramedPipeline
    import utils.network.IntFramedPipeline.Frame

    import scala.language.implicitConversions
    import scalaz.Scalaz._
    import scalaz._

    object MsgHandler {
    type Ref = ActorRef[In]

    sealed trait Message

    sealed trait In extends Message
    object In {
    sealed trait Control extends In
    object Control {
    case object ShutdownInitiated extends Control with NetClientFwd
    }
    case class FromNetClient(msg: NetClient.MsgHandlerOut) extends In
    }

    // Messages forwarded to NetClient
    sealed trait NetClientFwd
    implicit def asNetClientFwd(msg: NetClientFwd): NetClient.MsgHandlerIn.FwdFromMsgHandler =
    NetClient.MsgHandlerIn.FwdFromMsgHandler(msg)

    private[this] sealed trait Internal extends Message
    private[this] object Internal {
    case class Tcp(msg: akka.io.Tcp.Event) extends Internal
    case object Ack extends akka.io.Tcp.Event with Internal
    }

    def spawn(
    name: String, ctx: ActorContext[_],
    connection: untyped.ActorRef,
    netClientBehavior: Ref => Behavior[NetClient.MsgHandlerIn],
    maxToClientBufferSize: Int = 1024 * 1024
    )(implicit byteOrder: ByteOrder) = {
    lazy val tcpAdapter: ActorRef[Tcp.Event] = ctx.spawn(
    Props(ContextAware[Tcp.Event] { tcpCtx =>
    tcpCtx.watch(main)
    Full {
    case Msg(_, msg) =>
    main ! Internal.Tcp(msg)
    Same
    case Sig(_, Terminated(`main`)) =>
    Stopped
    }
    }),
    s"$name-tcp-adapter"
    )
    lazy val main: ActorRef[Message] = {
    val bridge = TypedUntypedActorBridge(connection, tcpAdapter.asUntyped)
    ctx.spawn(
    Props(behavior(bridge, netClientBehavior, maxToClientBufferSize)),
    name
    )
    }
    (main: Ref, tcpAdapter)
    }

    private[this] def behavior(
    connection: TypedUntypedActorBridge,
    netClientBehavior: Ref => Behavior[NetClient.MsgHandlerIn],
    maxToClientBufferSize: Int
    )(implicit byteOrder: ByteOrder): Behavior[Message] = ContextAware { ctx =>
    implicit val log = ctx.createLogging()

    val pipeline = new MsgHandlerPipeline
    val lowWatermark = maxToClientBufferSize / 4
    val highWatermark = maxToClientBufferSize * 3 / 4

    var storage = Vector.empty[ByteString]
    var stored = 0
    var closing = false
    var suspended = false

    val netClient = ctx.spawn(Props(netClientBehavior(ctx.self)), "net-client")
    ctx.watch(netClient)
    ctx.watch(connection.raw)

    val common = Partial[Message] {
    case Internal.Tcp(Tcp.Received(data)) =>
    pipeline.unserialize(data).foreach {
    case -\/(err) => log.error(err)
    case \/-(msg) => netClient ! msg
    }
    Same

    case msg: In.Control.ShutdownInitiated.type =>
    netClient ! msg
    Same
    }

    lazy val buffering = Partial[Message] {
    case In.FromNetClient(msg) =>
    buffer(pipeline.serialize(msg))

    case Internal.Ack =>
    acknowledge()

    case Internal.Tcp(msg: Tcp.ConnectionClosed) =>
    log.info(s"closing = true by {}.", msg)
    closing = true
    Same
    }

    lazy val normal = Partial[Message] {
    case In.FromNetClient(msg) =>
    val data = pipeline.serialize(msg)

    buffer(data)
    connection ! Tcp.Write(data, Internal.Ack)
    buffering

    case Internal.Tcp(msg: Tcp.ConnectionClosed) =>
    log.info(s"Connection closed by {}.", msg)
    Stopped
    }

    def buffer(data: ByteString): Behavior[Message] = {
    storage :+= data
    stored += data.size

    if (stored > maxToClientBufferSize) {
    log.warning(s"drop connection to [{}] (buffer overrun)", connection)
    Stopped
    }
    else if (stored > highWatermark) {
    log.debug(s"suspending reading")
    connection ! Tcp.SuspendReading
    suspended = true
    Same
    }
    else Same
    }

    def acknowledge(): Behavior[Message] = {
    require(storage.nonEmpty, "storage was empty")

    val size = storage.head.size
    stored -= size

    storage = storage.tail

    if (suspended && stored < lowWatermark) {
    log.debug("resuming reading")
    connection ! Tcp.ResumeReading
    suspended = false
    }

    if (storage.isEmpty) {
    if (closing) Stopped else normal
    }
    else {
    connection ! Tcp.Write(storage.head, Internal.Ack)
    Same
    }
    }

    Or(common, normal)
    }
    }

    class MsgHandlerPipeline(implicit byteOrder: ByteOrder) {
    private[this] val intFramed = new IntFramedPipeline()

    def unserialize(data: ByteString) = intFramed.fromFramedData(data).map { frame =>
    Parsing.parse(frame.data).leftMap(err => s"Cannot decode $frame into message: $err")
    }

    def serialize(data: NetClient.MsgHandlerOut) =
    data |> Serializing.serialize |> Frame |> intFramed.withFrameSize
    }
    364 changes: 364 additions & 0 deletions NetClient.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,364 @@
    package app.actors

    import java.util.UUID

    import akka.typed._, ScalaDSL._, AskPattern._
    import app.actors.game.GameActor.ClientData
    import netmsg.ProtoChecksum
    import spire.math.UInt
    import scala.concurrent.Future
    import scala.concurrent.duration._
    import akka.util.Timeout
    import app.actors.net_client._
    import app.actors.game.{GamesManagerActor, GameActor}
    import app.models._
    import app.models.game.Human
    import app.persistence.tables.Tables
    import implicits._, implicits.actor._
    import scalaz._, Scalaz._
    import app.persistence.DBDriver._
    import org.joda.time.DateTime
    import scala.language.implicitConversions

    import scala.util.Try

    object NetClient {
    type Ref = ActorRef[In]
    type LoggedInRef = ActorRef[IsLoggedIn]
    type GameInMsg = GameActor.ClientData => GameActor.In

    sealed trait In

    // Messages forwarded to GamesManager
    sealed trait GamesManagerFwd { _: In => }
    implicit def asGamesManager(msg: GamesManagerFwd): GamesManagerActor.In.FromNetClient =
    GamesManagerActor.In.FromNetClient(msg)

    // Messages that come from MsgHandler.
    sealed trait MsgHandlerIn extends In
    object MsgHandlerIn {
    case class FwdFromMsgHandler(msg: MsgHandler.NetClientFwd) extends MsgHandlerIn
    }

    // Messages sent to MsgHandler
    sealed trait MsgHandlerOut
    implicit def asMsgHandler(msg: MsgHandlerOut): MsgHandler.In.FromNetClient =
    MsgHandler.In.FromNetClient(msg)

    // Messages that come via the TCP connection
    sealed trait MsgHandlerConnectionIn extends MsgHandlerIn
    // Messages that come from the game client
    sealed trait GameClientIn extends MsgHandlerConnectionIn
    sealed trait GameClientOut extends MsgHandlerOut
    // Messages that are defined in management protobuf.
    sealed trait ManagementIn extends GameClientIn
    sealed trait ManagementOut extends GameClientOut

    // Messages that come from the control client
    case class Control(key: ControlSecretKey, msg: Control.In)
    extends MsgHandlerConnectionIn
    object Control {
    sealed trait In
    object In {
    case object Shutdown extends In
    case object Status extends In
    }

    sealed trait Out extends MsgHandlerOut
    object Out {
    case class GenericReply(success: Boolean, message: Option[String]) extends Out
    object GenericReply {
    val success = GenericReply(success = true, None)
    def error(msg: String) = GenericReply(success = false, Some(msg))
    }

    case class Status(
    tcpClients: Option[UInt], playingUsers: Option[UInt], games: Option[UInt]
    ) extends Out {
    override def toString = {
    def asStr(o: Option[UInt]) = o.fold2("-", _.toString())
    s"Status[tcp clients: ${asStr(tcpClients)}, playing users: ${
    asStr(playingUsers)}, games: ${asStr(games)}]"
    }
    }
    }
    }

    object MsgHandlerConnectionIn {
    case class TimeSync(clientNow: DateTime) extends GameClientIn
    case class TimeSyncReply(clientNow: DateTime, serverNow: DateTime) extends GameClientOut

    // Background searching for opponent heartbeat
    case class BackgroundSFO(
    kind: BackgroundSFO.Kind, token: GamesManagerActor.BackgroundToken
    ) extends MsgHandlerConnectionIn with GamesManagerFwd
    object BackgroundSFO {
    sealed trait Kind
    object Kind {
    case object Heartbeat extends Kind
    case object Cancel extends Kind
    }
    }
    }

    sealed trait NotLoggedInState extends MsgHandlerConnectionIn
    object NotLoggedInState {
    case object ProtoVersionCheck extends NotLoggedInState with GameClientIn
    case class ProtoVersionCheckReply(checksum: String) extends GameClientOut

    // After client connects in it should cancel the active background token.
    case class CancelBackgroundToken(
    token: GamesManagerActor.BackgroundToken
    ) extends NotLoggedInState with GamesManagerFwd with ManagementIn

    case object AutoRegister extends NotLoggedInState with ManagementIn

    case class Login(credentials: Credentials) extends NotLoggedInState with ManagementIn
    sealed trait LoginResponse extends ManagementOut
    object LoginResponse {
    case object InvalidCredentials extends LoginResponse
    case class LoggedIn(
    user: User, token: SessionToken, autogenerated: Boolean
    ) extends LoginResponse
    }
    }

    sealed trait IsLoggedIn extends In

    sealed trait LoggedInState extends IsLoggedIn
    object LoggedInState {
    object JoinGame {
    sealed trait Mode
    sealed trait PvPMode extends Mode {
    def playersPerTeam: Int
    def teams: Int
    def playersNeeded = teams * playersPerTeam
    }

    object Mode {
    case object Singleplayer extends Mode
    case object OneVsOne extends PvPMode { def playersPerTeam = 1; def teams = 2 }
    }
    }
    case class JoinGame(mode: JoinGame.Mode) extends LoggedInState with ManagementIn
    case class GameJoined(human: Human, game: GameActor.Ref) extends LoggedInState with ManagementOut

    case object CancelJoinGame extends LoggedInState with ManagementIn
    case object JoinGameCancelled extends LoggedInState with ManagementOut

    case class CheckNameAvailability(name: String) extends LoggedInState with ManagementIn
    case class CheckNameAvailabilityResponse(
    name: String, available: Boolean
    ) extends ManagementOut

    case class Register(
    username: String, password: PlainPassword, email: String
    ) extends LoggedInState with ManagementIn
    case class RegisterResponse(newToken: Option[SessionToken]) extends ManagementOut

    case class WaitingListJoined(token: GamesManagerActor.BackgroundToken)
    extends LoggedInState with ManagementOut
    }

    sealed trait InGameState extends IsLoggedIn
    object InGameState {
    case class FromMsgHandler(msg: GameInMsg) extends InGameState with GameClientIn
    case class FromGameActor(msg: GameActor.NetClientOut) extends InGameState with GameClientOut
    }

    def behavior(
    msgHandler: MsgHandler.Ref, gamesManager: GamesManagerActor.Ref,
    server: Server.Ref, controlKey: ControlSecretKey,
    db: Database
    ): Behavior[In] = ContextAware[In] { ctx =>
    val log = ctx.createLogging()
    ctx.watch(msgHandler)

    def handleControl(c: Control): Future[Control.Out] = {
    if (c.key === controlKey) c.msg match {
    case Control.In.Shutdown =>
    server ! Server.In.Unbind
    Future.successful(Control.Out.GenericReply.success)
    case Control.In.Status =>
    import ctx.executionContext

    implicit val timeout = Timeout(3.seconds)

    def asOption[A](f: Future[A]) = f.map(Some.apply).recover {
    case e =>
    log.error("Error while asking: {}", e)
    None
    }

    val clientsCountF = (server ? Server.In.ReportClientCount) |> asOption
    val gamesCountF = (gamesManager ? GamesManagerActor.In.StatsReport) |> asOption

    (clientsCountF zip gamesCountF).map {
    case (clients, gameManagerOpt) => Control.Out.Status(
    clients, gameManagerOpt.map(_.users), gameManagerOpt.map(_.games)
    )
    }
    }
    else Future.successful(
    Control.Out.GenericReply.error(s"Invalid control key '${c.key}'")
    )
    }

    var shutdownInitiated = false
    var inGameOpt = Option.empty[(GameActor.Ref, Human)]

    val common = Full[In] {
    case Sig(_, PostStop) =>
    if (shutdownInitiated) {
    inGameOpt.foreach { case (gameRef, human) =>
    // Auto-concede if lost connection when shutdown is initiated.
    log.info("Auto conceding because lost connection in shutdown mode.")
    gameRef ! GameActor.In.Concede(ClientData(human, ctx.self))
    }
    }
    Same

    case Msg(_, MsgHandlerConnectionIn.TimeSync(clientNow)) =>
    msgHandler ! MsgHandlerConnectionIn.TimeSyncReply(clientNow, DateTime.now)
    Same

    case Msg(_, m: MsgHandlerConnectionIn.BackgroundSFO) =>
    gamesManager ! m
    Same

    case Msg(_, MsgHandlerIn.FwdFromMsgHandler(MsgHandler.In.Control.ShutdownInitiated)) =>
    shutdownInitiated = true
    Same

    case Msg(_, c: Control) =>
    import ctx.executionContext
    handleControl(c).onComplete {
    case util.Success(m) => msgHandler ! m
    case util.Failure(err) => log.error("Error while handling control message {}: {}", c, err)
    }
    Same
    }

    lazy val notLoggedIn = {
    def logIn(
    self: ActorRef[In], user: User, sessionToken: SessionToken,
    autogenerated: Boolean
    ) = {
    msgHandler ! NotLoggedInState.LoginResponse.LoggedIn(user, sessionToken, autogenerated)
    gamesManager ! GamesManagerActor.In.CheckUserStatus(user, self)
    loggedIn(user)
    }

    Partial[In] {
    case msg: NotLoggedInState => msg match {
    case NotLoggedInState.ProtoVersionCheck =>
    msgHandler ! NotLoggedInState.ProtoVersionCheckReply(ProtoChecksum.checksum)
    Same

    case m: NotLoggedInState.CancelBackgroundToken =>
    gamesManager ! m
    Same

    case NotLoggedInState.AutoRegister =>
    val password = PlainPassword(UUID.randomUUID().shortStr)
    val sessionToken = SessionToken.random()
    val id = UUID.randomUUID()
    val user = User(id, s"autogen-${id.shortStr}")
    val credentials = Credentials(user.name, password)
    db.withSession { implicit session =>
    Tables.users.
    map(t => (t.user, t.sessionToken, t.password, t.email)).
    insert((user, sessionToken.value, password.encrypted, None))
    }
    logIn(ctx.self, user, sessionToken, autogenerated = true)

    case NotLoggedInState.Login(credentials) =>
    val optQ = Tables.users.
    filter(t => t.name === credentials.name).
    map(t => (t.id, t.sessionToken, t.email, t.password))
    val idOpt = db.withSession(optQ.firstOption(_)).filter {
    case (_, sessionToken, _, pwHash) =>
    credentials.check(sessionToken, pwHash)
    }.map(t => (t._1, SessionToken(t._2), t._3.isEmpty))

    idOpt.fold2(
    {
    msgHandler ! NotLoggedInState.LoginResponse.InvalidCredentials
    Same
    },
    { case (id, token, autogenerated) =>
    logIn(ctx.self, User(id, credentials.name), token, autogenerated)
    }
    )
    }
    }
    }

    def loggedIn(user: User): Behavior[In] = Partial[In] {
    case msg: LoggedInState => msg match {
    case LoggedInState.CheckNameAvailability(name) =>
    val query = Tables.users.map(_.name).filter(_ === name).exists
    val exists = db.withSession(query.run(_))
    msgHandler ! LoggedInState.CheckNameAvailabilityResponse(name, ! exists)
    Same

    case LoggedInState.Register(username, password, email) =>
    val token = SessionToken.random()
    val query = Tables.users.
    filter(t => t.id === user.id && t.email.isEmpty).
    map(t => (t.name, t.email, t.password, t.sessionToken))
    val success = Try {
    db.withSession(query.update((
    username, Some(email), password.encrypted, token.value
    ))(_))
    }.getOrElse(0) === 1
    msgHandler ! LoggedInState.RegisterResponse(if (success) Some(token) else None)
    Same

    case LoggedInState.JoinGame(mode) =>
    gamesManager ! GamesManagerActor.In.Join(user, mode, ctx.self)
    Same

    case msg: LoggedInState.WaitingListJoined =>
    msgHandler ! msg
    Same

    case msg @ LoggedInState.GameJoined(human, game) =>
    msgHandler ! msg
    inGame(user, human, game)

    case LoggedInState.CancelJoinGame =>
    gamesManager ! GamesManagerActor.In.CancelJoinGame(user, ctx.self)
    Same

    case msg: LoggedInState.JoinGameCancelled.type =>
    msgHandler ! msg
    Same
    }
    }

    def inGame(user: User, human: Human, game: GameActor.Ref) = {
    inGameOpt = Some((game, human))
    ctx.watch(game)
    Full[In] {
    case Sig(_, Terminated(`game`)) =>
    log.error("Game was terminated")
    inGameOpt = None
    loggedIn(user)

    case Msg(_, msg: InGameState) =>
    msg match {
    case InGameState.FromMsgHandler(msgFn) =>
    val msg = msgFn(ClientData(human, ctx.self))
    game ! msg
    case gameMsg: InGameState.FromGameActor =>
    msgHandler ! gameMsg
    }
    Same
    }
    }

    Or(common, notLoggedIn)
    }.narrow
    }

    122 changes: 122 additions & 0 deletions Server.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,122 @@
    package app.actors

    import java.net.InetSocketAddress
    import java.nio.ByteOrder

    import akka.io.{IO, Tcp}
    import akka.typed.ScalaDSL._
    import akka.typed._
    import akka.{actor => untyped}
    import app.actors.game.GamesManagerActor
    import app.persistence.DBDriver
    import implicits._
    import implicits.actor._
    import launch.RTConfig
    import spire.math.UInt

    object Server {
    type Ref = ActorRef[In]

    sealed trait Message

    sealed trait In extends Message
    object In {
    case class ReportClientCount(replyTo: ActorRef[UInt]) extends In
    case object Unbind extends In
    }

    private[this] sealed trait Internal extends Message
    private[this] object Internal {
    case class Tcp(
    msg: akka.io.Tcp.Event, sender: untyped.ActorRef
    ) extends Internal
    case class MsgHandlerTerminated(ref: MsgHandler.Ref) extends Internal
    }

    object Out {
    }

    def behavior(
    rtConfig: RTConfig, gamesManager: GamesManagerActor.Ref,
    db: DBDriver.Database
    )(implicit byteOrder: ByteOrder): Behavior[In] = ContextAware[Message] { ctx =>
    def port = rtConfig.port

    val log = ctx.createLogging()

    val tcpAdapter = ctx.spawnAdapterUTRef(Internal.Tcp).asUntyped

    {
    // If we want to access manager outside of this scope we probably need to wrap
    // it in a bridge.
    val manager = IO(Tcp)(ctx.system.asUntyped)
    manager.tell(
    Tcp.Bind(tcpAdapter, new InetSocketAddress(port.signed)),
    tcpAdapter
    )
    }

    /* Actor that is handling our bound socket. */
    var socketRef = Option.empty[TypedUntypedActorBridge]
    var msgHandlers = Set.empty[MsgHandler.Ref]

    Full {
    case Msg(_, msg) => msg match {
    case In.Unbind =>
    socketRef.fold2(
    log.error("Can't unbind, socket not bound to {}", port),
    ref => {
    log.debug("Received a request to unbind, forwarding to {}", ref)
    ref ! Tcp.Unbind
    }
    )
    Same

    case In.ReportClientCount(replyTo) =>
    replyTo ! UInt(msgHandlers.size)
    Same

    case Internal.Tcp(Tcp.Bound(localAddress), sender) =>
    socketRef = Some(TypedUntypedActorBridge(sender, tcpAdapter))
    log.info("Server bound to {}", localAddress)
    Same

    case Internal.Tcp(Tcp.Unbound, _) =>
    socketRef = None
    log.info("Socket to port {} unbound, initiating shutdown.", port)
    msgHandlers.foreach(_ ! MsgHandler.In.Control.ShutdownInitiated)
    gamesManager ! GamesManagerActor.In.ShutdownInitiated
    Same

    case Internal.Tcp(Tcp.CommandFailed(b: Tcp.Bind), _) =>
    log.error(s"Cannot bind to ${b.localAddress}!")
    Stopped

    case Internal.Tcp(Tcp.Connected(remote, local), connection) =>
    log.info(s"Client connected from $remote.")
    val (msgHandler, tcpAdapter) = MsgHandler.spawn(
    s"${remote.getHostString}-${remote.getPort}", ctx, connection,
    handlerRef => NetClient.behavior(
    handlerRef, gamesManager, ctx.self, rtConfig.controlKey, db
    ).narrow
    )
    msgHandlers += msgHandler
    ctx.watchWith(msgHandler, Internal.MsgHandlerTerminated(msgHandler))
    connection ! Tcp.Register(tcpAdapter.asUntyped, keepOpenOnPeerClosed = true)
    Same

    case Internal.MsgHandlerTerminated(ref) =>
    msgHandlers -= ref
    Same

    case Internal.Tcp(_, _) =>
    Unhandled
    }

    case Sig(_, PostStop) =>
    log.info("Shutting down actor system because server has stopped.")
    ctx.system.terminate()
    Stopped
    }
    }.narrow
    }
    7 changes: 7 additions & 0 deletions TypedUntypedActorBridge.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,7 @@
    package implicits.actor

    import akka.actor.ActorRef

    case class TypedUntypedActorBridge(raw: ActorRef, sender: ActorRef) {
    def !(msg: Any) = raw.tell(msg, sender)
    }
    7 changes: 7 additions & 0 deletions UTRefMessageWrapper.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,7 @@
    package implicits.actor

    import akka.actor._

    private[actor] class UTRefMessageWrapper[A, B](f: (A, ActorRef) => B) extends Actor {
    def receive = { case msg => context.parent ! f(msg.asInstanceOf[A], sender()) }
    }
    56 changes: 56 additions & 0 deletions package.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,56 @@
    package implicits

    import akka.event.Logging
    import akka.typed.{ActorContext, ActorRef, ActorSystem}
    import akka.{actor => untyped}

    package object actor {
    object TypedActorSystemExts {
    private val asUntyped =
    classOf[ActorSystem[_]].getDeclaredMethod("untyped")
    }
    implicit class TypedActorSystemExts(val as: ActorSystem[_]) extends AnyVal {
    def asUntyped =
    TypedActorSystemExts.asUntyped.invoke(as).asInstanceOf[akka.actor.ActorSystem]
    }

    object TypedActorRefExts {
    private val asUntyped =
    classOf[ActorRef[_]].getDeclaredMethod("untypedRef")
    }
    implicit class TypedActorRefExts(val ref: ActorRef[_]) extends AnyVal {
    def asUntyped =
    TypedActorRefExts.asUntyped.invoke(ref).asInstanceOf[akka.actor.ActorRef]
    }

    implicit class TypedActorContextExts[A](val ctx: ActorContext[A]) extends AnyVal {
    def createLogging() = Logging(ctx.system.asUntyped, ctx.self.asUntyped)

    /** As `spawnAdapter` but gives access to the untyped ActorRef which sent
    * the original message. */
    def spawnAdapterUTRef[B](f: (B, untyped.ActorRef) => A) =
    ActorRef[B](ctx.actorOf(untyped.Props(new UTRefMessageWrapper(f))))

    /** Watch `ref` and send `msg` to self when it terminates. */
    def watchWith[B](ref: ActorRef[B], msg: A): ActorRef[B] = {
    watchWith(ref, msg, ctx.self)
    ref
    }

    /** Watch `ref` and send `msg` to `sendTo` when it terminates. */
    def watchWith[B, C](ref: ActorRef[B], msg: C, sendTo: ActorRef[C]): ActorRef[B] = {
    import akka.typed._
    import ScalaDSL._
    ctx.spawnAnonymous(Props(ContextAware[Unit] { anonCtx =>
    anonCtx.watch(ref)

    Full {
    case Sig(_, Terminated(`ref`)) =>
    sendTo ! msg
    Stopped
    }
    }))
    ref
    }
    }
    }