From 7a79bd08162ea9b6f9b301e3e3233f0cb60bf2f4 Mon Sep 17 00:00:00 2001 From: md_5 Date: Sat, 15 Jun 2013 21:12:15 +1000 Subject: [PATCH] Update to Netty CR5, boasts very nice performance and should hopefully fix many of the issues we have seen. --- pom.xml | 2 +- .../main/java/net/md_5/bungee/BungeeCord.java | 2 +- .../java/net/md_5/bungee/ServerConnector.java | 5 +- .../bungee/connection/InitialHandler.java | 5 +- .../net/md_5/bungee/netty/ChannelWrapper.java | 4 +- .../net/md_5/bungee/netty/CipherBase.java | 18 ++ .../net/md_5/bungee/netty/CipherDecoder.java | 9 +- .../net/md_5/bungee/netty/CipherEncoder.java | 8 +- .../net/md_5/bungee/netty/HandlerBoss.java | 38 ++-- ...OutboundHandler.java => OutboundBoss.java} | 11 +- .../net/md_5/bungee/netty/PacketDecoder.java | 8 +- .../net/md_5/bungee/netty/PipelineUtils.java | 21 +- .../bungee/netty/ReusableChannelPromise.java | 188 ------------------ 13 files changed, 78 insertions(+), 241 deletions(-) rename proxy/src/main/java/net/md_5/bungee/netty/{OutboundHandler.java => OutboundBoss.java} (54%) delete mode 100644 proxy/src/main/java/net/md_5/bungee/netty/ReusableChannelPromise.java diff --git a/pom.xml b/pom.xml index 1623896c..e944a4aa 100644 --- a/pom.xml +++ b/pom.xml @@ -59,7 +59,7 @@ unknown - 4.0.0.CR1 + 4.0.0.CR5 UTF-8 diff --git a/proxy/src/main/java/net/md_5/bungee/BungeeCord.java b/proxy/src/main/java/net/md_5/bungee/BungeeCord.java index bab6868e..609e6674 100644 --- a/proxy/src/main/java/net/md_5/bungee/BungeeCord.java +++ b/proxy/src/main/java/net/md_5/bungee/BungeeCord.java @@ -88,7 +88,7 @@ public class BungeeCord extends ProxyServer * Thread pools. */ public final ScheduledThreadPoolExecutor executors = new BungeeThreadPool( new ThreadFactoryBuilder().setNameFormat( "Bungee Pool Thread #%1$d" ).build() ); - public final MultithreadEventLoopGroup eventLoops = new NioEventLoopGroup( 0, new ThreadFactoryBuilder().setNameFormat( "Netty IO Thread #%1$d" ).build() ); + public final MultithreadEventLoopGroup eventLoops = new NioEventLoopGroup( Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder().setNameFormat( "Netty IO Thread #%1$d" ).build() ); /** * locations.yml save thread. */ diff --git a/proxy/src/main/java/net/md_5/bungee/ServerConnector.java b/proxy/src/main/java/net/md_5/bungee/ServerConnector.java index 4292993a..c3560a2d 100644 --- a/proxy/src/main/java/net/md_5/bungee/ServerConnector.java +++ b/proxy/src/main/java/net/md_5/bungee/ServerConnector.java @@ -27,6 +27,7 @@ import net.md_5.bungee.netty.CipherDecoder; import net.md_5.bungee.netty.CipherEncoder; import net.md_5.bungee.netty.PacketDecoder; import net.md_5.bungee.netty.PacketHandler; +import net.md_5.bungee.netty.PipelineUtils; import net.md_5.bungee.protocol.Forge; import net.md_5.bungee.protocol.packet.DefinedPacket; import net.md_5.bungee.protocol.packet.Packet1Login; @@ -219,7 +220,7 @@ public class ServerConnector extends PacketHandler ch.write( new PacketFCEncryptionResponse( shared, token ) ); Cipher encrypt = EncryptionUtil.getCipher( Cipher.ENCRYPT_MODE, secretkey ); - ch.getHandle().pipeline().addBefore( "decoder", "encrypt", new CipherEncoder( encrypt ) ); + ch.getHandle().pipeline().addBefore( PipelineUtils.DECRYPT_HANDLER, PipelineUtils.ENCRYPT_HANDLER, new CipherEncoder( encrypt ) ); thisState = State.ENCRYPT_RESPONSE; } else @@ -234,7 +235,7 @@ public class ServerConnector extends PacketHandler Preconditions.checkState( thisState == State.ENCRYPT_RESPONSE, "Not expecting ENCRYPT_RESPONSE" ); Cipher decrypt = EncryptionUtil.getCipher( Cipher.DECRYPT_MODE, secretkey ); - ch.getHandle().pipeline().addBefore( "decoder", "decrypt", new CipherDecoder( decrypt ) ); + ch.getHandle().pipeline().addBefore( PipelineUtils.PACKET_DECODE_HANDLER, PipelineUtils.DECRYPT_HANDLER, new CipherDecoder( decrypt ) ); ch.write( user.getPendingConnection().getForgeLogin() ); diff --git a/proxy/src/main/java/net/md_5/bungee/connection/InitialHandler.java b/proxy/src/main/java/net/md_5/bungee/connection/InitialHandler.java index a2d00532..44bc56df 100644 --- a/proxy/src/main/java/net/md_5/bungee/connection/InitialHandler.java +++ b/proxy/src/main/java/net/md_5/bungee/connection/InitialHandler.java @@ -37,6 +37,7 @@ import net.md_5.bungee.netty.CipherDecoder; import net.md_5.bungee.netty.CipherEncoder; import net.md_5.bungee.netty.PacketDecoder; import net.md_5.bungee.netty.PacketHandler; +import net.md_5.bungee.netty.PipelineUtils; import net.md_5.bungee.protocol.Forge; import net.md_5.bungee.protocol.packet.DefinedPacket; import net.md_5.bungee.protocol.packet.Packet1Login; @@ -174,7 +175,7 @@ public class InitialHandler extends PacketHandler implements PendingConnection sharedKey = EncryptionUtil.getSecret( encryptResponse, request ); Cipher decrypt = EncryptionUtil.getCipher( Cipher.DECRYPT_MODE, sharedKey ); - ch.getHandle().pipeline().addBefore( "decoder", "decrypt", new CipherDecoder( decrypt ) ); + ch.getHandle().pipeline().addBefore( PipelineUtils.PACKET_DECODE_HANDLER, PipelineUtils.DECRYPT_HANDLER, new CipherDecoder( decrypt ) ); if ( BungeeCord.getInstance().config.isOnlineMode() ) { @@ -247,7 +248,7 @@ public class InitialHandler extends PacketHandler implements PendingConnection try { Cipher encrypt = EncryptionUtil.getCipher( Cipher.ENCRYPT_MODE, sharedKey ); - ch.getHandle().pipeline().addBefore( "decoder", "encrypt", new CipherEncoder( encrypt ) ); + ch.getHandle().pipeline().addBefore( PipelineUtils.DECRYPT_HANDLER, PipelineUtils.ENCRYPT_HANDLER, new CipherEncoder( encrypt ) ); } catch ( GeneralSecurityException ex ) { disconnect( "Cipher error: " + Util.exception( ex ) ); diff --git a/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java b/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java index 072c9b51..646a879b 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java @@ -10,19 +10,17 @@ public class ChannelWrapper private final Channel ch; @Getter private volatile boolean closed; - private final ReusableChannelPromise promise; public ChannelWrapper(ChannelHandlerContext ctx) { this.ch = ctx.channel(); - this.promise = new ReusableChannelPromise( ctx ); } public synchronized void write(Object packet) { if ( !closed ) { - ch.write( packet, promise ); + ch.write( packet ); } } diff --git a/proxy/src/main/java/net/md_5/bungee/netty/CipherBase.java b/proxy/src/main/java/net/md_5/bungee/netty/CipherBase.java index f5999cdc..e571d1b3 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/CipherBase.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/CipherBase.java @@ -1,6 +1,7 @@ package net.md_5.bungee.netty; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; import javax.crypto.Cipher; import javax.crypto.ShortBufferException; import lombok.AccessLevel; @@ -51,4 +52,21 @@ public class CipherBase } out.writeBytes( heapOut, 0, cipher.update( heapIn, 0, readableBytes, heapOut ) ); } + + protected ByteBuf cipher(ChannelHandlerContext ctx, ByteBuf in) throws ShortBufferException + { + byte[] heapIn = heapInLocal.get(); + int readableBytes = in.readableBytes(); + if ( heapIn.length < readableBytes ) + { + heapIn = new byte[ readableBytes ]; + heapInLocal.set( heapIn ); + } + in.readBytes( heapIn, 0, readableBytes ); + + ByteBuf out = ctx.alloc().heapBuffer( cipher.getOutputSize( readableBytes ) ); + out.writerIndex( cipher.update( heapIn, 0, readableBytes, out.array(), out.arrayOffset() ) ); + + return out; + } } diff --git a/proxy/src/main/java/net/md_5/bungee/netty/CipherDecoder.java b/proxy/src/main/java/net/md_5/bungee/netty/CipherDecoder.java index cc4055b3..37b629a1 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/CipherDecoder.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/CipherDecoder.java @@ -2,10 +2,11 @@ package net.md_5.bungee.netty; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToByteDecoder; +import io.netty.channel.MessageList; +import io.netty.handler.codec.ByteToMessageDecoder; import javax.crypto.Cipher; -public class CipherDecoder extends ByteToByteDecoder +public class CipherDecoder extends ByteToMessageDecoder { private final CipherBase cipher; @@ -16,8 +17,8 @@ public class CipherDecoder extends ByteToByteDecoder } @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList out) throws Exception { - cipher.cipher( in, out ); + out.add( cipher.cipher( ctx, in ) ); } } diff --git a/proxy/src/main/java/net/md_5/bungee/netty/CipherEncoder.java b/proxy/src/main/java/net/md_5/bungee/netty/CipherEncoder.java index 6d5232e3..75adea97 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/CipherEncoder.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/CipherEncoder.java @@ -2,10 +2,10 @@ package net.md_5.bungee.netty; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToByteEncoder; +import io.netty.handler.codec.MessageToByteEncoder; import javax.crypto.Cipher; -public class CipherEncoder extends ByteToByteEncoder +public class CipherEncoder extends MessageToByteEncoder { private final CipherBase cipher; @@ -16,8 +16,8 @@ public class CipherEncoder extends ByteToByteEncoder } @Override - protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception + protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { - cipher.cipher( in, out ); + cipher.cipher( msg, out ); } } diff --git a/proxy/src/main/java/net/md_5/bungee/netty/HandlerBoss.java b/proxy/src/main/java/net/md_5/bungee/netty/HandlerBoss.java index c729d976..25892f61 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/HandlerBoss.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/HandlerBoss.java @@ -2,7 +2,8 @@ package net.md_5.bungee.netty; import com.google.common.base.Preconditions; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.MessageList; import io.netty.handler.timeout.ReadTimeoutException; import java.io.IOException; import java.util.logging.Level; @@ -16,7 +17,7 @@ import net.md_5.bungee.connection.PingHandler; * channels to maintain simple states, and only call the required, adapted * methods when the channel is connected. */ -public class HandlerBoss extends ChannelInboundMessageHandlerAdapter +public class HandlerBoss extends ChannelInboundHandlerAdapter { private ChannelWrapper channel; @@ -53,27 +54,30 @@ public class HandlerBoss extends ChannelInboundMessageHandlerAdapter } @Override - public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception + public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - if ( ctx.channel().isActive() ) + for ( Object msg : msgs ) { - if ( msg instanceof PacketWrapper ) + if ( ctx.channel().isActive() ) { - boolean sendPacket = true; - try + if ( msg instanceof PacketWrapper ) { - ( (PacketWrapper) msg ).packet.handle( handler ); - } catch ( CancelSendSignal ex ) + boolean sendPacket = true; + try + { + ( (PacketWrapper) msg ).packet.handle( handler ); + } catch ( CancelSendSignal ex ) + { + sendPacket = false; + } + if ( sendPacket ) + { + handler.handle( ( (PacketWrapper) msg ).buf ); + } + } else { - sendPacket = false; + handler.handle( (byte[]) msg ); } - if ( sendPacket ) - { - handler.handle( ( (PacketWrapper) msg ).buf ); - } - } else - { - handler.handle( (byte[]) msg ); } } } diff --git a/proxy/src/main/java/net/md_5/bungee/netty/OutboundHandler.java b/proxy/src/main/java/net/md_5/bungee/netty/OutboundBoss.java similarity index 54% rename from proxy/src/main/java/net/md_5/bungee/netty/OutboundHandler.java rename to proxy/src/main/java/net/md_5/bungee/netty/OutboundBoss.java index 53789451..f0e4771c 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/OutboundHandler.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/OutboundBoss.java @@ -1,19 +1,12 @@ package net.md_5.bungee.netty; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOperationHandlerAdapter; -import io.netty.channel.ChannelPromise; +import io.netty.channel.ChannelOutboundHandlerAdapter; import java.nio.channels.ClosedChannelException; -public class OutboundHandler extends ChannelOperationHandlerAdapter +public class OutboundBoss extends ChannelOutboundHandlerAdapter { - @Override - public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception - { - ctx.flush( promise ); - } - @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { diff --git a/proxy/src/main/java/net/md_5/bungee/netty/PacketDecoder.java b/proxy/src/main/java/net/md_5/bungee/netty/PacketDecoder.java index 3f208951..6c4ef785 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/PacketDecoder.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/PacketDecoder.java @@ -1,8 +1,8 @@ package net.md_5.bungee.netty; import io.netty.buffer.ByteBuf; -import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.MessageList; import io.netty.handler.codec.ReplayingDecoder; import lombok.AllArgsConstructor; import lombok.Getter; @@ -28,7 +28,7 @@ public class PacketDecoder extends ReplayingDecoder private Protocol protocol; @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList out) throws Exception { // While we have enough data while ( true ) @@ -53,10 +53,10 @@ public class PacketDecoder extends ReplayingDecoder // Store our decoded message if ( packet != null ) { - return( new PacketWrapper( packet, buf ) ); + out.add( new PacketWrapper( packet, buf ) ); } else { - return( buf ); + out.add( buf ); } } } diff --git a/proxy/src/main/java/net/md_5/bungee/netty/PipelineUtils.java b/proxy/src/main/java/net/md_5/bungee/netty/PipelineUtils.java index afd28114..c25724b6 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/PipelineUtils.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/PipelineUtils.java @@ -4,6 +4,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.bytes.ByteArrayEncoder; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.AttributeKey; @@ -44,6 +45,14 @@ public class PipelineUtils public static final Base BASE = new Base(); private static final DefinedPacketEncoder packetEncoder = new DefinedPacketEncoder(); private static final ByteArrayEncoder arrayEncoder = new ByteArrayEncoder(); + public static String OUTBOUND_BOSS_HANDLER = "outbound-boss"; + public static String TIMEOUT_HANDLER = "timeout"; + public static String PACKET_DECODE_HANDLER = "packet-decoder"; + public static String PACKET_ENCODE_HANDLER = "packet-encoder"; + public static String ARRAY_ENCODE_HANDLER = "array-encoder"; + public static String BOSS_HANDLER = "inbound-boss"; + public static String ENCRYPT_HANDLER = "encrypt"; + public static String DECRYPT_HANDLER = "decrypt"; public final static class Base extends ChannelInitializer { @@ -59,12 +68,12 @@ public class PipelineUtils // IP_TOS is not supported (Windows XP / Windows Server 2003) } - ch.pipeline().addLast( "outbound", new OutboundHandler() ); - ch.pipeline().addLast( "timer", new ReadTimeoutHandler( BungeeCord.getInstance().config.getTimeout(), TimeUnit.MILLISECONDS ) ); - ch.pipeline().addLast( "decoder", new PacketDecoder( Vanilla.getInstance() ) ); - ch.pipeline().addLast( "packet-encoder", packetEncoder ); - ch.pipeline().addLast( "array-encoder", arrayEncoder ); - ch.pipeline().addLast( "handler", new HandlerBoss() ); + ch.pipeline().addLast( OUTBOUND_BOSS_HANDLER, new OutboundBoss() ); + ch.pipeline().addLast( TIMEOUT_HANDLER, new ReadTimeoutHandler( BungeeCord.getInstance().config.getTimeout(), TimeUnit.MILLISECONDS ) ); + ch.pipeline().addLast( PACKET_DECODE_HANDLER, new PacketDecoder( Vanilla.getInstance() ) ); + ch.pipeline().addLast( PACKET_ENCODE_HANDLER, packetEncoder ); + ch.pipeline().addLast( ARRAY_ENCODE_HANDLER, arrayEncoder ); + ch.pipeline().addLast( BOSS_HANDLER, new HandlerBoss() ); } }; } diff --git a/proxy/src/main/java/net/md_5/bungee/netty/ReusableChannelPromise.java b/proxy/src/main/java/net/md_5/bungee/netty/ReusableChannelPromise.java deleted file mode 100644 index a4c44a17..00000000 --- a/proxy/src/main/java/net/md_5/bungee/netty/ReusableChannelPromise.java +++ /dev/null @@ -1,188 +0,0 @@ -package net.md_5.bungee.netty; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; -import java.io.IOException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.logging.Level; -import lombok.RequiredArgsConstructor; -import net.md_5.bungee.api.ProxyServer; - -@RequiredArgsConstructor -public class ReusableChannelPromise implements ChannelPromise -{ - - private final ChannelHandlerContext ctx; - - @Override - public Channel channel() - { - return ctx.channel(); - } - - @Override - public ChannelPromise setSuccess(Void result) - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public ChannelPromise setSuccess() - { - return this; - } - - @Override - public boolean trySuccess() - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public ChannelPromise setFailure(Throwable cause) - { - if ( !( cause instanceof IOException ) ) - { - ctx.fireExceptionCaught( cause ); - } - return this; - } - - @Override - public ChannelPromise addListener(GenericFutureListener> listener) - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public ChannelPromise addListeners(GenericFutureListener>... listeners) - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public ChannelPromise removeListener(GenericFutureListener> listener) - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public ChannelPromise removeListeners(GenericFutureListener>... listeners) - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public ChannelPromise sync() throws InterruptedException - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public ChannelPromise syncUninterruptibly() - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public ChannelPromise await() throws InterruptedException - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public ChannelPromise awaitUninterruptibly() - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public boolean isSuccess() - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public Throwable cause() - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public boolean await(long timeout, TimeUnit unit) throws InterruptedException - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public boolean await(long timeoutMillis) throws InterruptedException - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public boolean awaitUninterruptibly(long timeout, TimeUnit unit) - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public boolean awaitUninterruptibly(long timeoutMillis) - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public Void getNow() - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public boolean isCancelled() - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public boolean isDone() - { - return false; - } - - @Override - public Void get() throws InterruptedException, ExecutionException - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public boolean trySuccess(Void result) - { - throw new UnsupportedOperationException( "Not supported yet." ); - } - - @Override - public boolean tryFailure(Throwable cause) - { - ProxyServer.getInstance().getLogger().log( Level.WARNING, "Exception in tryFailure(..)", cause ); - return true; - } -}