Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save tiranuom/16d0aa171f998f7aeae38c2e45b2d846 to your computer and use it in GitHub Desktop.
Save tiranuom/16d0aa171f998f7aeae38c2e45b2d846 to your computer and use it in GitHub Desktop.
This gist shows how to bind spring router function to akka actor system through extension.
package hms.test.springakkaintegration.akka
import akka.actor.typed.*
import akka.actor.typed.javadsl.ActorContext
import akka.actor.typed.javadsl.AskPattern
import akka.actor.typed.javadsl.Behaviors
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import kotlinx.coroutines.future.await
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.function.server.RouterFunction
import org.springframework.web.reactive.function.server.ServerResponse
import org.springframework.web.reactive.function.server.bodyValueAndAwait
import org.springframework.web.reactive.function.server.coRouter
import java.time.Duration
/**
* Spring configurations to bind actor system and routes
*/
@Configuration
class TestActorSystemConfig {
@Bean
fun actorSystem(): ActorSystem<Any> = ActorSystem.create(MasterActor.create(), "test-actor-system")
@Bean
fun actorRoute(actorSystem: ActorSystem<Any>) = SpringRouterExtension.apply(actorSystem).actorRoute()
}
//--- Actor system definition start ---
class MasterActor(val ctx: ActorContext<Any>, val echoActorRef: ActorRef<EchoActorCommand>) {
companion object {
fun create() = Behaviors.setup<Any> { ctx ->
val echoActorRef = ctx.spawn(Behaviors.supervise(EchoActor.create())
.onFailure(SupervisorStrategy.resume()),
"echo-actor"
)
MasterActor(ctx, echoActorRef).execute()
}
}
fun execute() = Behaviors.same<Any>()
}
data class Pong(
val message: String
)
sealed class EchoActorCommand {
data class Ping(val message: String, val replyTo: ActorRef<Pong>): EchoActorCommand()
}
class EchoActor(val ctx: ActorContext<EchoActorCommand>) {
companion object {
val echoServiceKey = ServiceKey.create(EchoActorCommand::class.java, "echo-service")
fun create(): Behavior<EchoActorCommand> = Behaviors.setup { ctx ->
ctx.system.receptionist().tell(Receptionist.register(echoServiceKey, ctx.self))
EchoActor(ctx).execute()
}
}
fun execute(): Behavior<EchoActorCommand> = Behaviors.receiveMessage { command ->
when (command) {
is EchoActorCommand.Ping -> {
command.replyTo.tell(Pong(command.message))
Behaviors.same()
}
}
}
}
//--- Actor system definition end ---
//--- Extension definition start ---
object SpringRouterExtension: ExtensionId<AkkaRouterFunction>() {
override fun createExtension(system: ActorSystem<*>): AkkaRouterFunction {
return AkkaRouterFunction(system)
}
}
class AkkaRouterFunction(val system: ActorSystem<*>): Extension {
private lateinit var echoActor: ActorRef<EchoActorCommand>
suspend fun <T> withEchoActor(lambda: suspend (echoActor: RecipientRef<EchoActorCommand>) -> T): T {
if (!this::echoActor.isInitialized) {
echoActor = AskPattern.ask<Receptionist.Command, Receptionist.Listing>(
system.receptionist(),
akka.japi.function.Function { replyTo ->
Receptionist.find(EchoActor.echoServiceKey, replyTo)
},
Duration.ofSeconds(10),
system.scheduler()
).await().getServiceInstances(EchoActor.echoServiceKey).toList().first()
}
return lambda(echoActor)
}
fun actorRoute(): RouterFunction<ServerResponse> {
return coRouter {
GET("/echo/{msg}") { serverRequest ->
val msg = serverRequest.pathVariable("msg")
withEchoActor { echoActor ->
val pong = AskPattern.ask<EchoActorCommand, Pong>(
echoActor,
akka.japi.function.Function { replyTo ->
EchoActorCommand.Ping(msg, replyTo = replyTo)
},
Duration.ofSeconds(10),
system.scheduler()
).await()
ok().bodyValueAndAwait(pong.message)
}
}
}
}
}
//--- Extension definition end ---
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment