From 69d618c6484434aafd492d7dfa3f1f322d031f94 Mon Sep 17 00:00:00 2001 From: md_5 Date: Sat, 9 Feb 2013 17:48:42 +1100 Subject: [PATCH] Use custom packet out stream to ease transition to a netty channel when it is required --- .../net/md_5/bungee/BungeeServerInfo.java | 4 +-- .../net/md_5/bungee/GenericConnection.java | 10 +++---- .../java/net/md_5/bungee/InitialHandler.java | 22 +++++++--------- .../net/md_5/bungee/ServerConnection.java | 25 ++++++++---------- .../java/net/md_5/bungee/UserConnection.java | 26 +++++++++---------- ...cketInputStream.java => PacketStream.java} | 26 ++++++++++++++++--- 6 files changed, 63 insertions(+), 50 deletions(-) rename proxy/src/main/java/net/md_5/bungee/packet/{PacketInputStream.java => PacketStream.java} (78%) diff --git a/proxy/src/main/java/net/md_5/bungee/BungeeServerInfo.java b/proxy/src/main/java/net/md_5/bungee/BungeeServerInfo.java index 74133f81..3bfd1062 100644 --- a/proxy/src/main/java/net/md_5/bungee/BungeeServerInfo.java +++ b/proxy/src/main/java/net/md_5/bungee/BungeeServerInfo.java @@ -14,7 +14,7 @@ import net.md_5.bungee.api.connection.Server; import net.md_5.bungee.packet.DefinedPacket; import net.md_5.bungee.packet.PacketFAPluginMessage; import net.md_5.bungee.packet.PacketFFKick; -import net.md_5.bungee.packet.PacketInputStream; +import net.md_5.bungee.packet.PacketStream; public class BungeeServerInfo extends ServerInfo { @@ -57,7 +57,7 @@ public class BungeeServerInfo extends ServerInfo out.write(0xFE); out.write(0x01); } - try (PacketInputStream in = new PacketInputStream(socket.getInputStream())) + try (PacketStream in = new PacketStream(socket.getInputStream())) { PacketFFKick response = new PacketFFKick(in.readPacket()); diff --git a/proxy/src/main/java/net/md_5/bungee/GenericConnection.java b/proxy/src/main/java/net/md_5/bungee/GenericConnection.java index a4e89301..067cb8d5 100644 --- a/proxy/src/main/java/net/md_5/bungee/GenericConnection.java +++ b/proxy/src/main/java/net/md_5/bungee/GenericConnection.java @@ -8,7 +8,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import static net.md_5.bungee.Logger.$; import net.md_5.bungee.packet.PacketFFKick; -import net.md_5.bungee.packet.PacketInputStream; +import net.md_5.bungee.packet.PacketStream; /** * Class to represent a Minecraft connection. @@ -19,8 +19,7 @@ public class GenericConnection { protected final Socket socket; - protected final PacketInputStream in; - protected final OutputStream out; + protected final PacketStream stream; @Getter public String name; @Getter @@ -40,15 +39,14 @@ public class GenericConnection log("disconnected with " + reason); try { - out.write(new PacketFFKick("[Proxy] " + reason).getPacket()); + stream.write(new PacketFFKick("[Proxy] " + reason)); } catch (IOException ex) { } finally { try { - out.flush(); - out.close(); + socket.shutdownOutput(); socket.close(); } catch (IOException ioe) { diff --git a/proxy/src/main/java/net/md_5/bungee/InitialHandler.java b/proxy/src/main/java/net/md_5/bungee/InitialHandler.java index dd030343..237b2b2b 100644 --- a/proxy/src/main/java/net/md_5/bungee/InitialHandler.java +++ b/proxy/src/main/java/net/md_5/bungee/InitialHandler.java @@ -24,7 +24,7 @@ import net.md_5.bungee.packet.PacketFDEncryptionRequest; import net.md_5.bungee.packet.PacketFEPing; import net.md_5.bungee.packet.PacketFFKick; import net.md_5.bungee.packet.PacketHandler; -import net.md_5.bungee.packet.PacketInputStream; +import net.md_5.bungee.packet.PacketStream; import org.bouncycastle.crypto.io.CipherInputStream; import org.bouncycastle.crypto.io.CipherOutputStream; @@ -34,8 +34,7 @@ public class InitialHandler extends PacketHandler implements Runnable, PendingCo private final Socket socket; @Getter private final ListenerInfo listener; - private PacketInputStream in; - private OutputStream out; + private PacketStream stream; private Packet2Handshake handshake; private PacketFDEncryptionRequest request; private State thisState = State.HANDSHAKE; @@ -44,8 +43,7 @@ public class InitialHandler extends PacketHandler implements Runnable, PendingCo { this.socket = socket; this.listener = info; - in = new PacketInputStream(socket.getInputStream()); - out = socket.getOutputStream(); + stream = new PacketStream(socket.getInputStream(), socket.getOutputStream()); } private enum State @@ -88,7 +86,7 @@ public class InitialHandler extends PacketHandler implements Runnable, PendingCo Preconditions.checkState(thisState == State.HANDSHAKE, "Not expecting HANDSHAKE"); this.handshake = handshake; request = EncryptionUtil.encryptRequest(); - out.write(request.getPacket()); + stream.write(request); thisState = State.ENCRYPT; } @@ -117,9 +115,9 @@ public class InitialHandler extends PacketHandler implements Runnable, PendingCo disconnect(event.getCancelReason()); } - out.write(new PacketFCEncryptionResponse().getPacket()); - in = new PacketInputStream(new CipherInputStream(socket.getInputStream(), EncryptionUtil.getCipher(false, shared))); - out = new CipherOutputStream(socket.getOutputStream(), EncryptionUtil.getCipher(true, shared)); + stream.write(new PacketFCEncryptionResponse()); + stream = new PacketStream(new CipherInputStream(socket.getInputStream(), + EncryptionUtil.getCipher(false, shared)), new CipherOutputStream(socket.getOutputStream(), EncryptionUtil.getCipher(true, shared))); thisState = State.LOGIN; } @@ -129,7 +127,7 @@ public class InitialHandler extends PacketHandler implements Runnable, PendingCo { Preconditions.checkState(thisState == State.LOGIN, "Not expecting LOGIN"); - UserConnection userCon = new UserConnection(socket, this, in, out, handshake); + UserConnection userCon = new UserConnection(socket, this, stream, handshake); String server = ProxyServer.getInstance().getReconnectHandler().getServer(userCon); ServerInfo s = BungeeCord.getInstance().config.getServers().get(server); userCon.connect(s, true); @@ -144,7 +142,7 @@ public class InitialHandler extends PacketHandler implements Runnable, PendingCo { while (thisState != State.FINISHED) { - byte[] buf = in.readPacket(); + byte[] buf = stream.readPacket(); DefinedPacket packet = DefinedPacket.packet(buf); packet.handle(this); } @@ -161,7 +159,7 @@ public class InitialHandler extends PacketHandler implements Runnable, PendingCo thisState = State.FINISHED; try { - out.write(new PacketFFKick(reason).getPacket()); + stream.write(new PacketFFKick(reason)); } catch (IOException ioe) { } finally diff --git a/proxy/src/main/java/net/md_5/bungee/ServerConnection.java b/proxy/src/main/java/net/md_5/bungee/ServerConnection.java index 25c4748f..6c24ac99 100644 --- a/proxy/src/main/java/net/md_5/bungee/ServerConnection.java +++ b/proxy/src/main/java/net/md_5/bungee/ServerConnection.java @@ -1,6 +1,5 @@ package net.md_5.bungee; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.util.Queue; @@ -18,9 +17,8 @@ import net.md_5.bungee.packet.Packet1Login; import net.md_5.bungee.packet.Packet2Handshake; import net.md_5.bungee.packet.PacketCDClientStatus; import net.md_5.bungee.packet.PacketFAPluginMessage; -import net.md_5.bungee.packet.PacketFDEncryptionRequest; import net.md_5.bungee.packet.PacketFFKick; -import net.md_5.bungee.packet.PacketInputStream; +import net.md_5.bungee.packet.PacketStream; /** * Class representing a connection from the proxy to the server; ie upstream. @@ -33,9 +31,9 @@ public class ServerConnection extends GenericConnection implements Server public final Packet1Login loginPacket; public Queue packetQueue = new ConcurrentLinkedQueue<>(); - public ServerConnection(Socket socket, ServerInfo info, PacketInputStream in, OutputStream out, Packet1Login loginPacket) + public ServerConnection(Socket socket, ServerInfo info, PacketStream stream, Packet1Login loginPacket) { - super(socket, in, out); + super(socket, stream); this.info = info; this.loginPacket = loginPacket; } @@ -48,30 +46,29 @@ public class ServerConnection extends GenericConnection implements Server socket.connect(info.getAddress(), BungeeCord.getInstance().config.getTimeout()); BungeeCord.getInstance().setSocketOptions(socket); - PacketInputStream in = new PacketInputStream(socket.getInputStream()); - OutputStream out = socket.getOutputStream(); + PacketStream stream = new PacketStream(socket.getInputStream(), socket.getOutputStream()); - out.write(handshake.getPacket()); - out.write(new PacketCDClientStatus((byte) 0).getPacket()); - in.readPacket(); + stream.write(handshake); + stream.write(new PacketCDClientStatus((byte) 0)); + stream.readPacket(); - byte[] loginResponse = in.readPacket(); + byte[] loginResponse = stream.readPacket(); if (Util.getId(loginResponse) == 0xFF) { throw new KickException("[Kicked] " + new PacketFFKick(loginResponse).message); } Packet1Login login = new Packet1Login(loginResponse); - ServerConnection server = new ServerConnection(socket, info, in, out, login); + ServerConnection server = new ServerConnection(socket, info, stream, login); ServerConnectedEvent event = new ServerConnectedEvent(user, server); ProxyServer.getInstance().getPluginManager().callEvent(event); - out.write(BungeeCord.getInstance().registerChannels().getPacket()); + stream.write(BungeeCord.getInstance().registerChannels()); Queue packetQueue = ((BungeeServerInfo) info).getPacketQueue(); while (!packetQueue.isEmpty()) { - out.write(packetQueue.poll().getPacket()); + stream.write(packetQueue.poll()); } return server; diff --git a/proxy/src/main/java/net/md_5/bungee/UserConnection.java b/proxy/src/main/java/net/md_5/bungee/UserConnection.java index fe354479..0a5b5bc3 100644 --- a/proxy/src/main/java/net/md_5/bungee/UserConnection.java +++ b/proxy/src/main/java/net/md_5/bungee/UserConnection.java @@ -54,9 +54,9 @@ public class UserConnection extends GenericConnection implements ProxiedPlayer // Hack for connect timings private ServerInfo nextServer; - public UserConnection(Socket socket, PendingConnection pendingConnection, PacketInputStream in, OutputStream out, Packet2Handshake handshake) + public UserConnection(Socket socket, PendingConnection pendingConnection, PacketStream stream, Packet2Handshake handshake) { - super(socket, in, out); + super(socket, stream); this.handshake = handshake; this.pendingConnection = pendingConnection; name = handshake.username; @@ -103,8 +103,8 @@ public class UserConnection extends GenericConnection implements ProxiedPlayer if (server != null) { - out.write(new Packet9Respawn((byte) 1, (byte) 0, (byte) 0, (short) 256, "DEFAULT").getPacket()); - out.write(new Packet9Respawn((byte) -1, (byte) 0, (byte) 0, (short) 256, "DEFAULT").getPacket()); + stream.write(new Packet9Respawn((byte) 1, (byte) 0, (byte) 0, (short) 256, "DEFAULT")); + stream.write(new Packet9Respawn((byte) -1, (byte) 0, (byte) 0, (short) 256, "DEFAULT")); } ServerConnection newServer = ServerConnection.connect(this, target, handshake, true); @@ -113,8 +113,8 @@ public class UserConnection extends GenericConnection implements ProxiedPlayer // Once again, first connection clientEntityId = newServer.loginPacket.entityId; serverEntityId = newServer.loginPacket.entityId; - out.write(newServer.loginPacket.getPacket()); - out.write(BungeeCord.getInstance().registerChannels().getPacket()); + stream.write(newServer.loginPacket); + stream.write(BungeeCord.getInstance().registerChannels()); upBridge = new UpstreamBridge(); upBridge.start(); @@ -133,7 +133,7 @@ public class UserConnection extends GenericConnection implements ProxiedPlayer Packet1Login login = newServer.loginPacket; serverEntityId = login.entityId; - out.write(new Packet9Respawn(login.dimension, login.difficulty, login.gameMode, (short) 256, login.levelType).getPacket()); + stream.write(new Packet9Respawn(login.dimension, login.difficulty, login.gameMode, (short) 256, login.levelType)); } // Reconnect process has finished, lets get the player moving again @@ -259,7 +259,7 @@ public class UserConnection extends GenericConnection implements ProxiedPlayer { try { - byte[] packet = in.readPacket(); + byte[] packet = stream.readPacket(); boolean sendPacket = true; int id = Util.getId(packet); @@ -310,14 +310,14 @@ public class UserConnection extends GenericConnection implements ProxiedPlayer DefinedPacket p = server.packetQueue.poll(); if (p != null) { - server.out.write(p.getPacket()); + server.stream.write(p); } } EntityMap.rewrite(packet, clientEntityId, serverEntityId); if (sendPacket && !server.socket.isClosed()) { - server.out.write(packet); + server.stream.write(packet); } } catch (IOException ex) { @@ -346,7 +346,7 @@ public class UserConnection extends GenericConnection implements ProxiedPlayer outer: while (!reconnecting) { - byte[] packet = server.in.readPacket(); + byte[] packet = server.stream.readPacket(); int id = Util.getId(packet); switch (id) @@ -502,12 +502,12 @@ public class UserConnection extends GenericConnection implements ProxiedPlayer DefinedPacket p = packetQueue.poll(); if (p != null) { - out.write(p.getPacket()); + stream.write(p); } } EntityMap.rewrite(packet, serverEntityId, clientEntityId); - out.write(packet); + stream.write(packet); if (nextServer != null) { diff --git a/proxy/src/main/java/net/md_5/bungee/packet/PacketInputStream.java b/proxy/src/main/java/net/md_5/bungee/packet/PacketStream.java similarity index 78% rename from proxy/src/main/java/net/md_5/bungee/packet/PacketInputStream.java rename to proxy/src/main/java/net/md_5/bungee/packet/PacketStream.java index 1016189f..e5183ca7 100644 --- a/proxy/src/main/java/net/md_5/bungee/packet/PacketInputStream.java +++ b/proxy/src/main/java/net/md_5/bungee/packet/PacketStream.java @@ -5,24 +5,44 @@ import java.io.DataInputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import lombok.Getter; +import lombok.Setter; import net.md_5.mendax.datainput.DataInputPacketReader; -import org.bouncycastle.crypto.io.CipherInputStream; /** * A specialized input stream to parse packets using the Mojang packet * definitions and then return them as a byte array. */ -public class PacketInputStream implements AutoCloseable +public class PacketStream implements AutoCloseable { private final DataInputStream dataInput; + @Getter + private OutputStream out; private final TrackingInputStream tracker; private final byte[] buffer = new byte[1 << 18]; - public PacketInputStream(InputStream in) + public PacketStream(InputStream in) + { + this(in, null); + } + + public PacketStream(InputStream in, OutputStream out) { tracker = new TrackingInputStream(in); dataInput = new DataInputStream(tracker); + this.out = out; + } + + public void write(byte[] b) throws IOException + { + out.write(b); + } + + public void write(DefinedPacket packet) throws IOException + { + out.write(packet.getPacket()); } /**