From dd530bfea955b5ba3c034df50940ddf3e63097c5 Mon Sep 17 00:00:00 2001 From: Marc Baloup Date: Thu, 8 Sep 2016 14:16:07 +0200 Subject: [PATCH] =?UTF-8?q?Am=C3=A9lioration=20de=20la=20librairie=20r?= =?UTF-8?q?=C3=A9seau?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Support de callback pour le retour de réponse de la part de l'application interlocuteur (client -> serveur ou serveur -> client) - ByteBuffer : un String peut être null - ByteBuffer : une liste de ByteSerializable peut être null - ByteBuffer : support des liste de String (peut être null aussi) - TCPServer : on peut obtenir une liste des clients connectés --- .../java/util/network/client/TCPClient.java | 69 ++++++++- .../java/util/network/packet/Packet.java | 3 +- .../util/network/packet/ResponseCallback.java | 8 ++ .../network/packet/bytebuffer/ByteBuffer.java | 99 +++++++++++-- .../java/util/network/server/TCPServer.java | 132 ++++++++++++++---- 5 files changed, 267 insertions(+), 44 deletions(-) create mode 100644 src/fr/pandacube/java/util/network/packet/ResponseCallback.java diff --git a/src/fr/pandacube/java/util/network/client/TCPClient.java b/src/fr/pandacube/java/util/network/client/TCPClient.java index fd7e2ef..ad2a385 100644 --- a/src/fr/pandacube/java/util/network/client/TCPClient.java +++ b/src/fr/pandacube/java/util/network/client/TCPClient.java @@ -8,7 +8,13 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import org.apache.commons.lang.builder.ToStringBuilder; @@ -18,7 +24,9 @@ import fr.pandacube.java.util.network.packet.Packet; import fr.pandacube.java.util.network.packet.PacketClient; import fr.pandacube.java.util.network.packet.PacketException; import fr.pandacube.java.util.network.packet.PacketServer; +import fr.pandacube.java.util.network.packet.ResponseCallback; import fr.pandacube.java.util.network.packet.packets.global.PacketServerException; +import javafx.util.Pair; public class TCPClient extends Thread implements Closeable { @@ -28,6 +36,8 @@ public class TCPClient extends Thread implements Closeable { private InputStream in; private OutputStream out; private Object outSynchronizer = new Object(); + private List, ResponseCallback>> callbacks = Collections.synchronizedList(new ArrayList<>()); + private List, ResponseCallback>> callbacksAvoidListener = Collections.synchronizedList(new ArrayList<>()); private AtomicBoolean isClosed = new AtomicBoolean(false); @@ -78,17 +88,23 @@ public class TCPClient extends Thread implements Closeable { try { listener.onServerException(this, ((PacketServerException)p).getExceptionString()); } catch (Exception e) { - Log.severe("Exception while calling TCPClientListener.onPacketReceive()", e); + Log.severe("Exception while calling TCPClientListener.onServerException()", e); } } PacketServer ps = (PacketServer) p; + boolean callbackExecuted = executeCallbacks(ps, callbacksAvoidListener); + try { - listener.onPacketReceive(this, ps); + if (!callbackExecuted) + listener.onPacketReceive(this, ps); } catch (Exception e) { Log.severe("Exception while calling TCPClientListener.onPacketReceive()", e); } + + executeCallbacks(ps, callbacks); + } catch (Exception e) { Log.severe("Exception while handling packet from server", e); } @@ -99,6 +115,27 @@ public class TCPClient extends Thread implements Closeable { } close(); } + + + private boolean executeCallbacks(PacketServer ps, List, ResponseCallback>> callbacks) { + boolean executedOne = false; + synchronized (callbacks) { + for(Iterator, ResponseCallback>> it = callbacks.iterator(); it.hasNext();) { + Pair, ResponseCallback> c = it.next(); + try { + if (c.getKey().test(ps)) { + it.remove(); + c.getValue().call(ps); + executedOne = true; + } + } catch (Exception e) { + Log.severe("Exception while executing callback", e); + } + } + } + return executedOne; + } + private void forceReadBytes(byte[] buff) throws IOException { int pos = 0; @@ -116,6 +153,34 @@ public class TCPClient extends Thread implements Closeable { } } + + public void sendAndGetResponse(PacketClient packet, Predicate responseCondition, ResponseCallback callback, boolean avoidListener) throws IOException { + Pair, ResponseCallback> p = new Pair<>(responseCondition, callback); + if (avoidListener) + callbacksAvoidListener.add(p); + else + callbacks.add(p); + send(packet); + } + + + public PacketServer sendAndWaitForResponse(PacketClient packet, Predicate responseCondition, boolean avoidListener) throws IOException, InterruptedException { + AtomicReference psStorage = new AtomicReference<>(null); + synchronized (psStorage) { + sendAndGetResponse(packet, responseCondition, packetServer -> { + synchronized (psStorage) { + psStorage.set(packetServer); + psStorage.notifyAll(); + } + }, true); + + psStorage.wait(); + return psStorage.get(); + } + } + + + @Override public void close() { try { diff --git a/src/fr/pandacube/java/util/network/packet/Packet.java b/src/fr/pandacube/java/util/network/packet/Packet.java index c125c71..35f8399 100644 --- a/src/fr/pandacube/java/util/network/packet/Packet.java +++ b/src/fr/pandacube/java/util/network/packet/Packet.java @@ -23,6 +23,7 @@ import fr.pandacube.java.util.network.packet.packets.web.PacketServerWebResponse * - web (-000----) 0x00 - 0x0F 0x80 - 0x8F (client is Apache, server is PandacubeCore master (PandacubeWeb)) * - spigot (-001----) 0x10 - 0x1F 0x90 - 0x9F (client is PandacubeSpigot, server is PandacubeCore master) * - bungee (-010----) 0x20 - 0x2F 0xA0 - 0xAF (client is PandacubeBungee, server is PandacubeCore master) + * -coreslave(-011----) 0x30 - 0x3F 0xB0 - 0xBF (client is PandacubeCore slave, sv is PandacubeCore master) * - global (-101----) 0x50 - 0x5F 0xD0 - 0xDF * * - reserved if not enough packet id in certain use case @@ -47,7 +48,7 @@ public abstract class Packet implements ByteSerializable { ByteBuffer internal = new ByteBuffer(CHARSET).putObject(this); byte[] data = Arrays.copyOfRange(internal.array(), 0, internal.getPosition()); - return new ByteBuffer(5 + data.length, CHARSET).putByte(code).putInt(data.length).putBytes(data).array(); + return new ByteBuffer(5 + data.length, CHARSET).putByte(code).putInt(data.length).putByteArray(data).array(); } public static final Charset CHARSET = Pandacube.NETWORK_CHARSET; diff --git a/src/fr/pandacube/java/util/network/packet/ResponseCallback.java b/src/fr/pandacube/java/util/network/packet/ResponseCallback.java new file mode 100644 index 0000000..a6a669e --- /dev/null +++ b/src/fr/pandacube/java/util/network/packet/ResponseCallback.java @@ -0,0 +1,8 @@ +package fr.pandacube.java.util.network.packet; + +@FunctionalInterface +public interface ResponseCallback { + + public void call(T packet); + +} diff --git a/src/fr/pandacube/java/util/network/packet/bytebuffer/ByteBuffer.java b/src/fr/pandacube/java/util/network/packet/bytebuffer/ByteBuffer.java index cb7cb87..37ca99d 100644 --- a/src/fr/pandacube/java/util/network/packet/bytebuffer/ByteBuffer.java +++ b/src/fr/pandacube/java/util/network/packet/bytebuffer/ByteBuffer.java @@ -47,10 +47,21 @@ public class ByteBuffer implements Cloneable { /** * @see java.nio.ByteBuffer#get(byte[]) */ - public byte[] getBytes(byte[] b) { + public byte[] getByteArray(byte[] b) { buff.get(b); return b; } + + /** + * Return the next byte array wich is preceded with his size as integer, + * or null if the founded size is negative. + * @return + */ + public byte[] getSizedByteArray() { + int size = getInt(); + if (size < 0) return null; + return getByteArray(new byte[size]); + } /** * @see java.nio.ByteBuffer#getChar() @@ -106,11 +117,19 @@ public class ByteBuffer implements Cloneable { /** * @see java.nio.ByteBuffer#put(byte[]) */ - public ByteBuffer putBytes(byte[] b) { + public ByteBuffer putByteArray(byte[] b) { askForBufferExtension(b.length * Byte.BYTES); buff.put(b); return this; } + + public ByteBuffer putSizedByteArray(byte[] b) { + if (b == null) { + return putInt(-1); + } + putInt(b.length); + return putByteArray(b); + } /** * @see java.nio.ByteBuffer#putChar(char) @@ -187,21 +206,31 @@ public class ByteBuffer implements Cloneable { return buff.capacity(); } + /** + * + * @param s null String are supported + * @return + */ public ByteBuffer putString(String s) { - byte[] charBytes = s.getBytes(charset); - putInt(charBytes.length); - putBytes(charBytes); - return this; + if (s == null) { + return putInt(-1); + } + return putSizedByteArray(s.getBytes(charset)); } + /** + * returned string can be null + * @return + */ public String getString() { - return new String(getBytes(new byte[getInt()]), charset); + byte[] binaryString = getSizedByteArray(); + return (binaryString == null) ? null : new String(binaryString, charset); } /** * The objet will be serialized and the data put in the current buffer * - * @param obj the object to serialize + * @param obj the object to serialize. Can't be null. * @return the current buffer */ public ByteBuffer putObject(ByteSerializable obj) { @@ -211,12 +240,12 @@ public class ByteBuffer implements Cloneable { /** * Ask to object passed as argument to deserialize data in buffer and fill - * the object content + * the object content. ByteSerializable object are never null. * * @param - * @param obj the objet to fill with his method - * {@link ByteSerializable#deserializeFromByteBuffer(ByteBuffer)} - * @return obj a reference to the same object + * @param clazz the class wich will be instanciated with his no-argument Constructor + * before filled by using {@link ByteSerializable#deserializeFromByteBuffer(ByteBuffer)} + * @return obj a reference to the filled object */ public T getObject(Class clazz) { try { @@ -224,25 +253,67 @@ public class ByteBuffer implements Cloneable { obj.deserializeFromByteBuffer(this); return obj; } catch (InstantiationException | IllegalAccessException e) { - throw new RuntimeException(e); + throw new RuntimeException("A ByteSerializable must have a no-argument Constructor", e); } } + /** + * + * @param list The list itself can be null, but not the values. + * @return + */ public ByteBuffer putListObject(List list) { + if (list.stream().anyMatch(e -> e == null)) + throw new IllegalArgumentException("List of object can't contains any null value"); putInt(list.size()); for (ByteSerializable obj : list) putObject(obj); return this; } + /** + * + * @param list The list can be null, and any String can be null too. + * @return + */ + public ByteBuffer putListOfString(List list) { + if (list == null) { + return putInt(-1); + } + putInt(list.size()); + for (String str : list) + putString(str); + return this; + } + + /** + * + * @param clazz + * @return Can be null. If not, there is no null element inside. + */ public List getListObject(Class clazz) { - List list = new ArrayList(); int size = getInt(); + if (size < 0) + return null; + List list = new ArrayList<>(); for (int i = 0; i < size; i++) list.add(getObject(clazz)); return list; } + /** + * @return a List of String. The list can be null, and any element can be null too. + */ + public List getListOfString() { + int size = getInt(); + if (size < 0) + return null; + List list = new ArrayList<>(); + for (int i = 0; i < size; i++) + list.add(getString()); + return list; + } + /** * @see java.nio.ByteBuffer#array() */ diff --git a/src/fr/pandacube/java/util/network/server/TCPServer.java b/src/fr/pandacube/java/util/network/server/TCPServer.java index cf31780..b0178c3 100644 --- a/src/fr/pandacube/java/util/network/server/TCPServer.java +++ b/src/fr/pandacube/java/util/network/server/TCPServer.java @@ -10,12 +10,16 @@ import java.net.Socket; import java.net.SocketAddress; import java.net.SocketException; import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.logging.Level; import org.apache.commons.lang.builder.ToStringBuilder; @@ -26,8 +30,10 @@ import fr.pandacube.java.util.network.packet.Packet; import fr.pandacube.java.util.network.packet.PacketClient; import fr.pandacube.java.util.network.packet.PacketException; import fr.pandacube.java.util.network.packet.PacketServer; +import fr.pandacube.java.util.network.packet.ResponseCallback; import fr.pandacube.java.util.network.packet.bytebuffer.ByteBuffer; import fr.pandacube.java.util.network.packet.packets.global.PacketServerException; +import javafx.util.Pair; /** * @@ -41,7 +47,7 @@ public class TCPServer extends Thread implements Closeable { private TCPServerListener listener; private String socketName; - private List clients = new ArrayList<>(); + private List clients = Collections.synchronizedList(new ArrayList<>()); private AtomicBoolean isClosed = new AtomicBoolean(false); @@ -53,7 +59,7 @@ public class TCPServer extends Thread implements Closeable { if (port <= 0 || port > 65535) throw new IllegalArgumentException("le numéro de port est invalide"); socket = new ServerSocket(); socket.setReceiveBufferSize(Pandacube.NETWORK_TCP_BUFFER_SIZE); - socket.setPerformancePreferences(0, 2, 1); + socket.setPerformancePreferences(0, 1, 0); socket.bind(new InetSocketAddress(port)); listener = l; try { @@ -95,6 +101,9 @@ public class TCPServer extends Thread implements Closeable { private OutputStream out; private SocketAddress address; private TCPServerConnectionOutputThread outThread; + private List, ResponseCallback>> callbacks = Collections.synchronizedList(new ArrayList<>()); + private List, ResponseCallback>> callbacksAvoidListener = Collections.synchronizedList(new ArrayList<>()); + public TCPServerClientConnection(Socket s, int coId) throws IOException { super("TCPSv " + socketName + " Conn#" + coId + " In"); @@ -126,13 +135,26 @@ public class TCPServer extends Thread implements Closeable { forceReadBytes(content); - byte[] packetData = new ByteBuffer(1 + 4 + size, Packet.CHARSET).putBytes(code).putBytes(sizeB) - .putBytes(content).array(); + byte[] packetData = new ByteBuffer(1 + 4 + size, Packet.CHARSET).putByteArray(code).putByteArray(sizeB) + .putByteArray(content).array(); bandwidthCalculation.addPacket(this, true, packetData.length); try { - interpreteReceivedMessage(this, packetData); + Packet p = Packet.constructPacket(packetData); + + if (!(p instanceof PacketClient)) + throw new PacketException(p.getClass().getCanonicalName() + " is not an instanceof PacketClient"); + + PacketClient pc = (PacketClient) p; + + boolean oneCallbackExecuted = executeCallbacks(pc, callbacksAvoidListener); + + if (!oneCallbackExecuted) + listener.onPacketReceive(TCPServer.this, this, pc); + + executeCallbacks(pc, callbacks); + } catch (Exception e) { Log.severe("Exception while handling packet. This exception will be sent to the client with PacketServerException packet.", e); PacketServerException packet = new PacketServerException(); @@ -148,10 +170,66 @@ public class TCPServer extends Thread implements Closeable { close(); } + + + private boolean executeCallbacks(PacketClient pc, List, ResponseCallback>> callbacks) { + boolean executedOne = false; + synchronized (callbacks) { + for(Iterator, ResponseCallback>> it = callbacks.iterator(); it.hasNext();) { + Pair, ResponseCallback> c = it.next(); + try { + if (c.getKey().test(pc)) { + it.remove(); + c.getValue().call(pc); + executedOne = true; + } + } catch (Exception e) { + Log.severe("Exception while executing callback", e); + } + } + } + return executedOne; + } public void send(PacketServer p) { outThread.addPacket(p); } + + public void sendAndGetResponse(PacketServer packet, Predicate responseCondition, ResponseCallback callback, boolean avoidListener) { + Pair, ResponseCallback> p = new Pair<>(responseCondition, callback); + if (avoidListener) + callbacksAvoidListener.add(p); + else + callbacks.add(p); + send(packet); + } + + + + /** + * + * @param packet the packet to send + * @param responseCondition {@link Predicate} that check each received packet to know which + * is the expected one as a response of the sended packet. + * @param avoidListener + * @param timeout + * @return + * @throws InterruptedException + */ + public PacketClient sendAndWaitForResponse(PacketServer packet, Predicate responseCondition, boolean avoidListener, long timeout) throws InterruptedException { + AtomicReference pcStorage = new AtomicReference<>(null); + synchronized (pcStorage) { + sendAndGetResponse(packet, responseCondition, packetClient -> { + synchronized (pcStorage) { + pcStorage.set(packetClient); + pcStorage.notifyAll(); + } + }, true); + + pcStorage.wait(timeout); + return pcStorage.get(); + } + } private void forceReadBytes(byte[] buff) throws IOException { int pos = 0; @@ -173,17 +251,15 @@ public class TCPServer extends Thread implements Closeable { clients.remove(this); try { + Thread.sleep(200); socket.close(); if (!Thread.currentThread().equals(outThread)) send(new PacketServer((byte) 0) { - @Override - public void serializeToByteBuffer(ByteBuffer buffer) {} - - @Override - public void deserializeFromByteBuffer(ByteBuffer buffer) {} + @Override public void serializeToByteBuffer(ByteBuffer buffer) {} + @Override public void deserializeFromByteBuffer(ByteBuffer buffer) {} }); // provoque une exception dans le thread de sortie, et la // termine - } catch (IOException e) { } + } catch (Exception e) { } } private class TCPServerConnectionOutputThread extends Thread { @@ -205,10 +281,16 @@ public class TCPServer extends Thread implements Closeable { PacketServer packet = packetQueue.poll(1, TimeUnit.SECONDS); byte[] data; if (packet != null) { - data = packet.getFullSerializedPacket(); - bandwidthCalculation.addPacket(TCPServerClientConnection.this, false, data.length); - out.write(data); - out.flush(); + try { + data = packet.getFullSerializedPacket(); + bandwidthCalculation.addPacket(TCPServerClientConnection.this, false, data.length); + out.write(data); + out.flush(); + } catch (IOException e) { + throw e; + } catch (Exception e) { + Log.severe("Can't send packet "+packet.getClass(), e); + } } } @@ -229,18 +311,6 @@ public class TCPServer extends Thread implements Closeable { } - private void interpreteReceivedMessage(TCPServerClientConnection co, byte[] data) { - - Packet p = Packet.constructPacket(data); - - if (!(p instanceof PacketClient)) - throw new PacketException(p.getClass().getCanonicalName() + " is not an instanceof PacketClient"); - - PacketClient pc = (PacketClient) p; - - listener.onPacketReceive(this, co, pc); - } - @Override public void close() { try { @@ -263,6 +333,14 @@ public class TCPServer extends Thread implements Closeable { return isClosed.get() || socket.isClosed(); } + + + public List getClients() { + synchronized (clients) { + return new ArrayList<>(clients); + } + } + @Override