From bd5a7e5b26b698c57e6f99933452bfc0659269f9 Mon Sep 17 00:00:00 2001 From: md_5 Date: Fri, 18 Aug 2017 18:27:25 +1000 Subject: [PATCH] #2228: Implement basic backpressure on client --- .../md_5/bungee/connection/UpstreamBridge.java | 17 +++++++++++++++++ .../java/net/md_5/bungee/netty/HandlerBoss.java | 9 +++++++++ .../net/md_5/bungee/netty/PacketHandler.java | 4 ++++ .../net/md_5/bungee/netty/PipelineUtils.java | 6 ++++++ 4 files changed, 36 insertions(+) diff --git a/proxy/src/main/java/net/md_5/bungee/connection/UpstreamBridge.java b/proxy/src/main/java/net/md_5/bungee/connection/UpstreamBridge.java index 92d1da05..167f1f88 100644 --- a/proxy/src/main/java/net/md_5/bungee/connection/UpstreamBridge.java +++ b/proxy/src/main/java/net/md_5/bungee/connection/UpstreamBridge.java @@ -1,6 +1,7 @@ package net.md_5.bungee.connection; import com.google.common.base.Preconditions; +import io.netty.channel.Channel; import java.util.ArrayList; import java.util.List; import net.md_5.bungee.BungeeCord; @@ -80,6 +81,22 @@ public class UpstreamBridge extends PacketHandler } } + @Override + public void writabilityChanged(ChannelWrapper channel) throws Exception + { + if ( con.getServer() != null ) + { + Channel server = con.getServer().getCh().getHandle(); + if ( channel.getHandle().isWritable() ) + { + server.config().setAutoRead( true ); + } else + { + server.config().setAutoRead( false ); + } + } + } + @Override public boolean shouldHandle(PacketWrapper packet) throws Exception { diff --git a/proxy/src/main/java/net/md_5/bungee/netty/HandlerBoss.java b/proxy/src/main/java/net/md_5/bungee/netty/HandlerBoss.java index 93776626..bf8ec29f 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/HandlerBoss.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/HandlerBoss.java @@ -64,6 +64,15 @@ public class HandlerBoss extends ChannelInboundHandlerAdapter } } + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception + { + if ( handler != null ) + { + handler.writabilityChanged( channel ); + } + } + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { diff --git a/proxy/src/main/java/net/md_5/bungee/netty/PacketHandler.java b/proxy/src/main/java/net/md_5/bungee/netty/PacketHandler.java index 7958eaa3..7bd223d9 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/PacketHandler.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/PacketHandler.java @@ -28,4 +28,8 @@ public abstract class PacketHandler extends net.md_5.bungee.protocol.AbstractPac public void disconnected(ChannelWrapper channel) throws Exception { } + + public void writabilityChanged(ChannelWrapper channel) throws Exception + { + } } diff --git a/proxy/src/main/java/net/md_5/bungee/netty/PipelineUtils.java b/proxy/src/main/java/net/md_5/bungee/netty/PipelineUtils.java index f239c7d8..5f4020cc 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/PipelineUtils.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/PipelineUtils.java @@ -7,6 +7,7 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollDatagramChannel; import io.netty.channel.epoll.EpollEventLoopGroup; @@ -115,6 +116,10 @@ public class PipelineUtils return epoll ? EpollDatagramChannel.class : NioDatagramChannel.class; } + private static final int LOW_MARK = Integer.getInteger( "net.md_5.bungee.high_mark", 2 << 18 ); // 0.5 mb + private static final int HIGH_MARK = Integer.getInteger( "net.md_5.bungee.low_mark", 2 << 20 ); // 2 mb + private static final WriteBufferWaterMark MARK = new WriteBufferWaterMark( LOW_MARK, HIGH_MARK ); + public final static class Base extends ChannelInitializer { @@ -129,6 +134,7 @@ public class PipelineUtils // IP_TOS is not supported (Windows XP / Windows Server 2003) } ch.config().setAllocator( PooledByteBufAllocator.DEFAULT ); + ch.config().setWriteBufferWaterMark( MARK ); ch.pipeline().addLast( TIMEOUT_HANDLER, new ReadTimeoutHandler( BungeeCord.getInstance().config.getTimeout(), TimeUnit.MILLISECONDS ) ); ch.pipeline().addLast( FRAME_DECODER, new Varint21FrameDecoder() );