2727
2828import org .apache .activemq .transport .mqtt .MQTTWireFormat ;
2929import org .apache .activemq .util .ByteSequence ;
30- import org .eclipse .jetty .ee9 . websocket .api .Session ;
31- import org .eclipse .jetty .ee9 . websocket .api .WebSocketAdapter ;
32- import org .eclipse .jetty .websocket .api .Session .Listener .AutoDemanding ;
30+ import org .eclipse .jetty .websocket .api .Callback ;
31+ import org .eclipse .jetty .websocket .api .Session ;
32+ import org .eclipse .jetty .websocket .api .Session .Listener .AbstractAutoDemanding ;
3333import org .fusesource .hawtbuf .UTF8Buffer ;
3434import org .fusesource .mqtt .codec .CONNACK ;
3535import org .fusesource .mqtt .codec .CONNECT ;
4949/**
5050 * Implements a simple WebSocket based MQTT Client that can be used for unit testing.
5151 */
52- public class MQTTWSConnection extends WebSocketAdapter implements AutoDemanding {
52+ public class MQTTWSConnection extends AbstractAutoDemanding implements Session . Listener . AutoDemanding {
5353
5454 private static final Logger LOG = LoggerFactory .getLogger (MQTTWSConnection .class );
5555
@@ -64,7 +64,6 @@ public class MQTTWSConnection extends WebSocketAdapter implements AutoDemanding
6464 private int closeCode = -1 ;
6565 private String closeMessage ;
6666
67- @ Override
6867 public boolean isConnected () {
6968 return getSession () != null ? getSession ().isOpen () : false ;
7069 }
@@ -181,15 +180,15 @@ public MQTTWSConnection setWritePartialFrames(boolean value) {
181180 //----- WebSocket callback handlers --------------------------------------//
182181
183182 @ Override
184- public void onWebSocketBinary (byte [] data , int offset , int length ) {
185- if (data ==null || length <= 0 ) {
183+ public void onWebSocketBinary (ByteBuffer payload , Callback callback ) {
184+ if (payload == null || ! payload . hasRemaining () ) {
186185 return ;
187186 }
188187
189188 MQTTFrame frame = null ;
190189
191190 try {
192- frame = (MQTTFrame )wireFormat .unmarshal (new ByteSequence (data , offset , length ));
191+ frame = (MQTTFrame )wireFormat .unmarshal (new ByteSequence (payload . array () ));
193192 } catch (IOException e ) {
194193 LOG .error ("Could not decode incoming MQTT Frame: {}" , e .getMessage ());
195194 getSession ().close ();
@@ -254,12 +253,12 @@ public void onWebSocketBinary(byte[] data, int offset, int length) {
254253
255254 private void sendBytes (ByteSequence payload ) throws IOException {
256255 if (!isWritePartialFrames ()) {
257- getRemote ().sendBytes (ByteBuffer .wrap (payload .data , payload .offset , payload .length ));
256+ getSession ().sendBinary (ByteBuffer .wrap (payload .data , payload .offset , payload .length ), null );
258257 } else {
259- getRemote ().sendBytes (ByteBuffer .wrap (
260- payload .data , payload .offset , payload .length / 2 ));
261- getRemote ().sendBytes (ByteBuffer .wrap (
262- payload .data , payload .offset + payload .length / 2 , payload .length / 2 ));
258+ getSession ().sendBinary (ByteBuffer .wrap (
259+ payload .data , payload .offset , payload .length / 2 ), null );
260+ getSession ().sendBinary (ByteBuffer .wrap (
261+ payload .data , payload .offset + payload .length / 2 , payload .length / 2 ), null );
263262 }
264263 }
265264
@@ -272,17 +271,14 @@ private void checkConnected() throws IOException {
272271 @ Override
273272 public void onWebSocketClose (int statusCode , String reason ) {
274273 LOG .trace ("MQTT WS Connection closed, code:{} message:{}" , statusCode , reason );
275-
276- getSession ().close (statusCode , reason );
277274 this .closeCode = statusCode ;
278275 this .closeMessage = reason ;
279-
280276 }
281277
282278 @ Override
283- public void onWebSocketConnect ( org . eclipse . jetty . ee9 . websocket . api . Session session ) {
284- super .onWebSocketConnect (session );
285- getSession () .setIdleTimeout (Duration .ZERO );
279+ public void onWebSocketOpen ( Session session ) {
280+ super .onWebSocketOpen (session );
281+ session .setIdleTimeout (Duration .ZERO );
286282 this .connectLatch .countDown ();
287283 }
288284}
0 commit comments