#3766: Combine packet length prepending and compressor

This commit is contained in:
Janmm14 2025-02-15 15:20:00 +11:00 committed by md_5
parent 0070421549
commit 774a6fd68c
No known key found for this signature in database
GPG Key ID: E8E901AC7C617C11
9 changed files with 229 additions and 174 deletions

View File

@ -292,6 +292,31 @@ public abstract class DefinedPacket
}
}
public static void setVarInt(int value, ByteBuf output, int pos, int len)
{
switch ( len )
{
case 1:
output.setByte( pos, value );
break;
case 2:
output.setShort( pos, ( value & 0x7F | 0x80 ) << 8 | ( value >>> 7 & 0x7F ) );
break;
case 3:
output.setMedium( pos, ( value & 0x7F | 0x80 ) << 16 | ( value >>> 7 & 0x7F | 0x80 ) << 8 | ( value >>> 14 & 0x7F ) );
break;
case 4:
output.setInt( pos, ( value & 0x7F | 0x80 ) << 24 | ( value >>> 7 & 0x7F | 0x80 ) << 16 | ( value >>> 14 & 0x7F | 0x80 ) << 8 | ( value >>> 21 & 0x7F ) );
break;
case 5:
output.setInt( pos, ( value & 0x7F | 0x80 ) << 24 | ( value >>> 7 & 0x7F | 0x80 ) << 16 | ( value >>> 14 & 0x7F | 0x80 ) << 8 | ( value >>> 21 & 0x7F | 0x80 ) );
output.setByte( pos + 4, value >>> 28 );
break;
default:
throw new IllegalArgumentException( "Invalid varint len: " + len );
}
}
public static int readVarShort(ByteBuf buf)
{
int low = buf.readUnsignedShort();

View File

@ -1,26 +0,0 @@
package net.md_5.bungee.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
/**
* Prepend length of the message as a Varint21 using an extra buffer for the
* length, avoiding copying packet data
*/
@ChannelHandler.Sharable
public class Varint21LengthFieldExtraBufPrepender extends MessageToMessageEncoder<ByteBuf>
{
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception
{
int bodyLen = msg.readableBytes();
ByteBuf lenBuf = ctx.alloc().ioBuffer( Varint21LengthFieldPrepender.varintSize( bodyLen ) );
DefinedPacket.writeVarInt( bodyLen, lenBuf );
out.add( lenBuf );
out.add( msg.retain() );
}
}

View File

@ -1,58 +0,0 @@
package net.md_5.bungee.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
import lombok.Setter;
/**
* Prepend length of the message as a Varint21 by writing length and data to a
* new buffer
*/
public class Varint21LengthFieldPrepender extends MessageToMessageEncoder<ByteBuf>
{
@Setter
private boolean compose = true;
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> list) throws Exception
{
int bodyLen = msg.readableBytes();
int headerLen = varintSize( bodyLen );
if ( compose )
{
ByteBuf buf = ctx.alloc().directBuffer( headerLen );
DefinedPacket.writeVarInt( bodyLen, buf );
list.add( ctx.alloc().compositeDirectBuffer( 2 ).addComponents( true, buf, msg.retain() ) );
} else
{
ByteBuf buf = ctx.alloc().directBuffer( headerLen + bodyLen );
DefinedPacket.writeVarInt( bodyLen, buf );
buf.writeBytes( msg );
list.add( buf );
}
}
static int varintSize(int paramInt)
{
if ( ( paramInt & 0xFFFFFF80 ) == 0 )
{
return 1;
}
if ( ( paramInt & 0xFFFFC000 ) == 0 )
{
return 2;
}
if ( ( paramInt & 0xFFE00000 ) == 0 )
{
return 3;
}
if ( ( paramInt & 0xF0000000 ) == 0 )
{
return 4;
}
return 5;
}
}

View File

