Update to netty 4 final, exciting!

This commit is contained in:
md_5 2013-07-17 16:47:49 +10:00
parent fb1cab499d
commit f510ab2a0b
7 changed files with 40 additions and 13 deletions

View File

@ -59,7 +59,7 @@
<properties> <properties>
<build.number>unknown</build.number> <build.number>unknown</build.number>
<netty.version>4.0.0.CR9</netty.version> <netty.version>4.0.1.Final</netty.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> </properties>

View File

@ -63,7 +63,7 @@ public class HttpClient
HttpRequest request = new DefaultHttpRequest( HttpVersion.HTTP_1_1, HttpMethod.GET, path ); HttpRequest request = new DefaultHttpRequest( HttpVersion.HTTP_1_1, HttpMethod.GET, path );
request.headers().set( HttpHeaders.Names.HOST, uri.getHost() ); request.headers().set( HttpHeaders.Names.HOST, uri.getHost() );
future.channel().write( request ); future.channel().writeAndFlush( request );
} else } else
{ {
callback.done( null, future.cause() ); callback.done( null, future.cause() );

View File

@ -31,7 +31,7 @@ public class HttpHandler extends SimpleChannelInboundHandler<HttpObject>
} }
@Override @Override
protected void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception
{ {
if ( msg instanceof HttpResponse ) if ( msg instanceof HttpResponse )
{ {

View File

@ -2,11 +2,8 @@ package net.md_5.bungee.netty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import lombok.Getter; import lombok.Getter;
public class ChannelWrapper public class ChannelWrapper
@ -15,6 +12,7 @@ public class ChannelWrapper
private final Channel ch; private final Channel ch;
@Getter @Getter
private volatile boolean closed; private volatile boolean closed;
private boolean flushNow = true;
public ChannelWrapper(ChannelHandlerContext ctx) public ChannelWrapper(ChannelHandlerContext ctx)
{ {
@ -26,14 +24,28 @@ public class ChannelWrapper
if ( !closed ) if ( !closed )
{ {
ch.write( packet ); ch.write( packet );
if ( flushNow )
{
ch.flush();
}
} }
} }
public synchronized void flushNow(boolean flush)
{
if ( !flushNow && flush )
{
ch.flush();
}
this.flushNow = flush;
}
public synchronized void close() public synchronized void close()
{ {
if ( !closed ) if ( !closed )
{ {
closed = true; closed = true;
ch.flush();
ch.close(); ch.close();
} }
} }
@ -41,6 +53,7 @@ public class ChannelWrapper
public void addBefore(String baseName, String name, ChannelHandler handler) public void addBefore(String baseName, String name, ChannelHandler handler)
{ {
Preconditions.checkState( ch.eventLoop().inEventLoop(), "cannot add handler outside of event loop" ); Preconditions.checkState( ch.eventLoop().inEventLoop(), "cannot add handler outside of event loop" );
ch.pipeline().flush();
ch.pipeline().addBefore( baseName, name, handler ); ch.pipeline().addBefore( baseName, name, handler );
} }

View File

@ -2,8 +2,8 @@ package net.md_5.bungee.netty;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.MessageToMessageDecoder;
import java.util.List;
import javax.crypto.Cipher; import javax.crypto.Cipher;
public class CipherDecoder extends MessageToMessageDecoder<ByteBuf> public class CipherDecoder extends MessageToMessageDecoder<ByteBuf>
@ -17,7 +17,7 @@ public class CipherDecoder extends MessageToMessageDecoder<ByteBuf>
} }
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageList<Object> out) throws Exception protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception
{ {
out.add( cipher.cipher( ctx, msg ) ); out.add( cipher.cipher( ctx, msg ) );
} }

View File

@ -3,9 +3,10 @@ package net.md_5.bungee.netty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import io.netty.handler.timeout.ReadTimeoutException; import io.netty.handler.timeout.ReadTimeoutException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.logging.Level; import java.util.logging.Level;
import net.md_5.bungee.api.ProxyServer; import net.md_5.bungee.api.ProxyServer;
import net.md_5.bungee.connection.CancelSendSignal; import net.md_5.bungee.connection.CancelSendSignal;
@ -22,6 +23,7 @@ public class HandlerBoss extends ChannelInboundHandlerAdapter
private ChannelWrapper channel; private ChannelWrapper channel;
private PacketHandler handler; private PacketHandler handler;
private final Queue<Object> msgs = new ArrayDeque<>();
public void setHandler(PacketHandler handler) public void setHandler(PacketHandler handler)
{ {
@ -59,10 +61,21 @@ public class HandlerBoss extends ChannelInboundHandlerAdapter
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{ {
for ( Object msg : msgs ) if ( ctx.channel().isActive() )
{ {
msgs.add( msg );
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
{
channel.flushNow( false );
while ( !msgs.isEmpty() )
{
Object msg = msgs.remove();
if ( handler != null && ctx.channel().isActive() ) if ( handler != null && ctx.channel().isActive() )
{ {
if ( msg instanceof PacketWrapper ) if ( msg instanceof PacketWrapper )
@ -85,6 +98,7 @@ public class HandlerBoss extends ChannelInboundHandlerAdapter
} }
} }
} }
channel.flushNow( true );
} }
@Override @Override

View File

@ -2,8 +2,8 @@ package net.md_5.bungee.netty;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ReplayingDecoder; import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
@ -28,7 +28,7 @@ public class PacketDecoder extends ReplayingDecoder<Void>
private Protocol protocol; private Protocol protocol;
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
{ {
// While we have enough data // While we have enough data
while ( true ) while ( true )