Created
November 5, 2020 15:34
-
-
Save tiranuom/16d0aa171f998f7aeae38c2e45b2d846 to your computer and use it in GitHub Desktop.
This gist shows how to bind spring router function to akka actor system through extension.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package hms.test.springakkaintegration.akka | |
| import akka.actor.typed.* | |
| import akka.actor.typed.javadsl.ActorContext | |
| import akka.actor.typed.javadsl.AskPattern | |
| import akka.actor.typed.javadsl.Behaviors | |
| import akka.actor.typed.receptionist.Receptionist | |
| import akka.actor.typed.receptionist.ServiceKey | |
| import kotlinx.coroutines.future.await | |
| import org.springframework.context.annotation.Bean | |
| import org.springframework.context.annotation.Configuration | |
| import org.springframework.web.reactive.function.server.RouterFunction | |
| import org.springframework.web.reactive.function.server.ServerResponse | |
| import org.springframework.web.reactive.function.server.bodyValueAndAwait | |
| import org.springframework.web.reactive.function.server.coRouter | |
| import java.time.Duration | |
| /** | |
| * Spring configurations to bind actor system and routes | |
| */ | |
| @Configuration | |
| class TestActorSystemConfig { | |
| @Bean | |
| fun actorSystem(): ActorSystem<Any> = ActorSystem.create(MasterActor.create(), "test-actor-system") | |
| @Bean | |
| fun actorRoute(actorSystem: ActorSystem<Any>) = SpringRouterExtension.apply(actorSystem).actorRoute() | |
| } | |
| //--- Actor system definition start --- | |
| class MasterActor(val ctx: ActorContext<Any>, val echoActorRef: ActorRef<EchoActorCommand>) { | |
| companion object { | |
| fun create() = Behaviors.setup<Any> { ctx -> | |
| val echoActorRef = ctx.spawn(Behaviors.supervise(EchoActor.create()) | |
| .onFailure(SupervisorStrategy.resume()), | |
| "echo-actor" | |
| ) | |
| MasterActor(ctx, echoActorRef).execute() | |
| } | |
| } | |
| fun execute() = Behaviors.same<Any>() | |
| } | |
| data class Pong( | |
| val message: String | |
| ) | |
| sealed class EchoActorCommand { | |
| data class Ping(val message: String, val replyTo: ActorRef<Pong>): EchoActorCommand() | |
| } | |
| class EchoActor(val ctx: ActorContext<EchoActorCommand>) { | |
| companion object { | |
| val echoServiceKey = ServiceKey.create(EchoActorCommand::class.java, "echo-service") | |
| fun create(): Behavior<EchoActorCommand> = Behaviors.setup { ctx -> | |
| ctx.system.receptionist().tell(Receptionist.register(echoServiceKey, ctx.self)) | |
| EchoActor(ctx).execute() | |
| } | |
| } | |
| fun execute(): Behavior<EchoActorCommand> = Behaviors.receiveMessage { command -> | |
| when (command) { | |
| is EchoActorCommand.Ping -> { | |
| command.replyTo.tell(Pong(command.message)) | |
| Behaviors.same() | |
| } | |
| } | |
| } | |
| } | |
| //--- Actor system definition end --- | |
| //--- Extension definition start --- | |
| object SpringRouterExtension: ExtensionId<AkkaRouterFunction>() { | |
| override fun createExtension(system: ActorSystem<*>): AkkaRouterFunction { | |
| return AkkaRouterFunction(system) | |
| } | |
| } | |
| class AkkaRouterFunction(val system: ActorSystem<*>): Extension { | |
| private lateinit var echoActor: ActorRef<EchoActorCommand> | |
| suspend fun <T> withEchoActor(lambda: suspend (echoActor: RecipientRef<EchoActorCommand>) -> T): T { | |
| if (!this::echoActor.isInitialized) { | |
| echoActor = AskPattern.ask<Receptionist.Command, Receptionist.Listing>( | |
| system.receptionist(), | |
| akka.japi.function.Function { replyTo -> | |
| Receptionist.find(EchoActor.echoServiceKey, replyTo) | |
| }, | |
| Duration.ofSeconds(10), | |
| system.scheduler() | |
| ).await().getServiceInstances(EchoActor.echoServiceKey).toList().first() | |
| } | |
| return lambda(echoActor) | |
| } | |
| fun actorRoute(): RouterFunction<ServerResponse> { | |
| return coRouter { | |
| GET("/echo/{msg}") { serverRequest -> | |
| val msg = serverRequest.pathVariable("msg") | |
| withEchoActor { echoActor -> | |
| val pong = AskPattern.ask<EchoActorCommand, Pong>( | |
| echoActor, | |
| akka.japi.function.Function { replyTo -> | |
| EchoActorCommand.Ping(msg, replyTo = replyTo) | |
| }, | |
| Duration.ofSeconds(10), | |
| system.scheduler() | |
| ).await() | |
| ok().bodyValueAndAwait(pong.message) | |
| } | |
| } | |
| } | |
| } | |
| } | |
| //--- Extension definition end --- |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment