From e12cf486502559962701f6e26740d41db5c8226a Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Tue, 2 Sep 2025 21:29:35 -0400 Subject: [PATCH 1/4] websocket plugin --- avaje-jex-websockets/pom.xml | 19 + .../io/avaje/jex/websocket/JexWebSocket.java | 53 +++ .../io/avaje/jex/websocket/WebSocket.java | 47 +++ .../websocket/WebSocketExchangeHandler.java | 19 + .../avaje/jex/websocket/WebSocketFrame.java | 110 +++++ .../jex/websocket/WebSocketListener.java | 56 +++ .../avaje/jex/websocket/WebSocketPlugin.java | 28 ++ .../io/avaje/jex/websocket/WsContext.java | 185 +++++++++ .../jex/websocket/exception/CloseCode.java | 35 ++ .../exception/WebSocketException.java | 32 ++ .../websocket/internal/AbstractWebSocket.java | 262 ++++++++++++ .../jex/websocket/internal/CloseFrame.java | 44 ++ .../avaje/jex/websocket/internal/LICENSE.md | 14 + .../avaje/jex/websocket/internal/State.java | 42 ++ .../io/avaje/jex/websocket/internal/Util.java | 67 +++ .../avaje/jex/websocket/internal/WSFrame.java | 390 ++++++++++++++++++ .../websocket/internal/WebSocketHandler.java | 70 ++++ .../src/main/java/module-info.java | 23 ++ .../internal/EchoWebSocketHandler.java | 20 + .../internal/WebSocketClientUtil.java | 72 ++++ .../jex/websocket/internal/WebSocketTest.java | 76 ++++ .../java/io/avaje/jex/core/JdkContext.java | 9 + .../jex/core/json/JacksonJsonService.java | 16 +- .../avaje/jex/core/json/JsonbJsonService.java | 5 + .../main/java/io/avaje/jex/http/Context.java | 8 + .../java/io/avaje/jex/spi/JsonService.java | 11 +- pom.xml | 1 + 27 files changed, 1711 insertions(+), 3 deletions(-) create mode 100644 avaje-jex-websockets/pom.xml create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/JexWebSocket.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocket.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketExchangeHandler.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/WebSocketException.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/CloseFrame.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/LICENSE.md create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/State.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WebSocketHandler.java create mode 100644 avaje-jex-websockets/src/main/java/module-info.java create mode 100644 avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java create mode 100644 avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java create mode 100644 avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java diff --git a/avaje-jex-websockets/pom.xml b/avaje-jex-websockets/pom.xml new file mode 100644 index 00000000..eaf440a2 --- /dev/null +++ b/avaje-jex-websockets/pom.xml @@ -0,0 +1,19 @@ + + 4.0.0 + + io.avaje + avaje-jex-parent + 3.3-RC4 + + avaje-jex-websockets + + io.avaje + avaje-jex + + + io.avaje + avaje-jex-test + test + + + \ No newline at end of file diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/JexWebSocket.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/JexWebSocket.java new file mode 100644 index 00000000..649dfa63 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/JexWebSocket.java @@ -0,0 +1,53 @@ +package io.avaje.jex.websocket; + +import io.avaje.jex.http.Context; +import io.avaje.jex.websocket.WsContext.WsBinaryMessage; +import io.avaje.jex.websocket.WsContext.WsClose; +import io.avaje.jex.websocket.WsContext.WsError; +import io.avaje.jex.websocket.WsContext.WsMessage; +import io.avaje.jex.websocket.WsContext.WsOpen; +import io.avaje.jex.websocket.WsContext.WsPong; +import io.avaje.jex.websocket.exception.CloseCode; +import io.avaje.jex.websocket.internal.AbstractWebSocket; + +class JexWebSocket extends AbstractWebSocket { + + private final WebSocketListener listener; + private final Context ctx; + + JexWebSocket(Context ctx, WebSocketListener listener) { + super(ctx.exchange()); + this.listener = listener; + this.ctx = ctx; + } + + @Override + protected void onClose(CloseCode code, String reason, boolean initiatedByRemote) { + listener.onClose(new WsClose(ctx, this, code, reason, initiatedByRemote)); + } + + @Override + protected void onMessage(WebSocketFrame frame) { + switch (frame.opCode()) { + case TEXT -> listener.onMessage(new WsMessage(ctx, this, frame, frame.textPayload())); + case BINARY -> + listener.onBinaryMessage(new WsBinaryMessage(ctx, this, frame, frame.binaryPayload())); + default -> throw new IllegalArgumentException("Unexpected value: "); + } + } + + @Override + protected void onPong(WebSocketFrame pong) { + listener.onPong(new WsPong(ctx, this, pong)); + } + + @Override + protected void onOpen() { + listener.onOpen(new WsOpen(ctx, this)); + } + + @Override + protected void onError(Exception exception) { + listener.onError(new WsError(ctx, this, exception)); + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocket.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocket.java new file mode 100644 index 00000000..e8948c39 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocket.java @@ -0,0 +1,47 @@ +package io.avaje.jex.websocket; + +import io.avaje.jex.websocket.exception.CloseCode; + +/** + * Represents a WebSocket connection, providing methods to interact with the connection. + * Allows sending and receiving messages, pinging, and closing the connection. + */ +public interface WebSocket { + + /** + * Checks if the WebSocket connection is open. + * + * @return true if the connection is open, false otherwise + */ + boolean isOpen(); + + /** + * Closes the WebSocket connection with the specified close code and reason. + * + * @param code the close code indicating the reason for closure + * @param reason the reason for closing the connection + * @param initiatedByRemote true if the close was initiated by the remote endpoint, false otherwise + */ + void close(CloseCode code, String reason, boolean initiatedByRemote); + + /** + * Sends a ping frame with the specified payload to the remote endpoint. + * + * @param payload the ping payload as a byte array + */ + void ping(byte[] payload); + + /** + * Sends a binary message to the remote endpoint. + * + * @param payload the binary payload as a byte array + */ + void send(byte[] payload); + + /** + * Sends a text message to the remote endpoint. + * + * @param payload the text payload as a string + */ + void send(String payload); +} \ No newline at end of file diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketExchangeHandler.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketExchangeHandler.java new file mode 100644 index 00000000..fd7e3b66 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketExchangeHandler.java @@ -0,0 +1,19 @@ +package io.avaje.jex.websocket; + +import io.avaje.jex.http.Context; +import io.avaje.jex.websocket.internal.WebSocketHandler; + +class WebSocketExchangeHandler extends WebSocketHandler { + + private final WebSocketListener listener; + + WebSocketExchangeHandler(WebSocketListener listener) { + this.listener = listener; + } + + @Override + protected JexWebSocket openWebSocket(Context exchange) { + + return new JexWebSocket(exchange, listener); + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java new file mode 100644 index 00000000..39b57f10 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java @@ -0,0 +1,110 @@ +package io.avaje.jex.websocket; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Represents a WebSocket frame as defined by RFC 6455. Provides access to frame payload, masking, + * opcode, and frame control information. + */ +public interface WebSocketFrame { + + /** + * Returns the binary payload of the frame. + * + * @return the binary payload as a byte array + */ + byte[] binaryPayload(); + + /** + * Returns the masking key used for the frame, if present. + * + * @return the masking key as a byte array, or null if not masked + */ + byte[] maskingKey(); + + /** + * Returns the opcode of the frame. + * + * @return the opcode + */ + OpCode opCode(); + + /** + * Returns the text payload of the frame, if applicable. + * + * @return the text payload, or null if not a text frame + */ + String textPayload(); + + /** + * Indicates if this frame is the final fragment in a message. + * + * @return true if final fragment, false otherwise + */ + boolean isFin(); + + /** + * Indicates if the frame is masked. + * + * @return true if masked, false otherwise + */ + boolean isMasked(); + + /** + * Writes the frame to the given output stream in WebSocket frame format. + * + * @param out the output stream to write to + * @throws IOException if an I/O error occurs + */ + void write(OutputStream out) throws IOException; + + /** WebSocket opcodes */ + public enum OpCode { + CONTINUATION(0), + TEXT(1), + BINARY(2), + CLOSE(8), + PING(9), + PONG(10); + + private final byte code; + private static final Map VALUES = + Arrays.stream(values()).collect(Collectors.toMap(OpCode::value, e -> e)); + + OpCode(int code) { + this.code = (byte) code; + } + + /** + * Finds the OpCode corresponding to the given byte value. + * + * @param value the opcode value + * @return the matching OpCode, or null if not found + */ + public static OpCode find(byte value) { + return VALUES.get(value); + } + + /** + * Returns the byte value of this opcode. + * + * @return the opcode value + */ + public byte value() { + return this.code; + } + + /** + * Indicates if this opcode is a control frame (close, ping, pong). + * + * @return true if control frame, false otherwise + */ + public boolean isControlFrame() { + return this == CLOSE || this == PING || this == PONG; + } + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java new file mode 100644 index 00000000..9b6870f7 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java @@ -0,0 +1,56 @@ +package io.avaje.jex.websocket; + +import io.avaje.jex.websocket.WsContext.WsBinaryMessage; +import io.avaje.jex.websocket.WsContext.WsClose; +import io.avaje.jex.websocket.WsContext.WsError; +import io.avaje.jex.websocket.WsContext.WsMessage; +import io.avaje.jex.websocket.WsContext.WsOpen; +import io.avaje.jex.websocket.WsContext.WsPong; + +/** + * Holds the different WebSocket handlers for a specific {@link WsHandlerEntry} or the WebSocket + * log. + */ +public interface WebSocketListener { + /** + * Called when a binary message is received. + * + * @param binaryPayload the binary payload + */ + default void onBinaryMessage(WsBinaryMessage binaryPayload) {} + + /** + * Called when the websocket is closed. + * + * @param wsClose the close context + */ + default void onClose(WsClose wsClose) {} + + /** + * Called when a text message is received. + * + * @param message the text message + */ + default void onMessage(WsMessage message) {} + + /** + * Called when the websocket is opened. + * + * @param wsOpenContext the open context + */ + default void onOpen(WsOpen wsOpen) {} + + /** + * Called when a pong is received. + * + * @param pong the pong + */ + default void onPong(WsPong wsPong) {} + + /** + * Called when an error occurs. + * + * @param wsError the error + */ + default void onError(WsError wsError) {} +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java new file mode 100644 index 00000000..105feabf --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java @@ -0,0 +1,28 @@ +package io.avaje.jex.websocket; + +import java.util.ArrayList; +import java.util.List; + +import io.avaje.jex.Jex; +import io.avaje.jex.Routing; +import io.avaje.jex.security.Role; +import io.avaje.jex.spi.JexPlugin; + +public class WebSocketPlugin implements JexPlugin { + + private final List handlers = new ArrayList<>(); + + public WebSocketPlugin ws(String path, WebSocketListener listener, Role... roles) { + handlers.add(r -> r.get(path, new WebSocketExchangeHandler(listener), roles)); + return this; + } + + @Override + public void apply(Jex jex) { + jex.routing().addAll(handlers); + } + + public static WebSocketPlugin create() { + return new WebSocketPlugin(); + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java new file mode 100644 index 00000000..52f22a42 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java @@ -0,0 +1,185 @@ +package io.avaje.jex.websocket; + +import java.lang.reflect.Type; + +import io.avaje.jex.http.Context; +import io.avaje.jex.websocket.exception.CloseCode; + +/** The context for a WebSocket event */ +public abstract sealed class WsContext { + + protected final Context ctx; + private final WebSocket ws; + + protected WsContext(Context ctx, WebSocket ws) { + this.ctx = ctx; + this.ws = ws; + } + + /** + * Serializes object to a JSON-string using the registered JsonMapper and sends it over the socket + */ + public void send(Object message) { + ws.send(ctx.jsonService().toJsonString(message)); + } + + /** Sends a String over the socket */ + public void send(String message) { + ws.send(message); + } + + /** Sends a byte[] over the socket */ + public void send(byte[] message) { + ws.send(message); + } + + /** Sends a ping over the socket */ + public void sendPing() { + sendPing(null); + } + + /** Sends a ping over the socket */ + public void sendPing(byte[] applicationData) { + ws.ping(applicationData != null ? applicationData : new byte[0]); + } + + /** + * Return the request Context. + * + * @return the request + */ + public Context ctx() { + return ctx; + } + + /** + * Return the Websocket. + * + * @return the request + */ + public WebSocket ws() { + return ws; + } + + /** Close the session */ + public void closeSession() { + ws.close(CloseCode.NORMAL_CLOSURE, "cya", false); + } + + /** Close the session with a CloseCode */ + public void closeSession(CloseCode code) { + ws.close(code, "", false); + } + + /** Close the session with a code and reason */ + public void closeSession(CloseCode code, String reason) { + ws.close(code, reason, false); + } + + public static final class WsOpen extends WsContext { + WsOpen(Context ctx, WebSocket ws) { + super(ctx, ws); + } + } + + public static final class WsPong extends WsMessageCtx { + WsPong(Context ctx, WebSocket ws, WebSocketFrame wsFrame) { + super(ctx, ws, wsFrame); + } + } + + public static final class WsError extends WsContext { + private final Exception error; + + WsError(Context ctx, WebSocket ws, Exception error) { + super(ctx, ws); + this.error = error; + } + + /** Get the Throwable error that occurred */ + public Exception error() { + return error; + } + } + + public static final class WsClose extends WsContext { + private final CloseCode closeCode; + private final String reason; + private final boolean initiatedByRemote; + + WsClose( + Context ctx, WebSocket ws, CloseCode closeCode, String reason, boolean initiatedByRemote) { + super(ctx, ws); + this.closeCode = closeCode; + this.reason = reason; + this.initiatedByRemote = initiatedByRemote; + } + + /** The int status for why connection was closed */ + public CloseCode closeCode() { + return closeCode; + } + + /** The reason for the close */ + public String reason() { + return reason; + } + + /** True if the close was initiated by the remote endpoint */ + public boolean initiatedByRemote() { + return initiatedByRemote; + } + } + + public abstract static sealed class WsMessageCtx extends WsContext { + private final WebSocketFrame wsFrame; + + WsMessageCtx(Context ctx, WebSocket ws, WebSocketFrame wsFrame) { + super(ctx, ws); + this.wsFrame = wsFrame; + } + + /** Get the underlying frame */ + public WebSocketFrame wsFrame() { + return wsFrame; + } + } + + public static final class WsBinaryMessage extends WsMessageCtx { + private final byte[] data; + + WsBinaryMessage(Context ctx, WebSocket ws, WebSocketFrame wsFrame, byte[] data) { + super(ctx, ws, wsFrame); + this.data = data; + } + + /** Get the binary data of the message */ + public byte[] data() { + return data; + } + } + + public static final class WsMessage extends WsMessageCtx { + private final String message; + + WsMessage(Context ctx, WebSocket ws, WebSocketFrame frame, String message) { + super(ctx, ws, frame); + this.message = message; + } + + /** Receive a string message from the client */ + public String message() { + return message; + } + + /** Receive a message from the client as a class */ + public T messageAsClass(Type type) { + return ctx.jsonService().fromJson(type, message); + } + + /** See Also: messageAsClass(Type) */ + public T messageAsClass(Class clazz) { + return messageAsClass((Type) clazz); + } + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java new file mode 100644 index 00000000..4b92320e --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java @@ -0,0 +1,35 @@ +package io.avaje.jex.websocket.exception; + +public enum CloseCode { + NORMAL_CLOSURE(1000), + GOING_AWAY(1001), + PROTOCOL_ERROR(1002), + UNSUPPORTED_DATA(1003), + NO_STATUS_RCVD(1005), + ABNORMAL_CLOSURE(1006), + INVALID_FRAME_PAYLOAD_DATA(1007), + POLICY_VIOLATION(1008), + MESSAGE_TOO_BIG(1009), + MANDATORY_EXT(1010), + INTERNAL_SERVER_ERROR(1011), + TLS_HANDSHAKE(1015); + + public static CloseCode find(int value) { + for (CloseCode code : values()) { + if (code.getValue() == value) { + return code; + } + } + return null; + } + + private final int code; + + CloseCode(int code) { + this.code = code; + } + + public int getValue() { + return this.code; + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/WebSocketException.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/WebSocketException.java new file mode 100644 index 00000000..28002f20 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/WebSocketException.java @@ -0,0 +1,32 @@ +package io.avaje.jex.websocket.exception; + +public class WebSocketException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + private final CloseCode code; + + private final String reason; + + public WebSocketException(CloseCode code, String reason) { + this(code, reason, null); + } + + public WebSocketException(CloseCode code, String reason, Exception cause) { + super(code + ": " + reason, cause); + this.code = code; + this.reason = reason; + } + + public WebSocketException(Exception cause) { + this(CloseCode.INTERNAL_SERVER_ERROR, cause.toString(), cause); + } + + public CloseCode getCode() { + return this.code; + } + + public String getReason() { + return this.reason; + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java new file mode 100644 index 00000000..b84b0b5b --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java @@ -0,0 +1,262 @@ +package io.avaje.jex.websocket.internal; + +import static java.lang.System.Logger.Level.DEBUG; +import static java.lang.System.Logger.Level.INFO; +import static java.lang.System.Logger.Level.TRACE; + +import java.io.EOFException; + +/* + * #%L + * NanoHttpd-Websocket + * %% + * Copyright (C) 2012 - 2016 nanohttpd + * %% + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the nanohttpd nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import com.sun.net.httpserver.HttpExchange; + +import io.avaje.jex.websocket.WebSocket; +import io.avaje.jex.websocket.WebSocketFrame; +import io.avaje.jex.websocket.WebSocketFrame.OpCode; +import io.avaje.jex.websocket.exception.CloseCode; +import io.avaje.jex.websocket.exception.WebSocketException; + +public abstract class AbstractWebSocket implements WebSocket { + + private final List continuousFrames = new LinkedList<>(); + + private OpCode continuousOpCode = null; + private final InputStream in; + private Lock lock = new ReentrantLock(); + protected final System.Logger log = System.getLogger("io.avaje.jex.websocket"); + private final OutputStream out; + private volatile State state = State.UNCONNECTED; + private final URI uri; + + protected AbstractWebSocket(HttpExchange exchange) { + this.uri = exchange.getRequestURI(); + log.log(INFO, "connecting websocket {0}", uri); + this.state = State.CONNECTING; + this.in = exchange.getRequestBody(); + this.out = exchange.getResponseBody(); + } + + @Override + public void close(CloseCode code, String reason, boolean initiatedByRemote) { + log.log(INFO, "closing websocket {0}", uri); + + var oldState = this.state; + this.state = State.CLOSING; + if (oldState == State.OPEN) { + sendFrame(new CloseFrame(code, reason)); + } else { + doClose(code, reason, initiatedByRemote); + } + } + + void doClose(CloseCode code, String reason, boolean initiatedByRemote) { + if (this.state == State.CLOSED) { + return; + } + try (in; out) { + // just close the streams + } catch (IOException expected) { + // Expected + } + this.state = State.CLOSED; + onClose(code, reason, initiatedByRemote); + } + + private void handleCloseFrame(WebSocketFrame frame) { + var code = CloseCode.NORMAL_CLOSURE; + var reason = ""; + if (frame instanceof CloseFrame cf) { + code = cf.getCloseCode(); + reason = cf.getCloseReason(); + } + log.log( + TRACE, + "handleCloseFrame: {0}, code={1}, reason={2}, state {3}", + uri, + code, + reason, + this.state); + if (this.state == State.CLOSING) { + // Answer for my requested close + doClose(code, reason, false); + } else { + close(code, reason, true); + } + } + + private void handleFrameFragment(WebSocketFrame frame) { + if (frame.opCode() != OpCode.CONTINUATION) { + // First + if (this.continuousOpCode != null) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Previous continuous frame sequence not completed."); + } + this.continuousOpCode = frame.opCode(); + this.continuousFrames.clear(); + this.continuousFrames.add(frame); + } else if (frame.isFin()) { + // Last + if (this.continuousOpCode == null) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Continuous frame sequence was not started."); + } + this.continuousFrames.add(frame); + onMessage(new WSFrame(this.continuousOpCode, this.continuousFrames)); + this.continuousOpCode = null; + this.continuousFrames.clear(); + } else if (this.continuousOpCode == null) { + // Unexpected + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Continuous frame sequence was not started."); + } else { + // Intermediate + this.continuousFrames.add(frame); + } + } + + private void handleWebsocketFrame(WebSocketFrame frame) { + onFrameReceived(frame); + if (frame.opCode() == OpCode.CLOSE) { + handleCloseFrame(frame); + } else if (frame.opCode() == OpCode.PING) { + sendFrame(new WSFrame(OpCode.PONG, true, frame.binaryPayload())); + } else if (frame.opCode() == OpCode.PONG) { + onPong(frame); + } else if (!frame.isFin() || frame.opCode() == OpCode.CONTINUATION) { + handleFrameFragment(frame); + } else if (this.continuousOpCode != null) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Continuous frame sequence not completed."); + } else if (frame.opCode() == OpCode.TEXT || frame.opCode() == OpCode.BINARY) { + onMessage(frame); + } else { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Non control or continuous frame expected."); + } + } + + @Override + public boolean isOpen() { + return state == State.OPEN; + } + + protected void onFrameReceived(WebSocketFrame frame) { + log.log(TRACE, "frame received: {0}", frame); + } + + /** + * Debug method. Do not Override unless for debug purposes!
+ * This method is called before actually sending the frame. + * + * @param frame The sent WebSocket Frame. + */ + protected void onFrameSent(WebSocketFrame frame) { + log.log(TRACE, "frame sent: {0}", frame); + } + + protected abstract void onClose(CloseCode code, String reason, boolean initiatedByRemote); + + protected abstract void onError(Exception exception); + + protected abstract void onMessage(WebSocketFrame message) throws WebSocketException; + + protected abstract void onOpen() throws WebSocketException; + + protected abstract void onPong(WebSocketFrame pong) throws WebSocketException; + + @Override + public void ping(byte[] payload) { + sendFrame(new WSFrame(OpCode.PING, true, payload)); + } + + void readWebsocket() { + try { + state = State.OPEN; + log.log(DEBUG, "websocket open {0}", uri); + onOpen(); + while (this.state == State.OPEN) { + handleWebsocketFrame(WSFrame.read(in)); + } + } catch (EOFException e) { + log.log(TRACE, "exception on websocket", e); + onError(e); + doClose(CloseCode.ABNORMAL_CLOSURE, e.toString(), false); + } catch (Exception e) { + onError(e); + if (e instanceof WebSocketException wse) { + doClose(wse.getCode(), wse.getReason(), false); + } else { + doClose(CloseCode.ABNORMAL_CLOSURE, e.toString(), false); + } + } finally { + doClose( + CloseCode.INTERNAL_SERVER_ERROR, + "Handler terminated without closing the connection.", + false); + log.log(TRACE, "readWebsocket() exiting {0}", uri); + } + } + + @Override + public void send(byte[] payload) { + sendFrame(new WSFrame(OpCode.BINARY, true, payload)); + } + + @Override + public void send(String payload) { + sendFrame(new WSFrame(OpCode.TEXT, true, payload)); + } + + public void sendFrame(WebSocketFrame frame) { + lock.lock(); + try { + onFrameSent(frame); + frame.write(this.out); + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + lock.unlock(); + } + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/CloseFrame.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/CloseFrame.java new file mode 100644 index 00000000..a7dadec0 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/CloseFrame.java @@ -0,0 +1,44 @@ +package io.avaje.jex.websocket.internal; + +import io.avaje.jex.websocket.exception.CloseCode; + +public final class CloseFrame extends WSFrame { + + private static byte[] generatePayload(CloseCode code, String closeReason) { + if (code != null) { + var reasonBytes = text2Binary(closeReason); + var payload = new byte[reasonBytes.length + 2]; + payload[0] = (byte) (code.getValue() >> 8 & 0xFF); + payload[1] = (byte) (code.getValue() & 0xFF); + System.arraycopy(reasonBytes, 0, payload, 2, reasonBytes.length); + return payload; + } + return new byte[0]; + } + + private CloseCode closeCode; + + private String closeReason; + + public CloseFrame(CloseCode code, String closeReason) { + super(OpCode.CLOSE, true, generatePayload(code, closeReason)); + } + + CloseFrame(WSFrame wrap) { + super(wrap); + assert wrap.opCode() == OpCode.CLOSE; + if (wrap.binaryPayload().length >= 2) { + this.closeCode = + CloseCode.find((wrap.binaryPayload()[0] & 0xFF) << 8 | wrap.binaryPayload()[1] & 0xFF); + this.closeReason = binary2Text(binaryPayload(), 2, binaryPayload().length - 2); + } + } + + public CloseCode getCloseCode() { + return this.closeCode; + } + + public String getCloseReason() { + return this.closeReason; + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/LICENSE.md b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/LICENSE.md new file mode 100644 index 00000000..4b21b270 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/LICENSE.md @@ -0,0 +1,14 @@ +Most of the code in the websockets package is covered under the following license: + +Copyright (c) 2012-2013 by Paul S. Hawke, 2001,2005-2013 by Jarno Elonen, 2010 by Konstantinos Togias +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. + +* Neither the name of the NanoHttpd organization nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/State.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/State.java new file mode 100644 index 00000000..8b9d7160 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/State.java @@ -0,0 +1,42 @@ +package io.avaje.jex.websocket.internal; + +/* + * #%L + * NanoHttpd-Websocket + * %% + * Copyright (C) 2012 - 2016 nanohttpd + * %% + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the nanohttpd nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +public enum State { + UNCONNECTED, + CONNECTING, + OPEN, + CLOSING, + CLOSED +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java new file mode 100644 index 00000000..6133b464 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java @@ -0,0 +1,67 @@ +package io.avaje.jex.websocket.internal; + +/* + * #%L + * NanoHttpd-Websocket + * %% + * Copyright (C) 2012 - 2016 nanohttpd + * %% + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the nanohttpd nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Base64; + +class Util { + + public static final String HEADER_UPGRADE = "Upgrade"; + + public static final String HEADER_UPGRADE_VALUE = "websocket"; + + public static final String HEADER_CONNECTION = "Connection"; + + public static final String HEADER_WEBSOCKET_VERSION = "sec-websocket-version"; + + public static final String HEADER_WEBSOCKET_VERSION_VALUE = "13"; + + public static final String HEADER_WEBSOCKET_KEY = "sec-websocket-key"; + + public static final String HEADER_WEBSOCKET_ACCEPT = "sec-websocket-accept"; + + public static final String HEADER_WEBSOCKET_PROTOCOL = "sec-websocket-protocol"; + + private static final String WEBSOCKET_KEY_MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + + public static String makeAcceptKey(String key) throws NoSuchAlgorithmException { + var md = MessageDigest.getInstance("SHA-1"); + var text = key + Util.WEBSOCKET_KEY_MAGIC; + md.update(text.getBytes(), 0, text.length()); + var sha1hash = md.digest(); + return Base64.getEncoder().encodeToString(sha1hash); + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java new file mode 100644 index 00000000..2b3218a3 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java @@ -0,0 +1,390 @@ +package io.avaje.jex.websocket.internal; + +/* + * #%L + * NanoHttpd-Websocket + * %% + * Copyright (C) 2012 - 2016 nanohttpd + * %% + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the nanohttpd nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + +import io.avaje.jex.websocket.WebSocketFrame; +import io.avaje.jex.websocket.exception.CloseCode; +import io.avaje.jex.websocket.exception.WebSocketException; + +public sealed class WSFrame implements WebSocketFrame permits CloseFrame { + + static final Charset TEXT_CHARSET = StandardCharsets.UTF_8; + + static String binary2Text(byte[] payload) { + return new String(payload, WSFrame.TEXT_CHARSET); + } + + static String binary2Text(byte[] payload, int offset, int length) { + return new String(payload, offset, length, WSFrame.TEXT_CHARSET); + } + + private static int checkedRead(int read) throws IOException { + if (read < 0) { + throw new EOFException(); + } + return read; + } + + static WSFrame read(InputStream in) throws IOException { + var head = (byte) checkedRead(in.read()); + var fin = (head & 0x80) != 0; + var opCode = OpCode.find((byte) (head & 0x0F)); + if ((head & 0x70) != 0) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, + "The reserved bits (" + Integer.toBinaryString(head & 0x70) + ") must be 0."); + } + if (opCode == null) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, + "Received frame with reserved/unknown opcode " + (head & 0x0F) + "."); + } + if (opCode.isControlFrame() && !fin) { + throw new WebSocketException(CloseCode.PROTOCOL_ERROR, "Fragmented control frame."); + } + + var frame = new WSFrame(opCode, fin); + frame.readPayloadInfo(in); + frame.readPayload(in); + if (frame.opCode() == OpCode.CLOSE) { + return new CloseFrame(frame); + } + return frame; + } + + static byte[] text2Binary(String payload) { + return payload.getBytes(WSFrame.TEXT_CHARSET); + } + + private OpCode opCode; + + private boolean fin; + + private byte[] maskingKey; + + private byte[] payload; + + // --------------------------------GETTERS--------------------------------- + + private int payloadLength; + + private String payloadString; + + private WSFrame(OpCode opCode, boolean fin) { + setOpCode(opCode); + setFin(fin); + } + + public WSFrame(OpCode opCode, boolean fin, byte[] payload) { + this(opCode, fin, payload, null); + } + + public WSFrame(OpCode opCode, boolean fin, byte[] payload, byte[] maskingKey) { + this(opCode, fin); + setMaskingKey(maskingKey); + setBinaryPayload(payload); + } + + public WSFrame(OpCode opCode, boolean fin, String payload) { + this(opCode, fin, payload, null); + } + + public WSFrame(OpCode opCode, boolean fin, String payload, byte[] maskingKey) { + this(opCode, fin); + setMaskingKey(maskingKey); + setTextPayload(payload); + } + + public WSFrame(OpCode opCode, List fragments) throws WebSocketException { + setOpCode(opCode); + setFin(true); + + var length = 0L; + for (var inter : fragments) { + length += inter.binaryPayload().length; + } + if (length < 0 || length > Integer.MAX_VALUE) { + throw new WebSocketException( + CloseCode.MESSAGE_TOO_BIG, "Max frame length has been exceeded."); + } + this.payloadLength = (int) length; + var payload = new byte[this.payloadLength]; + var offset = 0; + for (var inter : fragments) { + System.arraycopy(inter.binaryPayload(), 0, payload, offset, inter.binaryPayload().length); + offset += inter.binaryPayload().length; + } + setBinaryPayload(payload); + } + + public WSFrame(WSFrame clone) { + setOpCode(clone.opCode()); + setFin(clone.isFin()); + setBinaryPayload(clone.binaryPayload()); + setMaskingKey(clone.maskingKey()); + } + + @Override + public byte[] binaryPayload() { + return this.payload; + } + + @Override + public byte[] maskingKey() { + return this.maskingKey; + } + + @Override + public OpCode opCode() { + return this.opCode; + } + + // --------------------------------SERIALIZATION--------------------------- + + @Override + public String textPayload() { + if (this.payloadString == null) { + this.payloadString = binary2Text(binaryPayload()); + } + return this.payloadString; + } + + @Override + public boolean isFin() { + return this.fin; + } + + @Override + public boolean isMasked() { + return this.maskingKey != null && this.maskingKey.length == 4; + } + + private String payloadToString() { + if (this.payload == null) { + return "null"; + } + final var sb = new StringBuilder(); + sb.append('[').append(this.payload.length).append("b] "); + if (opCode() == OpCode.TEXT) { + var text = textPayload(); + if (text.length() > 100) { + sb.append(text.substring(0, 100)).append("..."); + } else { + sb.append(text); + } + } else { + sb.append("0x"); + for (var i = 0; i < Math.min(this.payload.length, 50); ++i) { + sb.append(Integer.toHexString(this.payload[i] & 0xFF)); + } + if (this.payload.length > 50) { + sb.append("..."); + } + } + return sb.toString(); + } + + private void readPayload(InputStream in) throws IOException { + this.payload = new byte[this.payloadLength]; + var read = 0; + while (read < this.payloadLength) { + read += checkedRead(in.read(this.payload, read, this.payloadLength - read)); + } + + if (isMasked()) { + for (var i = 0; i < this.payload.length; i++) { + this.payload[i] ^= this.maskingKey[i % 4]; + } + } + + // Test for Unicode errors + if (opCode() == OpCode.TEXT) { + this.payloadString = binary2Text(binaryPayload()); + } + } + + // --------------------------------ENCODING-------------------------------- + + private void readPayloadInfo(InputStream in) throws IOException { + var b = (byte) checkedRead(in.read()); + var masked = (b & 0x80) != 0; + + this.payloadLength = (byte) (0x7F & b); + if (this.payloadLength == 126) { + // checkedRead must return int for this to work + this.payloadLength = (checkedRead(in.read()) << 8 | checkedRead(in.read())) & 0xFFFF; + if (this.payloadLength < 126) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, + "Invalid data frame 2byte length. (not using minimal length encoding)"); + } + } else if (this.payloadLength == 127) { + var length = + (long) checkedRead(in.read()) << 56 + | (long) checkedRead(in.read()) << 48 + | (long) checkedRead(in.read()) << 40 + | (long) checkedRead(in.read()) << 32 + | checkedRead(in.read()) << 24 + | checkedRead(in.read()) << 16 + | checkedRead(in.read()) << 8 + | checkedRead(in.read()); + if (length < 65536) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, + "Invalid data frame 4byte length. (not using minimal length encoding)"); + } + if (length < 0 || length > Integer.MAX_VALUE) { + throw new WebSocketException( + CloseCode.MESSAGE_TOO_BIG, "Max frame length has been exceeded."); + } + this.payloadLength = (int) length; + } + + if (this.opCode.isControlFrame()) { + if (this.payloadLength > 125) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Control frame with payload length > 125 bytes."); + } + if (this.opCode == OpCode.CLOSE && this.payloadLength == 1) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Received close frame with payload len 1."); + } + } + + if (masked) { + this.maskingKey = new byte[4]; + var read = 0; + while (read < this.maskingKey.length) { + read += checkedRead(in.read(this.maskingKey, read, this.maskingKey.length - read)); + } + } + } + + void setBinaryPayload(byte[] payload) { + this.payload = payload; + this.payloadLength = payload.length; + this.payloadString = null; + } + + void setFin(boolean fin) { + this.fin = fin; + } + + void setMaskingKey(byte[] maskingKey) { + if (maskingKey != null && maskingKey.length != 4) { + throw new IllegalArgumentException( + "MaskingKey " + Arrays.toString(maskingKey) + " hasn't length 4"); + } + this.maskingKey = maskingKey; + } + + void setOpCode(OpCode opcode) { + this.opCode = opcode; + } + + void setTextPayload(String payload) { + this.payload = text2Binary(payload); + this.payloadLength = payload.length(); + this.payloadString = payload; + } + + // --------------------------------CONSTANTS------------------------------- + + void setUnmasked() { + setMaskingKey(null); + } + + @Override + public String toString() { + final var sb = new StringBuilder("WS["); + sb.append(opCode()); + sb.append(", ").append(isFin() ? "fin" : "inter"); + sb.append(", ").append(isMasked() ? "masked" : "unmasked"); + sb.append(", ").append(payloadToString()); + sb.append(']'); + return sb.toString(); + } + + // ------------------------------------------------------------------------ + + @Override + public void write(OutputStream out) throws IOException { + byte header = 0; + if (this.fin) { + header |= 0x80; + } + header |= this.opCode.value() & 0x0F; + out.write(header); + + this.payloadLength = binaryPayload().length; + if (this.payloadLength <= 125) { + out.write(isMasked() ? 0x80 | (byte) this.payloadLength : (byte) this.payloadLength); + } else { + if (this.payloadLength <= 0xFFFF) { + out.write(isMasked() ? 0xFE : 126); + } else { + out.write(isMasked() ? 0xFF : 127); + out.write(this.payloadLength >>> 56 & 0); // integer only + // contains + // 31 bit + out.write(this.payloadLength >>> 48 & 0); + out.write(this.payloadLength >>> 40 & 0); + out.write(this.payloadLength >>> 32 & 0); + out.write(this.payloadLength >>> 24); + out.write(this.payloadLength >>> 16); + } + out.write(this.payloadLength >>> 8); + out.write(this.payloadLength); + } + + if (isMasked()) { + out.write(this.maskingKey); + for (var i = 0; i < this.payloadLength; i++) { + out.write(binaryPayload()[i] ^ this.maskingKey[i % 4]); + } + } else { + out.write(binaryPayload()); + } + out.flush(); + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WebSocketHandler.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WebSocketHandler.java new file mode 100644 index 00000000..de4ca60b --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WebSocketHandler.java @@ -0,0 +1,70 @@ +package io.avaje.jex.websocket.internal; + +import java.io.IOException; +import java.security.NoSuchAlgorithmException; + +import com.sun.net.httpserver.Headers; + +import io.avaje.jex.http.BadRequestException; +import io.avaje.jex.http.Context; +import io.avaje.jex.http.ExchangeHandler; +import io.avaje.jex.http.HttpResponseException; +import io.avaje.jex.http.HttpStatus; +import io.avaje.jex.http.InternalServerErrorException; + +public abstract class WebSocketHandler implements ExchangeHandler { + + @Override + public void handle(Context ctx) throws IOException { + var headers = ctx.requestHeaders(); + + if (!isWebsocketRequested(headers)) { + throw new HttpResponseException(HttpStatus.UPGRADE_REQUIRED_426, "Not a websocket request"); + } + + if (!Util.HEADER_WEBSOCKET_VERSION_VALUE.equalsIgnoreCase( + headers.getFirst(Util.HEADER_WEBSOCKET_VERSION)) + || !headers.containsKey(Util.HEADER_WEBSOCKET_KEY)) { + throw new BadRequestException( + "Invalid Websocket-Version " + headers.getFirst(Util.HEADER_WEBSOCKET_VERSION)); + } + + var webSocket = openWebSocket(ctx); + + try { + ctx.header( + Util.HEADER_WEBSOCKET_ACCEPT, + Util.makeAcceptKey(headers.getFirst(Util.HEADER_WEBSOCKET_KEY))); + } catch (NoSuchAlgorithmException e) { + throw new InternalServerErrorException( + "The SHA-1 Algorithm required for websockets is not available on the server."); + } + + if (headers.containsKey(Util.HEADER_WEBSOCKET_PROTOCOL)) { + ctx.header( + Util.HEADER_WEBSOCKET_PROTOCOL, + headers.getFirst(Util.HEADER_WEBSOCKET_PROTOCOL).split(",")[0]); + } + + ctx.header(Util.HEADER_UPGRADE, Util.HEADER_UPGRADE_VALUE); + ctx.header(Util.HEADER_CONNECTION, Util.HEADER_UPGRADE); + ctx.writeEmpty(101); + + // this won't return until websocket is closed + webSocket.readWebsocket(); + } + + private static boolean isWebsocketRequested(Headers headers) { + // check if Upgrade connection + var values = headers.get(Util.HEADER_CONNECTION); + if (values == null + || values.stream().filter(Util.HEADER_UPGRADE::equalsIgnoreCase).findAny().isEmpty()) { + return false; + } + // check for proper upgrade type + var upgrade = headers.getFirst(Util.HEADER_UPGRADE); + return Util.HEADER_UPGRADE_VALUE.equalsIgnoreCase(upgrade); + } + + protected abstract AbstractWebSocket openWebSocket(Context exchange); +} diff --git a/avaje-jex-websockets/src/main/java/module-info.java b/avaje-jex-websockets/src/main/java/module-info.java new file mode 100644 index 00000000..598e7a5a --- /dev/null +++ b/avaje-jex-websockets/src/main/java/module-info.java @@ -0,0 +1,23 @@ +/** + * Defines the Static Content API for serving static resources with Jex - see {@link io.avaje.jex.staticcontent.StaticContent}. + * + *
{@code
+ * var staticContent = StaticContentService.createCP("/public").httpPath("/").directoryIndex("index.html");
+ * final Jex.Server app = Jex.create()
+ *   .plugin(staticContent)
+ *   .port(8080)
+ *   .start();
+ *
+ * app.shutdown();
+ *
+ * }
+ */ +module io.avaje.jex.websocket { + + exports io.avaje.jex.websocket; + exports io.avaje.jex.websocket.exception; + + requires transitive io.avaje.jex; + requires static java.logging; + +} diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java new file mode 100644 index 00000000..ca048aae --- /dev/null +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java @@ -0,0 +1,20 @@ +package io.avaje.jex.websocket.internal; + +import io.avaje.jex.websocket.WebSocketListener; +import io.avaje.jex.websocket.WsContext.WsMessage; + +public class EchoWebSocketHandler implements WebSocketListener { + + private StringBuilder sb = new StringBuilder(); + + @Override + public void onMessage(WsMessage message) { + sb.append(message.message()); + if (message.wsFrame().isFin()) { + String msg = sb.toString(); + sb = new StringBuilder(); + message.send(msg); + } + message.closeSession(); + } +} diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java new file mode 100644 index 00000000..cbc1b50b --- /dev/null +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java @@ -0,0 +1,72 @@ +package io.avaje.jex.websocket.internal; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.WebSocket; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +public class WebSocketClientUtil { + + public static WebSocket createWSC( + int port, String path, final Consumer onTextCallback, Runnable onCloseCallback) + throws InterruptedException { + + HttpClient client = HttpClient.newHttpClient(); + + CountDownLatch waitForOpen = new CountDownLatch(1); + + CompletableFuture future = + client + .newWebSocketBuilder() + .buildAsync( + URI.create("ws://localhost:" + port + path), + new WebSocket.Listener() { + StringBuilder text = new StringBuilder(); + + @Override + public CompletionStage onText( + WebSocket webSocket, CharSequence data, boolean last) { + text.append(data); + if (last) { + onTextCallback.accept(text.toString()); + text = new StringBuilder(); + } + webSocket.request(1); + + return null; + } + + @Override + public CompletionStage onClose( + WebSocket webSocket, int statusCode, String reason) { + if (onCloseCallback != null) { + onCloseCallback.run(); + } + return null; + } + + @Override + public void onError(WebSocket webSocket, Throwable error) { + if (onCloseCallback != null) { + onCloseCallback.run(); + } + } + + @Override + public void onOpen(WebSocket webSocket) { + waitForOpen.countDown(); + webSocket.request(1); + } + }); + + WebSocket ws = future.join(); + if (!waitForOpen.await(5, TimeUnit.SECONDS)) { + throw new IllegalStateException("websocket did not open"); + } + return ws; + } +} diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java new file mode 100644 index 00000000..e488e78d --- /dev/null +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java @@ -0,0 +1,76 @@ +package io.avaje.jex.websocket.internal; + +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.logging.ConsoleHandler; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.avaje.jex.Jex; +import io.avaje.jex.test.TestPair; +import io.avaje.jex.websocket.WebSocketPlugin; + +public class WebSocketTest { + + static { + // System.setProperty("jdk.httpclient.HttpClient.log", "all"); + // System.setProperty("jdk.internal.httpclient.websocket.debug", "true"); + } + + private static final String path = "/ws"; + + TestPair server; + + @BeforeEach + public void setUp() throws IOException { + + var jex = Jex.create(); + + WebSocketPlugin p = WebSocketPlugin.create(); + p.ws(path, new EchoWebSocketHandler()); + jex.plugin(p); + server = TestPair.create(jex); + + Logger logger = Logger.getLogger(WebSocketTest.class.getName()); + ConsoleHandler ch = new ConsoleHandler(); + logger.setLevel(Level.ALL); + ch.setLevel(Level.ALL); + logger.addHandler(ch); + } + + @AfterEach + public void tearDown() { + server.shutdown(); + } + + @Test + public void testEcho() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + var client = + WebSocketClientUtil.createWSC( + server.port(), + path, + s -> { + if ("a_message".equals(s)) { + latch.countDown(); + } else { + fail("received wrong message"); + } + }, + null); + + client.sendText("a_message", true); + + if (!latch.await(5, TimeUnit.SECONDS)) { + fail("did not receive message"); + } + System.err.println("closing client"); + } +} diff --git a/avaje-jex/src/main/java/io/avaje/jex/core/JdkContext.java b/avaje-jex/src/main/java/io/avaje/jex/core/JdkContext.java index 0aeae922..32375b0d 100644 --- a/avaje-jex/src/main/java/io/avaje/jex/core/JdkContext.java +++ b/avaje-jex/src/main/java/io/avaje/jex/core/JdkContext.java @@ -541,6 +541,15 @@ public void write(String content) { write(content.getBytes(StandardCharsets.UTF_8)); } + @Override + public void writeEmpty(int statusCode) { + try { + exchange.sendResponseHeaders(statusCode, -1); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + @Override public JsonService jsonService() { return mgr.jsonService(); diff --git a/avaje-jex/src/main/java/io/avaje/jex/core/json/JacksonJsonService.java b/avaje-jex/src/main/java/io/avaje/jex/core/json/JacksonJsonService.java index ac8ee4b5..dac3e48e 100644 --- a/avaje-jex/src/main/java/io/avaje/jex/core/json/JacksonJsonService.java +++ b/avaje-jex/src/main/java/io/avaje/jex/core/json/JacksonJsonService.java @@ -24,7 +24,8 @@ public final class JacksonJsonService implements JsonService { /** Create with defaults for Jackson */ public JacksonJsonService() { - this.mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + this.mapper = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } /** Create with a Jackson instance that might have custom configuration. */ @@ -53,7 +54,8 @@ public T fromJson(Type type, byte[] data) { } private JavaType javaType(Type type) { - return javaTypes.computeIfAbsent(type.getTypeName(), k -> mapper.getTypeFactory().constructType(type)); + return javaTypes.computeIfAbsent( + type.getTypeName(), k -> mapper.getTypeFactory().constructType(type)); } @Override @@ -101,4 +103,14 @@ private void write(Iterator iterator, final JsonGenerator generator) { throw new UncheckedIOException(e); } } + + @Override + public T fromJson(Type type, String data) { + try { + final var javaType = javaType(type); + return mapper.readValue(data, javaType); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/avaje-jex/src/main/java/io/avaje/jex/core/json/JsonbJsonService.java b/avaje-jex/src/main/java/io/avaje/jex/core/json/JsonbJsonService.java index e07f4bc3..ac6dab76 100644 --- a/avaje-jex/src/main/java/io/avaje/jex/core/json/JsonbJsonService.java +++ b/avaje-jex/src/main/java/io/avaje/jex/core/json/JsonbJsonService.java @@ -61,4 +61,9 @@ public void toJsonStream(Iterator iterator, OutputStream os) { } } } + + @Override + public T fromJson(Type type, String data) { + return jsonb.type(type).fromJson(data); + } } diff --git a/avaje-jex/src/main/java/io/avaje/jex/http/Context.java b/avaje-jex/src/main/java/io/avaje/jex/http/Context.java index 5644b9a7..6ecb1e44 100644 --- a/avaje-jex/src/main/java/io/avaje/jex/http/Context.java +++ b/avaje-jex/src/main/java/io/avaje/jex/http/Context.java @@ -531,6 +531,14 @@ default String userAgent() { return header(Constants.USER_AGENT); } + /** Writes Nothing. */ + void writeEmpty(int statusCode); + + /** Writes Nothing. */ + default void writeEmpty(HttpStatus statusCode) { + writeEmpty(statusCode.status()); + } + /** * Writes the given bytes directly to the response. * diff --git a/avaje-jex/src/main/java/io/avaje/jex/spi/JsonService.java b/avaje-jex/src/main/java/io/avaje/jex/spi/JsonService.java index bab6d9f0..2325bd2d 100644 --- a/avaje-jex/src/main/java/io/avaje/jex/spi/JsonService.java +++ b/avaje-jex/src/main/java/io/avaje/jex/spi/JsonService.java @@ -35,10 +35,19 @@ public non-sealed interface JsonService extends JexExtension { String toJsonString(Object bean); /** - * Deserializes a json input stream and deserializes it into a Java object of the specified type. + * **Read a Java Object from JSON string** + * + *

Deserializes a Java object from a JSON string * * @param type the Type object of the desired type * @param is the input stream containing the JSON data + * @return the serialized JSON string + */ + T fromJson(Type type, String string); + + /** + * Deserializes a json input stream and deserializes it into a Java object of the specified type. + * * @return the deserialized object */ T fromJson(Type type, InputStream is); diff --git a/pom.xml b/pom.xml index 6cca2ec0..89b7fb8b 100644 --- a/pom.xml +++ b/pom.xml @@ -47,6 +47,7 @@ avaje-jex-static-content avaje-jex-test avaje-jex-ssl + avaje-jex-websockets From afc15aef313501a59e5187a014819696d27c6881 Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Thu, 4 Sep 2025 01:36:37 -0400 Subject: [PATCH 2/4] will of D --- .../jex/websocket/{JexWebSocket.java => DWebSocket.java} | 4 ++-- ...bSocketExchangeHandler.java => DWebSocketHandler.java} | 8 ++++---- .../main/java/io/avaje/jex/websocket/WebSocketPlugin.java | 2 +- .../main/java/io/avaje/jex/websocket/internal/Util.java | 8 -------- .../jex/websocket/internal/EchoWebSocketHandler.java | 8 +++++++- .../avaje/jex/websocket/internal/WebSocketClientUtil.java | 1 + 6 files changed, 15 insertions(+), 16 deletions(-) rename avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/{JexWebSocket.java => DWebSocket.java} (93%) rename avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/{WebSocketExchangeHandler.java => DWebSocketHandler.java} (50%) diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/JexWebSocket.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocket.java similarity index 93% rename from avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/JexWebSocket.java rename to avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocket.java index 649dfa63..f66b3f1d 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/JexWebSocket.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocket.java @@ -10,12 +10,12 @@ import io.avaje.jex.websocket.exception.CloseCode; import io.avaje.jex.websocket.internal.AbstractWebSocket; -class JexWebSocket extends AbstractWebSocket { +class DWebSocket extends AbstractWebSocket { private final WebSocketListener listener; private final Context ctx; - JexWebSocket(Context ctx, WebSocketListener listener) { + DWebSocket(Context ctx, WebSocketListener listener) { super(ctx.exchange()); this.listener = listener; this.ctx = ctx; diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketExchangeHandler.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocketHandler.java similarity index 50% rename from avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketExchangeHandler.java rename to avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocketHandler.java index fd7e3b66..4a05476a 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketExchangeHandler.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocketHandler.java @@ -3,17 +3,17 @@ import io.avaje.jex.http.Context; import io.avaje.jex.websocket.internal.WebSocketHandler; -class WebSocketExchangeHandler extends WebSocketHandler { +class DWebSocketHandler extends WebSocketHandler { private final WebSocketListener listener; - WebSocketExchangeHandler(WebSocketListener listener) { + DWebSocketHandler(WebSocketListener listener) { this.listener = listener; } @Override - protected JexWebSocket openWebSocket(Context exchange) { + protected DWebSocket openWebSocket(Context exchange) { - return new JexWebSocket(exchange, listener); + return new DWebSocket(exchange, listener); } } diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java index 105feabf..f8dd6512 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java @@ -13,7 +13,7 @@ public class WebSocketPlugin implements JexPlugin { private final List handlers = new ArrayList<>(); public WebSocketPlugin ws(String path, WebSocketListener listener, Role... roles) { - handlers.add(r -> r.get(path, new WebSocketExchangeHandler(listener), roles)); + handlers.add(r -> r.get(path, new DWebSocketHandler(listener), roles)); return this; } diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java index 6133b464..a5a168d2 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java @@ -40,21 +40,13 @@ class Util { public static final String HEADER_UPGRADE = "Upgrade"; - public static final String HEADER_UPGRADE_VALUE = "websocket"; - public static final String HEADER_CONNECTION = "Connection"; - public static final String HEADER_WEBSOCKET_VERSION = "sec-websocket-version"; - public static final String HEADER_WEBSOCKET_VERSION_VALUE = "13"; - public static final String HEADER_WEBSOCKET_KEY = "sec-websocket-key"; - public static final String HEADER_WEBSOCKET_ACCEPT = "sec-websocket-accept"; - public static final String HEADER_WEBSOCKET_PROTOCOL = "sec-websocket-protocol"; - private static final String WEBSOCKET_KEY_MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; public static String makeAcceptKey(String key) throws NoSuchAlgorithmException { diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java index ca048aae..62d166d0 100644 --- a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java @@ -1,6 +1,7 @@ package io.avaje.jex.websocket.internal; import io.avaje.jex.websocket.WebSocketListener; +import io.avaje.jex.websocket.WsContext.WsError; import io.avaje.jex.websocket.WsContext.WsMessage; public class EchoWebSocketHandler implements WebSocketListener { @@ -15,6 +16,11 @@ public void onMessage(WsMessage message) { sb = new StringBuilder(); message.send(msg); } - message.closeSession(); + // message.closeSession(); + } + + @Override + public void onError(WsError wsError) { + wsError.error().printStackTrace(); } } diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java index cbc1b50b..f1c5fbe7 100644 --- a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java @@ -54,6 +54,7 @@ public void onError(WebSocket webSocket, Throwable error) { if (onCloseCallback != null) { onCloseCallback.run(); } + error.printStackTrace(); } @Override From 96cf358514aa02e3f55ec34b51ce60b2faa6e652 Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Thu, 4 Sep 2025 15:25:22 -0400 Subject: [PATCH 3/4] builder --- .../avaje/jex/websocket/ListenerBuilder.java | 132 ++++++++++++++++++ .../jex/websocket/WebSocketListener.java | 66 +++++++++ .../avaje/jex/websocket/WebSocketPlugin.java | 12 +- .../io/avaje/jex/websocket/WsContext.java | 9 ++ .../internal/EchoWebSocketHandler.java | 3 +- .../jex/websocket/internal/WebSocketTest.java | 3 +- 6 files changed, 219 insertions(+), 6 deletions(-) create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/ListenerBuilder.java diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/ListenerBuilder.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/ListenerBuilder.java new file mode 100644 index 00000000..8e37750f --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/ListenerBuilder.java @@ -0,0 +1,132 @@ +package io.avaje.jex.websocket; + +import java.util.function.Consumer; + +import io.avaje.jex.websocket.WsContext.WsBinaryMessage; +import io.avaje.jex.websocket.WsContext.WsClose; +import io.avaje.jex.websocket.WsContext.WsError; +import io.avaje.jex.websocket.WsContext.WsMessage; +import io.avaje.jex.websocket.WsContext.WsOpen; +import io.avaje.jex.websocket.WsContext.WsPong; + +/** A builder for creating a {@link WebSocketListener} with specific event handlers. */ +class ListenerBuilder implements WebSocketListener.Builder { + private Consumer onOpen; + private Consumer onMessage; + private Consumer onBinaryMessage; + private Consumer onClose; + private Consumer onPong; + private Consumer onError; + + /** + * Set the handler for the WebSocket open event. + * + * @param handler Consumer for {@link WsOpen} + * @return this builder + */ + @Override + public ListenerBuilder onOpen(Consumer handler) { + this.onOpen = handler; + return this; + } + + /** + * Set the handler for the WebSocket text message event. + * + * @param handler Consumer for {@link WsMessage} + * @return this builder + */ + @Override + public ListenerBuilder onMessage(Consumer handler) { + this.onMessage = handler; + return this; + } + + /** + * Set the handler for the WebSocket binary message event. + * + * @param handler Consumer for {@link WsBinaryMessage} + * @return this builder + */ + @Override + public ListenerBuilder onBinaryMessage(Consumer handler) { + this.onBinaryMessage = handler; + return this; + } + + /** + * Set the handler for the WebSocket close event. + * + * @param handler Consumer for {@link WsClose} + * @return this builder + */ + @Override + public ListenerBuilder onClose(Consumer handler) { + this.onClose = handler; + return this; + } + + /** + * Set the handler for the WebSocket pong event. + * + * @param handler Consumer for {@link WsPong} + * @return this builder + */ + @Override + public ListenerBuilder onPong(Consumer handler) { + this.onPong = handler; + return this; + } + + /** + * Set the handler for the WebSocket error event. + * + * @param handler Consumer for {@link WsError} + * @return this builder + */ + @Override + public ListenerBuilder onError(Consumer handler) { + this.onError = handler; + return this; + } + + /** + * Build a {@link WebSocketListener} implementation using the configured handlers. + * + * @return a new {@link WebSocketListener} instance + */ + @Override + public WebSocketListener build() { + return new WebSocketListener() { + @Override + public void onOpen(WsOpen wsOpen) { + if (onOpen != null) onOpen.accept(wsOpen); + } + + @Override + public void onMessage(WsMessage message) { + if (onMessage != null) onMessage.accept(message); + } + + @Override + public void onBinaryMessage(WsBinaryMessage binaryPayload) { + if (onBinaryMessage != null) onBinaryMessage.accept(binaryPayload); + } + + @Override + public void onClose(WsClose wsClose) { + if (onClose != null) onClose.accept(wsClose); + } + + @Override + public void onPong(WsPong wsPong) { + if (onPong != null) onPong.accept(wsPong); + } + + @Override + public void onError(WsError wsError) { + if (onError != null) onError.accept(wsError); + } + }; + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java index 9b6870f7..c11d0dd4 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java @@ -1,5 +1,7 @@ package io.avaje.jex.websocket; +import java.util.function.Consumer; + import io.avaje.jex.websocket.WsContext.WsBinaryMessage; import io.avaje.jex.websocket.WsContext.WsClose; import io.avaje.jex.websocket.WsContext.WsError; @@ -12,6 +14,16 @@ * log. */ public interface WebSocketListener { + + /** + * Create a builder for a WebSocketListener. + * + * @return the builder + */ + static Builder builder() { + return new ListenerBuilder(); + } + /** * Called when a binary message is received. * @@ -53,4 +65,58 @@ default void onPong(WsPong wsPong) {} * @param wsError the error */ default void onError(WsError wsError) {} + + interface Builder { + + /** + * Set the handler for the WebSocket open event. + * + * @param handler Consumer for {@link WsOpen} + * @return this builder + */ + Builder onOpen(Consumer handler); + + /** + * Set the handler for the WebSocket text message event. + * + * @param handler Consumer for {@link WsMessage} + * @return this builder + */ + Builder onMessage(Consumer handler); + + /** + * Set the handler for the WebSocket binary message event. + * + * @param handler Consumer for {@link WsBinaryMessage} + * @return this builder + */ + Builder onBinaryMessage(Consumer handler); + + /** + * Set the handler for the WebSocket close event. + * + * @param handler Consumer for {@link WsClose} + * @return this builder + */ + Builder onClose(Consumer handler); + + /** + * Set the handler for the WebSocket pong event. + * + * @param handler Consumer for {@link WsPong} + * @return this builder + */ + Builder onPong(Consumer handler); + + /** + * Set the handler for the WebSocket error event. + * + * @param handler Consumer for {@link WsError} + * @return this builder + */ + Builder onError(Consumer handler); + + /** Build the WebSocketListener. */ + WebSocketListener build(); + } } diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java index f8dd6512..06070a78 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java @@ -2,15 +2,23 @@ import java.util.ArrayList; import java.util.List; +import java.util.function.Consumer; import io.avaje.jex.Jex; -import io.avaje.jex.Routing; +import io.avaje.jex.Routing.HttpService; import io.avaje.jex.security.Role; import io.avaje.jex.spi.JexPlugin; +import io.avaje.jex.websocket.WebSocketListener.Builder; public class WebSocketPlugin implements JexPlugin { - private final List handlers = new ArrayList<>(); + private final List handlers = new ArrayList<>(); + + public WebSocketPlugin ws(String path, Consumer consumer, Role... roles) { + var builder = WebSocketListener.builder(); + consumer.accept(builder); + return ws(path, builder.build(), roles); + } public WebSocketPlugin ws(String path, WebSocketListener listener, Role... roles) { handlers.add(r -> r.get(path, new DWebSocketHandler(listener), roles)); diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java index 52f22a42..319eb3e9 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java @@ -143,6 +143,15 @@ public abstract static sealed class WsMessageCtx extends WsContext { public WebSocketFrame wsFrame() { return wsFrame; } + + /** + * Indicates if this frame is the final fragment in a message. + * + * @return true if final fragment, false otherwise + */ + public boolean isFin() { + return wsFrame.isFin(); + } } public static final class WsBinaryMessage extends WsMessageCtx { diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java index 62d166d0..4679ab25 100644 --- a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java @@ -11,12 +11,11 @@ public class EchoWebSocketHandler implements WebSocketListener { @Override public void onMessage(WsMessage message) { sb.append(message.message()); - if (message.wsFrame().isFin()) { + if (message.isFin()) { String msg = sb.toString(); sb = new StringBuilder(); message.send(msg); } - // message.closeSession(); } @Override diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java index e488e78d..bdaa034f 100644 --- a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java @@ -2,7 +2,6 @@ import static org.junit.jupiter.api.Assertions.fail; -import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.logging.ConsoleHandler; @@ -29,7 +28,7 @@ public class WebSocketTest { TestPair server; @BeforeEach - public void setUp() throws IOException { + void setUp() { var jex = Jex.create(); From 214bb03545d17f6e55a370bd96b638ff3801fe74 Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Sun, 14 Sep 2025 22:32:00 -0400 Subject: [PATCH 4/4] remove write from interface --- .../java/io/avaje/jex/websocket/WebSocketFrame.java | 10 ---------- .../jex/websocket/internal/AbstractWebSocket.java | 2 +- .../java/io/avaje/jex/websocket/internal/WSFrame.java | 4 +--- 3 files changed, 2 insertions(+), 14 deletions(-) diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java index 39b57f10..84bed8e3 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java @@ -1,7 +1,5 @@ package io.avaje.jex.websocket; -import java.io.IOException; -import java.io.OutputStream; import java.util.Arrays; import java.util.Map; import java.util.stream.Collectors; @@ -54,14 +52,6 @@ public interface WebSocketFrame { */ boolean isMasked(); - /** - * Writes the frame to the given output stream in WebSocket frame format. - * - * @param out the output stream to write to - * @throws IOException if an I/O error occurs - */ - void write(OutputStream out) throws IOException; - /** WebSocket opcodes */ public enum OpCode { CONTINUATION(0), diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java index b84b0b5b..85e0cf77 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java @@ -248,7 +248,7 @@ public void send(String payload) { sendFrame(new WSFrame(OpCode.TEXT, true, payload)); } - public void sendFrame(WebSocketFrame frame) { + public void sendFrame(WSFrame frame) { lock.lock(); try { onFrameSent(frame); diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java index 2b3218a3..f41d4134 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java @@ -346,9 +346,7 @@ public String toString() { } // ------------------------------------------------------------------------ - - @Override - public void write(OutputStream out) throws IOException { + void write(OutputStream out) throws IOException { byte header = 0; if (this.fin) { header |= 0x80;