#2228: Implement basic backpressure on client

This commit is contained in:
md_5 2017-08-18 18:27:25 +10:00
parent fd675022c0
commit bd5a7e5b26
4 changed files with 36 additions and 0 deletions

View File

@ -1,6 +1,7 @@
package net.md_5.bungee.connection; package net.md_5.bungee.connection;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import net.md_5.bungee.BungeeCord; import net.md_5.bungee.BungeeCord;
@ -80,6 +81,22 @@ public class UpstreamBridge extends PacketHandler
} }
} }
@Override
public void writabilityChanged(ChannelWrapper channel) throws Exception
{
if ( con.getServer() != null )
{
Channel server = con.getServer().getCh().getHandle();
if ( channel.getHandle().isWritable() )
{
server.config().setAutoRead( true );
} else
{
server.config().setAutoRead( false );
}
}
}
@Override @Override
public boolean shouldHandle(PacketWrapper packet) throws Exception public boolean shouldHandle(PacketWrapper packet) throws Exception
{ {

View File

@ -64,6 +64,15 @@ public class HandlerBoss extends ChannelInboundHandlerAdapter
} }
} }
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception
{
if ( handler != null )
{
handler.writabilityChanged( channel );
}
}
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{ {

View File

@ -28,4 +28,8 @@ public abstract class PacketHandler extends net.md_5.bungee.protocol.AbstractPac
public void disconnected(ChannelWrapper channel) throws Exception public void disconnected(ChannelWrapper channel) throws Exception
{ {
} }
public void writabilityChanged(ChannelWrapper channel) throws Exception
{
}
} }

View File

@ -7,6 +7,7 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDatagramChannel; import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup;
@ -115,6 +116,10 @@ public class PipelineUtils
return epoll ? EpollDatagramChannel.class : NioDatagramChannel.class; return epoll ? EpollDatagramChannel.class : NioDatagramChannel.class;
} }
private static final int LOW_MARK = Integer.getInteger( "net.md_5.bungee.high_mark", 2 << 18 ); // 0.5 mb
private static final int HIGH_MARK = Integer.getInteger( "net.md_5.bungee.low_mark", 2 << 20 ); // 2 mb
private static final WriteBufferWaterMark MARK = new WriteBufferWaterMark( LOW_MARK, HIGH_MARK );
public final static class Base extends ChannelInitializer<Channel> public final static class Base extends ChannelInitializer<Channel>
{ {
@ -129,6 +134,7 @@ public class PipelineUtils
// IP_TOS is not supported (Windows XP / Windows Server 2003) // IP_TOS is not supported (Windows XP / Windows Server 2003)
} }
ch.config().setAllocator( PooledByteBufAllocator.DEFAULT ); ch.config().setAllocator( PooledByteBufAllocator.DEFAULT );
ch.config().setWriteBufferWaterMark( MARK );
ch.pipeline().addLast( TIMEOUT_HANDLER, new ReadTimeoutHandler( BungeeCord.getInstance().config.getTimeout(), TimeUnit.MILLISECONDS ) ); ch.pipeline().addLast( TIMEOUT_HANDLER, new ReadTimeoutHandler( BungeeCord.getInstance().config.getTimeout(), TimeUnit.MILLISECONDS ) );
ch.pipeline().addLast( FRAME_DECODER, new Varint21FrameDecoder() ); ch.pipeline().addLast( FRAME_DECODER, new Varint21FrameDecoder() );