Amélioration de la librairie réseau

- Support de callback pour le retour de réponse de la part de
l'application interlocuteur (client -> serveur ou serveur -> client)
- ByteBuffer : un String peut être null
- ByteBuffer : une liste de ByteSerializable peut être null
- ByteBuffer : support des liste de String (peut être null aussi)
- TCPServer : on peut obtenir une liste des clients connectés
This commit is contained in:
Marc Baloup 2016-09-08 14:16:07 +02:00
parent 7c87b6c33a
commit dd530bfea9
5 changed files with 267 additions and 44 deletions

View File

@ -8,7 +8,13 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.commons.lang.builder.ToStringBuilder;
@ -18,7 +24,9 @@ import fr.pandacube.java.util.network.packet.Packet;
import fr.pandacube.java.util.network.packet.PacketClient;
import fr.pandacube.java.util.network.packet.PacketException;
import fr.pandacube.java.util.network.packet.PacketServer;
import fr.pandacube.java.util.network.packet.ResponseCallback;
import fr.pandacube.java.util.network.packet.packets.global.PacketServerException;
import javafx.util.Pair;
public class TCPClient extends Thread implements Closeable {
@ -28,6 +36,8 @@ public class TCPClient extends Thread implements Closeable {
private InputStream in;
private OutputStream out;
private Object outSynchronizer = new Object();
private List<Pair<Predicate<PacketServer>, ResponseCallback<PacketServer>>> callbacks = Collections.synchronizedList(new ArrayList<>());
private List<Pair<Predicate<PacketServer>, ResponseCallback<PacketServer>>> callbacksAvoidListener = Collections.synchronizedList(new ArrayList<>());
private AtomicBoolean isClosed = new AtomicBoolean(false);
@ -78,17 +88,23 @@ public class TCPClient extends Thread implements Closeable {
try {
listener.onServerException(this, ((PacketServerException)p).getExceptionString());
} catch (Exception e) {
Log.severe("Exception while calling TCPClientListener.onPacketReceive()", e);
Log.severe("Exception while calling TCPClientListener.onServerException()", e);
}
}
PacketServer ps = (PacketServer) p;
boolean callbackExecuted = executeCallbacks(ps, callbacksAvoidListener);
try {
listener.onPacketReceive(this, ps);
if (!callbackExecuted)
listener.onPacketReceive(this, ps);
} catch (Exception e) {
Log.severe("Exception while calling TCPClientListener.onPacketReceive()", e);
}
executeCallbacks(ps, callbacks);
} catch (Exception e) {
Log.severe("Exception while handling packet from server", e);
}
@ -99,6 +115,27 @@ public class TCPClient extends Thread implements Closeable {
}
close();
}
private boolean executeCallbacks(PacketServer ps, List<Pair<Predicate<PacketServer>, ResponseCallback<PacketServer>>> callbacks) {
boolean executedOne = false;
synchronized (callbacks) {
for(Iterator<Pair<Predicate<PacketServer>, ResponseCallback<PacketServer>>> it = callbacks.iterator(); it.hasNext();) {
Pair<Predicate<PacketServer>, ResponseCallback<PacketServer>> c = it.next();
try {
if (c.getKey().test(ps)) {
it.remove();
c.getValue().call(ps);
executedOne = true;
}
} catch (Exception e) {
Log.severe("Exception while executing callback", e);
}
}
}
return executedOne;
}
private void forceReadBytes(byte[] buff) throws IOException {
int pos = 0;
@ -116,6 +153,34 @@ public class TCPClient extends Thread implements Closeable {
}
}
public void sendAndGetResponse(PacketClient packet, Predicate<PacketServer> responseCondition, ResponseCallback<PacketServer> callback, boolean avoidListener) throws IOException {
Pair<Predicate<PacketServer>, ResponseCallback<PacketServer>> p = new Pair<>(responseCondition, callback);
if (avoidListener)
callbacksAvoidListener.add(p);
else
callbacks.add(p);
send(packet);
}
public PacketServer sendAndWaitForResponse(PacketClient packet, Predicate<PacketServer> responseCondition, boolean avoidListener) throws IOException, InterruptedException {
AtomicReference<PacketServer> psStorage = new AtomicReference<>(null);
synchronized (psStorage) {
sendAndGetResponse(packet, responseCondition, packetServer -> {
synchronized (psStorage) {
psStorage.set(packetServer);
psStorage.notifyAll();
}
}, true);
psStorage.wait();
return psStorage.get();
}
}
@Override
public void close() {
try {

View File

@ -23,6 +23,7 @@ import fr.pandacube.java.util.network.packet.packets.web.PacketServerWebResponse
* - web (-000----) 0x00 - 0x0F 0x80 - 0x8F (client is Apache, server is PandacubeCore master (PandacubeWeb))
* - spigot (-001----) 0x10 - 0x1F 0x90 - 0x9F (client is PandacubeSpigot, server is PandacubeCore master)
* - bungee (-010----) 0x20 - 0x2F 0xA0 - 0xAF (client is PandacubeBungee, server is PandacubeCore master)
* -coreslave(-011----) 0x30 - 0x3F 0xB0 - 0xBF (client is PandacubeCore slave, sv is PandacubeCore master)
* - global (-101----) 0x50 - 0x5F 0xD0 - 0xDF
*
* - reserved if not enough packet id in certain use case
@ -47,7 +48,7 @@ public abstract class Packet implements ByteSerializable {
ByteBuffer internal = new ByteBuffer(CHARSET).putObject(this);
byte[] data = Arrays.copyOfRange(internal.array(), 0, internal.getPosition());
return new ByteBuffer(5 + data.length, CHARSET).putByte(code).putInt(data.length).putBytes(data).array();
return new ByteBuffer(5 + data.length, CHARSET).putByte(code).putInt(data.length).putByteArray(data).array();
}
public static final Charset CHARSET = Pandacube.NETWORK_CHARSET;

View File

@ -0,0 +1,8 @@
package fr.pandacube.java.util.network.packet;
@FunctionalInterface
public interface ResponseCallback<T extends Packet> {
public void call(T packet);
}

View File

@ -47,10 +47,21 @@ public class ByteBuffer implements Cloneable {
/**
* @see java.nio.ByteBuffer#get(byte[])
*/
public byte[] getBytes(byte[] b) {
public byte[] getByteArray(byte[] b) {
buff.get(b);
return b;
}
/**
* Return the next byte array wich is preceded with his size as integer,
* or null if the founded size is negative.
* @return
*/
public byte[] getSizedByteArray() {
int size = getInt();
if (size < 0) return null;
return getByteArray(new byte[size]);
}
/**
* @see java.nio.ByteBuffer#getChar()
@ -106,11 +117,19 @@ public class ByteBuffer implements Cloneable {
/**
* @see java.nio.ByteBuffer#put(byte[])
*/
public ByteBuffer putBytes(byte[] b) {
public ByteBuffer putByteArray(byte[] b) {
askForBufferExtension(b.length * Byte.BYTES);
buff.put(b);
return this;
}
public ByteBuffer putSizedByteArray(byte[] b) {
if (b == null) {
return putInt(-1);
}
putInt(b.length);
return putByteArray(b);
}
/**
* @see java.nio.ByteBuffer#putChar(char)
@ -187,21 +206,31 @@ public class ByteBuffer implements Cloneable {
return buff.capacity();
}
/**
*
* @param s null String are supported
* @return
*/
public ByteBuffer putString(String s) {
byte[] charBytes = s.getBytes(charset);
putInt(charBytes.length);
putBytes(charBytes);
return this;
if (s == null) {
return putInt(-1);
}
return putSizedByteArray(s.getBytes(charset));
}
/**
* returned string can be null
* @return
*/
public String getString() {
return new String(getBytes(new byte[getInt()]), charset);
byte[] binaryString = getSizedByteArray();
return (binaryString == null) ? null : new String(binaryString, charset);
}
/**
* The objet will be serialized and the data put in the current buffer
*
* @param obj the object to serialize
* @param obj the object to serialize. Can't be null.
* @return the current buffer
*/
public ByteBuffer putObject(ByteSerializable obj) {
@ -211,12 +240,12 @@ public class ByteBuffer implements Cloneable {
/**
* Ask to object passed as argument to deserialize data in buffer and fill
* the object content
* the object content. ByteSerializable object are never null.
*
* @param <T>
* @param obj the objet to fill with his method
* {@link ByteSerializable#deserializeFromByteBuffer(ByteBuffer)}
* @return obj a reference to the same object
* @param clazz the class wich will be instanciated with his no-argument Constructor
* before filled by using {@link ByteSerializable#deserializeFromByteBuffer(ByteBuffer)}
* @return obj a reference to the filled object
*/
public <T extends ByteSerializable> T getObject(Class<T> clazz) {
try {
@ -224,25 +253,67 @@ public class ByteBuffer implements Cloneable {
obj.deserializeFromByteBuffer(this);
return obj;
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
throw new RuntimeException("A ByteSerializable must have a no-argument Constructor", e);
}
}
/**
*
* @param list The list itself can be null, but not the values.
* @return
*/
public ByteBuffer putListObject(List<ByteSerializable> list) {
if (list.stream().anyMatch(e -> e == null))
throw new IllegalArgumentException("List of object can't contains any null value");
putInt(list.size());
for (ByteSerializable obj : list)
putObject(obj);
return this;
}
/**
*
* @param list The list can be null, and any String can be null too.
* @return
*/
public ByteBuffer putListOfString(List<String> list) {
if (list == null) {
return putInt(-1);
}
putInt(list.size());
for (String str : list)
putString(str);
return this;
}
/**
*
* @param clazz
* @return Can be null. If not, there is no null element inside.
*/
public <T extends ByteSerializable> List<T> getListObject(Class<T> clazz) {
List<T> list = new ArrayList<T>();
int size = getInt();
if (size < 0)
return null;
List<T> list = new ArrayList<>();
for (int i = 0; i < size; i++)
list.add(getObject(clazz));
return list;
}
/**
* @return a List of String. The list can be null, and any element can be null too.
*/
public <T extends ByteSerializable> List<String> getListOfString() {
int size = getInt();
if (size < 0)
return null;
List<String> list = new ArrayList<>();
for (int i = 0; i < size; i++)
list.add(getString());
return list;
}
/**
* @see java.nio.ByteBuffer#array()
*/

View File

@ -10,12 +10,16 @@ import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.logging.Level;
import org.apache.commons.lang.builder.ToStringBuilder;
@ -26,8 +30,10 @@ import fr.pandacube.java.util.network.packet.Packet;
import fr.pandacube.java.util.network.packet.PacketClient;
import fr.pandacube.java.util.network.packet.PacketException;
import fr.pandacube.java.util.network.packet.PacketServer;
import fr.pandacube.java.util.network.packet.ResponseCallback;
import fr.pandacube.java.util.network.packet.bytebuffer.ByteBuffer;
import fr.pandacube.java.util.network.packet.packets.global.PacketServerException;
import javafx.util.Pair;
/**
*
@ -41,7 +47,7 @@ public class TCPServer extends Thread implements Closeable {
private TCPServerListener listener;
private String socketName;
private List<TCPServerClientConnection> clients = new ArrayList<>();
private List<TCPServerClientConnection> clients = Collections.synchronizedList(new ArrayList<>());
private AtomicBoolean isClosed = new AtomicBoolean(false);
@ -53,7 +59,7 @@ public class TCPServer extends Thread implements Closeable {
if (port <= 0 || port > 65535) throw new IllegalArgumentException("le numéro de port est invalide");
socket = new ServerSocket();
socket.setReceiveBufferSize(Pandacube.NETWORK_TCP_BUFFER_SIZE);
socket.setPerformancePreferences(0, 2, 1);
socket.setPerformancePreferences(0, 1, 0);
socket.bind(new InetSocketAddress(port));
listener = l;
try {
@ -95,6 +101,9 @@ public class TCPServer extends Thread implements Closeable {
private OutputStream out;
private SocketAddress address;
private TCPServerConnectionOutputThread outThread;
private List<Pair<Predicate<PacketClient>, ResponseCallback<PacketClient>>> callbacks = Collections.synchronizedList(new ArrayList<>());
private List<Pair<Predicate<PacketClient>, ResponseCallback<PacketClient>>> callbacksAvoidListener = Collections.synchronizedList(new ArrayList<>());
public TCPServerClientConnection(Socket s, int coId) throws IOException {
super("TCPSv " + socketName + " Conn#" + coId + " In");
@ -126,13 +135,26 @@ public class TCPServer extends Thread implements Closeable {
forceReadBytes(content);
byte[] packetData = new ByteBuffer(1 + 4 + size, Packet.CHARSET).putBytes(code).putBytes(sizeB)
.putBytes(content).array();
byte[] packetData = new ByteBuffer(1 + 4 + size, Packet.CHARSET).putByteArray(code).putByteArray(sizeB)
.putByteArray(content).array();
bandwidthCalculation.addPacket(this, true, packetData.length);
try {
interpreteReceivedMessage(this, packetData);
Packet p = Packet.constructPacket(packetData);
if (!(p instanceof PacketClient))
throw new PacketException(p.getClass().getCanonicalName() + " is not an instanceof PacketClient");
PacketClient pc = (PacketClient) p;
boolean oneCallbackExecuted = executeCallbacks(pc, callbacksAvoidListener);
if (!oneCallbackExecuted)
listener.onPacketReceive(TCPServer.this, this, pc);
executeCallbacks(pc, callbacks);
} catch (Exception e) {
Log.severe("Exception while handling packet. This exception will be sent to the client with PacketServerException packet.", e);
PacketServerException packet = new PacketServerException();
@ -148,10 +170,66 @@ public class TCPServer extends Thread implements Closeable {
close();
}
private boolean executeCallbacks(PacketClient pc, List<Pair<Predicate<PacketClient>, ResponseCallback<PacketClient>>> callbacks) {
boolean executedOne = false;
synchronized (callbacks) {
for(Iterator<Pair<Predicate<PacketClient>, ResponseCallback<PacketClient>>> it = callbacks.iterator(); it.hasNext();) {
Pair<Predicate<PacketClient>, ResponseCallback<PacketClient>> c = it.next();
try {
if (c.getKey().test(pc)) {
it.remove();
c.getValue().call(pc);
executedOne = true;
}
} catch (Exception e) {
Log.severe("Exception while executing callback", e);
}
}
}
return executedOne;
}
public void send(PacketServer p) {
outThread.addPacket(p);
}
public void sendAndGetResponse(PacketServer packet, Predicate<PacketClient> responseCondition, ResponseCallback<PacketClient> callback, boolean avoidListener) {
Pair<Predicate<PacketClient>, ResponseCallback<PacketClient>> p = new Pair<>(responseCondition, callback);
if (avoidListener)
callbacksAvoidListener.add(p);
else
callbacks.add(p);
send(packet);
}
/**
*
* @param packet the packet to send
* @param responseCondition {@link Predicate} that check each received packet to know which
* is the expected one as a response of the sended packet.
* @param avoidListener
* @param timeout
* @return
* @throws InterruptedException
*/
public PacketClient sendAndWaitForResponse(PacketServer packet, Predicate<PacketClient> responseCondition, boolean avoidListener, long timeout) throws InterruptedException {
AtomicReference<PacketClient> pcStorage = new AtomicReference<>(null);
synchronized (pcStorage) {
sendAndGetResponse(packet, responseCondition, packetClient -> {
synchronized (pcStorage) {
pcStorage.set(packetClient);
pcStorage.notifyAll();
}
}, true);
pcStorage.wait(timeout);
return pcStorage.get();
}
}
private void forceReadBytes(byte[] buff) throws IOException {
int pos = 0;
@ -173,17 +251,15 @@ public class TCPServer extends Thread implements Closeable {
clients.remove(this);
try {
Thread.sleep(200);
socket.close();
if (!Thread.currentThread().equals(outThread)) send(new PacketServer((byte) 0) {
@Override
public void serializeToByteBuffer(ByteBuffer buffer) {}
@Override
public void deserializeFromByteBuffer(ByteBuffer buffer) {}
@Override public void serializeToByteBuffer(ByteBuffer buffer) {}
@Override public void deserializeFromByteBuffer(ByteBuffer buffer) {}
});
// provoque une exception dans le thread de sortie, et la
// termine
} catch (IOException e) { }
} catch (Exception e) { }
}
private class TCPServerConnectionOutputThread extends Thread {
@ -205,10 +281,16 @@ public class TCPServer extends Thread implements Closeable {
PacketServer packet = packetQueue.poll(1, TimeUnit.SECONDS);
byte[] data;
if (packet != null) {
data = packet.getFullSerializedPacket();
bandwidthCalculation.addPacket(TCPServerClientConnection.this, false, data.length);
out.write(data);
out.flush();
try {
data = packet.getFullSerializedPacket();
bandwidthCalculation.addPacket(TCPServerClientConnection.this, false, data.length);
out.write(data);
out.flush();
} catch (IOException e) {
throw e;
} catch (Exception e) {
Log.severe("Can't send packet "+packet.getClass(), e);
}
}
}
@ -229,18 +311,6 @@ public class TCPServer extends Thread implements Closeable {
}
private void interpreteReceivedMessage(TCPServerClientConnection co, byte[] data) {
Packet p = Packet.constructPacket(data);
if (!(p instanceof PacketClient))
throw new PacketException(p.getClass().getCanonicalName() + " is not an instanceof PacketClient");
PacketClient pc = (PacketClient) p;
listener.onPacketReceive(this, co, pc);
}
@Override
public void close() {
try {
@ -263,6 +333,14 @@ public class TCPServer extends Thread implements Closeable {
return isClosed.get() || socket.isClosed();
}
public List<TCPServerClientConnection> getClients() {
synchronized (clients) {
return new ArrayList<>(clients);
}
}
@Override