Nouvelle librairie réseau (TCP)

- remplacera NetworkAPI dans le futur
- permet une connexion persistante entre les applications/processus :
évite ouverture/fermeture répétitif des connexions TCP
- basé sur la librairie réseau de
https://github.com/marcbal/SpaceInvaderTP
This commit is contained in:
Marc Baloup 2016-07-04 17:13:24 +02:00
parent 33bbf6457f
commit a25f294ffa
11 changed files with 944 additions and 0 deletions

View File

@ -0,0 +1,166 @@
package fr.pandacube.java.util.network.client;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import fr.pandacube.java.PandacubeUtil;
import fr.pandacube.java.util.network.packet.Packet;
import fr.pandacube.java.util.network.packet.PacketClient;
import fr.pandacube.java.util.network.packet.PacketException;
import fr.pandacube.java.util.network.packet.PacketServer;
public class TCPClient extends Thread implements Closeable {
private Socket socket;
private SocketAddress addr;
private TCPClientListener listener;
private InputStream in;
private OutputStream out;
private Object outSynchronizer = new Object();
private AtomicBoolean isClosed = new AtomicBoolean(false);
public TCPClient(InetSocketAddress a, String connName, TCPClientListener l) throws IOException {
super("TCPCl "+connName);
if (a == null || l == null)
throw new IllegalArgumentException("les arguments ne peuvent pas être null");
socket = new Socket();
socket.setReceiveBufferSize(PandacubeUtil.NETWORK_TCP_BUFFER_SIZE);
socket.setSendBufferSize(PandacubeUtil.NETWORK_TCP_BUFFER_SIZE);
socket.setSoTimeout(PandacubeUtil.NETWORK_TIMEOUT);
socket.connect(a);
addr = a;
listener = l;
listener.onConnect(this);
}
@Override
public void run() {
try {
byte[] code = new byte[1];
while(!socket.isClosed() && in.read(code) != -1) {
byte[] sizeB = new byte[4];
if (in.read(sizeB) != 4)
throw new IOException("Socket "+addr+" fermé");
int size = ByteBuffer.wrap(sizeB).getInt();
byte[] content = new byte[size];
forceReadBytes(content);
byte[] packetData = ByteBuffer.allocate(1+4+size).put(code).put(sizeB).put(content).array();
try {
if (listener == null)
throw new InvalidServerMessage("Le serveur ne peut actuellement pas prendre en charge de nouvelles requêtes. Les listeners n'ont pas encore été définis");
Packet p = Packet.constructPacket(packetData);
if (!(p instanceof PacketServer))
throw new InvalidServerMessage("Le type de packet reçu n'est pas un packet attendu : "+p.getClass().getCanonicalName());
PacketServer ps = (PacketServer) p;
listener.onPacketReceive(this, ps);
} catch (PacketException|InvalidServerMessage e) {
PandacubeUtil.getMasterLogger().log(Level.SEVERE, "Message du serveur mal formé", e);
} catch (Exception e) {
PandacubeUtil.getMasterLogger().log(Level.SEVERE, "Erreur lors de la prise en charge du message par le serveur", e);
}
}
} catch (SocketTimeoutException e) {
System.err.println("Le serveur a prit trop de temps à répondre");
} catch (Exception e) {
e.printStackTrace();
}
close();
}
private void forceReadBytes(byte[] buff) throws IOException {
int pos = 0;
do {
int nbR = in.read(buff, pos, buff.length-pos);
if (nbR == -1)
throw new IOException("Can't read required amount of byte");
pos += nbR;
} while (pos < buff.length);
}
public void send(PacketClient packet) throws IOException {
synchronized (outSynchronizer) {
out.write(packet.getFullSerializedPacket());
out.flush();
}
}
@Override
public void close() {
try {
synchronized (outSynchronizer) {
if (isClosed.get())
return;
socket.close();
isClosed.set(true);
listener.onDisconnect(this);
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void sendSilently(PacketClient packet) {
try {
send(packet);
} catch (IOException e) { }
}
public SocketAddress getServerAddress() {
return addr;
}
public boolean isClosed() {
return isClosed.get() || socket.isClosed();
}
public static class InvalidServerMessage extends RuntimeException {
private static final long serialVersionUID = 1L;
public InvalidServerMessage(String message) {
super(message);
}
}
}

View File

@ -0,0 +1,13 @@
package fr.pandacube.java.util.network.client;
import fr.pandacube.java.util.network.packet.PacketServer;
public interface TCPClientListener {
public void onConnect(TCPClient connection);
public void onPacketReceive(TCPClient connection, PacketServer packet);
public void onDisconnect(TCPClient connection);
}

View File

@ -0,0 +1,84 @@
package fr.pandacube.java.util.network.packet;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import fr.pandacube.java.PandacubeUtil;
import fr.pandacube.java.util.network.packet.bytebuffer.ByteBuffer;
import fr.pandacube.java.util.network.packet.bytebuffer.ByteSerializable;
public abstract class Packet implements ByteSerializable {
private final byte code;
public Packet(byte c) {
code = c;
}
public byte getCode() { return code; }
public byte[] getFullSerializedPacket() {
ByteBuffer internal = new ByteBuffer(CHARSET).putObject(this);
byte[] data = Arrays.copyOfRange(internal.array(), 0, internal.getPosition());
return new ByteBuffer(5+data.length, CHARSET).putByte(code).putInt(data.length).putBytes(data).array();
}
public static final Charset CHARSET = PandacubeUtil.NETWORK_CHARSET;
private static Map<Byte, Class<? extends Packet>> packetTypes = new HashMap<Byte, Class<? extends Packet>>();
public static Packet constructPacket(byte[] data) {
if (!packetTypes.containsKey(data[0]))
throw new PacketException("l'identifiant du packet ne correspond à aucun type de packet : "+data[0]);
try {
Packet p = packetTypes.get(data[0]).newInstance();
ByteBuffer dataBuffer = new ByteBuffer(Arrays.copyOfRange(data, 5, data.length), CHARSET);
p.deserializeFromByteBuffer(dataBuffer);
return p;
} catch (Exception e) {
e.printStackTrace();
throw new PacketException("erreur lors de la construction du packet");
}
}
private static <T extends Packet> void addPacket(Class<T> packetClass) {
try {
Packet p = (Packet)packetClass.newInstance();
packetTypes.put(p.code, packetClass);
} catch (Exception e) {
e.printStackTrace();
}
}
static {
/*
* Ajout des types de packets (client + serveur)
*/
// addPacket(PacketToto.class);
}
}

View File

@ -0,0 +1,14 @@
package fr.pandacube.java.util.network.packet;
/**
* On attend d'un instance de {@link PacketClient} qu'il soit envoyé depuis
* une connexion Client vers une connexion Serveur (d'un point de vue TCP)
*/
public abstract class PacketClient extends Packet {
public PacketClient(byte c) {
super(c);
}
}

View File

@ -0,0 +1,17 @@
package fr.pandacube.java.util.network.packet;
public class PacketException extends RuntimeException {
private static final long serialVersionUID = 1L;
public PacketException(String m) {
super(m);
}
public PacketException(String m, Throwable t) {
super(m, t);
}
public PacketException(Throwable t) {
super(t);
}
}

View File

@ -0,0 +1,13 @@
package fr.pandacube.java.util.network.packet;
/**
* On attend d'un instance de {@link PacketServer} qu'il soit envoyé depuis
* une connexion Serveur vers une connexion Client (d'un point de vue TCP)
*/
public abstract class PacketServer extends Packet {
public PacketServer(byte c) {
super(c);
}
}

View File

@ -0,0 +1,254 @@
package fr.pandacube.java.util.network.packet.bytebuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class ByteBuffer implements Cloneable {
private java.nio.ByteBuffer buff;
private Charset charset;
public ByteBuffer(Charset c) {
this(16, c);
}
public ByteBuffer(int initSize, Charset c) {
buff = java.nio.ByteBuffer.allocate(initSize);
charset = c;
}
public ByteBuffer(byte[] data, Charset c) {
buff = java.nio.ByteBuffer.wrap(Arrays.copyOf(data, data.length));
charset = c;
}
private void askForBufferExtension(int needed) {
if (buff.remaining() >= needed) return;
java.nio.ByteBuffer newBuff = java.nio.ByteBuffer.wrap(Arrays.copyOf(buff.array(), buff.array().length * 2));
newBuff.position(buff.position());
buff = newBuff;
}
@Override
public ByteBuffer clone() {
return new ByteBuffer(Arrays.copyOf(buff.array(), buff.array().length), charset);
}
/**
* @see java.nio.ByteBuffer#get()
*/
public byte getByte() {
return buff.get();
}
/**
* @see java.nio.ByteBuffer#get(byte[])
*/
public byte[] getBytes(byte[] b) {
buff.get(b);
return b;
}
/**
* @see java.nio.ByteBuffer#getChar()
*/
public char getChar() {
return buff.getChar();
}
/**
* @see java.nio.ByteBuffer#getShort()
*/
public short getShort() {
return buff.getShort();
}
/**
* @see java.nio.ByteBuffer#getInt()
*/
public int getInt() {
return buff.getInt();
}
/**
* @see java.nio.ByteBuffer#getLong()
*/
public long getLong() {
return buff.getLong();
}
/**
* @see java.nio.ByteBuffer#getFloat()
*/
public float getFloat() {
return buff.getFloat();
}
/**
* @see java.nio.ByteBuffer#getDouble()
*/
public double getDouble() {
return buff.getDouble();
}
/**
* @see java.nio.ByteBuffer#put(byte)
*/
public ByteBuffer putByte(byte b) {
askForBufferExtension(Byte.BYTES);
buff.put(b);
return this;
}
/**
* @see java.nio.ByteBuffer#put(byte[])
*/
public ByteBuffer putBytes(byte[] b) {
askForBufferExtension(b.length*Byte.BYTES);
buff.put(b);
return this;
}
/**
* @see java.nio.ByteBuffer#putChar(char)
*/
public ByteBuffer putChar(char value) {
askForBufferExtension(Character.BYTES);
buff.putChar(value);
return this;
}
/**
* @see java.nio.ByteBuffer#putShort(short)
*/
public ByteBuffer putShort(short value) {
askForBufferExtension(Short.BYTES);
buff.putShort(value);
return this;
}
/**
* @see java.nio.ByteBuffer#putInt(int)
*/
public ByteBuffer putInt(int value) {
askForBufferExtension(Integer.BYTES);
buff.putInt(value);
return this;
}
/**
* @see java.nio.ByteBuffer#putLong(long)
*/
public ByteBuffer putLong(long value) {
askForBufferExtension(Long.BYTES);
buff.putLong(value);
return this;
}
/**
* @see java.nio.ByteBuffer#putFloat(float)
*/
public ByteBuffer putFloat(float value) {
askForBufferExtension(Float.BYTES);
buff.putFloat(value);
return this;
}
/**
* @see java.nio.ByteBuffer#putDouble(double)
*/
public ByteBuffer putDouble(double value) {
askForBufferExtension(Double.BYTES);
buff.putDouble(value);
return this;
}
/**
* @see java.nio.ByteBuffer#position()
*/
public int getPosition() {
return buff.position();
}
/**
* @see java.nio.ByteBuffer#position(int)
*/
public void setPosition(int p) {
buff.position(p);
}
/**
* @see java.nio.ByteBuffer#capacity()
*/
public int capacity() {
return buff.capacity();
}
public ByteBuffer putString(String s) {
byte[] charBytes = s.getBytes(charset);
putInt(charBytes.length);
putBytes(charBytes);
return this;
}
public String getString() {
return new String(getBytes(new byte[getInt()]), charset);
}
/**
* The objet will be serialized and the data put in the current buffer
* @param obj the object to serialize
* @return the current buffer
*/
public ByteBuffer putObject(ByteSerializable obj) {
obj.serializeToByteBuffer(this);
return this;
}
/**
* Ask to object passed as argument to deserialize data in buffer and fill the object content
* @param <T>
* @param obj the objet to fill with his method {@link ByteSerializable#deserializeFromByteBuffer(ByteBuffer)}
* @return obj a reference to the same object
*/
public <T extends ByteSerializable> T getObject(Class<T> clazz) {
try {
T obj = clazz.newInstance();
obj.deserializeFromByteBuffer(this);
return obj;
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
public ByteBuffer putListObject(List<ByteSerializable> list) {
putInt(list.size());
for (ByteSerializable obj : list)
putObject(obj);
return this;
}
public <T extends ByteSerializable> List<T> getListObject(Class<T> clazz) {
List<T> list = new ArrayList<T>();
int size = getInt();
for (int i=0; i<size; i++) {
list.add(getObject(clazz));
}
return list;
}
/**
* @see java.nio.ByteBuffer#array()
*/
public byte[] array() {
return buff.array();
}
}

View File

@ -0,0 +1,17 @@
package fr.pandacube.java.util.network.packet.bytebuffer;
/**
* Cette interface permet à un {@link ByteBuffer} de sérialiser sous forme de données binaire
* les attributs de la classe courante.<br/>
* <br/>
* Les classes concrètes implémentant cette interface doivent avoir un constructeur vide, utilisé
* lors de la désérialisation
*
*/
public interface ByteSerializable {
public void serializeToByteBuffer(ByteBuffer buffer);
public void deserializeFromByteBuffer(ByteBuffer buffer);
}

View File

@ -0,0 +1,66 @@
package fr.pandacube.java.util.network.server;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import fr.pandacube.java.util.network.server.TCPServer.TCPServerClientConnection;
public class BandwidthCalculation {
private List<PacketStat> packetHistory = new LinkedList<PacketStat>();
public synchronized void addPacket(TCPServerClientConnection co, boolean in, long size) {
packetHistory.add(new PacketStat(co, in, size));
}
/**
* Get the instant bandwith in byte/s
* @param input true if getting input bw, false if getting output, null if getting input + output
* @param co
* @return
*/
public synchronized long getBandWidth(Boolean input, TCPServerClientConnection co) {
long currentTime = System.currentTimeMillis();
Iterator<PacketStat> it = packetHistory.iterator();
long sum = 0;
while(it.hasNext()) {
PacketStat el = it.next();
if (el.time < currentTime - 1000) {
it.remove();
continue;
}
if (input != null && el.input != input.booleanValue())
continue;
if (co != null && !co.equals(el.connection))
continue;
sum += el.packetSize;
}
return sum;
}
private class PacketStat {
public final long time;
public final long packetSize;
public final boolean input;
public final TCPServerClientConnection connection;
public PacketStat(TCPServerClientConnection co, boolean input, long size) {
time = System.currentTimeMillis();
packetSize = size;
this.input = input;
connection = co;
}
}
}

View File

@ -0,0 +1,282 @@
package fr.pandacube.java.util.network.server;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import fr.pandacube.java.PandacubeUtil;
import fr.pandacube.java.util.network.packet.Packet;
import fr.pandacube.java.util.network.packet.PacketClient;
import fr.pandacube.java.util.network.packet.PacketServer;
import fr.pandacube.java.util.network.packet.bytebuffer.ByteBuffer;
/**
*
* @author Marc Baloup
*
*/
public class TCPServer extends Thread implements Closeable {
private static AtomicInteger connectionCounterId = new AtomicInteger(0);
private ServerSocket socket;
private TCPServerListener listener;
private String socketName;
private List<TCPServerClientConnection> clients = new ArrayList<>();
private AtomicBoolean isClosed = new AtomicBoolean(false);
public final BandwidthCalculation bandwidthCalculation = new BandwidthCalculation();
public TCPServer(int port, String sckName, TCPServerListener l) throws IOException {
super("TCPSv "+sckName);
if (port <= 0 || port > 65535)
throw new IllegalArgumentException("le numéro de port est invalide");
socket = new ServerSocket();
socket.setReceiveBufferSize(PandacubeUtil.NETWORK_TCP_BUFFER_SIZE);
socket.setPerformancePreferences(0, 2, 1);
socket.bind(new InetSocketAddress(port));
listener = l;
listener.onSocketOpen(this);
socketName = sckName;
}
@Override
public void run() {
try {
while(true) {
Socket socketClient = socket.accept();
socketClient.setSendBufferSize(PandacubeUtil.NETWORK_TCP_BUFFER_SIZE);
socketClient.setSoTimeout(PandacubeUtil.NETWORK_TIMEOUT);
try {
TCPServerClientConnection co = new TCPServerClientConnection(socketClient, connectionCounterId.getAndIncrement());
clients.add(co);
listener.onClientConnect(this, co);
co.start();
} catch(IOException e) {
PandacubeUtil.getMasterLogger().log(Level.SEVERE, "Connexion impossible avec "+socketClient.getInetAddress());
}
}
} catch (Exception e) {
PandacubeUtil.getMasterLogger().log(Level.WARNING, "Plus aucune connexion ne peux être acceptée", e);
}
}
public class TCPServerClientConnection extends Thread {
private Socket socket;
private InputStream in;
private OutputStream out;
private SocketAddress address;
private TCPServerConnectionOutputThread outThread;
public TCPServerClientConnection(Socket s, int coId) throws IOException {
super("TCPSv "+socketName+" Conn#"+coId+" In");
socket = s;
in = socket.getInputStream();
out = socket.getOutputStream();
address = new InetSocketAddress(socket.getInetAddress(), socket.getPort());
listener.onClientConnect(TCPServer.this, this);
outThread = new TCPServerConnectionOutputThread(coId);
outThread.start();
}
@Override
public void run() {
try {
byte[] code = new byte[1];
while(!socket.isClosed() && in.read(code) != -1) {
byte[] sizeB = new byte[4];
if (in.read(sizeB) != 4)
throw new IOException("Socket "+address+" fermé");
int size = new ByteBuffer(sizeB, Packet.CHARSET).getInt();
byte[] content = new byte[size];
forceReadBytes(content);
byte[] packetData = new ByteBuffer(1+4+size, Packet.CHARSET).putBytes(code).putBytes(sizeB).putBytes(content).array();
bandwidthCalculation.addPacket(this, true, packetData.length);
try {
interpreteReceivedMessage(this, packetData);
} catch (InvalidClientMessage e) {
PandacubeUtil.getMasterLogger().log(Level.SEVERE, "Erreur protocole de : ", e);
} catch (Exception e) {
PandacubeUtil.getMasterLogger().log(Level.SEVERE, "Erreur lors de la prise en charge du message par le serveur", e);
e.printStackTrace();
}
}
} catch (Exception e) {
PandacubeUtil.getMasterLogger().log(Level.SEVERE, "Fermeture de la connexion de "+address, e);
}
close();
}
public void send(PacketServer p) {
outThread.addPacket(p);
}
private void forceReadBytes(byte[] buff) throws IOException {
int pos = 0;
do {
int nbR = in.read(buff, pos, buff.length-pos);
if (nbR == -1)
throw new IOException("Can't read required amount of byte");
pos += nbR;
} while (pos < buff.length);
}
public void close() {
if (socket.isClosed()) return;
listener.onClientDisconnect(TCPServer.this, this);
clients.remove(this);
try {
socket.close();
if (!Thread.currentThread().equals(outThread))
send(new PacketServer((byte)0){
@Override
public void serializeToByteBuffer( ByteBuffer buffer) {}
@Override
public void deserializeFromByteBuffer( ByteBuffer buffer) {}
});
// provoque une exception dans le thread de sortie, et la termine
} catch (IOException e) {
e.printStackTrace();
}
}
private class TCPServerConnectionOutputThread extends Thread {
private BlockingQueue<PacketServer> packetQueue = new LinkedBlockingDeque<PacketServer>();
public TCPServerConnectionOutputThread(int coId) {
super("TCPSv "+socketName+" Conn#"+coId+" Out");
}
private void addPacket(PacketServer packet) {
packetQueue.add(packet);
}
@Override
public void run() {
try {
while (!socket.isClosed()) {
PacketServer packet = packetQueue.poll(1, TimeUnit.SECONDS);
byte[] data;
if (packet != null) {
data = packet.getFullSerializedPacket();
bandwidthCalculation.addPacket(TCPServerClientConnection.this, false, data.length);
out.write(data);
out.flush();
}
}
} catch (InterruptedException e) {
} catch (IOException e) { }
close();
}
}
}
private void interpreteReceivedMessage(TCPServerClientConnection co, byte[] data) {
Packet p = Packet.constructPacket(data);
if (!(p instanceof PacketClient))
throw new InvalidClientMessage("Le type de packet reçu n'est pas un packet attendu : "+p.getClass().getCanonicalName());
PacketClient pc = (PacketClient) p;
listener.onPacketReceive(this, co, pc);
}
@Override
public void close() {
try {
if (isClosed.get()) return;
clients.forEach(el -> el.close());
socket.close();
isClosed.set(true);
listener.onSocketClose(this);
} catch (IOException e) { }
}
public boolean isClosed() {
return isClosed.get() || socket.isClosed();
}
public static class InvalidClientMessage extends RuntimeException {
private static final long serialVersionUID = 1L;
public InvalidClientMessage(String message) {
super(message);
}
}
}

View File

@ -0,0 +1,18 @@
package fr.pandacube.java.util.network.server;
import fr.pandacube.java.util.network.packet.PacketClient;
import fr.pandacube.java.util.network.server.TCPServer.TCPServerClientConnection;
public interface TCPServerListener {
public void onSocketOpen(TCPServer svConnection);
public void onClientConnect(TCPServer svConnection, TCPServerClientConnection clientConnection);
public void onPacketReceive(TCPServer svConnection, TCPServerClientConnection clientConnection, PacketClient packet);
public void onClientDisconnect(TCPServer svConnection, TCPServerClientConnection clientConnection);
public void onSocketClose(TCPServer svConnection);
}