#3737: Use composite buffers where possible

This commit is contained in:
Outfluencer 2024-09-09 21:01:19 +10:00 committed by md_5
parent 477ea5983c
commit b309e4ac50
No known key found for this signature in database
GPG Key ID: E8E901AC7C617C11
12 changed files with 123 additions and 23 deletions

View File

@ -18,4 +18,10 @@ public interface BungeeCipher
void cipher(ByteBuf in, ByteBuf out) throws GeneralSecurityException; void cipher(ByteBuf in, ByteBuf out) throws GeneralSecurityException;
ByteBuf cipher(ChannelHandlerContext ctx, ByteBuf in) throws GeneralSecurityException; ByteBuf cipher(ChannelHandlerContext ctx, ByteBuf in) throws GeneralSecurityException;
/*
* This indicates whether the input ByteBuf is allowed to be a CompositeByteBuf.
* If you need access to a memory address, you should not allow composite buffers.
*/
boolean allowComposite();
} }

View File

@ -2,6 +2,7 @@ package net.md_5.bungee.jni.cipher;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.FastThreadLocal;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import javax.crypto.Cipher; import javax.crypto.Cipher;
import javax.crypto.SecretKey; import javax.crypto.SecretKey;
@ -12,10 +13,10 @@ public class JavaCipher implements BungeeCipher
{ {
private final Cipher cipher; private final Cipher cipher;
private static final ThreadLocal<byte[]> heapInLocal = new EmptyByteThreadLocal(); private static final FastThreadLocal<byte[]> heapInLocal = new EmptyByteThreadLocal();
private static final ThreadLocal<byte[]> heapOutLocal = new EmptyByteThreadLocal(); private static final FastThreadLocal<byte[]> heapOutLocal = new EmptyByteThreadLocal();
private static class EmptyByteThreadLocal extends ThreadLocal<byte[]> private static class EmptyByteThreadLocal extends FastThreadLocal<byte[]>
{ {
@Override @Override
@ -88,4 +89,10 @@ public class JavaCipher implements BungeeCipher
in.readBytes( heapIn, 0, readableBytes ); in.readBytes( heapIn, 0, readableBytes );
return heapIn; return heapIn;
} }
@Override
public boolean allowComposite()
{
return true;
}
} }

View File

@ -71,4 +71,10 @@ public class NativeCipher implements BungeeCipher
return heapOut; return heapOut;
} }
@Override
public boolean allowComposite()
{
return false;
}
} }

View File

@ -6,9 +6,17 @@ import java.util.zip.DataFormatException;
public interface BungeeZlib public interface BungeeZlib
{ {
public static final int OUTPUT_BUFFER_SIZE = 8192;
void init(boolean compress, int level); void init(boolean compress, int level);
void free(); void free();
void process(ByteBuf in, ByteBuf out) throws DataFormatException; void process(ByteBuf in, ByteBuf out) throws DataFormatException;
/*
* This indicates whether the input ByteBuf is allowed to be a CompositeByteBuf.
* If you need access to a memory address, you should not allow composite buffers.
*/
boolean allowComposite();
} }

View File

@ -73,4 +73,10 @@ public class JavaZlib implements BungeeZlib
inflater.reset(); inflater.reset();
} }
} }
@Override
public boolean allowComposite()
{
return true;
}
} }

View File

@ -55,7 +55,7 @@ public class NativeZlib implements BungeeZlib
while ( !nativeCompress.finished && ( compress || in.isReadable() ) ) while ( !nativeCompress.finished && ( compress || in.isReadable() ) )
{ {
out.ensureWritable( 8192 ); out.ensureWritable( OUTPUT_BUFFER_SIZE );
int processed; int processed;
try try
@ -74,4 +74,10 @@ public class NativeZlib implements BungeeZlib
nativeCompress.consumed = 0; nativeCompress.consumed = 0;
nativeCompress.finished = false; nativeCompress.finished = false;
} }
@Override
public boolean allowComposite()
{
return false;
}
} }

View File

