Start work on netty overhaul

This commit is contained in:
md_5
2013-03-07 18:17:49 +11:00
parent 87e78bae7e
commit 0077af58d0
35 changed files with 519 additions and 386 deletions

View File

@@ -1,5 +1,15 @@
package net.md_5.bungee;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.timeout.ReadTimeoutHandler;
import net.md_5.bungee.config.Configuration;
import java.io.BufferedReader;
import java.io.File;
@@ -10,13 +20,10 @@ import java.net.Socket;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -36,6 +43,9 @@ import net.md_5.bungee.api.plugin.Plugin;
import net.md_5.bungee.api.plugin.PluginManager;
import net.md_5.bungee.command.*;
import net.md_5.bungee.config.YamlConfig;
import net.md_5.bungee.netty.ChannelBootstrapper;
import net.md_5.bungee.netty.HandlerBoss;
import net.md_5.bungee.netty.PacketDecoder;
import net.md_5.bungee.packet.DefinedPacket;
import net.md_5.bungee.packet.PacketFAPluginMessage;
@@ -64,7 +74,7 @@ public class BungeeCord extends ProxyServer
/**
* Thread pool.
*/
public final ExecutorService threadPool = Executors.newCachedThreadPool();
public final MultithreadEventLoopGroup eventLoops = new NioEventLoopGroup( 8, new ThreadFactoryBuilder().setNameFormat( "Netty IO Thread - %1$d" ).build() );
/**
* locations.yml save thread.
*/
@@ -72,7 +82,7 @@ public class BungeeCord extends ProxyServer
/**
* Server socket listener.
*/
private Collection<ListenThread> listeners = new HashSet<>();
private Collection<Channel> listeners = new HashSet<>();
/**
* Fully qualified connections.
*/
@@ -149,6 +159,7 @@ public class BungeeCord extends ProxyServer
*
* @throws IOException
*/
@Override
public void start() throws IOException
{
File plugins = new File( "plugins" );
@@ -181,30 +192,26 @@ public class BungeeCord extends ProxyServer
{
for ( ListenerInfo info : config.getListeners() )
{
try
{
ListenThread listener = new ListenThread( info );
listener.start();
listeners.add( listener );
$().info( "Listening on " + info.getHost() );
} catch ( IOException ex )
{
$().log( Level.SEVERE, "Could not start listener " + info, ex );
}
Channel server = new ServerBootstrap()
.childHandler( ChannelBootstrapper.SERVER )
.localAddress( info.getHost() )
.group( eventLoops )
.bind().channel();
listeners.add( server );
$().info( "Listening on " + info.getHost() );
}
}
public void stopListeners()
{
for ( ListenThread listener : listeners )
for ( Channel listener : listeners )
{
$().log( Level.INFO, "Closing listen thread {0}", listener.socket );
$().log( Level.INFO, "Closing listener {0}", listener );
try
{
listener.interrupt();
listener.socket.close();
listener.join();
} catch ( InterruptedException | IOException ex )
listener.close().syncUninterruptibly();
} catch ( ChannelException ex )
{
$().severe( "Could not close listen thread" );
}
@@ -219,7 +226,6 @@ public class BungeeCord extends ProxyServer
stopListeners();
$().info( "Closing pending connections" );
threadPool.shutdown();
$().info( "Disconnecting " + connections.size() + " connections" );
for ( UserConnection user : connections.values() )
@@ -227,6 +233,9 @@ public class BungeeCord extends ProxyServer
user.disconnect( "Proxy restarting, brb." );
}
$().info( "Closing IO threads" );
eventLoops.shutdown();
$().info( "Saving reconnect locations" );
reconnectHandler.save();
saveThread.cancel();
@@ -241,20 +250,6 @@ public class BungeeCord extends ProxyServer
System.exit( 0 );
}
/**
* Miscellaneous method to set options on a socket based on those in the
* configuration.
*
* @param socket to set the options on
* @throws IOException when the underlying set methods thrown an exception
*/
public void setSocketOptions(Socket socket) throws IOException
{
socket.setSoTimeout( config.getTimeout() );
socket.setTrafficClass( 0x18 );
socket.setTcpNoDelay( true );
}
/**
* Broadcasts a packet to all clients that is connected to this instance.
*

View File

@@ -15,7 +15,7 @@ import net.md_5.bungee.packet.DefinedPacket;
import net.md_5.bungee.packet.PacketFAPluginMessage;
import net.md_5.bungee.packet.PacketFFKick;
import net.md_5.bungee.packet.PacketStream;
import net.md_5.mendax.PacketDefinitions;
import net.md_5.bungee.protocol.PacketDefinitions;
public class BungeeServerInfo extends ServerInfo
{

View File

@@ -1,5 +1,7 @@
package net.md_5.bungee;
import io.netty.buffer.ByteBuf;
/**
* Class to rewrite integers within packets.
*/
@@ -113,20 +115,20 @@ public class EntityMap
};
}
public static void rewrite(byte[] packet, int oldId, int newId)
public static void rewrite(ByteBuf packet, int oldId, int newId)
{
int packetId = Util.getId( packet );
int packetId = packet.getUnsignedShort( 0 );
if ( packetId == 0x1D )
{ // bulk entity
for ( int pos = 2; pos < packet.length; pos += 4 )
for ( int pos = 2; pos < packet.writerIndex(); pos += 4 )
{
int readId = readInt( packet, pos );
int readId = packet.getInt( pos );
if ( readId == oldId )
{
setInt( packet, pos, newId );
packet.setInt( pos, newId );
} else if ( readId == newId )
{
setInt( packet, pos, oldId );
packet.setInt( pos, oldId );
}
}
} else
@@ -136,29 +138,16 @@ public class EntityMap
{
for ( int pos : idArray )
{
int readId = readInt( packet, pos );
int readId = packet.getInt( pos );
if ( readId == oldId )
{
setInt( packet, pos, newId );
packet.setInt( pos, newId );
} else if ( readId == newId )
{
setInt( packet, pos, oldId );
packet.setInt( pos, oldId );
}
}
}
}
}
private static void setInt(byte[] buf, int pos, int i)
{
buf[pos] = (byte) ( i >> 24 );
buf[pos + 1] = (byte) ( i >> 16 );
buf[pos + 2] = (byte) ( i >> 8 );
buf[pos + 3] = (byte) i;
}
private static int readInt(byte[] buf, int pos)
{
return ( ( ( buf[pos] & 0xFF ) << 24 ) | ( ( buf[pos + 1] & 0xFF ) << 16 ) | ( ( buf[pos + 2] & 0xFF ) << 8 ) | buf[pos + 3] & 0xFF );
}
}

