Created
September 18, 2015 20:33
-
-
Save dzhulk/b7b22c30ecb8f0febb71 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import java.util.concurrent.ConcurrentHashMap | |
| import akka.actor._ | |
| import io.netty.bootstrap.ServerBootstrap | |
| import io.netty.buffer.Unpooled | |
| import io.netty.channel.nio.NioEventLoopGroup | |
| import io.netty.channel.socket.SocketChannel | |
| import io.netty.channel.socket.nio.NioServerSocketChannel | |
| import io.netty.channel._ | |
| import io.netty.handler.codec.http._ | |
| import io.netty.handler.codec.http.HttpHeaders.Names._ | |
| import io.netty.handler.codec.http.HttpHeaders.Values._ | |
| object Main extends App { | |
| // new APIServerRunner(8080).run() | |
| new APIJettyRunner(8080, "/").run() | |
| } | |
| object DefaultActorSystem { | |
| implicit val system = ActorSystem("default") | |
| } | |
| //case class Action() | |
| //case class HttpRequestContainer(headers: HttpHeaders, ) | |
| class ChannelActor(channel: Channel, request: HttpRequest) extends Actor { | |
| def execute(): Unit = { | |
| val decoder = new QueryStringDecoder(request.getUri) | |
| decoder.path() | |
| decoder.parameters() | |
| val response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer("FUCK YOU!\n".getBytes)) | |
| response.headers().set(CONTENT_TYPE, "text/plain") | |
| response.headers().set(CONTENT_LENGTH, response.content.readableBytes) | |
| if (!HttpHeaders.isKeepAlive(request)) { | |
| channel.write(response).addListener(ChannelFutureListener.CLOSE) | |
| } else { | |
| response.headers().set(CONNECTION, KEEP_ALIVE) | |
| channel.write(response) | |
| } | |
| channel.flush() | |
| } | |
| def receive = { | |
| case ChannelActor.Run => | |
| execute() | |
| context.stop(self) | |
| } | |
| } | |
| object ChannelActor { | |
| val clients = new ConcurrentHashMap[Channel, ActorRef]() | |
| def putClient(channel: Channel, request: HttpRequest): Unit = { | |
| val actor = DefaultActorSystem.system.actorOf(Props(new ChannelActor(channel, request))) | |
| clients.put(channel, actor) | |
| } | |
| def findActor(channel: Channel) = Option(clients.get(channel)) | |
| case object Run | |
| } | |
| object APIServerHandler { | |
| def handleHttpRequest(ctx: ChannelHandlerContext, request: HttpRequest): Unit = { | |
| ChannelActor.putClient(ctx.channel, request) | |
| } | |
| def handleLastConnection(ctx: ChannelHandlerContext): Unit = { | |
| ChannelActor.findActor(ctx.channel) match { | |
| case Some(actorRef) => actorRef ! ChannelActor.Run | |
| case None => ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE) | |
| } | |
| } | |
| } | |
| class APIServerHandler extends SimpleChannelInboundHandler[java.lang.Object] { | |
| import APIServerHandler._ | |
| private[this] var request: HttpRequest = null | |
| def check(meth: String) = { | |
| // println(meth + " " + this.hashCode()) | |
| } | |
| override def channelRegistered(ctx: ChannelHandlerContext) = { | |
| check("channelRegistered") | |
| } | |
| override def channelReadComplete(ctx: ChannelHandlerContext): Unit = { | |
| check("channelReadComplete") | |
| handleLastConnection(ctx) | |
| } | |
| override def channelRead0(ctx: ChannelHandlerContext, msg: AnyRef): Unit = { | |
| check("channelRead0") | |
| msg match { | |
| case request: HttpRequest => handleHttpRequest(ctx, request) | |
| case content: LastHttpContent => | |
| case content: HttpContent => | |
| case _ => | |
| } | |
| } | |
| override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { | |
| ChannelActor.clients.remove(ctx.channel) | |
| cause.printStackTrace() | |
| ctx.close() | |
| } | |
| override def channelUnregistered(ctx: ChannelHandlerContext) = { | |
| check("channelUnregistered") | |
| ChannelActor.clients.remove(ctx.channel) | |
| } | |
| } | |
| class APIServerRunner(port: Int) { | |
| class Initializer extends ChannelInitializer[SocketChannel] { | |
| override def initChannel(ch: SocketChannel): Unit = { | |
| ch.pipeline | |
| .addLast(new HttpServerCodec) | |
| .addLast(new APIServerHandler) | |
| } | |
| } | |
| def run(): Unit = { | |
| val bossGroup = new NioEventLoopGroup(2) | |
| val workerGroup = new NioEventLoopGroup(4) | |
| try { | |
| new ServerBootstrap() | |
| .group(bossGroup, workerGroup) | |
| .channel(classOf[NioServerSocketChannel]) | |
| .childHandler(new Initializer) | |
| .option(ChannelOption.SO_BACKLOG, Integer.valueOf(128)) | |
| .bind(port) | |
| .sync | |
| .channel | |
| .closeFuture | |
| .sync | |
| } catch { | |
| case e: Throwable => e.printStackTrace() | |
| } finally { | |
| workerGroup.shutdownGracefully() | |
| bossGroup.shutdownGracefully() | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment