Update to Netty CR5, boasts very nice performance and should hopefully fix many of the issues we have seen.

This commit is contained in:
md_5 2013-06-15 21:12:15 +10:00
parent 6a60376033
commit 7a79bd0816
13 changed files with 78 additions and 241 deletions

View File

@ -59,7 +59,7 @@
<properties> <properties>
<build.number>unknown</build.number> <build.number>unknown</build.number>
<netty.version>4.0.0.CR1</netty.version> <netty.version>4.0.0.CR5</netty.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> </properties>

View File

@ -88,7 +88,7 @@ public class BungeeCord extends ProxyServer
* Thread pools. * Thread pools.
*/ */
public final ScheduledThreadPoolExecutor executors = new BungeeThreadPool( new ThreadFactoryBuilder().setNameFormat( "Bungee Pool Thread #%1$d" ).build() ); public final ScheduledThreadPoolExecutor executors = new BungeeThreadPool( new ThreadFactoryBuilder().setNameFormat( "Bungee Pool Thread #%1$d" ).build() );
public final MultithreadEventLoopGroup eventLoops = new NioEventLoopGroup( 0, new ThreadFactoryBuilder().setNameFormat( "Netty IO Thread #%1$d" ).build() ); public final MultithreadEventLoopGroup eventLoops = new NioEventLoopGroup( Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder().setNameFormat( "Netty IO Thread #%1$d" ).build() );
/** /**
* locations.yml save thread. * locations.yml save thread.
*/ */

View File

@ -27,6 +27,7 @@ import net.md_5.bungee.netty.CipherDecoder;
import net.md_5.bungee.netty.CipherEncoder; import net.md_5.bungee.netty.CipherEncoder;
import net.md_5.bungee.netty.PacketDecoder; import net.md_5.bungee.netty.PacketDecoder;
import net.md_5.bungee.netty.PacketHandler; import net.md_5.bungee.netty.PacketHandler;
import net.md_5.bungee.netty.PipelineUtils;
import net.md_5.bungee.protocol.Forge; import net.md_5.bungee.protocol.Forge;
import net.md_5.bungee.protocol.packet.DefinedPacket; import net.md_5.bungee.protocol.packet.DefinedPacket;
import net.md_5.bungee.protocol.packet.Packet1Login; import net.md_5.bungee.protocol.packet.Packet1Login;
@ -219,7 +220,7 @@ public class ServerConnector extends PacketHandler
ch.write( new PacketFCEncryptionResponse( shared, token ) ); ch.write( new PacketFCEncryptionResponse( shared, token ) );
Cipher encrypt = EncryptionUtil.getCipher( Cipher.ENCRYPT_MODE, secretkey ); Cipher encrypt = EncryptionUtil.getCipher( Cipher.ENCRYPT_MODE, secretkey );
ch.getHandle().pipeline().addBefore( "decoder", "encrypt", new CipherEncoder( encrypt ) ); ch.getHandle().pipeline().addBefore( PipelineUtils.DECRYPT_HANDLER, PipelineUtils.ENCRYPT_HANDLER, new CipherEncoder( encrypt ) );
thisState = State.ENCRYPT_RESPONSE; thisState = State.ENCRYPT_RESPONSE;
} else } else
@ -234,7 +235,7 @@ public class ServerConnector extends PacketHandler
Preconditions.checkState( thisState == State.ENCRYPT_RESPONSE, "Not expecting ENCRYPT_RESPONSE" ); Preconditions.checkState( thisState == State.ENCRYPT_RESPONSE, "Not expecting ENCRYPT_RESPONSE" );
Cipher decrypt = EncryptionUtil.getCipher( Cipher.DECRYPT_MODE, secretkey ); Cipher decrypt = EncryptionUtil.getCipher( Cipher.DECRYPT_MODE, secretkey );
ch.getHandle().pipeline().addBefore( "decoder", "decrypt", new CipherDecoder( decrypt ) ); ch.getHandle().pipeline().addBefore( PipelineUtils.PACKET_DECODE_HANDLER, PipelineUtils.DECRYPT_HANDLER, new CipherDecoder( decrypt ) );
ch.write( user.getPendingConnection().getForgeLogin() ); ch.write( user.getPendingConnection().getForgeLogin() );

View File

@ -37,6 +37,7 @@ import net.md_5.bungee.netty.CipherDecoder;
import net.md_5.bungee.netty.CipherEncoder; import net.md_5.bungee.netty.CipherEncoder;
import net.md_5.bungee.netty.PacketDecoder; import net.md_5.bungee.netty.PacketDecoder;
import net.md_5.bungee.netty.PacketHandler; import net.md_5.bungee.netty.PacketHandler;
import net.md_5.bungee.netty.PipelineUtils;
import net.md_5.bungee.protocol.Forge; import net.md_5.bungee.protocol.Forge;
import net.md_5.bungee.protocol.packet.DefinedPacket; import net.md_5.bungee.protocol.packet.DefinedPacket;
import net.md_5.bungee.protocol.packet.Packet1Login; import net.md_5.bungee.protocol.packet.Packet1Login;
@ -174,7 +175,7 @@ public class InitialHandler extends PacketHandler implements PendingConnection
sharedKey = EncryptionUtil.getSecret( encryptResponse, request ); sharedKey = EncryptionUtil.getSecret( encryptResponse, request );
Cipher decrypt = EncryptionUtil.getCipher( Cipher.DECRYPT_MODE, sharedKey ); Cipher decrypt = EncryptionUtil.getCipher( Cipher.DECRYPT_MODE, sharedKey );
ch.getHandle().pipeline().addBefore( "decoder", "decrypt", new CipherDecoder( decrypt ) ); ch.getHandle().pipeline().addBefore( PipelineUtils.PACKET_DECODE_HANDLER, PipelineUtils.DECRYPT_HANDLER, new CipherDecoder( decrypt ) );
if ( BungeeCord.getInstance().config.isOnlineMode() ) if ( BungeeCord.getInstance().config.isOnlineMode() )
{ {
@ -247,7 +248,7 @@ public class InitialHandler extends PacketHandler implements PendingConnection
try try
{ {
Cipher encrypt = EncryptionUtil.getCipher( Cipher.ENCRYPT_MODE, sharedKey ); Cipher encrypt = EncryptionUtil.getCipher( Cipher.ENCRYPT_MODE, sharedKey );
ch.getHandle().pipeline().addBefore( "decoder", "encrypt", new CipherEncoder( encrypt ) ); ch.getHandle().pipeline().addBefore( PipelineUtils.DECRYPT_HANDLER, PipelineUtils.ENCRYPT_HANDLER, new CipherEncoder( encrypt ) );
} catch ( GeneralSecurityException ex ) } catch ( GeneralSecurityException ex )
{ {
disconnect( "Cipher error: " + Util.exception( ex ) ); disconnect( "Cipher error: " + Util.exception( ex ) );

View File

@ -10,19 +10,17 @@ public class ChannelWrapper
private final Channel ch; private final Channel ch;
@Getter @Getter
private volatile boolean closed; private volatile boolean closed;
private final ReusableChannelPromise promise;
public ChannelWrapper(ChannelHandlerContext ctx) public ChannelWrapper(ChannelHandlerContext ctx)
{ {
this.ch = ctx.channel(); this.ch = ctx.channel();
this.promise = new ReusableChannelPromise( ctx );
} }
public synchronized void write(Object packet) public synchronized void write(Object packet)
{ {
if ( !closed ) if ( !closed )
{ {
ch.write( packet, promise ); ch.write( packet );
} }
} }

View File

@ -1,6 +1,7 @@
package net.md_5.bungee.netty; package net.md_5.bungee.netty;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import javax.crypto.Cipher; import javax.crypto.Cipher;
import javax.crypto.ShortBufferException; import javax.crypto.ShortBufferException;
import lombok.AccessLevel; import lombok.AccessLevel;
@ -51,4 +52,21 @@ public class CipherBase
} }
out.writeBytes( heapOut, 0, cipher.update( heapIn, 0, readableBytes, heapOut ) ); out.writeBytes( heapOut, 0, cipher.update( heapIn, 0, readableBytes, heapOut ) );
} }
protected ByteBuf cipher(ChannelHandlerContext ctx, ByteBuf in) throws ShortBufferException
{
byte[] heapIn = heapInLocal.get();
int readableBytes = in.readableBytes();
if ( heapIn.length < readableBytes )
{
heapIn = new byte[ readableBytes ];
heapInLocal.set( heapIn );
}
in.readBytes( heapIn, 0, readableBytes );
ByteBuf out = ctx.alloc().heapBuffer( cipher.getOutputSize( readableBytes ) );
out.writerIndex( cipher.update( heapIn, 0, readableBytes, out.array(), out.arrayOffset() ) );
return out;
}
} }