View File

@@ -32,7 +32,7 @@ import net.md_5.bungee.packet.PacketFEPing;
import net.md_5.bungee.packet.PacketFFKick;
import net.md_5.bungee.packet.PacketHandler;
import net.md_5.bungee.packet.PacketStream;
import net.md_5.mendax.PacketDefinitions;
import net.md_5.bungee.protocol.PacketDefinitions;
public class InitialHandler extends PacketHandler implements Runnable, PendingConnection
{

View File

@@ -1,48 +0,0 @@
package net.md_5.bungee;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import static net.md_5.bungee.Logger.$;
import net.md_5.bungee.api.config.ListenerInfo;
/**
* Thread to listen and dispatch incoming connections to the proxy.
*/
public class ListenThread extends Thread
{
public final ServerSocket socket;
private final ListenerInfo info;
public ListenThread(ListenerInfo info) throws IOException
{
super( "Listen Thread - " + info );
this.info = info;
socket = new ServerSocket();
socket.bind( info.getHost() );
}
@Override
public void run()
{
while ( !isInterrupted() )
{
try
{
Socket client = socket.accept();
BungeeCord.getInstance().setSocketOptions( client );
$().info( client.getInetAddress() + " has connected" );
InitialHandler handler = new InitialHandler( client, info );
BungeeCord.getInstance().threadPool.submit( handler );
} catch ( SocketException ex )
{
ex.printStackTrace(); // Now people can see why their operating system is failing them and stop bitching at me!
} catch ( IOException ex )
{
ex.printStackTrace(); // TODO
}
}
}
}

View File

@@ -1,13 +1,16 @@
package net.md_5.bungee;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.Socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Queue;
import net.md_5.bungee.api.ChatColor;
import net.md_5.bungee.api.ProxyServer;
import net.md_5.bungee.api.config.ServerInfo;
import net.md_5.bungee.api.event.ServerConnectedEvent;
import net.md_5.bungee.netty.ChannelBootstrapper;
import net.md_5.bungee.packet.DefinedPacket;
import net.md_5.bungee.packet.Packet1Login;
import net.md_5.bungee.packet.PacketCDClientStatus;
@@ -39,6 +42,18 @@ public class ServerConnector extends PacketHandler
{
Preconditions.checkState( thisState == State.LOGIN, "Not exepcting LOGIN" );
loginPacket = login;
ServerConnection server = new ServerConnection( socket, info, stream, connector.loginPacket );
ServerConnectedEvent event = new ServerConnectedEvent( user, server );
ProxyServer.getInstance().getPluginManager().callEvent( event );
stream.write( BungeeCord.getInstance().registerChannels() );
Queue<DefinedPacket> packetQueue = ( (BungeeServerInfo) info ).getPacketQueue();
while ( !packetQueue.isEmpty() )
{
stream.write( packetQueue.poll() );
}
thisState = State.FINISHED;
}
@@ -55,63 +70,33 @@ public class ServerConnector extends PacketHandler
throw new KickException( kick.message );
}
public static ServerConnection connect(UserConnection user, ServerInfo info, boolean retry)
public static void connect(final UserConnection user, final ServerInfo info, final boolean retry)
{
Socket socket = null;
try
new Bootstrap()
.channel( NioSocketChannel.class )
.group( BungeeCord.getInstance().eventLoops )
.handler( ChannelBootstrapper.CLIENT )
.remoteAddress( info.getAddress() )
.connect().addListener( new ChannelFutureListener()
{
socket = new Socket();
socket.connect( info.getAddress(), BungeeCord.getInstance().config.getTimeout() );
BungeeCord.getInstance().setSocketOptions( socket );
PacketStream stream = new PacketStream( socket.getInputStream(), socket.getOutputStream(), user.stream.getProtocol() );
ServerConnector connector = new ServerConnector( stream );
stream.write( user.handshake );
stream.write( PacketCDClientStatus.CLIENT_LOGIN );
while ( connector.thisState != State.FINISHED )
@Override
public void operationComplete(ChannelFuture future) throws Exception
{
byte[] buf = stream.readPacket();
DefinedPacket packet = DefinedPacket.packet( buf );
packet.handle( connector );
}
ServerConnection server = new ServerConnection( socket, info, stream, connector.loginPacket );
ServerConnectedEvent event = new ServerConnectedEvent( user, server );
ProxyServer.getInstance().getPluginManager().callEvent( event );
stream.write( BungeeCord.getInstance().registerChannels() );
Queue<DefinedPacket> packetQueue = ( (BungeeServerInfo) info ).getPacketQueue();
while ( !packetQueue.isEmpty() )
{
stream.write( packetQueue.poll() );
}
return server;
} catch ( Exception ex )
{
if ( socket != null )
{
try
if ( future.isSuccess() )
{
socket.close();
} catch ( IOException ioe )
future.channel().write( user.handshake );
future.channel().write( PacketCDClientStatus.CLIENT_LOGIN );
} else
{
future.channel().close();
ServerInfo def = ProxyServer.getInstance().getServers().get( user.getPendingConnection().getListener().getDefaultServer() );
if ( retry && !info.equals( def ) )
{
user.sendMessage( ChatColor.RED + "Could not connect to target server, you have been moved to the default server" );
connect( user, def, false );
}
}
}
ServerInfo def = ProxyServer.getInstance().getServers().get( user.getPendingConnection().getListener().getDefaultServer() );
if ( retry && !info.equals( def ) )
{
user.sendMessage( ChatColor.RED + "Could not connect to target server, you have been moved to the default server" );
return connect( user, def, false );
} else
{
if ( ex instanceof RuntimeException )
{
throw (RuntimeException) ex;
}
throw new RuntimeException( "Could not connect to target server " + Util.exception( ex ) );
}
}
} ).channel();
}
}

