diff --git a/pandalib-ws-client/src/main/java/fr/pandacube/lib/ws/client/AbstractClientWS.java b/pandalib-ws-client/src/main/java/fr/pandacube/lib/ws/client/AbstractClientWS.java index c1e7172..e39853d 100644 --- a/pandalib-ws-client/src/main/java/fr/pandacube/lib/ws/client/AbstractClientWS.java +++ b/pandalib-ws-client/src/main/java/fr/pandacube/lib/ws/client/AbstractClientWS.java @@ -1,6 +1,5 @@ package fr.pandacube.lib.ws.client; -import fr.pandacube.lib.util.Log; import fr.pandacube.lib.util.ThrowableUtil; import fr.pandacube.lib.ws.AbstractWS; @@ -16,7 +15,6 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicReference; -// TODO implement auto-reconnect /** * Minimal implementation of a Websocket client endpoint using the java.net.http Websocket API. */ @@ -24,6 +22,7 @@ public abstract class AbstractClientWS implements AbstractWS { private final URI uri; private boolean autoReconnect; + private boolean isConnecting; private final AtomicReference socket = new AtomicReference<>(); @@ -48,14 +47,27 @@ public abstract class AbstractClientWS implements AbstractWS { @Override public CompletionStage onClose(WebSocket webSocket, int statusCode, String reason) { - AbstractClientWS.this.onClose(statusCode, reason); - return Listener.super.onClose(webSocket, statusCode, reason); + try { + AbstractClientWS.this.onClose(statusCode, reason); + } finally { + synchronized (socket) { + socket.set(null); + reconnectIfNecessary(); + } + } + return null; } @Override public void onError(WebSocket webSocket, Throwable error) { - AbstractClientWS.this.onError(error); - Listener.super.onError(webSocket, error); + try { + AbstractClientWS.this.onError(error); + } finally { + synchronized (socket) { + socket.set(null); + reconnectIfNecessary(); + } + } } }; @@ -69,21 +81,40 @@ public abstract class AbstractClientWS implements AbstractWS { public AbstractClientWS(String uri, boolean autoReconnect) throws URISyntaxException { this.uri = new URI(uri); this.autoReconnect = autoReconnect; - if (autoReconnect) { - Log.warning("Websocket client auto-reconnect is not yet implemented."); - } connect(); } + private void reconnectIfNecessary() { + synchronized (socket) { + if (autoReconnect && !isConnecting && socket.get() == null) { + connect(); + } + } + } + + private void connect() { synchronized (socket) { - socket.set(HttpClient.newHttpClient() + isConnecting = true; + HttpClient.newHttpClient() .newWebSocketBuilder() - .connectTimeout(Duration.ofSeconds(2)) + .connectTimeout(Duration.ofSeconds(5)) .buildAsync(uri, receiveListener) - .join() - ); + .whenCompleteAsync((ws, ex) -> { + synchronized (socket) { + isConnecting = false; + if (ws != null) { + socket.set(ws); + } + } + if (ex instanceof IOException) { + reconnectIfNecessary(); + } + else { + logError("Error connecting (not trying to reconnect even if asked): ", ex); + } + }); } } @@ -92,7 +123,11 @@ public abstract class AbstractClientWS implements AbstractWS { public final void sendString(String message) throws IOException { try { synchronized (socket) { - socket.get().sendText(message, true).join(); + WebSocket ws = socket.get(); + if (ws != null) + ws.sendText(message, true).join(); + else + throw new IOException("Connection is currently closed"); } } catch (CompletionException ce) { if (ce.getCause() instanceof IOException ioe) @@ -110,7 +145,9 @@ public abstract class AbstractClientWS implements AbstractWS { public final void sendClose(int code, String reason) throws IOException { synchronized (socket) { autoReconnect = false; // if we ask for closing connection, dont reconnect automatically - socket.get().sendClose(code, reason).join(); + WebSocket ws = socket.get(); + if (ws != null) + ws.sendClose(code, reason).join(); } }