Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

MQTT over WebSockets using Netty?

I want to use MQTT over Websockets. In Netty using Websockets is quite easy:

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("codec-http", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("handler", new WebSocketServerHandler());

I've found MQTT broker (moquette) which is based on Netty.

 NettyMQTTHandler handler = new NettyMQTTHandler();
 ServerBootstrap b = new ServerBootstrap();
            b.group(m_bossGroup, m_workerGroup)
             .channel(NioServerSocketChannel.class) 
             .childHandler(new ChannelInitializer<SocketChannel>() { 
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    //pipeline.addFirst("metrics", new BytesMetricsHandler(m_metricsCollector));
                    pipeline.addFirst("idleStateHandler", new IdleStateHandler(0, 0, Constants.DEFAULT_CONNECT_TIMEOUT));
                    pipeline.addAfter("idleStateHandler", "idleEventHandler", new MoquetteIdleTimoutHandler());
                    //pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR));
                    pipeline.addLast("decoder", new MQTTDecoder());
                    pipeline.addLast("encoder", new MQTTEncoder());
                    pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));
                    pipeline.addLast("handler", handler);
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)
             .option(ChannelOption.SO_REUSEADDR, true)
             .childOption(ChannelOption.SO_KEEPALIVE, true); 

So in the theory I should be able to send MQTT over Websocket but I have no idea if it possible using Netty? Does any have any clues or ideas how to do that? Should I use MessageToMessageCodec and BinaryWebSocketFrame?

Cheers!

like image 893
radzio Avatar asked Mar 16 '14 13:03

radzio


Video Answer


1 Answers

Let me assume your MQTTDecoder consumes ByteBufs and produces some MQTT message objects, and MQTTEncoder does the opposite, which is usually the case.

Then, the ByteBufs your codec works with is not a Web Socket message. They need to become the payloads of the Web Socket frames. I would insert the following handlers into the pipeline:

  • A MessageToMessageDecoder that transforms a WebSocket text (or binary) frame into a ByteBuf so that MQTTDecoder can consume it. The transformation should be very simple - just get the content of the Web Socket frame.
  • A MessageToMessageEncoder that transforms a ByteBuf into a Web Socket text (or binary) frame so that Netty's WebSocketFrameEncoder can consume it. The transformation should also be very simple - just wrap the ByteBuf encoded by MQTTEncoder with a Web Socket frame object.

The resulting pipeline will look like the following:

  1. HttpResponseEncoder
  2. HttpRequestDecoder
  3. HttpObjectAggregator(65536)
  4. WebSocketServerProtocolHandler("/your-websocket-endpoint-path")
  5. WebSocketFrameToByteBufDecoder extends MessageToMessageDecoder
  6. ByteBufToWebSocketFrameEncoder extends MessageToMessageEncoder
  7. MQTTEncoder
  8. MQTTDecoder
  9. MessageMetricsHandler
  10. handler

WebSocketServerProtocolHandler will perform necessary handshaking with your web socket client and insert WebSocketFrameEncoder and WebSocketFrameDecoder right before WebSocketFrameToByteBufDecoder. The resulting pipeline after successful handshake will look like the following:

  1. WebSocketFrameEncoder
  2. WebSocketFrameDecoder
  3. WebSocketFrameToByteBufDecoder extends MessageToMessageDecoder
  4. ByteBufToWebSocketFrameEncoder extends MessageToMessageEncoder
  5. MQTTEncoder
  6. MQTTDecoder
  7. MessageMetricsHandler
  8. handler
like image 93
trustin Avatar answered Oct 27 '22 00:10

trustin