View File

@@ -27,18 +27,6 @@ public class Util
return new InetSocketAddress( split[0], port );
}
/**
* Gets the value of the first unsigned byte of the specified array. Useful
* for getting the id of a packet array .
*
* @param b the array to read from
* @return the unsigned value of the first byte
*/
public static int getId(byte[] b)
{
return b[0] & 0xFF;
}
/**
* Normalizes a config path by prefix upper case letters with '_' and
* turning them to lowercase.

View File

@@ -0,0 +1,49 @@
package net.md_5.bungee.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import java.lang.reflect.Constructor;
import java.util.concurrent.TimeUnit;
import net.md_5.bungee.BungeeCord;
import net.md_5.bungee.InitialHandler;
import net.md_5.bungee.ServerConnector;
import net.md_5.bungee.packet.PacketHandler;
import net.md_5.bungee.protocol.PacketDefinitions;
public class ChannelBootstrapper extends ChannelInitializer<Channel>
{
public static ChannelBootstrapper CLIENT = new ChannelBootstrapper( InitialHandler.class );
public static ChannelBootstrapper SERVER = new ChannelBootstrapper( ServerConnector.class );
private final Constructor<? extends PacketHandler> initial;
private ChannelBootstrapper(Class<? extends PacketHandler> initialHandler)
{
try
{
this.initial = initialHandler.getDeclaredConstructor();
} catch ( NoSuchMethodException ex )
{
throw new ExceptionInInitializerError( ex );
}
}
@Override
protected void initChannel(Channel ch) throws Exception
{
try
{
ch.config().setOption( ChannelOption.IP_TOS, 0x18 );
} catch ( ChannelException ex )
{
// IP_TOS is not supported (Windows XP / Windows Server 2003)
}
ch.pipeline().addLast( "timer", new ReadTimeoutHandler( BungeeCord.getInstance().config.getTimeout(), TimeUnit.MILLISECONDS ) );
ch.pipeline().addLast( "decoder", new PacketDecoder( PacketDefinitions.VANILLA_PROTOCOL ) );
ch.pipeline().addLast( "handler", new HandlerBoss( initial.newInstance() ) );
}
}

View File

@@ -0,0 +1,76 @@
package net.md_5.bungee.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToByteCodec;
import javax.crypto.Cipher;
import javax.crypto.ShortBufferException;
/**
* This class is a complete solution for encrypting and decoding bytes in a
* Netty stream. It takes two {@link BufferedBlockCipher} instances, used for
* encryption and decryption respectively.
*/
public class CipherCodec extends ByteToByteCodec
{
private Cipher encrypt;
private Cipher decrypt;
private ByteBuf heapOut;
public CipherCodec(Cipher encrypt, Cipher decrypt)
{
this.encrypt = encrypt;
this.decrypt = decrypt;
}
@Override
public void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception
{
if ( heapOut == null )
{
heapOut = ctx.alloc().heapBuffer();
}
cipher( encrypt, in, heapOut );
out.writeBytes( heapOut );
heapOut.discardSomeReadBytes();
}
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception
{
cipher( decrypt, in, out );
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception
{
super.freeInboundBuffer( ctx );
decrypt = null;
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception
{
super.freeOutboundBuffer( ctx );
if ( heapOut != null )
{
heapOut.release();
heapOut = null;
}
encrypt = null;
}
private void cipher(Cipher cipher, ByteBuf in, ByteBuf out) throws ShortBufferException
{
int available = in.readableBytes();
int outputSize = cipher.getOutputSize( available );
if ( out.capacity() < outputSize )
{
out.capacity( outputSize );
}
int processed = cipher.update( in.array(), in.arrayOffset() + in.readerIndex(), available, out.array(), out.arrayOffset() + out.writerIndex() );
in.readerIndex( in.readerIndex() + processed );
out.writerIndex( out.writerIndex() + processed );
}
}

View File

@@ -0,0 +1,45 @@
package net.md_5.bungee.netty;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import net.md_5.bungee.packet.DefinedPacket;
import net.md_5.bungee.packet.PacketHandler;
public class HandlerBoss extends ChannelInboundMessageHandlerAdapter<ByteBuf>
{
private PacketHandler handler;
HandlerBoss(PacketHandler handler)
{
Preconditions.checkArgument( handler != null, "handler" );
this.handler = handler;
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception
{
if ( ctx.channel().isActive() )
{
DefinedPacket packet = DefinedPacket.packet( msg );
if ( packet != null )
{
handler.handle( packet );
} else
{
handler.handle( msg );
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
if ( ctx.channel().isActive() )
{
ctx.close();
}
}
}

View File

@@ -0,0 +1,25 @@
package net.md_5.bungee.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import net.md_5.bungee.protocol.netty.PacketReader;
@AllArgsConstructor
public class PacketDecoder extends ReplayingDecoder<ByteBuf>
{
@Getter
@Setter
private int protocol;
@Override
protected ByteBuf decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
{
PacketReader.readPacket( in, protocol );
return in.copy();
}
}

View File

@@ -3,6 +3,7 @@ package net.md_5.bungee.packet;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import io.netty.buffer.ByteBuf;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
@@ -196,9 +197,9 @@ public abstract class DefinedPacket implements DataOutput
public abstract void handle(PacketHandler handler) throws Exception;
private static Class<? extends DefinedPacket>[] classes = new Class[ 256 ];
public static DefinedPacket packet(byte[] buf)
public static DefinedPacket packet(ByteBuf buf)
{
int id = Util.getId( buf );
int id = buf.getUnsignedShort( 0);
Class<? extends DefinedPacket> clazz = classes[id];
DefinedPacket ret = null;
if ( clazz != null )
@@ -213,6 +214,8 @@ public abstract class DefinedPacket implements DataOutput
} catch ( IllegalAccessException | InstantiationException | InvocationTargetException | NoSuchMethodException ex )
{
}
} else {
return null;
}
Preconditions.checkState( ret != null, "Don't know how to deal with packet ID %s", Util.hex( id ) );

View File

@@ -1,11 +1,23 @@
package net.md_5.bungee.packet;
import io.netty.buffer.ByteBuf;
public abstract class PacketHandler
{
private void nop(DefinedPacket packet)
private void nop(Object msg)
{
throw new UnsupportedOperationException( "No handler defined for packet " + packet.getClass() );
throw new UnsupportedOperationException( "No handler defined for packet " + msg.getClass() );
}
public void handle(ByteBuf buf) throws Exception
{
nop( buf );
}
public void handle(DefinedPacket packet) throws Exception
{
nop( packet );
}
public void handle(Packet0KeepAlive alive) throws Exception

View File

@@ -1,101 +0,0 @@
package net.md_5.bungee.packet;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import lombok.Getter;
import lombok.Setter;
import net.md_5.mendax.datainput.DataInputPacketReader;
/**
* A specialized input stream to parse packets using the Mojang packet
* definitions and then return them as a byte array.
*/
public class PacketStream implements AutoCloseable
{
private final DataInputStream dataInput;
@Getter
private OutputStream out;
@Getter
@Setter
private int protocol;
private final TrackingInputStream tracker;
private final byte[] buffer = new byte[ 1 << 18 ];
public PacketStream(InputStream in, int protocol)
{
this( in, null, protocol );
}
public PacketStream(InputStream in, OutputStream out, int protocol)
{
tracker = new TrackingInputStream( in );
dataInput = new DataInputStream( tracker );
this.out = out;
this.protocol = protocol;
}
public void write(byte[] b) throws IOException
{
out.write( b );
}
public void write(DefinedPacket packet) throws IOException
{
out.write( packet.getPacket() );
}
/**
* Read an entire packet from the stream and return it as a byte array.
*
* @return the read packet
* @throws IOException when the underlying input stream throws an exception
*/
public byte[] readPacket() throws IOException
{
tracker.out.reset();
DataInputPacketReader.readPacket( dataInput, buffer, protocol );
return tracker.out.toByteArray();
}
@Override
public void close() throws Exception
{
dataInput.close();
}
/**
* Input stream which will wrap another stream and copy all bytes read to a
* {@link ByteArrayOutputStream}.
*/
private class TrackingInputStream extends FilterInputStream
{
private final ByteArrayOutputStream out = new ByteArrayOutputStream();
public TrackingInputStream(InputStream in)
{
super( in );
}
@Override
public int read() throws IOException
{
int ret = in.read();
out.write( ret );
return ret;
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
int ret = in.read( b, off, len );
out.write( b, off, ret );
return ret;
}
}
}