View File

@ -2,10 +2,11 @@ package net.md_5.bungee.netty;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToByteDecoder; import io.netty.channel.MessageList;
import io.netty.handler.codec.ByteToMessageDecoder;
import javax.crypto.Cipher; import javax.crypto.Cipher;
public class CipherDecoder extends ByteToByteDecoder public class CipherDecoder extends ByteToMessageDecoder
{ {
private final CipherBase cipher; private final CipherBase cipher;
@ -16,8 +17,8 @@ public class CipherDecoder extends ByteToByteDecoder
} }
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception
{ {
cipher.cipher( in, out ); out.add( cipher.cipher( ctx, in ) );
} }
} }

View File

@ -2,10 +2,10 @@ package net.md_5.bungee.netty;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;
import javax.crypto.Cipher; import javax.crypto.Cipher;
public class CipherEncoder extends ByteToByteEncoder public class CipherEncoder extends MessageToByteEncoder<ByteBuf>
{ {
private final CipherBase cipher; private final CipherBase cipher;
@ -16,8 +16,8 @@ public class CipherEncoder extends ByteToByteEncoder
} }
@Override @Override
protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception
{ {
cipher.cipher( in, out ); cipher.cipher( msg, out );
} }
} }

View File

@ -2,7 +2,8 @@ package net.md_5.bungee.netty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import io.netty.handler.timeout.ReadTimeoutException; import io.netty.handler.timeout.ReadTimeoutException;
import java.io.IOException; import java.io.IOException;
import java.util.logging.Level; import java.util.logging.Level;
@ -16,7 +17,7 @@ import net.md_5.bungee.connection.PingHandler;
* channels to maintain simple states, and only call the required, adapted * channels to maintain simple states, and only call the required, adapted
* methods when the channel is connected. * methods when the channel is connected.
*/ */
public class HandlerBoss extends ChannelInboundMessageHandlerAdapter<Object> public class HandlerBoss extends ChannelInboundHandlerAdapter
{ {
private ChannelWrapper channel; private ChannelWrapper channel;
@ -53,7 +54,9 @@ public class HandlerBoss extends ChannelInboundMessageHandlerAdapter<Object>
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception
{
for ( Object msg : msgs )
{ {
if ( ctx.channel().isActive() ) if ( ctx.channel().isActive() )
{ {
@ -77,6 +80,7 @@ public class HandlerBoss extends ChannelInboundMessageHandlerAdapter<Object>
} }
} }
} }
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception

View File

@ -1,19 +1,12 @@
package net.md_5.bungee.netty; package net.md_5.bungee.netty;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOperationHandlerAdapter; import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
public class OutboundHandler extends ChannelOperationHandlerAdapter public class OutboundBoss extends ChannelOutboundHandlerAdapter
{ {
@Override
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception
{
ctx.flush( promise );
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{ {

View File

@ -1,8 +1,8 @@
package net.md_5.bungee.netty; package net.md_5.bungee.netty;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ReplayingDecoder; import io.netty.handler.codec.ReplayingDecoder;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
@ -28,7 +28,7 @@ public class PacketDecoder extends ReplayingDecoder<Void>
private Protocol protocol; private Protocol protocol;
@Override @Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception
{ {
// While we have enough data // While we have enough data
while ( true ) while ( true )
@ -53,10 +53,10 @@ public class PacketDecoder extends ReplayingDecoder<Void>
// Store our decoded message // Store our decoded message
if ( packet != null ) if ( packet != null )
{ {
return( new PacketWrapper( packet, buf ) ); out.add( new PacketWrapper( packet, buf ) );
} else } else
{ {
return( buf ); out.add( buf );
} }
} }
} }

View File

@ -4,6 +4,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.bytes.ByteArrayEncoder; import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
@ -44,6 +45,14 @@ public class PipelineUtils
public static final Base BASE = new Base(); public static final Base BASE = new Base();
private static final DefinedPacketEncoder packetEncoder = new DefinedPacketEncoder(); private static final DefinedPacketEncoder packetEncoder = new DefinedPacketEncoder();
private static final ByteArrayEncoder arrayEncoder = new ByteArrayEncoder(); private static final ByteArrayEncoder arrayEncoder = new ByteArrayEncoder();
public static String OUTBOUND_BOSS_HANDLER = "outbound-boss";
public static String TIMEOUT_HANDLER = "timeout";
public static String PACKET_DECODE_HANDLER = "packet-decoder";
public static String PACKET_ENCODE_HANDLER = "packet-encoder";
public static String ARRAY_ENCODE_HANDLER = "array-encoder";
public static String BOSS_HANDLER = "inbound-boss";
public static String ENCRYPT_HANDLER = "encrypt";
public static String DECRYPT_HANDLER = "decrypt";
public final static class Base extends ChannelInitializer<Channel> public final static class Base extends ChannelInitializer<Channel>
{ {
@ -59,12 +68,12 @@ public class PipelineUtils
// IP_TOS is not supported (Windows XP / Windows Server 2003) // IP_TOS is not supported (Windows XP / Windows Server 2003)
} }
ch.pipeline().addLast( "outbound", new OutboundHandler() ); ch.pipeline().addLast( OUTBOUND_BOSS_HANDLER, new OutboundBoss() );
ch.pipeline().addLast( "timer", new ReadTimeoutHandler( BungeeCord.getInstance().config.getTimeout(), TimeUnit.MILLISECONDS ) ); ch.pipeline().addLast( TIMEOUT_HANDLER, new ReadTimeoutHandler( BungeeCord.getInstance().config.getTimeout(), TimeUnit.MILLISECONDS ) );
ch.pipeline().addLast( "decoder", new PacketDecoder( Vanilla.getInstance() ) ); ch.pipeline().addLast( PACKET_DECODE_HANDLER, new PacketDecoder( Vanilla.getInstance() ) );
ch.pipeline().addLast( "packet-encoder", packetEncoder ); ch.pipeline().addLast( PACKET_ENCODE_HANDLER, packetEncoder );
ch.pipeline().addLast( "array-encoder", arrayEncoder ); ch.pipeline().addLast( ARRAY_ENCODE_HANDLER, arrayEncoder );
ch.pipeline().addLast( "handler", new HandlerBoss() ); ch.pipeline().addLast( BOSS_HANDLER, new HandlerBoss() );
} }
}; };
} }

