diff --git a/proxy/src/main/java/net/md_5/bungee/ServerConnection.java b/proxy/src/main/java/net/md_5/bungee/ServerConnection.java index b4cc85b2..54254a9b 100644 --- a/proxy/src/main/java/net/md_5/bungee/ServerConnection.java +++ b/proxy/src/main/java/net/md_5/bungee/ServerConnection.java @@ -5,7 +5,6 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayDeque; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import lombok.Data; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -32,7 +31,7 @@ public class ServerConnection implements Server private final boolean forgeServer = false; @Getter private final Queue keepAlives = new ArrayDeque<>(); - private final Queue packetQueue = new ConcurrentLinkedQueue<>(); + private final Queue packetQueue = new ArrayDeque<>(); private final Unsafe unsafe = new Unsafe() { @@ -45,23 +44,37 @@ public class ServerConnection implements Server public void sendPacketQueued(DefinedPacket packet) { - Protocol encodeProtocol = ch.getEncodeProtocol(); - if ( !encodeProtocol.TO_SERVER.hasPacket( packet.getClass(), ch.getEncodeVersion() ) ) + ch.scheduleIfNecessary( () -> { - packetQueue.add( packet ); - } else - { - unsafe().sendPacket( packet ); - } + if ( ch.isClosed() ) + { + return; + } + Protocol encodeProtocol = ch.getEncodeProtocol(); + if ( !encodeProtocol.TO_SERVER.hasPacket( packet.getClass(), ch.getEncodeVersion() ) ) + { + packetQueue.add( packet ); + } else + { + unsafe().sendPacket( packet ); + } + } ); } public void sendQueuedPackets() { - DefinedPacket packet; - while ( ( packet = packetQueue.poll() ) != null ) + ch.scheduleIfNecessary( () -> { - unsafe().sendPacket( packet ); - } + if ( ch.isClosed() ) + { + return; + } + DefinedPacket packet; + while ( ( packet = packetQueue.poll() ) != null ) + { + unsafe().sendPacket( packet ); + } + } ); } @Override diff --git a/proxy/src/main/java/net/md_5/bungee/UserConnection.java b/proxy/src/main/java/net/md_5/bungee/UserConnection.java index 2359e864..8f121aa9 100644 --- a/proxy/src/main/java/net/md_5/bungee/UserConnection.java +++ b/proxy/src/main/java/net/md_5/bungee/UserConnection.java @@ -11,6 +11,7 @@ import io.netty.channel.ChannelOption; import io.netty.util.internal.PlatformDependent; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.ArrayDeque; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -21,7 +22,6 @@ import java.util.Objects; import java.util.Queue; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Level; import lombok.Getter; import lombok.NonNull; @@ -146,7 +146,7 @@ public final class UserConnection implements ProxiedPlayer @Setter private ForgeServerHandler forgeServerHandler; /*========================================================================*/ - private final Queue packetQueue = new ConcurrentLinkedQueue<>(); + private final Queue packetQueue = new ArrayDeque<>(); private final Unsafe unsafe = new Unsafe() { @Override @@ -186,23 +186,37 @@ public final class UserConnection implements ProxiedPlayer public void sendPacketQueued(DefinedPacket packet) { - Protocol encodeProtocol = ch.getEncodeProtocol(); - if ( !encodeProtocol.TO_CLIENT.hasPacket( packet.getClass(), getPendingConnection().getVersion() ) ) + ch.scheduleIfNecessary( () -> { - packetQueue.add( packet ); - } else - { - unsafe().sendPacket( packet ); - } + if ( ch.isClosed() ) + { + return; + } + Protocol encodeProtocol = ch.getEncodeProtocol(); + if ( !encodeProtocol.TO_CLIENT.hasPacket( packet.getClass(), getPendingConnection().getVersion() ) ) + { + packetQueue.add( packet ); + } else + { + unsafe().sendPacket( packet ); + } + } ); } public void sendQueuedPackets() { - DefinedPacket packet; - while ( ( packet = packetQueue.poll() ) != null ) + ch.scheduleIfNecessary( () -> { - unsafe().sendPacket( packet ); - } + if ( ch.isClosed() ) + { + return; + } + DefinedPacket packet; + while ( ( packet = packetQueue.poll() ) != null ) + { + unsafe().sendPacket( packet ); + } + } ); } @Deprecated diff --git a/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java b/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java index 682bb807..a15ba5df 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java @@ -42,23 +42,22 @@ public class ChannelWrapper public Protocol getDecodeProtocol() { - return ch.pipeline().get( MinecraftDecoder.class ).getProtocol(); + return getMinecraftDecoder().getProtocol(); } public void setDecodeProtocol(Protocol protocol) { - ch.pipeline().get( MinecraftDecoder.class ).setProtocol( protocol ); + getMinecraftDecoder().setProtocol( protocol ); } public Protocol getEncodeProtocol() { - return ch.pipeline().get( MinecraftEncoder.class ).getProtocol(); - + return getMinecraftEncoder().getProtocol(); } public void setEncodeProtocol(Protocol protocol) { - ch.pipeline().get( MinecraftEncoder.class ).setProtocol( protocol ); + getMinecraftEncoder().setProtocol( protocol ); } public void setProtocol(Protocol protocol) @@ -69,13 +68,23 @@ public class ChannelWrapper public void setVersion(int protocol) { - ch.pipeline().get( MinecraftDecoder.class ).setProtocolVersion( protocol ); - ch.pipeline().get( MinecraftEncoder.class ).setProtocolVersion( protocol ); + getMinecraftDecoder().setProtocolVersion( protocol ); + getMinecraftEncoder().setProtocolVersion( protocol ); + } + + public MinecraftDecoder getMinecraftDecoder() + { + return ch.pipeline().get( MinecraftDecoder.class ); + } + + public MinecraftEncoder getMinecraftEncoder() + { + return ch.pipeline().get( MinecraftEncoder.class ); } public int getEncodeVersion() { - return ch.pipeline().get( MinecraftEncoder.class ).getProtocolVersion(); + return getMinecraftEncoder().getProtocolVersion(); } public void write(Object packet) @@ -223,4 +232,15 @@ public class ChannelWrapper packetCompressor.setCompose( compressorCompose ); } } + + public void scheduleIfNecessary(Runnable task) + { + if ( ch.eventLoop().inEventLoop() ) + { + task.run(); + return; + } + + ch.eventLoop().execute( task ); + } }