Created
October 25, 2015 12:40
-
-
Save anonymous/28accfa8e5f3fe187c4d to your computer and use it in GitHub Desktop.
Revisions
-
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,61 @@ package app.actors import akka.event.Logging import akka.http.scaladsl._ import akka.http.scaladsl.model.HttpHeader.ParsingResult import akka.http.scaladsl.model._ import akka.stream.ActorMaterializer import akka.typed.ScalaDSL._ import akka.typed._ import argonaut.Argonaut._ import implicits.actor._ import infrastructure.GCM import launch.RTConfig import scala.util.Try import scalaz.Scalaz._ object GCMSender { sealed trait In case class Send(message: GCM.Message) extends In object Internal { case class GCMComplete( message: GCM.Message, result: Try[HttpResponse] ) extends In } def behaviour( authHeader: HttpHeader, httpMaterializer: ActorMaterializer ): Behavior[Send] = ContextAware[In] { ctx => val untypedSystem = ctx.system.asUntyped val http = Http(untypedSystem) val log = Logging(untypedSystem, ctx.self.asUntyped) val headers = Vector(authHeader) Static { case Send(m) => log.info("Sending GCM message: {}", m) val body = m.asJson.nospaces log.debug("GCM message as JSON: {}", body) val future = http.singleRequest(HttpRequest( HttpMethods.POST, "https://gcm-http.googleapis.com/gcm/send", headers, HttpEntity(MediaTypes.`application/json`, body) ))(httpMaterializer) // Logging isn't thread safe. import ctx.executionContext future.onComplete(r => ctx.self ! GCMSender.Internal.GCMComplete(m, r)) case Internal.GCMComplete(message, result) => log.info("GCM response for {}: {}", message, result) } }.narrow def authHeader(key: RTConfig.GCM.Key) = HttpHeader.parse("Authorization", s"key=${key.value}") match { case ParsingResult.Ok(header, _) => header.right case ParsingResult.Error(error) => s"Cannot turn '$key' into HTTP header: $error".left } } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,341 @@ package app.actors.game import akka.event.LoggingAdapter import akka.typed.ScalaDSL._ import akka.typed._ import app.actors.NetClient import app.models.game._ import app.models.game.events._ import app.models.game.world.WObject.Id import app.models.game.world._ import app.models.game.world.maps.WorldMaterializer import app.models.game.world.props.ExtractionSpeed import implicits._ import implicits.actor._ import org.joda.time.DateTime import utils.data.{NonEmptyVector, Timeframe} import scala.concurrent.duration._ import scala.language.{existentials, implicitConversions} import scalaz.Scalaz._ import scalaz._ object GameActor { type Ref = akka.typed.ActorRef[In] private[this] type NetClientJoinedRef = ActorRef[NetClient.LoggedInState.GameJoined] type NetClientOutRef = ActorRef[NetClient.InGameState.FromGameActor] sealed trait Message case class ClientData(human: Human, replyTo: NetClientOutRef) sealed trait In extends Message object In { case class Join( clientData: ClientData, replyJoined: NetClientJoinedRef ) extends In case class Warp( clientData: ClientData, position: Vect2, warpable: WarpableCompanion.Some ) extends In /* path does not include objects position and ends in target position */ case class Move( clientData: ClientData, id: WObject.Id, path: NonEmptyVector[Vect2] ) extends In case class Attack( clientData: ClientData, id: WObject.Id, target: Vect2 \/ WObject.Id ) extends In case class MoveAttack( move: Move, target: Vect2 \/ WObject.Id ) extends In case class Special( clientData: ClientData, id: WObject.Id ) extends In case class ToggleWaitingForRoundEnd(clientData: ClientData) extends In case class Concede(clientData: ClientData) extends In } private[this] sealed trait Internal extends Message private[this] object Internal { case object CheckTurnTime extends Internal } sealed trait NetClientOut implicit def asNetClient(msg: NetClientOut): NetClient.InGameState.FromGameActor = NetClient.InGameState.FromGameActor(msg) object NetClientOut { case class Init( id: World.Id, bounds: Bounds, objects: WorldObjs.All, warpZonePoints: Iterable[Vect2], visiblePoints: Iterable[Vect2], selfTeam: Team, otherTeams: Iterable[Team], self: HumanState, others: Iterable[(Player, Option[HumanState])], warpableObjects: Iterable[WarpableStats], objectives: RemainingObjectives, currentTurn: TurnStartedEvt, extractionSpeeds: Set[ExtractionSpeed] ) extends NetClientOut case class Events(events: Vector[FinalEvent]) extends NetClientOut case class Error(error: String) extends NetClientOut } private[this] def initMsg(human: Human, gaGame: GameActorGame) (implicit log: LoggingAdapter): String \/ NetClientOut.Init = { val game = gaGame.game val visibleGame = game.visibleBy(human) val states = visibleGame.states val resourceMap = visibleGame.world.resourcesMap def stateFor(p: Player): String \/ HumanState = for { gameState <- states.get(p). toRightDisjunction(s"can't get game state for $p in $states") resources <- resourceMap.get(p). toRightDisjunction(s"can't get game state for $p in $resourceMap") } yield HumanState(resources, visibleGame.world.populationFor(p), gameState) stateFor(human).map { selfState => NetClientOut.Init( game.world.id, visibleGame.world.bounds, visibleGame.world.objects ++ game.world.noLongerVisibleImmovableObjectsFor(human.team), visibleGame.world.warpZoneMap.map.keys.map(_._1), visibleGame.world.visibilityMap.map.keys.map(_._1), human.team, game.world.teams - human.team, selfState, (game.world.players - human).map { player => player -> ( if (player.isFriendOf(human)) stateFor(player).toOption else None ) }, selfState.gameState.canWarp, game.remainingObjectives(human.team), TurnStartedEvt(gaGame.currentPlayer, gaGame.currentTurnTimeframe), ExtractionSpeed.values ) } } private def events( human: Human, ref: NetClientOutRef, events: Events )(implicit log: LoggingAdapter): Unit = { log.debug("### Dispatching events for {} ###", human) log.debug("Events ({}):", events.size) val viewedEvents = events.flatMap { event => log.debug("* {}", event) val viewed = event.asViewedBy(human) if (log.isDebugEnabled) viewed.foreach(log.debug("*** {}", _)) viewed } ref ! NetClientOut.Events(viewedEvents) } case class StartingHuman( human: Human, resources: Resources, replyJoined: NetClientJoinedRef, client: NetClientOutRef ) { def game = Game.StartingPlayer(human, resources) } def behavior( worldMaterializer: WorldMaterializer, turnTimerSettings: Option[TurnTimers.Settings], aiTeam: Team, starting: Set[GameActor.StartingHuman] ): Behavior[In] = ContextAware[Message] { ctx => implicit val log = ctx.createLogging() def scheduleCheckTurnTime() = ctx.schedule(1.second, ctx.self, Internal.CheckTurnTime) log.debug( "initializing game actor: starting={} turnTimer={}, aiTeam={}", starting, turnTimerSettings, aiTeam ) var clients = starting.map(data => data.human -> data.client).toMap var game: GameActorGame = { val humanTeams = starting.map(_.human.team) val world = worldMaterializer.materialize(humanTeams).right_! log.debug("World initialized to {}", world) val objectives = Map( aiTeam -> Objectives( destroyAllCriticalObjects = Some(Objective.DestroyAllCriticalObjects) ) ) ++ humanTeams.map { _ -> Objectives( // gatherResources = Some(Objective.GatherResources(world, Resources(200), Percentage(0.15))), // collectVps = Some(Objective.CollectVPs(VPS(10))), destroyAllCriticalObjects = Some(Objective.DestroyAllCriticalObjects) ) }.toMap log.debug("Objectives initialized to {}", objectives) SemiRealtimeGame( world, starting.map(_.game), objectives, turnTimerSettings.map(WithCurrentTime(_, DateTime.now)) ).fold( err => throw new IllegalStateException(s"Cannot initialize game: $err"), evented => { log.debug("Turn based game initialized to {}", evented) starting.foreach { data => val init = initMsg(data.human, evented.value).right_! data.replyJoined ! NetClient.LoggedInState.GameJoined(data.human, ctx.self) // We need to init the game to starting state. data.client ! init events(data.human, data.client, evented.events) } starting.foreach { data => events(data.human, data.client, evented.events) } evented.value } ) } def checkedTurnTimes = game.checkTurnTimes(DateTime.now) def update( requester: NetClientOutRef, f: GameActorGame => GameActorGame.Result ): Behavior[Message] = { log.debug("Updating game by a request from {}", requester) val afterTimeCheck = checkedTurnTimes afterTimeCheck.value.fold( _ => postGameChange(afterTimeCheck), tbg => f(tbg).map(evt => afterTimeCheck.events ++: evt).fold( err => { log.error(err) requester ! NetClientOut.Error(err) Same }, postGameChange ) ) } def postGameChange( evented: Evented[Winner \/ GameActorGame] ): Behavior[Message] = { dispatchEvents(evented.events) evented.value.fold( winner => { log.info("Game is finished, won by {}", winner) Stopped }, g => { game = g Same } ) } def dispatchEvents(events: Events): Unit = { if (events.nonEmpty) clients.foreach { case (human, ref) => GameActor.events(human, ref, events) } } var turnTimerChecker = scheduleCheckTurnTime() Full { case Sig(_, PostStop) => turnTimerChecker.cancel() Same case Msg(_, msg) => msg match { case Internal.CheckTurnTime => postGameChange(checkedTurnTimes) turnTimerChecker = scheduleCheckTurnTime() Same case In.Join(ClientData(human, replyTo), joinedRef) => joinedRef ! NetClient.LoggedInState.GameJoined(human, ctx.self) def doInit(gaGame: GameActorGame): Unit = { replyTo ! initMsg(human, gaGame).right_! clients += human -> replyTo } if (game.isJoined(human)) { log.info("Rejoining {} to {}", human, ctx.self) doInit(game) } else { log.error("Unknown human trying to join the game: {}", human) } Same case In.Warp(clientData, position, warpable) => update(clientData.replyTo, _.warp(clientData.human, position, warpable, DateTime.now)) case In.Move(clientData, id, path) => update(clientData.replyTo, _.move(clientData.human, id, path, DateTime.now)) case In.Attack(clientData, id, target) => update(clientData.replyTo, _.attack(clientData.human, id, target, DateTime.now)) case In.MoveAttack(move, target) => update( move.clientData.replyTo, _.moveAttack(move.clientData.human, move.id, move.path, target, DateTime.now) ) case In.Special(clientData, id) => update(clientData.replyTo, _.special(clientData.human, id, DateTime.now)) case In.ToggleWaitingForRoundEnd(clientData) => update(clientData.replyTo, _.toggleWaitingForRoundEnd(clientData.human, DateTime.now)) case In.Concede(clientData) => update(clientData.replyTo, _.concede(clientData.human, DateTime.now)) } } }.narrow } object GameActorGame { type Result = Game.ResultT[Winner \/ GameActorGame] } trait GameActorGame { import GameActorGame._ def warp(human: Human, position: Vect2, warpable: WarpableCompanion.Some, now: DateTime) (implicit log: LoggingAdapter): Result def move(human: Human, id: WObject.Id, path: NonEmptyVector[Vect2], now: DateTime) (implicit log: LoggingAdapter): Result def special(human: Human, id: WObject.Id, now: DateTime)(implicit log: LoggingAdapter): Result def attack(human: Human, id: WObject.Id, target: Vect2 \/ WObject.Id, now: DateTime) (implicit log: LoggingAdapter): Result def moveAttack( human: Human, id: Id, path: NonEmptyVector[Vect2], target: Vect2 \/ WObject.Id, now: DateTime )(implicit log: LoggingAdapter): Result def toggleWaitingForRoundEnd(human: Human, now: DateTime)(implicit log: LoggingAdapter): Result def concede(human: Human, now: DateTime)(implicit log: LoggingAdapter): Result def game: Game def isJoined(human: Human)(implicit log: LoggingAdapter): Boolean def currentPlayer: Player def currentTurnTimeframe: Option[Timeframe] def currentTurnStartedEvt = TurnStartedEvt(currentPlayer, currentTurnTimeframe) def checkTurnTimes(time: DateTime)(implicit log: LoggingAdapter) : Evented[Winner \/ GameActorGame] } trait GameActorGameStarter[GAGame <: GameActorGame] { type StartedGame = String \/ Evented[GAGame] def apply( world: World, starting: Set[Game.StartingPlayer], objectives: Game.ObjectivesMap, turnTimerSettings: Option[WithCurrentTime[TurnTimers.Settings]] )(implicit log: LoggingAdapter): StartedGame = { val game = Game(world, starting, objectives) game.flatMap(apply(_, turnTimerSettings)) } def apply(game: Game, turnTimerSettings: Option[WithCurrentTime[TurnTimers.Settings]]) (implicit log: LoggingAdapter): StartedGame = { val turnTimers = turnTimerSettings.map(_.map(TurnTimers(game.world.humans, _))) startNewGame(game, turnTimers) } protected[this] def startNewGame( game: Game, turnTimers: Option[WithCurrentTime[TurnTimers]] )(implicit log: LoggingAdapter): String \/ Evented[GAGame] } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,359 @@ package app.actors.game import java.util.UUID import akka.event.Logging import akka.typed.ScalaDSL._ import akka.typed._ import app.actors.NetClient.LoggedInState.JoinGame.{PvPMode, Mode} import app.actors.game.GameActor.{ClientData, StartingHuman} import app.actors.{GCMSender, NetClient} import app.models.User import app.models.game.world.maps.{GameMaps, SingleplayerMap, WorldMaterializer} import app.models.game.world.{ExtractorStats, Resources, World} import app.models.game.{Bot, Human, Team, TurnTimers} import implicits._, implicits.actor._ import infrastructure.GCM import launch.RTConfig import org.joda.time.DateTime import spire.math.UInt import scala.concurrent.duration._ import scalaz.Scalaz._ import scalaz.effect.IO object GamesManagerActor { type Ref = ActorRef[In] val StartingResources = ExtractorStats.cost * Resources(4) sealed trait Message sealed trait In extends Message object In { case class FromNetClient(msg: NetClient.GamesManagerFwd) extends In // After user connects to the server, he should check whether he is in game or not. case class CheckUserStatus(user: User, client: NetClient.LoggedInRef) extends In // Game joining case class Join( user: User, mode: NetClient.LoggedInState.JoinGame.Mode, replyTo: NetClient.LoggedInRef ) extends In case class CancelJoinGame( user: User, replyTo: ActorRef[NetClient.LoggedInState.JoinGameCancelled.type] ) extends In // Stats report for control client case class StatsReport(replyTo: ActorRef[StatsReportData]) extends In case object ShutdownInitiated extends In } private[this] sealed trait Internal extends Message private[this] object Internal { case object CleanupBackgroundWaitingList extends Internal /* Check if we can shutdown. */ case object CheckShutdown extends Internal case class GameTerminated(ref: GameActor.Ref) extends Internal case class ClientTerminated(ref: NetClient.LoggedInRef) extends Internal } // TODO: proper singleplayer // object PVEGame { // sealed trait PresetTeam { // def gameTeam: Team // } // object PresetTeam { // object Red extends PresetTeam { val gameTeam = Team() } // object Blue extends PresetTeam { val gameTeam = Team() } // } // // val empty = PVEGame(None, Set.empty, Set.empty) // } // case class PVEGame(ref: Option[ActorRef], redTeamPlayers: Set[User], blueTeamPlayers: Set[User]) { // def giveTeam: PVEGame.PresetTeam = // redTeamPlayers.size ?|? blueTeamPlayers.size match { // case Ordering.LT => PVEGame.PresetTeam.Red // case Ordering.GT => PVEGame.PresetTeam.Blue // case Ordering.EQ => if (Random.chance(0.5)) PVEGame.PresetTeam.Red else PVEGame.PresetTeam.Blue // } // // def add(user: User, team: PresetTeam): PVEGame = team match { // case PresetTeam.Red => copy(redTeamPlayers = redTeamPlayers + user) // case PresetTeam.Blue => copy(blueTeamPlayers = blueTeamPlayers + user) // } // } case class StatsReportData(users: UInt, games: UInt) case class BackgroundToken(value: String) extends AnyVal object BackgroundToken { val newToken = IO { BackgroundToken(UUID.randomUUID().toString) } } case class WaitingListEntry( user: User, client: NetClient.LoggedInRef, backgroundToken: BackgroundToken ) private def joinGame( game: GameActor.Ref, human: Human, client: NetClient.LoggedInRef ): Unit = game ! GameActor.In.Join(ClientData(human, client), client) def behaviour( maps: GameMaps, gcm: Option[(ActorRef[GCMSender.Send], RTConfig.GCM)] )(implicit rtConfig: RTConfig): Behavior[In] = ContextAware[Message] { ctx => val log = ctx.createLogging() def scheduleCleanup(): Unit = ctx.schedule(1.second, ctx.self, Internal.CleanupBackgroundWaitingList) def scheduleShutdownMode(): Unit = ctx.schedule(1.second, ctx.self, Internal.CheckShutdown) var waitingList = Vector.empty[WaitingListEntry] // token -> last heartbeat var waitingInBackground = Map.empty[BackgroundToken, DateTime] var user2game = Map.empty[User, (GameActor.Ref, Human)] var game2humans = Map.empty[GameActor.Ref, Set[Human]] def removeBackgroundToken(token: BackgroundToken): Unit = { log.info("Removing background token: {}", token) waitingInBackground -= token notifyGCM() } def notifyGCM(): Unit = { gcm.foreach { case (ref, cfg) => val foreground = GCM.Data.SearchingForOpponent.InForeground(UInt(waitingList.size)) val background = GCM.Data.SearchingForOpponent.InBackground(UInt(waitingInBackground.size)) ref ! GCMSender.Send( GCM.searchingForOpponent(foreground, background, cfg.searchForOpponentTTL) ) } } def noExistingGame( user: User, mode: Mode, client: NetClient.LoggedInRef ): Unit = { mode match { case Mode.Singleplayer => //launchRandomGenerated(user, client) launchPVE(user, client) case pvp: PvPMode => val token = BackgroundToken.newToken.unsafePerformIO() val entry = WaitingListEntry(user, client, token) waitingList :+= entry if (waitingList.size < pvp.playersNeeded) { log.debug( "Added {} from {} to {} waiting list: {}", user, client, mode, waitingList ) notifyGCM() ctx.watchWith(client, Internal.ClientTerminated(client)) client ! NetClient.LoggedInState.WaitingListJoined(token) } else fromWaitingList(pvp) } } def launchPVE(user: User, client: NetClient.LoggedInRef) = { // TODO: proper PVE // val team = pveGame.giveTeam // if (pveGame.ref.isEmpty) { // val game = createGame( // maps.pve.random, Some(TurnTimers.Settings()), Team(), // Set(StartingHuman(Human(user, team.gameTeam), StartingResources, client)) // ) // pveGame = pveGame.copy(ref = Some(game)) // } // pveGame = pveGame.add(user, team) createGame( maps.pve.random, None, Team(), Set(StartingHuman(Human(user, Team()), StartingResources, client, client)) ) } // def launchRandomGenerated(user: User, client: NetClient.LoggedInRef) = { // val materializer = SingleplayerMap { data => implicit log => // val npcTeam = Team() // val npcBot = Bot(npcTeam) // val spawnerBot = Bot(npcTeam) // World.create( // data.humanTeam, () => npcBot, () => spawnerBot, staticObjectsKnownAtStart = false // ) // } // createGame( // materializer, None, Team(), // Set(StartingHuman(Human(user, Team()), StartingResources, client, client)) // ) // } def fromWaitingList(mode: PvPMode): Unit = { val (entries, newWaitingList) = waitingList.splitAt(mode.playersNeeded) waitingList = newWaitingList notifyGCM() val teams = Vector.fill(mode.teams)(Team()) val players = entries.zipWithIndex.map { case (entry, idx) => val team = teams.wrapped(idx) StartingHuman( Human(entry.user, team), StartingResources, entry.client, entry.client ) }.toSet log.debug( "Fetched {} from waiting list for mode {}, rest={}", players, mode, newWaitingList ) // TODO: will fail if we have more teams than any of the maps support val map = maps.pvpMapFor(mode.playersNeeded).right_!.unsafePerformIO() val npcTeam = Team() createGame(map, Some(TurnTimers.Settings()), npcTeam, players) } def createGame( worldMaterializer: WorldMaterializer, turnTimerSettings: Option[TurnTimers.Settings], npcTeam: Team, starting: Set[GameActor.StartingHuman] ): GameActor.Ref = { val game = ctx.spawnAnonymous(Props(GameActor.behavior( worldMaterializer, turnTimerSettings, npcTeam, starting ))) ctx.watchWith(game, Internal.GameTerminated(game)) starting.foreach { data => user2game += data.human.user -> ((game, data.human)) } game2humans += game -> starting.map(_.human) log.info("Game {} created for {}", game, starting) game } scheduleCleanup() Total[Message] { case Internal.CleanupBackgroundWaitingList => val now = DateTime.now() val expiredKeys = waitingInBackground.keys.filter { token => val lastBeat = waitingInBackground(token) val timePassed = now - lastBeat val active = timePassed <= rtConfig.gamesManager.backgroundHeartbeatTTL.duration if (! active) log.debug( "Timing out background token {}: {} > {}", token, timePassed, rtConfig.gamesManager.backgroundHeartbeatTTL.duration ) !active } expiredKeys.foreach(waitingInBackground -= _) if (expiredKeys.nonEmpty) notifyGCM() scheduleCleanup() Same case In.ShutdownInitiated => log.info("Shutdown mode initiated.") scheduleShutdownMode() Same case Internal.CheckShutdown => val games = game2humans.size log.debug("Checking for shutdown state, games: {}", games) if (games === 0) { log.info("No games alive, shutting down.") ctx.system.terminate() Stopped } else { scheduleShutdownMode() Same } case In.CheckUserStatus(user, client) => user2game.get(user).foreach { case (game, human) => log.info("{} joining game {} on user status check", human, game) joinGame(game, human, client) } Same case In.Join(user, mode, replyTo) => user2game.get(user).fold2( { if (waitingList.exists(_.user === user)) log.warning( "Not joining a new game, because {} is already in a waiting list, ref: {}", user, replyTo ) else noExistingGame(user, mode, replyTo) }, { case (game, human) => log.info("{} joining game {} on game join", human, game) joinGame(game, human, replyTo) } ) Same case In.CancelJoinGame(user, replyTo) => waitingList.indexWhere(_.user === user) match { case -1 => log.warning("Not cancelling join game, because {} is not in a waiting list.", user) case idx => val entry = waitingList(idx) ctx.unwatch(entry.client) waitingList = waitingList.removeAt(idx) notifyGCM() replyTo ! NetClient.LoggedInState.JoinGameCancelled } Same case In.StatsReport(replyTo) => replyTo ! StatsReportData(UInt(user2game.size), UInt(game2humans.size)) Same case In.FromNetClient(NetClient.NotLoggedInState.CancelBackgroundToken(token)) => removeBackgroundToken(token) Same case In.FromNetClient(NetClient.MsgHandlerConnectionIn.BackgroundSFO(kind, token)) => if (waitingInBackground contains token) { kind match { case NetClient.MsgHandlerConnectionIn.BackgroundSFO.Kind.Heartbeat => waitingInBackground += token -> DateTime.now() log.debug("Background heartbeat from {}", token) case NetClient.MsgHandlerConnectionIn.BackgroundSFO.Kind.Cancel => removeBackgroundToken(token) } } else { // TODO: should we tell sender that his heartbeat was expired? log.info("Ignoring background {} from unknown token: {}", kind, token) } Same case Internal.ClientTerminated(ref) => waitingList.zipWithIndex.collectFirst { case (entry @ WaitingListEntry(_, `ref`, _), idx) => (entry, idx) }.foreach { case (entry, idx) => log.info("{} going into background", entry) waitingList = waitingList.removeAt(idx) waitingInBackground += entry.backgroundToken -> DateTime.now() notifyGCM() } Same case Internal.GameTerminated(ref) => game2humans.get(ref) match { case Some(humans) => log.info("Game {} terminated for humans {}", ref, humans) game2humans -= ref humans.foreach { human => user2game -= human.user } case None => log.warning( "Game {} terminated, but can't find it in our state!", ref ) } Same } }.narrow } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,186 @@ package app.actors import java.nio.ByteOrder import akka.io.Tcp import akka.typed.ScalaDSL._ import akka.typed._ import akka.util.ByteString import akka.{actor => untyped} import app.protobuf.parsing.Parsing import app.protobuf.serializing.Serializing import implicits.actor._ import utils.network.IntFramedPipeline import utils.network.IntFramedPipeline.Frame import scala.language.implicitConversions import scalaz.Scalaz._ import scalaz._ object MsgHandler { type Ref = ActorRef[In] sealed trait Message sealed trait In extends Message object In { sealed trait Control extends In object Control { case object ShutdownInitiated extends Control with NetClientFwd } case class FromNetClient(msg: NetClient.MsgHandlerOut) extends In } // Messages forwarded to NetClient sealed trait NetClientFwd implicit def asNetClientFwd(msg: NetClientFwd): NetClient.MsgHandlerIn.FwdFromMsgHandler = NetClient.MsgHandlerIn.FwdFromMsgHandler(msg) private[this] sealed trait Internal extends Message private[this] object Internal { case class Tcp(msg: akka.io.Tcp.Event) extends Internal case object Ack extends akka.io.Tcp.Event with Internal } def spawn( name: String, ctx: ActorContext[_], connection: untyped.ActorRef, netClientBehavior: Ref => Behavior[NetClient.MsgHandlerIn], maxToClientBufferSize: Int = 1024 * 1024 )(implicit byteOrder: ByteOrder) = { lazy val tcpAdapter: ActorRef[Tcp.Event] = ctx.spawn( Props(ContextAware[Tcp.Event] { tcpCtx => tcpCtx.watch(main) Full { case Msg(_, msg) => main ! Internal.Tcp(msg) Same case Sig(_, Terminated(`main`)) => Stopped } }), s"$name-tcp-adapter" ) lazy val main: ActorRef[Message] = { val bridge = TypedUntypedActorBridge(connection, tcpAdapter.asUntyped) ctx.spawn( Props(behavior(bridge, netClientBehavior, maxToClientBufferSize)), name ) } (main: Ref, tcpAdapter) } private[this] def behavior( connection: TypedUntypedActorBridge, netClientBehavior: Ref => Behavior[NetClient.MsgHandlerIn], maxToClientBufferSize: Int )(implicit byteOrder: ByteOrder): Behavior[Message] = ContextAware { ctx => implicit val log = ctx.createLogging() val pipeline = new MsgHandlerPipeline val lowWatermark = maxToClientBufferSize / 4 val highWatermark = maxToClientBufferSize * 3 / 4 var storage = Vector.empty[ByteString] var stored = 0 var closing = false var suspended = false val netClient = ctx.spawn(Props(netClientBehavior(ctx.self)), "net-client") ctx.watch(netClient) ctx.watch(connection.raw) val common = Partial[Message] { case Internal.Tcp(Tcp.Received(data)) => pipeline.unserialize(data).foreach { case -\/(err) => log.error(err) case \/-(msg) => netClient ! msg } Same case msg: In.Control.ShutdownInitiated.type => netClient ! msg Same } lazy val buffering = Partial[Message] { case In.FromNetClient(msg) => buffer(pipeline.serialize(msg)) case Internal.Ack => acknowledge() case Internal.Tcp(msg: Tcp.ConnectionClosed) => log.info(s"closing = true by {}.", msg) closing = true Same } lazy val normal = Partial[Message] { case In.FromNetClient(msg) => val data = pipeline.serialize(msg) buffer(data) connection ! Tcp.Write(data, Internal.Ack) buffering case Internal.Tcp(msg: Tcp.ConnectionClosed) => log.info(s"Connection closed by {}.", msg) Stopped } def buffer(data: ByteString): Behavior[Message] = { storage :+= data stored += data.size if (stored > maxToClientBufferSize) { log.warning(s"drop connection to [{}] (buffer overrun)", connection) Stopped } else if (stored > highWatermark) { log.debug(s"suspending reading") connection ! Tcp.SuspendReading suspended = true Same } else Same } def acknowledge(): Behavior[Message] = { require(storage.nonEmpty, "storage was empty") val size = storage.head.size stored -= size storage = storage.tail if (suspended && stored < lowWatermark) { log.debug("resuming reading") connection ! Tcp.ResumeReading suspended = false } if (storage.isEmpty) { if (closing) Stopped else normal } else { connection ! Tcp.Write(storage.head, Internal.Ack) Same } } Or(common, normal) } } class MsgHandlerPipeline(implicit byteOrder: ByteOrder) { private[this] val intFramed = new IntFramedPipeline() def unserialize(data: ByteString) = intFramed.fromFramedData(data).map { frame => Parsing.parse(frame.data).leftMap(err => s"Cannot decode $frame into message: $err") } def serialize(data: NetClient.MsgHandlerOut) = data |> Serializing.serialize |> Frame |> intFramed.withFrameSize } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,364 @@ package app.actors import java.util.UUID import akka.typed._, ScalaDSL._, AskPattern._ import app.actors.game.GameActor.ClientData import netmsg.ProtoChecksum import spire.math.UInt import scala.concurrent.Future import scala.concurrent.duration._ import akka.util.Timeout import app.actors.net_client._ import app.actors.game.{GamesManagerActor, GameActor} import app.models._ import app.models.game.Human import app.persistence.tables.Tables import implicits._, implicits.actor._ import scalaz._, Scalaz._ import app.persistence.DBDriver._ import org.joda.time.DateTime import scala.language.implicitConversions import scala.util.Try object NetClient { type Ref = ActorRef[In] type LoggedInRef = ActorRef[IsLoggedIn] type GameInMsg = GameActor.ClientData => GameActor.In sealed trait In // Messages forwarded to GamesManager sealed trait GamesManagerFwd { _: In => } implicit def asGamesManager(msg: GamesManagerFwd): GamesManagerActor.In.FromNetClient = GamesManagerActor.In.FromNetClient(msg) // Messages that come from MsgHandler. sealed trait MsgHandlerIn extends In object MsgHandlerIn { case class FwdFromMsgHandler(msg: MsgHandler.NetClientFwd) extends MsgHandlerIn } // Messages sent to MsgHandler sealed trait MsgHandlerOut implicit def asMsgHandler(msg: MsgHandlerOut): MsgHandler.In.FromNetClient = MsgHandler.In.FromNetClient(msg) // Messages that come via the TCP connection sealed trait MsgHandlerConnectionIn extends MsgHandlerIn // Messages that come from the game client sealed trait GameClientIn extends MsgHandlerConnectionIn sealed trait GameClientOut extends MsgHandlerOut // Messages that are defined in management protobuf. sealed trait ManagementIn extends GameClientIn sealed trait ManagementOut extends GameClientOut // Messages that come from the control client case class Control(key: ControlSecretKey, msg: Control.In) extends MsgHandlerConnectionIn object Control { sealed trait In object In { case object Shutdown extends In case object Status extends In } sealed trait Out extends MsgHandlerOut object Out { case class GenericReply(success: Boolean, message: Option[String]) extends Out object GenericReply { val success = GenericReply(success = true, None) def error(msg: String) = GenericReply(success = false, Some(msg)) } case class Status( tcpClients: Option[UInt], playingUsers: Option[UInt], games: Option[UInt] ) extends Out { override def toString = { def asStr(o: Option[UInt]) = o.fold2("-", _.toString()) s"Status[tcp clients: ${asStr(tcpClients)}, playing users: ${ asStr(playingUsers)}, games: ${asStr(games)}]" } } } } object MsgHandlerConnectionIn { case class TimeSync(clientNow: DateTime) extends GameClientIn case class TimeSyncReply(clientNow: DateTime, serverNow: DateTime) extends GameClientOut // Background searching for opponent heartbeat case class BackgroundSFO( kind: BackgroundSFO.Kind, token: GamesManagerActor.BackgroundToken ) extends MsgHandlerConnectionIn with GamesManagerFwd object BackgroundSFO { sealed trait Kind object Kind { case object Heartbeat extends Kind case object Cancel extends Kind } } } sealed trait NotLoggedInState extends MsgHandlerConnectionIn object NotLoggedInState { case object ProtoVersionCheck extends NotLoggedInState with GameClientIn case class ProtoVersionCheckReply(checksum: String) extends GameClientOut // After client connects in it should cancel the active background token. case class CancelBackgroundToken( token: GamesManagerActor.BackgroundToken ) extends NotLoggedInState with GamesManagerFwd with ManagementIn case object AutoRegister extends NotLoggedInState with ManagementIn case class Login(credentials: Credentials) extends NotLoggedInState with ManagementIn sealed trait LoginResponse extends ManagementOut object LoginResponse { case object InvalidCredentials extends LoginResponse case class LoggedIn( user: User, token: SessionToken, autogenerated: Boolean ) extends LoginResponse } } sealed trait IsLoggedIn extends In sealed trait LoggedInState extends IsLoggedIn object LoggedInState { object JoinGame { sealed trait Mode sealed trait PvPMode extends Mode { def playersPerTeam: Int def teams: Int def playersNeeded = teams * playersPerTeam } object Mode { case object Singleplayer extends Mode case object OneVsOne extends PvPMode { def playersPerTeam = 1; def teams = 2 } } } case class JoinGame(mode: JoinGame.Mode) extends LoggedInState with ManagementIn case class GameJoined(human: Human, game: GameActor.Ref) extends LoggedInState with ManagementOut case object CancelJoinGame extends LoggedInState with ManagementIn case object JoinGameCancelled extends LoggedInState with ManagementOut case class CheckNameAvailability(name: String) extends LoggedInState with ManagementIn case class CheckNameAvailabilityResponse( name: String, available: Boolean ) extends ManagementOut case class Register( username: String, password: PlainPassword, email: String ) extends LoggedInState with ManagementIn case class RegisterResponse(newToken: Option[SessionToken]) extends ManagementOut case class WaitingListJoined(token: GamesManagerActor.BackgroundToken) extends LoggedInState with ManagementOut } sealed trait InGameState extends IsLoggedIn object InGameState { case class FromMsgHandler(msg: GameInMsg) extends InGameState with GameClientIn case class FromGameActor(msg: GameActor.NetClientOut) extends InGameState with GameClientOut } def behavior( msgHandler: MsgHandler.Ref, gamesManager: GamesManagerActor.Ref, server: Server.Ref, controlKey: ControlSecretKey, db: Database ): Behavior[In] = ContextAware[In] { ctx => val log = ctx.createLogging() ctx.watch(msgHandler) def handleControl(c: Control): Future[Control.Out] = { if (c.key === controlKey) c.msg match { case Control.In.Shutdown => server ! Server.In.Unbind Future.successful(Control.Out.GenericReply.success) case Control.In.Status => import ctx.executionContext implicit val timeout = Timeout(3.seconds) def asOption[A](f: Future[A]) = f.map(Some.apply).recover { case e => log.error("Error while asking: {}", e) None } val clientsCountF = (server ? Server.In.ReportClientCount) |> asOption val gamesCountF = (gamesManager ? GamesManagerActor.In.StatsReport) |> asOption (clientsCountF zip gamesCountF).map { case (clients, gameManagerOpt) => Control.Out.Status( clients, gameManagerOpt.map(_.users), gameManagerOpt.map(_.games) ) } } else Future.successful( Control.Out.GenericReply.error(s"Invalid control key '${c.key}'") ) } var shutdownInitiated = false var inGameOpt = Option.empty[(GameActor.Ref, Human)] val common = Full[In] { case Sig(_, PostStop) => if (shutdownInitiated) { inGameOpt.foreach { case (gameRef, human) => // Auto-concede if lost connection when shutdown is initiated. log.info("Auto conceding because lost connection in shutdown mode.") gameRef ! GameActor.In.Concede(ClientData(human, ctx.self)) } } Same case Msg(_, MsgHandlerConnectionIn.TimeSync(clientNow)) => msgHandler ! MsgHandlerConnectionIn.TimeSyncReply(clientNow, DateTime.now) Same case Msg(_, m: MsgHandlerConnectionIn.BackgroundSFO) => gamesManager ! m Same case Msg(_, MsgHandlerIn.FwdFromMsgHandler(MsgHandler.In.Control.ShutdownInitiated)) => shutdownInitiated = true Same case Msg(_, c: Control) => import ctx.executionContext handleControl(c).onComplete { case util.Success(m) => msgHandler ! m case util.Failure(err) => log.error("Error while handling control message {}: {}", c, err) } Same } lazy val notLoggedIn = { def logIn( self: ActorRef[In], user: User, sessionToken: SessionToken, autogenerated: Boolean ) = { msgHandler ! NotLoggedInState.LoginResponse.LoggedIn(user, sessionToken, autogenerated) gamesManager ! GamesManagerActor.In.CheckUserStatus(user, self) loggedIn(user) } Partial[In] { case msg: NotLoggedInState => msg match { case NotLoggedInState.ProtoVersionCheck => msgHandler ! NotLoggedInState.ProtoVersionCheckReply(ProtoChecksum.checksum) Same case m: NotLoggedInState.CancelBackgroundToken => gamesManager ! m Same case NotLoggedInState.AutoRegister => val password = PlainPassword(UUID.randomUUID().shortStr) val sessionToken = SessionToken.random() val id = UUID.randomUUID() val user = User(id, s"autogen-${id.shortStr}") val credentials = Credentials(user.name, password) db.withSession { implicit session => Tables.users. map(t => (t.user, t.sessionToken, t.password, t.email)). insert((user, sessionToken.value, password.encrypted, None)) } logIn(ctx.self, user, sessionToken, autogenerated = true) case NotLoggedInState.Login(credentials) => val optQ = Tables.users. filter(t => t.name === credentials.name). map(t => (t.id, t.sessionToken, t.email, t.password)) val idOpt = db.withSession(optQ.firstOption(_)).filter { case (_, sessionToken, _, pwHash) => credentials.check(sessionToken, pwHash) }.map(t => (t._1, SessionToken(t._2), t._3.isEmpty)) idOpt.fold2( { msgHandler ! NotLoggedInState.LoginResponse.InvalidCredentials Same }, { case (id, token, autogenerated) => logIn(ctx.self, User(id, credentials.name), token, autogenerated) } ) } } } def loggedIn(user: User): Behavior[In] = Partial[In] { case msg: LoggedInState => msg match { case LoggedInState.CheckNameAvailability(name) => val query = Tables.users.map(_.name).filter(_ === name).exists val exists = db.withSession(query.run(_)) msgHandler ! LoggedInState.CheckNameAvailabilityResponse(name, ! exists) Same case LoggedInState.Register(username, password, email) => val token = SessionToken.random() val query = Tables.users. filter(t => t.id === user.id && t.email.isEmpty). map(t => (t.name, t.email, t.password, t.sessionToken)) val success = Try { db.withSession(query.update(( username, Some(email), password.encrypted, token.value ))(_)) }.getOrElse(0) === 1 msgHandler ! LoggedInState.RegisterResponse(if (success) Some(token) else None) Same case LoggedInState.JoinGame(mode) => gamesManager ! GamesManagerActor.In.Join(user, mode, ctx.self) Same case msg: LoggedInState.WaitingListJoined => msgHandler ! msg Same case msg @ LoggedInState.GameJoined(human, game) => msgHandler ! msg inGame(user, human, game) case LoggedInState.CancelJoinGame => gamesManager ! GamesManagerActor.In.CancelJoinGame(user, ctx.self) Same case msg: LoggedInState.JoinGameCancelled.type => msgHandler ! msg Same } } def inGame(user: User, human: Human, game: GameActor.Ref) = { inGameOpt = Some((game, human)) ctx.watch(game) Full[In] { case Sig(_, Terminated(`game`)) => log.error("Game was terminated") inGameOpt = None loggedIn(user) case Msg(_, msg: InGameState) => msg match { case InGameState.FromMsgHandler(msgFn) => val msg = msgFn(ClientData(human, ctx.self)) game ! msg case gameMsg: InGameState.FromGameActor => msgHandler ! gameMsg } Same } } Or(common, notLoggedIn) }.narrow } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,122 @@ package app.actors import java.net.InetSocketAddress import java.nio.ByteOrder import akka.io.{IO, Tcp} import akka.typed.ScalaDSL._ import akka.typed._ import akka.{actor => untyped} import app.actors.game.GamesManagerActor import app.persistence.DBDriver import implicits._ import implicits.actor._ import launch.RTConfig import spire.math.UInt object Server { type Ref = ActorRef[In] sealed trait Message sealed trait In extends Message object In { case class ReportClientCount(replyTo: ActorRef[UInt]) extends In case object Unbind extends In } private[this] sealed trait Internal extends Message private[this] object Internal { case class Tcp( msg: akka.io.Tcp.Event, sender: untyped.ActorRef ) extends Internal case class MsgHandlerTerminated(ref: MsgHandler.Ref) extends Internal } object Out { } def behavior( rtConfig: RTConfig, gamesManager: GamesManagerActor.Ref, db: DBDriver.Database )(implicit byteOrder: ByteOrder): Behavior[In] = ContextAware[Message] { ctx => def port = rtConfig.port val log = ctx.createLogging() val tcpAdapter = ctx.spawnAdapterUTRef(Internal.Tcp).asUntyped { // If we want to access manager outside of this scope we probably need to wrap // it in a bridge. val manager = IO(Tcp)(ctx.system.asUntyped) manager.tell( Tcp.Bind(tcpAdapter, new InetSocketAddress(port.signed)), tcpAdapter ) } /* Actor that is handling our bound socket. */ var socketRef = Option.empty[TypedUntypedActorBridge] var msgHandlers = Set.empty[MsgHandler.Ref] Full { case Msg(_, msg) => msg match { case In.Unbind => socketRef.fold2( log.error("Can't unbind, socket not bound to {}", port), ref => { log.debug("Received a request to unbind, forwarding to {}", ref) ref ! Tcp.Unbind } ) Same case In.ReportClientCount(replyTo) => replyTo ! UInt(msgHandlers.size) Same case Internal.Tcp(Tcp.Bound(localAddress), sender) => socketRef = Some(TypedUntypedActorBridge(sender, tcpAdapter)) log.info("Server bound to {}", localAddress) Same case Internal.Tcp(Tcp.Unbound, _) => socketRef = None log.info("Socket to port {} unbound, initiating shutdown.", port) msgHandlers.foreach(_ ! MsgHandler.In.Control.ShutdownInitiated) gamesManager ! GamesManagerActor.In.ShutdownInitiated Same case Internal.Tcp(Tcp.CommandFailed(b: Tcp.Bind), _) => log.error(s"Cannot bind to ${b.localAddress}!") Stopped case Internal.Tcp(Tcp.Connected(remote, local), connection) => log.info(s"Client connected from $remote.") val (msgHandler, tcpAdapter) = MsgHandler.spawn( s"${remote.getHostString}-${remote.getPort}", ctx, connection, handlerRef => NetClient.behavior( handlerRef, gamesManager, ctx.self, rtConfig.controlKey, db ).narrow ) msgHandlers += msgHandler ctx.watchWith(msgHandler, Internal.MsgHandlerTerminated(msgHandler)) connection ! Tcp.Register(tcpAdapter.asUntyped, keepOpenOnPeerClosed = true) Same case Internal.MsgHandlerTerminated(ref) => msgHandlers -= ref Same case Internal.Tcp(_, _) => Unhandled } case Sig(_, PostStop) => log.info("Shutting down actor system because server has stopped.") ctx.system.terminate() Stopped } }.narrow } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,7 @@ package implicits.actor import akka.actor.ActorRef case class TypedUntypedActorBridge(raw: ActorRef, sender: ActorRef) { def !(msg: Any) = raw.tell(msg, sender) } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,7 @@ package implicits.actor import akka.actor._ private[actor] class UTRefMessageWrapper[A, B](f: (A, ActorRef) => B) extends Actor { def receive = { case msg => context.parent ! f(msg.asInstanceOf[A], sender()) } } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,56 @@ package implicits import akka.event.Logging import akka.typed.{ActorContext, ActorRef, ActorSystem} import akka.{actor => untyped} package object actor { object TypedActorSystemExts { private val asUntyped = classOf[ActorSystem[_]].getDeclaredMethod("untyped") } implicit class TypedActorSystemExts(val as: ActorSystem[_]) extends AnyVal { def asUntyped = TypedActorSystemExts.asUntyped.invoke(as).asInstanceOf[akka.actor.ActorSystem] } object TypedActorRefExts { private val asUntyped = classOf[ActorRef[_]].getDeclaredMethod("untypedRef") } implicit class TypedActorRefExts(val ref: ActorRef[_]) extends AnyVal { def asUntyped = TypedActorRefExts.asUntyped.invoke(ref).asInstanceOf[akka.actor.ActorRef] } implicit class TypedActorContextExts[A](val ctx: ActorContext[A]) extends AnyVal { def createLogging() = Logging(ctx.system.asUntyped, ctx.self.asUntyped) /** As `spawnAdapter` but gives access to the untyped ActorRef which sent * the original message. */ def spawnAdapterUTRef[B](f: (B, untyped.ActorRef) => A) = ActorRef[B](ctx.actorOf(untyped.Props(new UTRefMessageWrapper(f)))) /** Watch `ref` and send `msg` to self when it terminates. */ def watchWith[B](ref: ActorRef[B], msg: A): ActorRef[B] = { watchWith(ref, msg, ctx.self) ref } /** Watch `ref` and send `msg` to `sendTo` when it terminates. */ def watchWith[B, C](ref: ActorRef[B], msg: C, sendTo: ActorRef[C]): ActorRef[B] = { import akka.typed._ import ScalaDSL._ ctx.spawnAnonymous(Props(ContextAware[Unit] { anonCtx => anonCtx.watch(ref) Full { case Sig(_, Terminated(`ref`)) => sendTo ! msg Stopped } })) ref } } }