@ -1,27 +1,38 @@
package net.md_5.bungee.protocol; package net.md_5.bungee.protocol;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
import lombok.Setter;
/** /**
* Prepend length of the message as a Varint21 by writing length and data to a * Prepend length of the message as a Varint21 by writing length and data to a
* new buffer * new buffer
*/ */
@ChannelHandler.Sharable public class Varint21LengthFieldPrepender extends MessageToMessageEncoder<ByteBuf>
public class Varint21LengthFieldPrepender extends MessageToByteEncoder<ByteBuf>
{ {
@Setter
private boolean compose = true;
@Override @Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> list) throws Exception
{ {
int bodyLen = msg.readableBytes(); int bodyLen = msg.readableBytes();
int headerLen = varintSize( bodyLen ); int headerLen = varintSize( bodyLen );
out.ensureWritable( headerLen + bodyLen ); if ( compose )
{
DefinedPacket.writeVarInt( bodyLen, out ); ByteBuf buf = ctx.alloc().directBuffer( headerLen );
out.writeBytes( msg ); DefinedPacket.writeVarInt( bodyLen, buf );
list.add( ctx.alloc().compositeDirectBuffer( 2 ).addComponents( true, buf, msg.retain() ) );
} else
{
ByteBuf buf = ctx.alloc().directBuffer( headerLen + bodyLen );
DefinedPacket.writeVarInt( bodyLen, buf );
buf.writeBytes( msg );
list.add( buf );
}
} }
static int varintSize(int paramInt) static int varintSize(int paramInt)

View File

@ -2,18 +2,23 @@ package net.md_5.bungee.compress;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
import java.util.zip.Deflater; import java.util.zip.Deflater;
import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import net.md_5.bungee.jni.zlib.BungeeZlib; import net.md_5.bungee.jni.zlib.BungeeZlib;
import net.md_5.bungee.protocol.DefinedPacket; import net.md_5.bungee.protocol.DefinedPacket;
public class PacketCompressor extends MessageToByteEncoder<ByteBuf> public class PacketCompressor extends MessageToMessageEncoder<ByteBuf>
{ {
@Getter
private final BungeeZlib zlib = CompressFactory.zlib.newInstance(); private final BungeeZlib zlib = CompressFactory.zlib.newInstance();
@Setter @Setter
private int threshold = 256; private int threshold = 256;
@Setter
private boolean compose = true;
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception public void handlerAdded(ChannelHandlerContext ctx) throws Exception
@ -28,18 +33,25 @@ public class PacketCompressor extends MessageToByteEncoder<ByteBuf>
} }
@Override @Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception
{ {
int origSize = msg.readableBytes(); int origSize = msg.readableBytes();
if ( origSize < threshold ) if ( origSize < threshold )
{ {
DefinedPacket.writeVarInt( 0, out ); if ( compose )
out.writeBytes( msg ); {
// create a virtual buffer to avoid copying of data
out.add( ctx.alloc().compositeDirectBuffer( 2 ).addComponents( true, ctx.alloc().directBuffer( 1 ).writeByte( 0 ), msg.retain() ) );
} else } else
{ {
DefinedPacket.writeVarInt( origSize, out ); out.add( ctx.alloc().directBuffer( origSize + 1 ).writeByte( 0 ).writeBytes( msg ) );
}
zlib.process( msg, out ); } else
{
ByteBuf buf = ctx.alloc().directBuffer( BungeeZlib.OUTPUT_BUFFER_SIZE );
DefinedPacket.writeVarInt( origSize, buf );
zlib.process( msg, buf );
out.add( buf );
} }
} }
} }

View File

@ -508,6 +508,8 @@ public class InitialHandler extends PacketHandler implements PendingConnection
ch.addBefore( PipelineUtils.FRAME_DECODER, PipelineUtils.DECRYPT_HANDLER, new CipherDecoder( decrypt ) ); ch.addBefore( PipelineUtils.FRAME_DECODER, PipelineUtils.DECRYPT_HANDLER, new CipherDecoder( decrypt ) );
BungeeCipher encrypt = EncryptionUtil.getCipher( true, sharedKey ); BungeeCipher encrypt = EncryptionUtil.getCipher( true, sharedKey );
ch.addBefore( PipelineUtils.FRAME_PREPENDER, PipelineUtils.ENCRYPT_HANDLER, new CipherEncoder( encrypt ) ); ch.addBefore( PipelineUtils.FRAME_PREPENDER, PipelineUtils.ENCRYPT_HANDLER, new CipherEncoder( encrypt ) );
// disable use of composite buffers if we use natives
ch.updateComposite();
String encName = URLEncoder.encode( InitialHandler.this.getName(), "UTF-8" ); String encName = URLEncoder.encode( InitialHandler.this.getName(), "UTF-8" );

View File