@ -1,57 +0,0 @@
package net.md_5.bungee.compress;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
import java.util.zip.Deflater;
import lombok.Getter;
import lombok.Setter;
import net.md_5.bungee.jni.zlib.BungeeZlib;
import net.md_5.bungee.protocol.DefinedPacket;
public class PacketCompressor extends MessageToMessageEncoder<ByteBuf>
{
@Getter
private final BungeeZlib zlib = CompressFactory.zlib.newInstance();
@Setter
private int threshold = 256;
@Setter
private boolean compose = true;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception
{
zlib.init( true, Deflater.DEFAULT_COMPRESSION );
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception
{
zlib.free();
}
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception
{
int origSize = msg.readableBytes();
if ( origSize < threshold )
{
if ( compose )
{
// create a virtual buffer to avoid copying of data
out.add( ctx.alloc().compositeDirectBuffer( 2 ).addComponents( true, ctx.alloc().directBuffer( 1 ).writeByte( 0 ), msg.retain() ) );
} else
{
out.add( ctx.alloc().directBuffer( origSize + 1 ).writeByte( 0 ).writeBytes( msg ) );
}
} else
{
ByteBuf buf = ctx.alloc().directBuffer( BungeeZlib.OUTPUT_BUFFER_SIZE );
DefinedPacket.writeVarInt( origSize, buf );
zlib.process( msg, buf );
out.add( buf );
}
}
}

View File

@ -506,7 +506,7 @@ public class InitialHandler extends PacketHandler implements PendingConnection
BungeeCipher decrypt = EncryptionUtil.getCipher( false, sharedKey );
ch.addBefore( PipelineUtils.FRAME_DECODER, PipelineUtils.DECRYPT_HANDLER, new CipherDecoder( decrypt ) );
BungeeCipher encrypt = EncryptionUtil.getCipher( true, sharedKey );
ch.addBefore( PipelineUtils.FRAME_PREPENDER, PipelineUtils.ENCRYPT_HANDLER, new CipherEncoder( encrypt ) );
ch.addBefore( PipelineUtils.FRAME_PREPENDER_AND_COMPRESS, PipelineUtils.ENCRYPT_HANDLER, new CipherEncoder( encrypt ) );
// disable use of composite buffers if we use natives
ch.updateComposite();

View File

@ -38,7 +38,7 @@ public class PingHandler extends PacketHandler
MinecraftEncoder encoder = new MinecraftEncoder( Protocol.HANDSHAKE, false, protocol );
channel.getHandle().pipeline().addAfter( PipelineUtils.FRAME_DECODER, PipelineUtils.PACKET_DECODER, new MinecraftDecoder( Protocol.STATUS, false, ProxyServer.getInstance().getProtocolVersion() ) );
channel.getHandle().pipeline().addAfter( PipelineUtils.FRAME_PREPENDER, PipelineUtils.PACKET_ENCODER, encoder );
channel.getHandle().pipeline().addAfter( PipelineUtils.FRAME_PREPENDER_AND_COMPRESS, PipelineUtils.PACKET_ENCODER, encoder );
channel.write( new Handshake( protocol, target.getAddress().getHostString(), target.getAddress().getPort(), 1 ) );

View File

@ -11,7 +11,6 @@ import java.util.logging.Level;
import lombok.Getter;
import lombok.Setter;
import net.md_5.bungee.api.ProxyServer;
import net.md_5.bungee.compress.PacketCompressor;
import net.md_5.bungee.compress.PacketDecompressor;
import net.md_5.bungee.netty.cipher.CipherEncoder;
import net.md_5.bungee.protocol.DefinedPacket;
@ -19,7 +18,6 @@ import net.md_5.bungee.protocol.MinecraftDecoder;
import net.md_5.bungee.protocol.MinecraftEncoder;
import net.md_5.bungee.protocol.PacketWrapper;
import net.md_5.bungee.protocol.Protocol;
import net.md_5.bungee.protocol.Varint21LengthFieldPrepender;
import net.md_5.bungee.protocol.packet.Kick;
public class ChannelWrapper
@ -180,13 +178,13 @@ public class ChannelWrapper
public void setCompressionThreshold(int compressionThreshold)
{
PacketCompressor compressor = ch.pipeline().get( PacketCompressor.class );
LengthPrependerAndCompressor compressor = ch.pipeline().get( LengthPrependerAndCompressor.class );
PacketDecompressor decompressor = ch.pipeline().get( PacketDecompressor.class );
if ( compressionThreshold >= 0 )
{
if ( compressor == null )
if ( !compressor.isCompress() )
{
addBefore( PipelineUtils.PACKET_ENCODER, "compress", compressor = new PacketCompressor() );
compressor.setCompress( true );
}
compressor.setThreshold( compressionThreshold );
@ -196,10 +194,7 @@ public class ChannelWrapper
}
} else
{
if ( compressor != null )
{
ch.pipeline().remove( "compress" );
}
compressor.setCompress( false );
if ( decompressor != null )
{
ch.pipeline().remove( "decompress" );
@ -216,26 +211,16 @@ public class ChannelWrapper
public void updateComposite()
{
CipherEncoder cipherEncoder = ch.pipeline().get( CipherEncoder.class );
PacketCompressor packetCompressor = ch.pipeline().get( PacketCompressor.class );
Varint21LengthFieldPrepender prepender = ch.pipeline().get( Varint21LengthFieldPrepender.class );
LengthPrependerAndCompressor prependerAndCompressor = ch.pipeline().get( LengthPrependerAndCompressor.class );
boolean compressorCompose = cipherEncoder == null || cipherEncoder.getCipher().allowComposite();
boolean prependerCompose = compressorCompose && ( packetCompressor == null || packetCompressor.getZlib().allowComposite() );
if ( prepender != null )
if ( prependerAndCompressor != null )
{
ProxyServer.getInstance().getLogger().log( Level.FINE, "set prepender compose to {0} for {1}", new Object[]
{
prependerCompose, ch
} );
prepender.setCompose( prependerCompose );
}
if ( packetCompressor != null )
{
ProxyServer.getInstance().getLogger().log( Level.FINE, "set packetCompressor compose to {0} for {1}", new Object[]
{
compressorCompose, ch
} );
packetCompressor.setCompose( compressorCompose );
prependerAndCompressor.setCompose( compressorCompose );
}
}

View File

@ -0,0 +1,189 @@
package net.md_5.bungee.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
import java.util.zip.Deflater;
import lombok.Setter;
import net.md_5.bungee.compress.CompressFactory;
import net.md_5.bungee.jni.zlib.BungeeZlib;
import net.md_5.bungee.protocol.DefinedPacket;
/**
* prepends length of message and optionally compresses message beforehand
* <br>
* combining these operations allows to keep space infront of compressed data for length varint
*/
public class LengthPrependerAndCompressor extends MessageToMessageEncoder<ByteBuf>
{
// reasonable to not support length varints > 4 byte (268435455 byte > 268MB)
// if ever changed to smaller than 4, also change varintSize method to check for that
private static final byte MAX_SUPPORTED_VARINT_LENGTH_LEN = 4;
private static final byte FLAG_COMPRESS = 0x01;
/**
* overridden by FLAG_TWO_BUFFERS if set
*/
private static final byte FLAG_COMPOSE = 0x02;
/**
* overwrites FLAG_COMPOSE if set
*/
private static final byte FLAG_TWO_BUFFERS = 0x04;
public LengthPrependerAndCompressor(boolean compose, boolean twoBuffers)
{
setCompose( compose );
setTwoBuffers( twoBuffers );
}
private BungeeZlib zlib;
@Setter
private int threshold = 256;
private byte flags = FLAG_COMPOSE;
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception
{
int oldBodyLen = msg.readableBytes();
final byte flags = this.flags;
if ( ( flags & FLAG_COMPRESS ) != 0 )
{
if ( oldBodyLen < threshold )
{
byte lengthLen = varintSize( oldBodyLen + 1 );
if ( ( flags & FLAG_TWO_BUFFERS ) != 0 )
{
ByteBuf lenBuf = ctx.alloc().directBuffer( lengthLen );
DefinedPacket.writeVarInt( oldBodyLen + 1, lenBuf );
lenBuf.writeByte( 0 ); // indicates uncompressed
out.add( lenBuf );
out.add( msg.retain() );
} else if ( ( flags & FLAG_COMPOSE ) != 0 )
{
// create a virtual buffer to avoid copying of data
ByteBuf pre = ctx.alloc().directBuffer( lengthLen + 1 );
DefinedPacket.writeVarInt( oldBodyLen + 1, pre );
pre.writeByte( 0 ); // indicates uncompressed
out.add( ctx.alloc().compositeDirectBuffer( 2 ).addComponents( true, pre, msg.retain() ) );
} else
{
ByteBuf buf = ctx.alloc().directBuffer( lengthLen + 1 + oldBodyLen );
DefinedPacket.writeVarInt( oldBodyLen + 1, buf );
out.add( buf.writeByte( 0 ).writeBytes( msg ) ); // 0 indicates uncompressed
}
} else
{
ByteBuf buf = ctx.alloc().directBuffer( BungeeZlib.OUTPUT_BUFFER_SIZE + MAX_SUPPORTED_VARINT_LENGTH_LEN + varintSize( oldBodyLen ) );
buf.writerIndex( MAX_SUPPORTED_VARINT_LENGTH_LEN ); // Reserve space for packet length varint
DefinedPacket.writeVarInt( oldBodyLen, buf ); // write uncompressed length
zlib.process( msg, buf ); // compress data to buf
// write varint length of compressed directly infront of compressed data
// leaves potential unused bytes at buffer start
int writerIndex = buf.writerIndex();
int compressedLen = writerIndex - MAX_SUPPORTED_VARINT_LENGTH_LEN;
byte lengthLen = varintSize( compressedLen );
int lengthStart = MAX_SUPPORTED_VARINT_LENGTH_LEN - lengthLen;
DefinedPacket.setVarInt( compressedLen, buf, lengthStart, lengthLen );
buf.readerIndex( lengthStart ); // set start of buffer to ignore potential unused bytes before length
out.add( buf );
}
} else
{
byte lengthLen = varintSize( oldBodyLen );
if ( ( flags & FLAG_TWO_BUFFERS ) != 0 )
{
ByteBuf lenBuf = ctx.alloc().directBuffer( lengthLen );
DefinedPacket.writeVarInt( oldBodyLen, lenBuf );
out.add( lenBuf );
out.add( msg.retain() );
} else if ( ( flags & FLAG_COMPOSE ) != 0 )
{
// create a virtual buffer to avoid copying of data
ByteBuf pre = ctx.alloc().directBuffer( lengthLen );
DefinedPacket.writeVarInt( oldBodyLen, pre );
out.add( ctx.alloc().compositeDirectBuffer( 2 ).addComponents( true, pre, msg.retain() ) );
} else
{
ByteBuf buf = ctx.alloc().directBuffer( lengthLen + oldBodyLen );
DefinedPacket.writeVarInt( oldBodyLen, buf );
out.add( buf.writeBytes( msg ) ); // 0 indicates uncompressed
}
}
}
public void setCompose(boolean compose)
{
if ( compose )
{
flags |= FLAG_COMPOSE;
} else
{
flags &= ~FLAG_COMPOSE;
}
}
public boolean isCompress()
{
return ( flags & FLAG_COMPRESS ) != 0;
}
public void setCompress(boolean compress)
{
if ( compress )
{
BungeeZlib zlib = this.zlib;
if ( zlib == null )
{
this.zlib = zlib = CompressFactory.zlib.newInstance();
}
zlib.init( true, Deflater.DEFAULT_COMPRESSION );
flags |= FLAG_COMPRESS;
} else
{
flags &= ~FLAG_COMPRESS;
if ( zlib != null )
{
zlib.free();
}
}
}
public void setTwoBuffers(boolean twoBuffers)
{
if ( twoBuffers )
{
flags |= FLAG_TWO_BUFFERS;
} else
{
flags &= ~FLAG_TWO_BUFFERS;
}
}
private static byte varintSize(int value)
{
if ( ( value & 0xFFFFFF80 ) == 0 )
{
return 1;
}
if ( ( value & 0xFFFFC000 ) == 0 )
{
return 2;
}
if ( ( value & 0xFFE00000 ) == 0 )
{
return 3;
}
if ( ( value & 0xF0000000 ) == 0 )
{
return 4;
}
if ( MAX_SUPPORTED_VARINT_LENGTH_LEN < 5 )
{
throw new IllegalArgumentException( "Packet length " + value + " longer than supported (max. 268435455 for 4 byte varint)" );
}
return 5;
}
}

View File

@ -48,8 +48,6 @@ import net.md_5.bungee.protocol.MinecraftDecoder;
import net.md_5.bungee.protocol.MinecraftEncoder;
import net.md_5.bungee.protocol.Protocol;
import net.md_5.bungee.protocol.Varint21FrameDecoder;
import net.md_5.bungee.protocol.Varint21LengthFieldExtraBufPrepender;
import net.md_5.bungee.protocol.Varint21LengthFieldPrepender;
import net.md_5.bungee.protocol.channel.BungeeChannelInitializer;
import net.md_5.bungee.protocol.channel.ChannelAcceptor;
@ -79,8 +77,8 @@ public class PipelineUtils
BASE.accept( ch );
ch.pipeline().addBefore( FRAME_DECODER, LEGACY_DECODER, new LegacyDecoder() );
ch.pipeline().addAfter( FRAME_DECODER, PACKET_DECODER, new MinecraftDecoder( Protocol.HANDSHAKE, true, ProxyServer.getInstance().getProtocolVersion() ) );
ch.pipeline().addAfter( FRAME_PREPENDER, PACKET_ENCODER, new MinecraftEncoder( Protocol.HANDSHAKE, true, ProxyServer.getInstance().getProtocolVersion() ) );
ch.pipeline().addBefore( FRAME_PREPENDER, LEGACY_KICKER, legacyKicker );
ch.pipeline().addAfter( FRAME_PREPENDER_AND_COMPRESS, PACKET_ENCODER, new MinecraftEncoder( Protocol.HANDSHAKE, true, ProxyServer.getInstance().getProtocolVersion() ) );
ch.pipeline().addBefore( FRAME_PREPENDER_AND_COMPRESS, LEGACY_KICKER, legacyKicker );
ch.pipeline().get( HandlerBoss.class ).setHandler( new InitialHandler( BungeeCord.getInstance(), listener ) );
if ( listener.isProxyProtocol() )
@ -95,7 +93,7 @@ public class PipelineUtils
{
PipelineUtils.BASE_SERVERSIDE.accept( ch );
ch.pipeline().addAfter( PipelineUtils.FRAME_DECODER, PipelineUtils.PACKET_DECODER, new MinecraftDecoder( Protocol.HANDSHAKE, false, ProxyServer.getInstance().getProtocolVersion() ) );
ch.pipeline().addAfter( PipelineUtils.FRAME_PREPENDER, PipelineUtils.PACKET_ENCODER, new MinecraftEncoder( Protocol.HANDSHAKE, false, ProxyServer.getInstance().getProtocolVersion() ) );
ch.pipeline().addAfter( PipelineUtils.FRAME_PREPENDER_AND_COMPRESS, PipelineUtils.PACKET_ENCODER, new MinecraftEncoder( Protocol.HANDSHAKE, false, ProxyServer.getInstance().getProtocolVersion() ) );
return true;
} ) );
@ -106,7 +104,6 @@ public class PipelineUtils
private static final ChannelAcceptor BASE = new Base( false );
private static final ChannelAcceptor BASE_SERVERSIDE = new Base( true );
private static final KickStringWriter legacyKicker = new KickStringWriter();
private static final Varint21LengthFieldExtraBufPrepender serverFramePrepender = new Varint21LengthFieldExtraBufPrepender();
public static final String TIMEOUT_HANDLER = "timeout";
public static final String PACKET_DECODER = "packet-decoder";
public static final String PACKET_ENCODER = "packet-encoder";
@ -114,7 +111,7 @@ public class PipelineUtils
public static final String ENCRYPT_HANDLER = "encrypt";
public static final String DECRYPT_HANDLER = "decrypt";
public static final String FRAME_DECODER = "frame-decoder";
public static final String FRAME_PREPENDER = "frame-prepender";
public static final String FRAME_PREPENDER_AND_COMPRESS = "frame-prepender-compress";
public static final String LEGACY_DECODER = "legacy-decoder";
public static final String LEGACY_KICKER = "legacy-kick";
@ -216,8 +213,8 @@ public class PipelineUtils
ch.pipeline().addLast( TIMEOUT_HANDLER, new ReadTimeoutHandler( BungeeCord.getInstance().config.getTimeout(), TimeUnit.MILLISECONDS ) );
// No encryption bungee -> server, therefore use extra buffer to avoid copying everything for length prepending
// Not used bungee -> client as header would need to be encrypted separately through expensive JNI call
ch.pipeline().addLast( FRAME_PREPENDER, ( toServer ) ? serverFramePrepender : new Varint21LengthFieldPrepender() );
// TODO: evaluate difference compose vs two buffers
ch.pipeline().addLast( FRAME_PREPENDER_AND_COMPRESS, new LengthPrependerAndCompressor( true, toServer ) );
ch.pipeline().addLast( BOSS_HANDLER, new HandlerBoss() );
return true;