Skip to content

Instantly share code, notes, and snippets.

@talk114
Forked from ochinchina/NettyConnection.java
Created June 12, 2017 16:21
Show Gist options
  • Select an option

  • Save talk114/2e26b9839bb1c35de4235af0e678e744 to your computer and use it in GitHub Desktop.

Select an option

Save talk114/2e26b9839bb1c35de4235af0e678e744 to your computer and use it in GitHub Desktop.

Revisions

  1. @ochinchina ochinchina revised this gist Aug 4, 2015. 1 changed file with 19 additions and 7 deletions.
    26 changes: 19 additions & 7 deletions NettyConnection.java
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,5 @@
    package com.nokia.nls.nmif;

    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.Channel;
    @@ -71,8 +73,25 @@ private void doConnect() {
    bootstrap.connect( addr_ ).addListener(this);
    } else {//good, the connection is ok
    channel_ = future.channel();
    //add a listener to detect the connection lost
    addCloseDetectListener( channel_ );
    connectionEstablished();

    }
    }

    private void addCloseDetectListener(Channel channel) {
    //if the channel connection is lost, the ChannelFutureListener.operationComplete() will be called
    channel.closeFuture().addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future )
    throws Exception {
    connectionLost();
    scheduleConnect( 5 );
    }

    });

    }
    });
    }catch( Exception ex ) {
    @@ -102,13 +121,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
    handleMessage( new String( b ) );
    }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx)
    throws Exception {
    connectionLost();
    scheduleConnect( 5 );
    }

    };
    }
  2. @ochinchina ochinchina revised this gist Aug 3, 2015. 1 changed file with 144 additions and 0 deletions.
    144 changes: 144 additions & 0 deletions NettyConnection.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,144 @@
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.SocketAddress;
    import java.util.Timer;
    import java.util.TimerTask;

    public class NettyConnection {
    private Bootstrap bootstrap = new Bootstrap();
    private SocketAddress addr_;
    private Channel channel_;
    private Timer timer_;

    public NettyConnection( String host, int port, Timer timer ) {
    this( new InetSocketAddress( host, port ), timer );
    }
    public NettyConnection( SocketAddress addr, Timer timer ) {
    this.addr_ = addr;
    this.timer_ = timer;
    bootstrap.group( new NioEventLoopGroup() );
    bootstrap.channel(NioSocketChannel.class);
    bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast( createNMMessageHandler() );
    }
    });

    scheduleConnect( 10 );
    }

    public void send(String msg) throws IOException {
    if( channel_ != null && channel_.isActive() ) {
    ByteBuf buf = channel_.alloc().buffer().writeBytes( msg.getBytes() );
    channel_.writeAndFlush( buf );
    } else {
    throw new IOException( "Can't send message to inactive connection");
    }
    }

    public void close() {
    try {
    channel_.close().sync();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }


    private void doConnect() {
    try {
    ChannelFuture f = bootstrap.connect( addr_ );
    f.addListener( new ChannelFutureListener() {
    @Override public void operationComplete(ChannelFuture future) throws Exception {
    if( !future.isSuccess() ) {//if is not successful, reconnect
    future.channel().close();
    bootstrap.connect( addr_ ).addListener(this);
    } else {//good, the connection is ok
    channel_ = future.channel();
    connectionEstablished();
    }
    }
    });
    }catch( Exception ex ) {
    scheduleConnect( 1000 );

    }
    }

    private void scheduleConnect( long millis ) {
    timer_.schedule( new TimerTask() {
    @Override
    public void run() {
    doConnect();
    }
    }, millis );
    }

    private ChannelHandler createNMMessageHandler() {
    return new ChannelInboundHandlerAdapter () {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf buf = (ByteBuf)msg;
    int n = buf.readableBytes();
    if( n > 0 ) {
    byte[] b = new byte[n];
    buf.readBytes(b);
    handleMessage( new String( b ) );
    }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx)
    throws Exception {
    connectionLost();
    scheduleConnect( 5 );
    }

    };
    }





    public void handleMessage(String msg) {
    System.out.println( msg );

    }

    public void connectionLost() {
    System.out.println("connectionLost()" );
    }

    public void connectionEstablished() {
    try {
    send( "hello");
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    public static void main( String...args ) {
    NettyConnection conn = new NettyConnection( "127.0.0.1", 34567, new Timer() );

    for( ; ; ) {
    try { Thread.sleep( 100 ); } catch( Exception ex ) {}
    }
    }
    }
  3. @ochinchina ochinchina created this gist Aug 3, 2015.
    40 changes: 40 additions & 0 deletions Server.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,40 @@
    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;

    public class Server {

    private ServerSocket serverSock;

    public Server(int port ) throws IOException {
    serverSock = new ServerSocket( port );

    for( ; ; ) {
    Socket sock = serverSock.accept();
    System.out.println( "socket accepted:" + sock );
    try {
    byte[] buf = new byte[5];

    int n = sock.getInputStream().read( buf );
    System.out.println( new String( buf, 0, n ));
    sock.getOutputStream().write( "world".getBytes() );
    sock.getOutputStream().flush();
    Thread.sleep( 1000 );
    sock.close();
    }catch( Exception ex ) {

    }
    }
    }

    public static void main( String ... args ) {
    try {
    new Server( 34567 );
    } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }


    }