Use custom packet out stream to ease transition to a netty channel when it is required

This commit is contained in:
md_5 2013-02-09 17:48:42 +11:00
parent fbede036d8
commit 69d618c648
6 changed files with 63 additions and 50 deletions

View File

@ -14,7 +14,7 @@ import net.md_5.bungee.api.connection.Server;
import net.md_5.bungee.packet.DefinedPacket;
import net.md_5.bungee.packet.PacketFAPluginMessage;
import net.md_5.bungee.packet.PacketFFKick;
import net.md_5.bungee.packet.PacketInputStream;
import net.md_5.bungee.packet.PacketStream;
public class BungeeServerInfo extends ServerInfo
{
@ -57,7 +57,7 @@ public class BungeeServerInfo extends ServerInfo
out.write(0xFE);
out.write(0x01);
}
try (PacketInputStream in = new PacketInputStream(socket.getInputStream()))
try (PacketStream in = new PacketStream(socket.getInputStream()))
{
PacketFFKick response = new PacketFFKick(in.readPacket());

View File

@ -8,7 +8,7 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import static net.md_5.bungee.Logger.$;
import net.md_5.bungee.packet.PacketFFKick;
import net.md_5.bungee.packet.PacketInputStream;
import net.md_5.bungee.packet.PacketStream;
/**
* Class to represent a Minecraft connection.
@ -19,8 +19,7 @@ public class GenericConnection
{
protected final Socket socket;
protected final PacketInputStream in;
protected final OutputStream out;
protected final PacketStream stream;
@Getter
public String name;
@Getter
@ -40,15 +39,14 @@ public class GenericConnection
log("disconnected with " + reason);
try
{
out.write(new PacketFFKick("[Proxy] " + reason).getPacket());
stream.write(new PacketFFKick("[Proxy] " + reason));
} catch (IOException ex)
{
} finally
{
try
{
out.flush();
out.close();
socket.shutdownOutput();
socket.close();
} catch (IOException ioe)
{

View File

@ -24,7 +24,7 @@ import net.md_5.bungee.packet.PacketFDEncryptionRequest;
import net.md_5.bungee.packet.PacketFEPing;
import net.md_5.bungee.packet.PacketFFKick;
import net.md_5.bungee.packet.PacketHandler;
import net.md_5.bungee.packet.PacketInputStream;
import net.md_5.bungee.packet.PacketStream;
import org.bouncycastle.crypto.io.CipherInputStream;
import org.bouncycastle.crypto.io.CipherOutputStream;
@ -34,8 +34,7 @@ public class InitialHandler extends PacketHandler implements Runnable, PendingCo
private final Socket socket;
@Getter
private final ListenerInfo listener;
private PacketInputStream in;
private OutputStream out;
private PacketStream stream;
private Packet2Handshake handshake;
private PacketFDEncryptionRequest request;
private State thisState = State.HANDSHAKE;
@ -44,8 +43,7 @@ public class InitialHandler extends PacketHandler implements Runnable, PendingCo
{
this.socket = socket;
this.listener = info;
in = new PacketInputStream(socket.getInputStream());
out = socket.getOutputStream();
stream = new PacketStream(socket.getInputStream(), socket.getOutputStream());
}
private enum State
@ -88,7 +86,7 @@ public class InitialHandler extends PacketHandler implements Runnable, PendingCo
Preconditions.checkState(thisState == State.HANDSHAKE, "Not expecting HANDSHAKE");
this.handshake = handshake;
request = EncryptionUtil.encryptRequest();
out.write(request.getPacket());
stream.write(request);
thisState = State.ENCRYPT;
}
@ -117,9 +115,9 @@ public class InitialHandler extends PacketHandler implements Runnable, PendingCo
disconnect(event.getCancelReason());
}
out.write(new PacketFCEncryptionResponse().getPacket());
in = new PacketInputStream(new CipherInputStream(socket.getInputStream(), EncryptionUtil.getCipher(false, shared)));
out = new CipherOutputStream(socket.getOutputStream(), EncryptionUtil.getCipher(true, shared));
stream.write(new PacketFCEncryptionResponse());
stream = new PacketStream(new CipherInputStream(socket.getInputStream(),
EncryptionUtil.getCipher(false, shared)), new CipherOutputStream(socket.getOutputStream(), EncryptionUtil.getCipher(true, shared)));
thisState = State.LOGIN;
}
@ -129,7 +127,7 @@ public class InitialHandler extends PacketHandler implements Runnable, PendingCo
{
Preconditions.checkState(thisState == State.LOGIN, "Not expecting LOGIN");
UserConnection userCon = new UserConnection(socket, this, in, out, handshake);
UserConnection userCon = new UserConnection(socket, this, stream, handshake);
String server = ProxyServer.getInstance().getReconnectHandler().getServer(userCon);
ServerInfo s = BungeeCord.getInstance().config.getServers().get(server);
userCon.connect(s, true);
@ -144,7 +142,7 @@ public class InitialHandler extends PacketHandler implements Runnable, PendingCo
{
while (thisState != State.FINISHED)
{
byte[] buf = in.readPacket();
byte[] buf = stream.readPacket();
DefinedPacket packet = DefinedPacket.packet(buf);
packet.handle(this);
}
@ -161,7 +159,7 @@ public class InitialHandler extends PacketHandler implements Runnable, PendingCo
thisState = State.FINISHED;
try
{
out.write(new PacketFFKick(reason).getPacket());
stream.write(new PacketFFKick(reason));
} catch (IOException ioe)
{
} finally

View File

@ -1,6 +1,5 @@
package net.md_5.bungee;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Queue;
@ -18,9 +17,8 @@ import net.md_5.bungee.packet.Packet1Login;
import net.md_5.bungee.packet.Packet2Handshake;
import net.md_5.bungee.packet.PacketCDClientStatus;
import net.md_5.bungee.packet.PacketFAPluginMessage;
import net.md_5.bungee.packet.PacketFDEncryptionRequest;
import net.md_5.bungee.packet.PacketFFKick;
import net.md_5.bungee.packet.PacketInputStream;
import net.md_5.bungee.packet.PacketStream;
/**
* Class representing a connection from the proxy to the server; ie upstream.
@ -33,9 +31,9 @@ public class ServerConnection extends GenericConnection implements Server
public final Packet1Login loginPacket;
public Queue<DefinedPacket> packetQueue = new ConcurrentLinkedQueue<>();
public ServerConnection(Socket socket, ServerInfo info, PacketInputStream in, OutputStream out, Packet1Login loginPacket)
public ServerConnection(Socket socket, ServerInfo info, PacketStream stream, Packet1Login loginPacket)
{
super(socket, in, out);
super(socket, stream);
this.info = info;
this.loginPacket = loginPacket;
}
@ -48,30 +46,29 @@ public class ServerConnection extends GenericConnection implements Server
socket.connect(info.getAddress(), BungeeCord.getInstance().config.getTimeout());
BungeeCord.getInstance().setSocketOptions(socket);
PacketInputStream in = new PacketInputStream(socket.getInputStream());
OutputStream out = socket.getOutputStream();
PacketStream stream = new PacketStream(socket.getInputStream(), socket.getOutputStream());
out.write(handshake.getPacket());
out.write(new PacketCDClientStatus((byte) 0).getPacket());
in.readPacket();
stream.write(handshake);
stream.write(new PacketCDClientStatus((byte) 0));
stream.readPacket();
byte[] loginResponse = in.readPacket();
byte[] loginResponse = stream.readPacket();
if (Util.getId(loginResponse) == 0xFF)
{
throw new KickException("[Kicked] " + new PacketFFKick(loginResponse).message);
}
Packet1Login login = new Packet1Login(loginResponse);
ServerConnection server = new ServerConnection(socket, info, in, out, login);
ServerConnection server = new ServerConnection(socket, info, stream, login);
ServerConnectedEvent event = new ServerConnectedEvent(user, server);
ProxyServer.getInstance().getPluginManager().callEvent(event);
out.write(BungeeCord.getInstance().registerChannels().getPacket());
stream.write(BungeeCord.getInstance().registerChannels());
Queue<DefinedPacket> packetQueue = ((BungeeServerInfo) info).getPacketQueue();
while (!packetQueue.isEmpty())
{
out.write(packetQueue.poll().getPacket());
stream.write(packetQueue.poll());
}
return server;

View File

@ -54,9 +54,9 @@ public class UserConnection extends GenericConnection implements ProxiedPlayer
// Hack for connect timings
private ServerInfo nextServer;
public UserConnection(Socket socket, PendingConnection pendingConnection, PacketInputStream in, OutputStream out, Packet2Handshake handshake)
public UserConnection(Socket socket, PendingConnection pendingConnection, PacketStream stream, Packet2Handshake handshake)
{
super(socket, in, out);
super(socket, stream);
this.handshake = handshake;
this.pendingConnection = pendingConnection;
name = handshake.username;
@ -103,8 +103,8 @@ public class UserConnection extends GenericConnection implements ProxiedPlayer
if (server != null)
{
out.write(new Packet9Respawn((byte) 1, (byte) 0, (byte) 0, (short) 256, "DEFAULT").getPacket());
out.write(new Packet9Respawn((byte) -1, (byte) 0, (byte) 0, (short) 256, "DEFAULT").getPacket());
stream.write(new Packet9Respawn((byte) 1, (byte) 0, (byte) 0, (short) 256, "DEFAULT"));
stream.write(new Packet9Respawn((byte) -1, (byte) 0, (byte) 0, (short) 256, "DEFAULT"));
}
ServerConnection newServer = ServerConnection.connect(this, target, handshake, true);
@ -113,8 +113,8 @@ public class UserConnection extends GenericConnection implements ProxiedPlayer
// Once again, first connection
clientEntityId = newServer.loginPacket.entityId;
serverEntityId = newServer.loginPacket.entityId;
out.write(newServer.loginPacket.getPacket());
out.write(BungeeCord.getInstance().registerChannels().getPacket());
stream.write(newServer.loginPacket);
stream.write(BungeeCord.getInstance().registerChannels());
upBridge = new UpstreamBridge();
upBridge.start();
@ -133,7 +133,7 @@ public class UserConnection extends GenericConnection implements ProxiedPlayer
Packet1Login login = newServer.loginPacket;
serverEntityId = login.entityId;
out.write(new Packet9Respawn(login.dimension, login.difficulty, login.gameMode, (short) 256, login.levelType).getPacket());
stream.write(new Packet9Respawn(login.dimension, login.difficulty, login.gameMode, (short) 256, login.levelType));
}
// Reconnect process has finished, lets get the player moving again
@ -259,7 +259,7 @@ public class UserConnection extends GenericConnection implements ProxiedPlayer
{
try
{
byte[] packet = in.readPacket();
byte[] packet = stream.readPacket();
boolean sendPacket = true;
int id = Util.getId(packet);
@ -310,14 +310,14 @@ public class UserConnection extends GenericConnection implements ProxiedPlayer
DefinedPacket p = server.packetQueue.poll();
if (p != null)
{
server.out.write(p.getPacket());
server.stream.write(p);
}
}
EntityMap.rewrite(packet, clientEntityId, serverEntityId);
if (sendPacket && !server.socket.isClosed())
{
server.out.write(packet);
server.stream.write(packet);
}
} catch (IOException ex)
{
@ -346,7 +346,7 @@ public class UserConnection extends GenericConnection implements ProxiedPlayer
outer:
while (!reconnecting)
{
byte[] packet = server.in.readPacket();
byte[] packet = server.stream.readPacket();
int id = Util.getId(packet);
switch (id)
@ -502,12 +502,12 @@ public class UserConnection extends GenericConnection implements ProxiedPlayer
DefinedPacket p = packetQueue.poll();
if (p != null)
{
out.write(p.getPacket());
stream.write(p);
}
}
EntityMap.rewrite(packet, serverEntityId, clientEntityId);
out.write(packet);
stream.write(packet);
if (nextServer != null)
{

View File

@ -5,24 +5,44 @@ import java.io.DataInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import lombok.Getter;
import lombok.Setter;
import net.md_5.mendax.datainput.DataInputPacketReader;
import org.bouncycastle.crypto.io.CipherInputStream;
/**
* A specialized input stream to parse packets using the Mojang packet
* definitions and then return them as a byte array.
*/
public class PacketInputStream implements AutoCloseable
public class PacketStream implements AutoCloseable
{
private final DataInputStream dataInput;
@Getter
private OutputStream out;
private final TrackingInputStream tracker;
private final byte[] buffer = new byte[1 << 18];
public PacketInputStream(InputStream in)
public PacketStream(InputStream in)
{
this(in, null);
}
public PacketStream(InputStream in, OutputStream out)
{
tracker = new TrackingInputStream(in);
dataInput = new DataInputStream(tracker);
this.out = out;
}
public void write(byte[] b) throws IOException
{
out.write(b);
}
public void write(DefinedPacket packet) throws IOException
{
out.write(packet.getPacket());
}
/**