@ -7,15 +7,19 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import net.md_5.bungee.api.ProxyServer;
import net.md_5.bungee.compress.PacketCompressor; import net.md_5.bungee.compress.PacketCompressor;
import net.md_5.bungee.compress.PacketDecompressor; import net.md_5.bungee.compress.PacketDecompressor;
import net.md_5.bungee.netty.cipher.CipherEncoder;
import net.md_5.bungee.protocol.DefinedPacket; import net.md_5.bungee.protocol.DefinedPacket;
import net.md_5.bungee.protocol.MinecraftDecoder; import net.md_5.bungee.protocol.MinecraftDecoder;
import net.md_5.bungee.protocol.MinecraftEncoder; import net.md_5.bungee.protocol.MinecraftEncoder;
import net.md_5.bungee.protocol.PacketWrapper; import net.md_5.bungee.protocol.PacketWrapper;
import net.md_5.bungee.protocol.Protocol; import net.md_5.bungee.protocol.Protocol;
import net.md_5.bungee.protocol.Varint21LengthFieldPrepender;
import net.md_5.bungee.protocol.packet.Kick; import net.md_5.bungee.protocol.packet.Kick;
public class ChannelWrapper public class ChannelWrapper
@ -187,5 +191,36 @@ public class ChannelWrapper
{ {
ch.pipeline().remove( "decompress" ); ch.pipeline().remove( "decompress" );
} }
// disable use of composite buffers if we use natives
updateComposite();
}
/*
* Should be called on encryption add and on compressor add or remove
*/
public void updateComposite()
{
CipherEncoder cipherEncoder = ch.pipeline().get( CipherEncoder.class );
PacketCompressor packetCompressor = ch.pipeline().get( PacketCompressor.class );
Varint21LengthFieldPrepender prepender = ch.pipeline().get( Varint21LengthFieldPrepender.class );
boolean compressorCompose = cipherEncoder == null || cipherEncoder.getCipher().allowComposite();
boolean prependerCompose = compressorCompose && ( packetCompressor == null || packetCompressor.getZlib().allowComposite() );
if ( prepender != null )
{
ProxyServer.getInstance().getLogger().log( Level.FINE, "set prepender compose to {0} for {1}", new Object[]
{
prependerCompose, ch
} );
prepender.setCompose( prependerCompose );
}
if ( packetCompressor != null )
{
ProxyServer.getInstance().getLogger().log( Level.FINE, "set packetCompressor compose to {0} for {1}", new Object[]
{
compressorCompose, ch
} );
packetCompressor.setCompose( compressorCompose );
}
} }
} }

View File

@ -93,7 +93,6 @@ public class PipelineUtils
public static final Base BASE = new Base( false ); public static final Base BASE = new Base( false );
public static final Base BASE_SERVERSIDE = new Base( true ); public static final Base BASE_SERVERSIDE = new Base( true );
private static final KickStringWriter legacyKicker = new KickStringWriter(); private static final KickStringWriter legacyKicker = new KickStringWriter();
private static final Varint21LengthFieldPrepender framePrepender = new Varint21LengthFieldPrepender();
private static final Varint21LengthFieldExtraBufPrepender serverFramePrepender = new Varint21LengthFieldExtraBufPrepender(); private static final Varint21LengthFieldExtraBufPrepender serverFramePrepender = new Varint21LengthFieldExtraBufPrepender();
public static final String TIMEOUT_HANDLER = "timeout"; public static final String TIMEOUT_HANDLER = "timeout";
public static final String PACKET_DECODER = "packet-decoder"; public static final String PACKET_DECODER = "packet-decoder";
@ -202,7 +201,7 @@ public class PipelineUtils
ch.pipeline().addLast( TIMEOUT_HANDLER, new ReadTimeoutHandler( BungeeCord.getInstance().config.getTimeout(), TimeUnit.MILLISECONDS ) ); ch.pipeline().addLast( TIMEOUT_HANDLER, new ReadTimeoutHandler( BungeeCord.getInstance().config.getTimeout(), TimeUnit.MILLISECONDS ) );
// No encryption bungee -> server, therefore use extra buffer to avoid copying everything for length prepending // No encryption bungee -> server, therefore use extra buffer to avoid copying everything for length prepending
// Not used bungee -> client as header would need to be encrypted separately through expensive JNI call // Not used bungee -> client as header would need to be encrypted separately through expensive JNI call
ch.pipeline().addLast( FRAME_PREPENDER, ( toServer ) ? serverFramePrepender : framePrepender ); ch.pipeline().addLast( FRAME_PREPENDER, ( toServer ) ? serverFramePrepender : new Varint21LengthFieldPrepender() );
ch.pipeline().addLast( BOSS_HANDLER, new HandlerBoss() ); ch.pipeline().addLast( BOSS_HANDLER, new HandlerBoss() );
} }

View File

@ -3,6 +3,7 @@ package net.md_5.bungee.netty.cipher;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;
import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import net.md_5.bungee.jni.cipher.BungeeCipher; import net.md_5.bungee.jni.cipher.BungeeCipher;
@ -10,6 +11,7 @@ import net.md_5.bungee.jni.cipher.BungeeCipher;
public class CipherEncoder extends MessageToByteEncoder<ByteBuf> public class CipherEncoder extends MessageToByteEncoder<ByteBuf>
{ {
@Getter
private final BungeeCipher cipher; private final BungeeCipher cipher;
@Override @Override