View File

@ -1,188 +0,0 @@
package net.md_5.bungee.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import lombok.RequiredArgsConstructor;
import net.md_5.bungee.api.ProxyServer;
@RequiredArgsConstructor
public class ReusableChannelPromise implements ChannelPromise
{
private final ChannelHandlerContext ctx;
@Override
public Channel channel()
{
return ctx.channel();
}
@Override
public ChannelPromise setSuccess(Void result)
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public ChannelPromise setSuccess()
{
return this;
}
@Override
public boolean trySuccess()
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public ChannelPromise setFailure(Throwable cause)
{
if ( !( cause instanceof IOException ) )
{
ctx.fireExceptionCaught( cause );
}
return this;
}
@Override
public ChannelPromise addListener(GenericFutureListener<? extends Future<Void>> listener)
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public ChannelPromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners)
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public ChannelPromise removeListener(GenericFutureListener<? extends Future<Void>> listener)
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public ChannelPromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners)
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public ChannelPromise sync() throws InterruptedException
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public ChannelPromise syncUninterruptibly()
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public ChannelPromise await() throws InterruptedException
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public ChannelPromise awaitUninterruptibly()
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public boolean isSuccess()
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public Throwable cause()
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public boolean await(long timeoutMillis) throws InterruptedException
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit)
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public boolean awaitUninterruptibly(long timeoutMillis)
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public Void getNow()
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public boolean isCancelled()
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public boolean isDone()
{
return false;
}
@Override
public Void get() throws InterruptedException, ExecutionException
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public boolean trySuccess(Void result)
{
throw new UnsupportedOperationException( "Not supported yet." );
}
@Override
public boolean tryFailure(Throwable cause)
{
ProxyServer.getInstance().getLogger().log( Level.WARNING, "Exception in tryFailure(..)", cause );
return true;
}
}