Compare commits
3 Commits
5b40c4aabb
...
46653f06ff
Author | SHA1 | Date | |
---|---|---|---|
46653f06ff | |||
2bc60df11c | |||
e4a5bf0eac |
@ -1,6 +1,5 @@
|
|||||||
package fr.pandacube.lib.ws.client;
|
package fr.pandacube.lib.ws.client;
|
||||||
|
|
||||||
import fr.pandacube.lib.util.Log;
|
|
||||||
import fr.pandacube.lib.util.ThrowableUtil;
|
import fr.pandacube.lib.util.ThrowableUtil;
|
||||||
import fr.pandacube.lib.ws.AbstractWS;
|
import fr.pandacube.lib.ws.AbstractWS;
|
||||||
|
|
||||||
@ -16,7 +15,6 @@ import java.util.concurrent.CompletionException;
|
|||||||
import java.util.concurrent.CompletionStage;
|
import java.util.concurrent.CompletionStage;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
// TODO implement auto-reconnect
|
|
||||||
/**
|
/**
|
||||||
* Minimal implementation of a Websocket client endpoint using the java.net.http Websocket API.
|
* Minimal implementation of a Websocket client endpoint using the java.net.http Websocket API.
|
||||||
*/
|
*/
|
||||||
@ -24,6 +22,7 @@ public abstract class AbstractClientWS implements AbstractWS {
|
|||||||
|
|
||||||
private final URI uri;
|
private final URI uri;
|
||||||
private boolean autoReconnect;
|
private boolean autoReconnect;
|
||||||
|
private boolean isConnecting;
|
||||||
private final AtomicReference<WebSocket> socket = new AtomicReference<>();
|
private final AtomicReference<WebSocket> socket = new AtomicReference<>();
|
||||||
|
|
||||||
|
|
||||||
@ -48,14 +47,27 @@ public abstract class AbstractClientWS implements AbstractWS {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
|
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
|
||||||
|
try {
|
||||||
AbstractClientWS.this.onClose(statusCode, reason);
|
AbstractClientWS.this.onClose(statusCode, reason);
|
||||||
return Listener.super.onClose(webSocket, statusCode, reason);
|
} finally {
|
||||||
|
synchronized (socket) {
|
||||||
|
socket.set(null);
|
||||||
|
reconnectIfNecessary();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onError(WebSocket webSocket, Throwable error) {
|
public void onError(WebSocket webSocket, Throwable error) {
|
||||||
|
try {
|
||||||
AbstractClientWS.this.onError(error);
|
AbstractClientWS.this.onError(error);
|
||||||
Listener.super.onError(webSocket, error);
|
} finally {
|
||||||
|
synchronized (socket) {
|
||||||
|
socket.set(null);
|
||||||
|
reconnectIfNecessary();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -69,21 +81,40 @@ public abstract class AbstractClientWS implements AbstractWS {
|
|||||||
public AbstractClientWS(String uri, boolean autoReconnect) throws URISyntaxException {
|
public AbstractClientWS(String uri, boolean autoReconnect) throws URISyntaxException {
|
||||||
this.uri = new URI(uri);
|
this.uri = new URI(uri);
|
||||||
this.autoReconnect = autoReconnect;
|
this.autoReconnect = autoReconnect;
|
||||||
if (autoReconnect) {
|
|
||||||
Log.warning("Websocket client auto-reconnect is not yet implemented.");
|
|
||||||
}
|
|
||||||
connect();
|
connect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void reconnectIfNecessary() {
|
||||||
|
synchronized (socket) {
|
||||||
|
if (autoReconnect && !isConnecting && socket.get() == null) {
|
||||||
|
connect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private void connect() {
|
private void connect() {
|
||||||
synchronized (socket) {
|
synchronized (socket) {
|
||||||
socket.set(HttpClient.newHttpClient()
|
isConnecting = true;
|
||||||
|
HttpClient.newHttpClient()
|
||||||
.newWebSocketBuilder()
|
.newWebSocketBuilder()
|
||||||
.connectTimeout(Duration.ofSeconds(2))
|
.connectTimeout(Duration.ofSeconds(5))
|
||||||
.buildAsync(uri, receiveListener)
|
.buildAsync(uri, receiveListener)
|
||||||
.join()
|
.whenCompleteAsync((ws, ex) -> {
|
||||||
);
|
synchronized (socket) {
|
||||||
|
isConnecting = false;
|
||||||
|
if (ws != null) {
|
||||||
|
socket.set(ws);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (ex instanceof IOException) {
|
||||||
|
reconnectIfNecessary();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
logError("Error connecting (not trying to reconnect even if asked): ", ex);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,7 +123,11 @@ public abstract class AbstractClientWS implements AbstractWS {
|
|||||||
public final void sendString(String message) throws IOException {
|
public final void sendString(String message) throws IOException {
|
||||||
try {
|
try {
|
||||||
synchronized (socket) {
|
synchronized (socket) {
|
||||||
socket.get().sendText(message, true).join();
|
WebSocket ws = socket.get();
|
||||||
|
if (ws != null)
|
||||||
|
ws.sendText(message, true).join();
|
||||||
|
else
|
||||||
|
throw new IOException("Connection is currently closed");
|
||||||
}
|
}
|
||||||
} catch (CompletionException ce) {
|
} catch (CompletionException ce) {
|
||||||
if (ce.getCause() instanceof IOException ioe)
|
if (ce.getCause() instanceof IOException ioe)
|
||||||
@ -110,7 +145,9 @@ public abstract class AbstractClientWS implements AbstractWS {
|
|||||||
public final void sendClose(int code, String reason) throws IOException {
|
public final void sendClose(int code, String reason) throws IOException {
|
||||||
synchronized (socket) {
|
synchronized (socket) {
|
||||||
autoReconnect = false; // if we ask for closing connection, dont reconnect automatically
|
autoReconnect = false; // if we ask for closing connection, dont reconnect automatically
|
||||||
socket.get().sendClose(code, reason).join();
|
WebSocket ws = socket.get();
|
||||||
|
if (ws != null)
|
||||||
|
ws.sendClose(code, reason).join();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,6 +5,7 @@ import org.eclipse.jetty.websocket.api.Session;
|
|||||||
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
|
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -12,6 +13,8 @@ import java.time.Duration;
|
|||||||
*/
|
*/
|
||||||
public abstract class AbstractServerWS extends WebSocketAdapter implements AbstractWS {
|
public abstract class AbstractServerWS extends WebSocketAdapter implements AbstractWS {
|
||||||
|
|
||||||
|
private boolean isClosed = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void onWebSocketConnect(Session sess)
|
public final void onWebSocketConnect(Session sess)
|
||||||
{
|
{
|
||||||
@ -37,6 +40,8 @@ public abstract class AbstractServerWS extends WebSocketAdapter implements Abstr
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void onWebSocketError(Throwable cause) {
|
public final void onWebSocketError(Throwable cause) {
|
||||||
|
if (isClosed && cause instanceof ClosedChannelException)
|
||||||
|
return; // ignore because this exception is expected when we just sent a close packet.
|
||||||
onError(cause);
|
onError(cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -53,6 +58,7 @@ public abstract class AbstractServerWS extends WebSocketAdapter implements Abstr
|
|||||||
@Override
|
@Override
|
||||||
public final void sendClose(int code, String reason) throws IOException {
|
public final void sendClose(int code, String reason) throws IOException {
|
||||||
getSession().close(code, reason);
|
getSession().close(code, reason);
|
||||||
|
isClosed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -32,17 +32,28 @@ public interface AbstractWS {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (payload instanceof ErrorPayload errorPayload) {
|
||||||
|
try {
|
||||||
|
onReceiveErrorPayload(errorPayload);
|
||||||
|
} catch(Exception e) {
|
||||||
|
logError("Error while handling received error payload", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
try {
|
try {
|
||||||
onReceivePayload(payload);
|
onReceivePayload(payload);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
trySendAsJson(new ErrorPayload("Error handling payload: " + t));
|
trySendAsJson(new ErrorPayload("Error while handling your payload: " + t));
|
||||||
if (t instanceof Exception)
|
if (t instanceof Exception)
|
||||||
logError("Error handling payload", t);
|
logError("Error while handling received payload", t);
|
||||||
else
|
else
|
||||||
throw t;
|
throw t;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handles the reception of binary data. The default implementation reject any binary data by sending an
|
* Handles the reception of binary data. The default implementation reject any binary data by sending an
|
||||||
* {@link ErrorPayload} to the remote endpoint.
|
* {@link ErrorPayload} to the remote endpoint.
|
||||||
@ -62,6 +73,15 @@ public interface AbstractWS {
|
|||||||
*/
|
*/
|
||||||
void onReceivePayload(Payload payload);
|
void onReceivePayload(Payload payload);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called on reception of a valid {@link ErrorPayload}.
|
||||||
|
* @param error the received {@link ErrorPayload}.
|
||||||
|
* @implNote default implementation will log the received error.
|
||||||
|
*/
|
||||||
|
default void onReceiveErrorPayload(ErrorPayload error) {
|
||||||
|
logError("Received the following error from the remote side: " + error.message, error.throwable);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called on reception of a websocket Close packet.
|
* Called on reception of a websocket Close packet.
|
||||||
* The connection is closed after this method call.
|
* The connection is closed after this method call.
|
||||||
|
@ -49,12 +49,12 @@ public class PayloadRegistry {
|
|||||||
public static Payload fromString(String message) {
|
public static Payload fromString(String message) {
|
||||||
String[] split = message.split(Pattern.quote(PAYLOAD_TYPE_SEPARATOR), 2);
|
String[] split = message.split(Pattern.quote(PAYLOAD_TYPE_SEPARATOR), 2);
|
||||||
if (split.length != 2) {
|
if (split.length != 2) {
|
||||||
throw new IllegalArgumentException("Malformed message: does not respect format '<type>" + PAYLOAD_TYPE_SEPARATOR + "<jsonObject>'.");
|
throw new IllegalArgumentException("Malformed message: does not respect format ‘<type>" + PAYLOAD_TYPE_SEPARATOR + "<jsonObject>’.");
|
||||||
}
|
}
|
||||||
|
|
||||||
Class<? extends Payload> detectedClass = payloadClasses.get(split[0]);
|
Class<? extends Payload> detectedClass = payloadClasses.get(split[0]);
|
||||||
if (detectedClass == null) {
|
if (detectedClass == null) {
|
||||||
throw new IllegalArgumentException("Unrecognized data type '" + split[0] + "'.");
|
throw new IllegalArgumentException("Unrecognized data type ‘" + split[0] + "’.");
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
Loading…
Reference in New Issue
Block a user