package hms.test.springakkaintegration.akka import akka.actor.typed.* import akka.actor.typed.javadsl.ActorContext import akka.actor.typed.javadsl.AskPattern import akka.actor.typed.javadsl.Behaviors import akka.actor.typed.receptionist.Receptionist import akka.actor.typed.receptionist.ServiceKey import kotlinx.coroutines.future.await import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.web.reactive.function.server.RouterFunction import org.springframework.web.reactive.function.server.ServerResponse import org.springframework.web.reactive.function.server.bodyValueAndAwait import org.springframework.web.reactive.function.server.coRouter import java.time.Duration /** * Spring configurations to bind actor system and routes */ @Configuration class TestActorSystemConfig { @Bean fun actorSystem(): ActorSystem = ActorSystem.create(MasterActor.create(), "test-actor-system") @Bean fun actorRoute(actorSystem: ActorSystem) = SpringRouterExtension.apply(actorSystem).actorRoute() } //--- Actor system definition start --- class MasterActor(val ctx: ActorContext, val echoActorRef: ActorRef) { companion object { fun create() = Behaviors.setup { ctx -> val echoActorRef = ctx.spawn(Behaviors.supervise(EchoActor.create()) .onFailure(SupervisorStrategy.resume()), "echo-actor" ) MasterActor(ctx, echoActorRef).execute() } } fun execute() = Behaviors.same() } data class Pong( val message: String ) sealed class EchoActorCommand { data class Ping(val message: String, val replyTo: ActorRef): EchoActorCommand() } class EchoActor(val ctx: ActorContext) { companion object { val echoServiceKey = ServiceKey.create(EchoActorCommand::class.java, "echo-service") fun create(): Behavior = Behaviors.setup { ctx -> ctx.system.receptionist().tell(Receptionist.register(echoServiceKey, ctx.self)) EchoActor(ctx).execute() } } fun execute(): Behavior = Behaviors.receiveMessage { command -> when (command) { is EchoActorCommand.Ping -> { command.replyTo.tell(Pong(command.message)) Behaviors.same() } } } } //--- Actor system definition end --- //--- Extension definition start --- object SpringRouterExtension: ExtensionId() { override fun createExtension(system: ActorSystem<*>): AkkaRouterFunction { return AkkaRouterFunction(system) } } class AkkaRouterFunction(val system: ActorSystem<*>): Extension { private lateinit var echoActor: ActorRef suspend fun withEchoActor(lambda: suspend (echoActor: RecipientRef) -> T): T { if (!this::echoActor.isInitialized) { echoActor = AskPattern.ask( system.receptionist(), akka.japi.function.Function { replyTo -> Receptionist.find(EchoActor.echoServiceKey, replyTo) }, Duration.ofSeconds(10), system.scheduler() ).await().getServiceInstances(EchoActor.echoServiceKey).toList().first() } return lambda(echoActor) } fun actorRoute(): RouterFunction { return coRouter { GET("/echo/{msg}") { serverRequest -> val msg = serverRequest.pathVariable("msg") withEchoActor { echoActor -> val pong = AskPattern.ask( echoActor, akka.japi.function.Function { replyTo -> EchoActorCommand.Ping(msg, replyTo = replyTo) }, Duration.ofSeconds(10), system.scheduler() ).await() ok().bodyValueAndAwait(pong.message) } } } } } //--- Extension definition end ---