Compare commits

..

No commits in common. "46653f06ff349fc9e0217ed033965eba788625c1" and "5b40c4aabb7c44fbaa69a28cfff267b511a493bc" have entirely different histories.

4 changed files with 25 additions and 88 deletions

View File

@ -1,5 +1,6 @@
package fr.pandacube.lib.ws.client; package fr.pandacube.lib.ws.client;
import fr.pandacube.lib.util.Log;
import fr.pandacube.lib.util.ThrowableUtil; import fr.pandacube.lib.util.ThrowableUtil;
import fr.pandacube.lib.ws.AbstractWS; import fr.pandacube.lib.ws.AbstractWS;
@ -15,6 +16,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
// TODO implement auto-reconnect
/** /**
* Minimal implementation of a Websocket client endpoint using the java.net.http Websocket API. * Minimal implementation of a Websocket client endpoint using the java.net.http Websocket API.
*/ */
@ -22,7 +24,6 @@ public abstract class AbstractClientWS implements AbstractWS {
private final URI uri; private final URI uri;
private boolean autoReconnect; private boolean autoReconnect;
private boolean isConnecting;
private final AtomicReference<WebSocket> socket = new AtomicReference<>(); private final AtomicReference<WebSocket> socket = new AtomicReference<>();
@ -47,27 +48,14 @@ public abstract class AbstractClientWS implements AbstractWS {
@Override @Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) { public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
try { AbstractClientWS.this.onClose(statusCode, reason);
AbstractClientWS.this.onClose(statusCode, reason); return Listener.super.onClose(webSocket, statusCode, reason);
} finally {
synchronized (socket) {
socket.set(null);
reconnectIfNecessary();
}
}
return null;
} }
@Override @Override
public void onError(WebSocket webSocket, Throwable error) { public void onError(WebSocket webSocket, Throwable error) {
try { AbstractClientWS.this.onError(error);
AbstractClientWS.this.onError(error); Listener.super.onError(webSocket, error);
} finally {
synchronized (socket) {
socket.set(null);
reconnectIfNecessary();
}
}
} }
}; };
@ -81,40 +69,21 @@ public abstract class AbstractClientWS implements AbstractWS {
public AbstractClientWS(String uri, boolean autoReconnect) throws URISyntaxException { public AbstractClientWS(String uri, boolean autoReconnect) throws URISyntaxException {
this.uri = new URI(uri); this.uri = new URI(uri);
this.autoReconnect = autoReconnect; this.autoReconnect = autoReconnect;
connect(); if (autoReconnect) {
} Log.warning("Websocket client auto-reconnect is not yet implemented.");
private void reconnectIfNecessary() {
synchronized (socket) {
if (autoReconnect && !isConnecting && socket.get() == null) {
connect();
}
} }
connect();
} }
private void connect() { private void connect() {
synchronized (socket) { synchronized (socket) {
isConnecting = true; socket.set(HttpClient.newHttpClient()
HttpClient.newHttpClient()
.newWebSocketBuilder() .newWebSocketBuilder()
.connectTimeout(Duration.ofSeconds(5)) .connectTimeout(Duration.ofSeconds(2))
.buildAsync(uri, receiveListener) .buildAsync(uri, receiveListener)
.whenCompleteAsync((ws, ex) -> { .join()
synchronized (socket) { );
isConnecting = false;
if (ws != null) {
socket.set(ws);
}
}
if (ex instanceof IOException) {
reconnectIfNecessary();
}
else {
logError("Error connecting (not trying to reconnect even if asked): ", ex);
}
});
} }
} }
@ -123,11 +92,7 @@ public abstract class AbstractClientWS implements AbstractWS {
public final void sendString(String message) throws IOException { public final void sendString(String message) throws IOException {
try { try {
synchronized (socket) { synchronized (socket) {
WebSocket ws = socket.get(); socket.get().sendText(message, true).join();
if (ws != null)
ws.sendText(message, true).join();
else
throw new IOException("Connection is currently closed");
} }
} catch (CompletionException ce) { } catch (CompletionException ce) {
if (ce.getCause() instanceof IOException ioe) if (ce.getCause() instanceof IOException ioe)
@ -145,9 +110,7 @@ public abstract class AbstractClientWS implements AbstractWS {
public final void sendClose(int code, String reason) throws IOException { public final void sendClose(int code, String reason) throws IOException {
synchronized (socket) { synchronized (socket) {
autoReconnect = false; // if we ask for closing connection, dont reconnect automatically autoReconnect = false; // if we ask for closing connection, dont reconnect automatically
WebSocket ws = socket.get(); socket.get().sendClose(code, reason).join();
if (ws != null)
ws.sendClose(code, reason).join();
} }
} }

View File

@ -5,15 +5,12 @@ import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter; import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.time.Duration; import java.time.Duration;
/** /**
* Minimal implementation of a Websocket server endpoint using the Jetty Websocket API. * Minimal implementation of a Websocket server endpoint using the Jetty Websocket API.
*/ */
public abstract class AbstractServerWS extends WebSocketAdapter implements AbstractWS { public abstract class AbstractServerWS extends WebSocketAdapter implements AbstractWS {
private boolean isClosed = false;
@Override @Override
public final void onWebSocketConnect(Session sess) public final void onWebSocketConnect(Session sess)
@ -40,8 +37,6 @@ public abstract class AbstractServerWS extends WebSocketAdapter implements Abstr
@Override @Override
public final void onWebSocketError(Throwable cause) { public final void onWebSocketError(Throwable cause) {
if (isClosed && cause instanceof ClosedChannelException)
return; // ignore because this exception is expected when we just sent a close packet.
onError(cause); onError(cause);
} }
@ -58,7 +53,6 @@ public abstract class AbstractServerWS extends WebSocketAdapter implements Abstr
@Override @Override
public final void sendClose(int code, String reason) throws IOException { public final void sendClose(int code, String reason) throws IOException {
getSession().close(code, reason); getSession().close(code, reason);
isClosed = true;
} }
@Override @Override

View File

@ -32,26 +32,15 @@ public interface AbstractWS {
return; return;
} }
if (payload instanceof ErrorPayload errorPayload) { try {
try { onReceivePayload(payload);
onReceiveErrorPayload(errorPayload); } catch (Throwable t) {
} catch(Exception e) { trySendAsJson(new ErrorPayload("Error handling payload: " + t));
logError("Error while handling received error payload", e); if (t instanceof Exception)
} logError("Error handling payload", t);
else
throw t;
} }
else {
try {
onReceivePayload(payload);
} catch (Throwable t) {
trySendAsJson(new ErrorPayload("Error while handling your payload: " + t));
if (t instanceof Exception)
logError("Error while handling received payload", t);
else
throw t;
}
}
} }
/** /**
@ -73,15 +62,6 @@ public interface AbstractWS {
*/ */
void onReceivePayload(Payload payload); void onReceivePayload(Payload payload);
/**
* Called on reception of a valid {@link ErrorPayload}.
* @param error the received {@link ErrorPayload}.
* @implNote default implementation will log the received error.
*/
default void onReceiveErrorPayload(ErrorPayload error) {
logError("Received the following error from the remote side: " + error.message, error.throwable);
}
/** /**
* Called on reception of a websocket Close packet. * Called on reception of a websocket Close packet.
* The connection is closed after this method call. * The connection is closed after this method call.

View File

@ -49,12 +49,12 @@ public class PayloadRegistry {
public static Payload fromString(String message) { public static Payload fromString(String message) {
String[] split = message.split(Pattern.quote(PAYLOAD_TYPE_SEPARATOR), 2); String[] split = message.split(Pattern.quote(PAYLOAD_TYPE_SEPARATOR), 2);
if (split.length != 2) { if (split.length != 2) {
throw new IllegalArgumentException("Malformed message: does not respect format <type>" + PAYLOAD_TYPE_SEPARATOR + "<jsonObject>."); throw new IllegalArgumentException("Malformed message: does not respect format '<type>" + PAYLOAD_TYPE_SEPARATOR + "<jsonObject>'.");
} }
Class<? extends Payload> detectedClass = payloadClasses.get(split[0]); Class<? extends Payload> detectedClass = payloadClasses.get(split[0]);
if (detectedClass == null) { if (detectedClass == null) {
throw new IllegalArgumentException("Unrecognized data type " + split[0] + "."); throw new IllegalArgumentException("Unrecognized data type '" + split[0] + "'.");
} }
try { try {