From 0afc52c130baa8da3dfa2111e7228032d88bf01a Mon Sep 17 00:00:00 2001 From: md_5 Date: Thu, 7 Mar 2013 20:47:39 +1100 Subject: [PATCH] Upstream and downstream bridges are now uber sexy --- .../java/net/md_5/bungee/UserConnection.java | 305 +----------------- .../bungee/connection/CancelSendSignal.java | 17 + .../bungee/connection/DownstreamBridge.java | 188 +++++++++++ .../bungee/connection/UpstreamBridge.java | 64 ++++ 4 files changed, 274 insertions(+), 300 deletions(-) create mode 100644 proxy/src/main/java/net/md_5/bungee/connection/CancelSendSignal.java create mode 100644 proxy/src/main/java/net/md_5/bungee/connection/DownstreamBridge.java create mode 100644 proxy/src/main/java/net/md_5/bungee/connection/UpstreamBridge.java diff --git a/proxy/src/main/java/net/md_5/bungee/UserConnection.java b/proxy/src/main/java/net/md_5/bungee/UserConnection.java index 2673fcfc..645e56ea 100644 --- a/proxy/src/main/java/net/md_5/bungee/UserConnection.java +++ b/proxy/src/main/java/net/md_5/bungee/UserConnection.java @@ -1,10 +1,5 @@ package net.md_5.bungee; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.util.Collection; @@ -16,15 +11,15 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import lombok.Getter; +import lombok.Setter; import lombok.Synchronized; import net.md_5.bungee.api.ProxyServer; import net.md_5.bungee.api.config.ServerInfo; import net.md_5.bungee.api.connection.PendingConnection; import net.md_5.bungee.api.connection.ProxiedPlayer; -import net.md_5.bungee.api.event.ChatEvent; import net.md_5.bungee.api.event.PlayerDisconnectEvent; -import net.md_5.bungee.api.event.PluginMessageEvent; import net.md_5.bungee.api.event.ServerConnectEvent; +import net.md_5.bungee.connection.DownstreamBridge; import net.md_5.bungee.packet.*; public final class UserConnection extends GenericConnection implements ProxiedPlayer @@ -38,16 +33,15 @@ public final class UserConnection extends GenericConnection implements ProxiedPl private final PendingConnection pendingConnection; @Getter private ServerConnection server; - private UpstreamBridge upBridge; - private DownstreamBridge downBridge; // reconnect stuff private int clientEntityId; private int serverEntityId; private volatile boolean reconnecting; // ping stuff - private int trackingPingId; - private long pingTime; + public int trackingPingId; + public long pingTime; @Getter + @Setter private int ping = 1000; // Permissions private final Collection groups = new HashSet<>(); @@ -253,293 +247,4 @@ public final class UserConnection extends GenericConnection implements ProxiedPl { permissions.put( permission, value ); } - - private class UpstreamBridge extends Thread - { - - public UpstreamBridge() - { - super( "Upstream Bridge - " + name ); - } - - @Override - public void run() - { - while ( !socket.isClosed() ) - { - try - { - byte[] packet = stream.readPacket(); - boolean sendPacket = true; - int id = Util.getId( packet ); - - switch ( id ) - { - case 0x00: - if ( trackingPingId == new Packet0KeepAlive( packet ).id ) - { - int newPing = (int) ( System.currentTimeMillis() - pingTime ); - ProxyServer.getInstance().getTabListHandler().onPingChange( UserConnection.this, newPing ); - ping = newPing; - } - break; - case 0x03: - Packet3Chat chat = new Packet3Chat( packet ); - if ( chat.message.startsWith( "/" ) ) - { - sendPacket = !ProxyServer.getInstance().getPluginManager().dispatchCommand( UserConnection.this, chat.message.substring( 1 ) ); - } else - { - ChatEvent chatEvent = new ChatEvent( UserConnection.this, server, chat.message ); - ProxyServer.getInstance().getPluginManager().callEvent( chatEvent ); - sendPacket = !chatEvent.isCancelled(); - } - break; - case 0xFA: - // Call the onPluginMessage event - PacketFAPluginMessage message = new PacketFAPluginMessage( packet ); - - // Might matter in the future - if ( message.tag.equals( "BungeeCord" ) ) - { - continue; - } - - PluginMessageEvent event = new PluginMessageEvent( UserConnection.this, server, message.tag, message.data ); - ProxyServer.getInstance().getPluginManager().callEvent( event ); - - if ( event.isCancelled() ) - { - continue; - } - break; - } - - while ( !server.packetQueue.isEmpty() ) - { - DefinedPacket p = server.packetQueue.poll(); - if ( p != null ) - { - server.stream.write( p ); - } - } - - EntityMap.rewrite( packet, clientEntityId, serverEntityId ); - if ( sendPacket && !server.socket.isClosed() ) - { - server.stream.write( packet ); - } - - try - { - Thread.sleep( BungeeCord.getInstance().config.getSleepTime() ); - } catch ( InterruptedException ex ) - { - } - } catch ( IOException ex ) - { - disconnect( "Reached end of stream" ); - } catch ( Exception ex ) - { - disconnect( Util.exception( ex ) ); - } - } - } - } - - private class DownstreamBridge extends Thread - { - - public DownstreamBridge() - { - super( "Downstream Bridge - " + name ); - } - - @Override - public void run() - { - try - { - outer: - while ( !reconnecting ) - { - byte[] packet = server.stream.readPacket(); - int id = Util.getId( packet ); - - switch ( id ) - { - case 0x00: - trackingPingId = new Packet0KeepAlive( packet ).id; - pingTime = System.currentTimeMillis(); - break; - case 0x03: - Packet3Chat chat = new Packet3Chat( packet ); - ChatEvent chatEvent = new ChatEvent( server, UserConnection.this, chat.message ); - ProxyServer.getInstance().getPluginManager().callEvent( chatEvent ); - - if ( chatEvent.isCancelled() ) - { - continue; - } - break; - case 0xC9: - PacketC9PlayerListItem playerList = new PacketC9PlayerListItem( packet ); - if ( !ProxyServer.getInstance().getTabListHandler().onListUpdate( UserConnection.this, playerList.username, playerList.online, playerList.ping ) ) - { - continue; - } - break; - case 0xFA: - // Call the onPluginMessage event - PacketFAPluginMessage message = new PacketFAPluginMessage( packet ); - DataInputStream in = new DataInputStream( new ByteArrayInputStream( message.data ) ); - PluginMessageEvent event = new PluginMessageEvent( server, UserConnection.this, message.tag, message.data ); - ProxyServer.getInstance().getPluginManager().callEvent( event ); - - if ( event.isCancelled() ) - { - continue; - } - - if ( message.tag.equals( "BungeeCord" ) ) - { - String subChannel = in.readUTF(); - if ( subChannel.equals( "Forward" ) ) - { - String target = in.readUTF(); - String channel = in.readUTF(); - short len = in.readShort(); - byte[] data = new byte[ len ]; - in.readFully( data ); - - - ByteArrayOutputStream b = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream( b ); - out.writeUTF( channel ); - out.writeShort( data.length ); - out.write( data ); - - if ( target.equals( "ALL" ) ) - { - for ( ServerInfo server : BungeeCord.getInstance().getServers().values() ) - { - server.sendData( "BungeeCord", b.toByteArray() ); - } - } else - { - ServerInfo server = BungeeCord.getInstance().getServerInfo( target ); - if ( server != null ) - { - server.sendData( "BungeeCord", b.toByteArray() ); - } - } - } - if ( subChannel.equals( "Connect" ) ) - { - ServerInfo server = ProxyServer.getInstance().getServerInfo( in.readUTF() ); - if ( server != null ) - { - connect( server, true ); - break outer; - } - } - if ( subChannel.equals( "IP" ) ) - { - ByteArrayOutputStream b = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream( b ); - out.writeUTF( "IP" ); - out.writeUTF( getAddress().getHostString() ); - out.writeInt( getAddress().getPort() ); - getServer().sendData( "BungeeCord", b.toByteArray() ); - } - if ( subChannel.equals( "PlayerCount" ) ) - { - ServerInfo server = ProxyServer.getInstance().getServerInfo( in.readUTF() ); - if ( server != null ) - { - ByteArrayOutputStream b = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream( b ); - out.writeUTF( "PlayerCount" ); - out.writeUTF( server.getName() ); - out.writeInt( server.getPlayers().size() ); - getServer().sendData( "BungeeCord", b.toByteArray() ); - } - } - if ( subChannel.equals( "PlayerList" ) ) - { - ServerInfo server = ProxyServer.getInstance().getServerInfo( in.readUTF() ); - if ( server != null ) - { - ByteArrayOutputStream b = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream( b ); - out.writeUTF( "PlayerList" ); - out.writeUTF( server.getName() ); - - StringBuilder sb = new StringBuilder(); - for ( ProxiedPlayer p : server.getPlayers() ) - { - sb.append( p.getName() ); - sb.append( "," ); - } - out.writeUTF( sb.substring( 0, sb.length() - 1 ) ); - - getServer().sendData( "BungeeCord", b.toByteArray() ); - } - } - if ( subChannel.equals( "GetServers" ) ) - { - ByteArrayOutputStream b = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream( b ); - out.writeUTF( "GetServers" ); - - StringBuilder sb = new StringBuilder(); - for ( String server : ProxyServer.getInstance().getServers().keySet() ) - { - sb.append( server ); - sb.append( "," ); - } - out.writeUTF( sb.substring( 0, sb.length() - 1 ) ); - - getServer().sendData( "BungeeCord", b.toByteArray() ); - } - if ( subChannel.equals( "Message" ) ) - { - ProxiedPlayer target = ProxyServer.getInstance().getPlayer( in.readUTF() ); - if ( target != null ) - { - target.sendMessage( in.readUTF() ); - } - } - continue; - } - break; - case 0xFF: - disconnect( new PacketFFKick( packet ).message ); - break outer; - } - - while ( !packetQueue.isEmpty() ) - { - DefinedPacket p = packetQueue.poll(); - if ( p != null ) - { - stream.write( p ); - } - } - - EntityMap.rewrite( packet, serverEntityId, clientEntityId ); - stream.write( packet ); - - if ( nextServer != null ) - { - connect( nextServer, true ); - break outer; - } - } - } catch ( Exception ex ) - { - disconnect( Util.exception( ex ) ); - } - } - } } diff --git a/proxy/src/main/java/net/md_5/bungee/connection/CancelSendSignal.java b/proxy/src/main/java/net/md_5/bungee/connection/CancelSendSignal.java new file mode 100644 index 00000000..765b4bc8 --- /dev/null +++ b/proxy/src/main/java/net/md_5/bungee/connection/CancelSendSignal.java @@ -0,0 +1,17 @@ +package net.md_5.bungee.connection; + +class CancelSendSignal extends Error +{ + + @Override + public synchronized Throwable initCause(Throwable cause) + { + return this; + } + + @Override + public synchronized Throwable fillInStackTrace() + { + return this; + } +} diff --git a/proxy/src/main/java/net/md_5/bungee/connection/DownstreamBridge.java b/proxy/src/main/java/net/md_5/bungee/connection/DownstreamBridge.java new file mode 100644 index 00000000..695af1c6 --- /dev/null +++ b/proxy/src/main/java/net/md_5/bungee/connection/DownstreamBridge.java @@ -0,0 +1,188 @@ +package net.md_5.bungee.connection; + +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import lombok.RequiredArgsConstructor; +import net.md_5.bungee.UserConnection; +import net.md_5.bungee.api.ProxyServer; +import net.md_5.bungee.api.config.ServerInfo; +import net.md_5.bungee.api.connection.ProxiedPlayer; +import net.md_5.bungee.api.event.ChatEvent; +import net.md_5.bungee.api.event.PluginMessageEvent; +import net.md_5.bungee.packet.Packet0KeepAlive; +import net.md_5.bungee.packet.Packet3Chat; +import net.md_5.bungee.packet.PacketC9PlayerListItem; +import net.md_5.bungee.packet.PacketFAPluginMessage; +import net.md_5.bungee.packet.PacketFFKick; +import net.md_5.bungee.packet.PacketHandler; + +@RequiredArgsConstructor +public class DownstreamBridge extends PacketHandler +{ + + private final ProxyServer bungee; + private final UserConnection con; + + @Override + public void handle(Packet0KeepAlive alive) throws Exception + { + con.trackingPingId = alive.id; + } + + @Override + public void handle(Packet3Chat chat) throws Exception + { + ChatEvent chatEvent = new ChatEvent( con.getServer(), con, chat.message ); + bungee.getPluginManager().callEvent( chatEvent ); + + if ( chatEvent.isCancelled() ) + { + throw new CancelSendSignal(); + } + } + + @Override + public void handle(PacketC9PlayerListItem playerList) throws Exception + { + + if ( !bungee.getTabListHandler().onListUpdate( con, playerList.username, playerList.online, playerList.ping ) ) + { + throw new CancelSendSignal(); + } + } + + @Override + public void handle(PacketFAPluginMessage pluginMessage) throws Exception + { + ByteArrayDataInput in = ByteStreams.newDataInput( pluginMessage.data ); + PluginMessageEvent event = new PluginMessageEvent( con.getServer(), con, pluginMessage.tag, pluginMessage.data.clone() ); + + if ( bungee.getPluginManager().callEvent( event ).isCancelled() ) + { + throw new CancelSendSignal(); + } + + if ( pluginMessage.tag.equals( "BungeeCord" ) ) + { + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + String subChannel = in.readUTF(); + + if ( subChannel.equals( "Forward" ) ) + { + // Read data from server + String target = in.readUTF(); + String channel = in.readUTF(); + short len = in.readShort(); + byte[] data = new byte[ len ]; + in.readFully( data ); + + // Prepare new data to send + out.writeUTF( channel ); + out.writeShort( data.length ); + out.write( data ); + byte[] payload = out.toByteArray(); + + // Null out stream, important as we don't want to send to ourselves + out = null; + + if ( target.equals( "ALL" ) ) + { + for ( ServerInfo server : bungee.getServers().values() ) + { + if ( server != con.getServer().getInfo() ) + { + server.sendData( "BungeeCord", payload ); + } + } + } else + { + ServerInfo server = bungee.getServerInfo( target ); + if ( server != null ) + { + server.sendData( "BungeeCord", payload ); + } + } + } + if ( subChannel.equals( "Connect" ) ) + { + ServerInfo server = bungee.getServerInfo( in.readUTF() ); + if ( server != null ) + { + connect( server, true ); + break outer; + } + } + if ( subChannel.equals( "IP" ) ) + { + out.writeUTF( "IP" ); + out.writeUTF( con.getAddress().getHostString() ); + out.writeInt( con.getAddress().getPort() ); + } + if ( subChannel.equals( "PlayerCount" ) ) + { + ServerInfo server = bungee.getServerInfo( in.readUTF() ); + if ( server != null ) + { + out.writeUTF( "PlayerCount" ); + out.writeUTF( server.getName() ); + out.writeInt( server.getPlayers().size() ); + } + } + if ( subChannel.equals( "PlayerList" ) ) + { + ServerInfo server = bungee.getServerInfo( in.readUTF() ); + if ( server != null ) + { + out.writeUTF( "PlayerList" ); + out.writeUTF( server.getName() ); + + StringBuilder sb = new StringBuilder(); + for ( ProxiedPlayer p : server.getPlayers() ) + { + sb.append( p.getName() ); + sb.append( "," ); + } + out.writeUTF( sb.substring( 0, sb.length() - 1 ) ); + } + } + if ( subChannel.equals( "GetServers" ) ) + { + out.writeUTF( "GetServers" ); + + StringBuilder sb = new StringBuilder(); + for ( String server : bungee.getServers().keySet() ) + { + sb.append( server ); + sb.append( "," ); + } + out.writeUTF( sb.substring( 0, sb.length() - 1 ) ); + } + if ( subChannel.equals( "Message" ) ) + { + ProxiedPlayer target = bungee.getPlayer( in.readUTF() ); + if ( target != null ) + { + target.sendMessage( in.readUTF() ); + } + } + + // Check we haven't set out to null, and we have written data, if so reply back back along the BungeeCord channel + if ( out != null ) + { + byte[] b = out.toByteArray(); + if ( b.length != 0 ) + { + con.getServer().sendData( "BungeeCord", b ); + } + } + } + } + + @Override + public void handle(PacketFFKick kick) throws Exception + { + con.disconnect( "[Kicked] " + kick.message ); + throw new CancelSendSignal(); + } +} diff --git a/proxy/src/main/java/net/md_5/bungee/connection/UpstreamBridge.java b/proxy/src/main/java/net/md_5/bungee/connection/UpstreamBridge.java new file mode 100644 index 00000000..e8c9eefc --- /dev/null +++ b/proxy/src/main/java/net/md_5/bungee/connection/UpstreamBridge.java @@ -0,0 +1,64 @@ +package net.md_5.bungee.connection; + +import lombok.RequiredArgsConstructor; +import net.md_5.bungee.UserConnection; +import net.md_5.bungee.api.ProxyServer; +import net.md_5.bungee.api.event.ChatEvent; +import net.md_5.bungee.api.event.PluginMessageEvent; +import net.md_5.bungee.packet.Packet0KeepAlive; +import net.md_5.bungee.packet.Packet3Chat; +import net.md_5.bungee.packet.PacketFAPluginMessage; +import net.md_5.bungee.packet.PacketHandler; + +@RequiredArgsConstructor +public class UpstreamBridge extends PacketHandler +{ + + private final ProxyServer bungee; + private final UserConnection con; + + @Override + public void handle(Packet0KeepAlive alive) throws Exception + { + if ( alive.id == con.trackingPingId ) + { + int newPing = (int) ( System.currentTimeMillis() - con.pingTime ); + bungee.getTabListHandler().onPingChange( con, newPing ); + con.setPing( newPing ); + } + } + + @Override + public void handle(Packet3Chat chat) throws Exception + { + if ( chat.message.charAt( 0 ) == '/' ) + { + if ( bungee.getPluginManager().dispatchCommand( con, chat.message.substring( 1 ) ) ) + { + throw new CancelSendSignal(); + } + } else + { + ChatEvent chatEvent = new ChatEvent( con, con.getServer(), chat.message ); + if ( bungee.getPluginManager().callEvent( chatEvent ).isCancelled() ) + { + throw new CancelSendSignal(); + } + } + } + + @Override + public void handle(PacketFAPluginMessage pluginMessage) throws Exception + { + if ( pluginMessage.tag.equals( "BungeeCord" ) ) + { + throw new CancelSendSignal(); + } + + PluginMessageEvent event = new PluginMessageEvent( con, con.getServer(), pluginMessage.tag, pluginMessage.data.clone() ); + if ( bungee.getPluginManager().callEvent( event ).isCancelled() ) + { + throw new CancelSendSignal(); + } + } +}