Implements websocket client auto-reconnect

This commit is contained in:
Marc Baloup 2023-03-19 23:30:36 +01:00
parent 2bc60df11c
commit 46653f06ff

View File

@ -1,6 +1,5 @@
package fr.pandacube.lib.ws.client; package fr.pandacube.lib.ws.client;
import fr.pandacube.lib.util.Log;
import fr.pandacube.lib.util.ThrowableUtil; import fr.pandacube.lib.util.ThrowableUtil;
import fr.pandacube.lib.ws.AbstractWS; import fr.pandacube.lib.ws.AbstractWS;
@ -16,7 +15,6 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
// TODO implement auto-reconnect
/** /**
* Minimal implementation of a Websocket client endpoint using the java.net.http Websocket API. * Minimal implementation of a Websocket client endpoint using the java.net.http Websocket API.
*/ */
@ -24,6 +22,7 @@ public abstract class AbstractClientWS implements AbstractWS {
private final URI uri; private final URI uri;
private boolean autoReconnect; private boolean autoReconnect;
private boolean isConnecting;
private final AtomicReference<WebSocket> socket = new AtomicReference<>(); private final AtomicReference<WebSocket> socket = new AtomicReference<>();
@ -48,14 +47,27 @@ public abstract class AbstractClientWS implements AbstractWS {
@Override @Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) { public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
try {
AbstractClientWS.this.onClose(statusCode, reason); AbstractClientWS.this.onClose(statusCode, reason);
return Listener.super.onClose(webSocket, statusCode, reason); } finally {
synchronized (socket) {
socket.set(null);
reconnectIfNecessary();
}
}
return null;
} }
@Override @Override
public void onError(WebSocket webSocket, Throwable error) { public void onError(WebSocket webSocket, Throwable error) {
try {
AbstractClientWS.this.onError(error); AbstractClientWS.this.onError(error);
Listener.super.onError(webSocket, error); } finally {
synchronized (socket) {
socket.set(null);
reconnectIfNecessary();
}
}
} }
}; };
@ -69,21 +81,40 @@ public abstract class AbstractClientWS implements AbstractWS {
public AbstractClientWS(String uri, boolean autoReconnect) throws URISyntaxException { public AbstractClientWS(String uri, boolean autoReconnect) throws URISyntaxException {
this.uri = new URI(uri); this.uri = new URI(uri);
this.autoReconnect = autoReconnect; this.autoReconnect = autoReconnect;
if (autoReconnect) {
Log.warning("Websocket client auto-reconnect is not yet implemented.");
}
connect(); connect();
} }
private void reconnectIfNecessary() {
synchronized (socket) {
if (autoReconnect && !isConnecting && socket.get() == null) {
connect();
}
}
}
private void connect() { private void connect() {
synchronized (socket) { synchronized (socket) {
socket.set(HttpClient.newHttpClient() isConnecting = true;
HttpClient.newHttpClient()
.newWebSocketBuilder() .newWebSocketBuilder()
.connectTimeout(Duration.ofSeconds(2)) .connectTimeout(Duration.ofSeconds(5))
.buildAsync(uri, receiveListener) .buildAsync(uri, receiveListener)
.join() .whenCompleteAsync((ws, ex) -> {
); synchronized (socket) {
isConnecting = false;
if (ws != null) {
socket.set(ws);
}
}
if (ex instanceof IOException) {
reconnectIfNecessary();
}
else {
logError("Error connecting (not trying to reconnect even if asked): ", ex);
}
});
} }
} }
@ -92,7 +123,11 @@ public abstract class AbstractClientWS implements AbstractWS {
public final void sendString(String message) throws IOException { public final void sendString(String message) throws IOException {
try { try {
synchronized (socket) { synchronized (socket) {
socket.get().sendText(message, true).join(); WebSocket ws = socket.get();
if (ws != null)
ws.sendText(message, true).join();
else
throw new IOException("Connection is currently closed");
} }
} catch (CompletionException ce) { } catch (CompletionException ce) {
if (ce.getCause() instanceof IOException ioe) if (ce.getCause() instanceof IOException ioe)
@ -110,7 +145,9 @@ public abstract class AbstractClientWS implements AbstractWS {
public final void sendClose(int code, String reason) throws IOException { public final void sendClose(int code, String reason) throws IOException {
synchronized (socket) { synchronized (socket) {
autoReconnect = false; // if we ask for closing connection, dont reconnect automatically autoReconnect = false; // if we ask for closing connection, dont reconnect automatically
socket.get().sendClose(code, reason).join(); WebSocket ws = socket.get();
if (ws != null)
ws.sendClose(code, reason).join();
} }
} }