-
-
Notifications
You must be signed in to change notification settings - Fork 15.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(example-mqtt): new MQTT heartBeat broker and client examples (#9336
) Motivation: Recently I'm going to build MQTT broker and client based on Netty. I had MQTT encoder and decoder founded, while no basic examples. So I'm going to share my simple heartBeat MQTT broker and client as an example. Modification: New MQTT heartBeat example under io.netty.example/mqtt/heartBeat/. Result: Client would send CONNECT and PINGREQ(heartBeat message). - CONNECT: once channel active - PINGREQ: once IdleStateEvent triggered, which is 20 seconds in this example Client would discard all messages it received. MQTT broker could handle CONNECT, PINGREQ and DISCONNECT messages. - CONNECT: send CONNACK back - PINGREQ: send PINGRESP back - DISCONNECT: close the channel Broker would close the channel if 2 heartBeat lost, which set to 45 seconds in this example.
- Loading branch information
1 parent
7fc355a
commit b02ee11
Showing
5 changed files
with
298 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
64 changes: 64 additions & 0 deletions
64
example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatBroker.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
/* | ||
* Copyright 2019 The Netty Project | ||
* | ||
* The Netty Project licenses this file to you under the Apache License, | ||
* version 2.0 (the "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at: | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package io.netty.example.mqtt.heartBeat; | ||
|
||
import io.netty.bootstrap.ServerBootstrap; | ||
import io.netty.channel.ChannelFuture; | ||
import io.netty.channel.ChannelInitializer; | ||
import io.netty.channel.ChannelOption; | ||
import io.netty.channel.EventLoopGroup; | ||
import io.netty.channel.nio.NioEventLoopGroup; | ||
import io.netty.channel.socket.SocketChannel; | ||
import io.netty.channel.socket.nio.NioServerSocketChannel; | ||
import io.netty.handler.codec.mqtt.MqttDecoder; | ||
import io.netty.handler.codec.mqtt.MqttEncoder; | ||
import io.netty.handler.timeout.IdleStateHandler; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
|
||
public final class MqttHeartBeatBroker { | ||
|
||
private MqttHeartBeatBroker() { | ||
} | ||
|
||
public static void main(String[] args) throws Exception { | ||
EventLoopGroup bossGroup = new NioEventLoopGroup(1); | ||
EventLoopGroup workerGroup = new NioEventLoopGroup(); | ||
|
||
try { | ||
ServerBootstrap b = new ServerBootstrap(); | ||
b.group(bossGroup, workerGroup); | ||
b.option(ChannelOption.SO_BACKLOG, 1024); | ||
b.channel(NioServerSocketChannel.class); | ||
b.childHandler(new ChannelInitializer<SocketChannel>() { | ||
protected void initChannel(SocketChannel ch) throws Exception { | ||
ch.pipeline().addLast("encoder", MqttEncoder.INSTANCE); | ||
ch.pipeline().addLast("decoder", new MqttDecoder()); | ||
ch.pipeline().addLast("heartBeatHandler", new IdleStateHandler(45, 0, 0, TimeUnit.SECONDS)); | ||
ch.pipeline().addLast("handler", MqttHeartBeatBrokerHandler.INSTANCE); | ||
} | ||
}); | ||
|
||
ChannelFuture f = b.bind(1883).sync(); | ||
System.out.println("Broker initiated..."); | ||
|
||
f.channel().closeFuture().sync(); | ||
} finally { | ||
workerGroup.shutdownGracefully(); | ||
bossGroup.shutdownGracefully(); | ||
} | ||
} | ||
} |
82 changes: 82 additions & 0 deletions
82
example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatBrokerHandler.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Copyright 2019 The Netty Project | ||
* | ||
* The Netty Project licenses this file to you under the Apache License, | ||
* version 2.0 (the "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at: | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package io.netty.example.mqtt.heartBeat; | ||
|
||
import io.netty.channel.ChannelHandler.Sharable; | ||
import io.netty.channel.ChannelHandlerContext; | ||
import io.netty.channel.ChannelInboundHandlerAdapter; | ||
import io.netty.handler.codec.mqtt.MqttConnAckMessage; | ||
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; | ||
import io.netty.handler.codec.mqtt.MqttConnectReturnCode; | ||
import io.netty.handler.codec.mqtt.MqttFixedHeader; | ||
import io.netty.handler.codec.mqtt.MqttMessage; | ||
import io.netty.handler.codec.mqtt.MqttMessageType; | ||
import io.netty.handler.codec.mqtt.MqttQoS; | ||
import io.netty.handler.timeout.IdleState; | ||
import io.netty.handler.timeout.IdleStateEvent; | ||
import io.netty.util.ReferenceCountUtil; | ||
|
||
@Sharable | ||
public final class MqttHeartBeatBrokerHandler extends ChannelInboundHandlerAdapter { | ||
|
||
public static final MqttHeartBeatBrokerHandler INSTANCE = new MqttHeartBeatBrokerHandler(); | ||
|
||
private MqttHeartBeatBrokerHandler() { | ||
} | ||
|
||
@Override | ||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | ||
MqttMessage mqttMessage = (MqttMessage) msg; | ||
System.out.println("Received MQTT message: " + mqttMessage); | ||
switch (mqttMessage.fixedHeader().messageType()) { | ||
case CONNECT: | ||
MqttFixedHeader connackFixedHeader = | ||
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0); | ||
MqttConnAckVariableHeader mqttConnAckVariableHeader = | ||
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false); | ||
MqttConnAckMessage connack = new MqttConnAckMessage(connackFixedHeader, mqttConnAckVariableHeader); | ||
ctx.writeAndFlush(connack); | ||
break; | ||
case PINGREQ: | ||
MqttFixedHeader pingreqFixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, | ||
MqttQoS.AT_MOST_ONCE, false, 0); | ||
MqttMessage pingResp = new MqttMessage(pingreqFixedHeader); | ||
ctx.writeAndFlush(pingResp); | ||
break; | ||
case DISCONNECT: | ||
ctx.close(); | ||
break; | ||
default: | ||
System.out.println("Unexpected message type: " + mqttMessage.fixedHeader().messageType()); | ||
ReferenceCountUtil.release(msg); | ||
ctx.close(); | ||
} | ||
} | ||
|
||
@Override | ||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { | ||
System.out.println("Channel heartBeat lost"); | ||
if (evt instanceof IdleStateEvent && IdleState.READER_IDLE == ((IdleStateEvent) evt).state()) { | ||
ctx.close(); | ||
} | ||
} | ||
|
||
@Override | ||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { | ||
cause.printStackTrace(); | ||
ctx.close(); | ||
} | ||
} |
64 changes: 64 additions & 0 deletions
64
example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatClient.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
/* | ||
* Copyright 2019 The Netty Project | ||
* | ||
* The Netty Project licenses this file to you under the Apache License, | ||
* version 2.0 (the "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at: | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package io.netty.example.mqtt.heartBeat; | ||
|
||
import io.netty.bootstrap.Bootstrap; | ||
import io.netty.channel.ChannelFuture; | ||
import io.netty.channel.ChannelInitializer; | ||
import io.netty.channel.EventLoopGroup; | ||
import io.netty.channel.nio.NioEventLoopGroup; | ||
import io.netty.channel.socket.SocketChannel; | ||
import io.netty.channel.socket.nio.NioSocketChannel; | ||
import io.netty.handler.codec.mqtt.MqttDecoder; | ||
import io.netty.handler.codec.mqtt.MqttEncoder; | ||
import io.netty.handler.timeout.IdleStateHandler; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
|
||
public final class MqttHeartBeatClient { | ||
private MqttHeartBeatClient() { | ||
} | ||
|
||
private static final String HOST = System.getProperty("host", "127.0.0.1"); | ||
private static final int PORT = Integer.parseInt(System.getProperty("port", "1883")); | ||
private static final String CLIENT_ID = System.getProperty("clientId", "guestClient"); | ||
private static final String USER_NAME = System.getProperty("userName", "guest"); | ||
private static final String PASSWORD = System.getProperty("password", "guest"); | ||
|
||
public static void main(String[] args) throws Exception { | ||
EventLoopGroup workerGroup = new NioEventLoopGroup(); | ||
|
||
try { | ||
Bootstrap b = new Bootstrap(); | ||
b.group(workerGroup); | ||
b.channel(NioSocketChannel.class); | ||
b.handler(new ChannelInitializer<SocketChannel>() { | ||
protected void initChannel(SocketChannel ch) throws Exception { | ||
ch.pipeline().addLast("encoder", MqttEncoder.INSTANCE); | ||
ch.pipeline().addLast("decoder", new MqttDecoder()); | ||
ch.pipeline().addLast("heartBeatHandler", new IdleStateHandler(0, 20, 0, TimeUnit.SECONDS)); | ||
ch.pipeline().addLast("handler", new MqttHeartBeatClientHandler(CLIENT_ID, USER_NAME, PASSWORD)); | ||
} | ||
}); | ||
|
||
ChannelFuture f = b.connect(HOST, PORT).sync(); | ||
System.out.println("Client connected"); | ||
f.channel().closeFuture().sync(); | ||
} finally { | ||
workerGroup.shutdownGracefully(); | ||
} | ||
} | ||
} |
83 changes: 83 additions & 0 deletions
83
example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatClientHandler.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* Copyright 2019 The Netty Project | ||
* | ||
* The Netty Project licenses this file to you under the Apache License, | ||
* version 2.0 (the "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at: | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package io.netty.example.mqtt.heartBeat; | ||
|
||
import io.netty.channel.ChannelHandlerContext; | ||
import io.netty.channel.ChannelInboundHandlerAdapter; | ||
import io.netty.handler.codec.mqtt.MqttConnectMessage; | ||
import io.netty.handler.codec.mqtt.MqttConnectPayload; | ||
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; | ||
import io.netty.handler.codec.mqtt.MqttFixedHeader; | ||
import io.netty.handler.codec.mqtt.MqttMessage; | ||
import io.netty.handler.codec.mqtt.MqttMessageType; | ||
import io.netty.handler.codec.mqtt.MqttQoS; | ||
import io.netty.handler.timeout.IdleStateEvent; | ||
import io.netty.util.ReferenceCountUtil; | ||
|
||
public class MqttHeartBeatClientHandler extends ChannelInboundHandlerAdapter { | ||
|
||
private static final String PROTOCOL_NAME_MQTT_3_1_1 = "MQTT"; | ||
private static final int PROTOCOL_VERSION_MQTT_3_1_1 = 4; | ||
|
||
private final String clientId; | ||
private final String userName; | ||
private final byte[] password; | ||
|
||
public MqttHeartBeatClientHandler(String clientId, String userName, String password) { | ||
this.clientId = clientId; | ||
this.userName = userName; | ||
this.password = password.getBytes(); | ||
} | ||
|
||
@Override | ||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | ||
// discard all messages | ||
ReferenceCountUtil.release(msg); | ||
} | ||
|
||
@Override | ||
public void channelActive(ChannelHandlerContext ctx) throws Exception { | ||
MqttFixedHeader connectFixedHeader = | ||
new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0); | ||
MqttConnectVariableHeader connectVariableHeader = | ||
new MqttConnectVariableHeader(PROTOCOL_NAME_MQTT_3_1_1, PROTOCOL_VERSION_MQTT_3_1_1, true, true, false, | ||
0, false, false, 20); | ||
MqttConnectPayload connectPayload = new MqttConnectPayload(clientId, null, null, userName, password); | ||
MqttConnectMessage connectMessage = | ||
new MqttConnectMessage(connectFixedHeader, connectVariableHeader, connectPayload); | ||
ctx.writeAndFlush(connectMessage); | ||
System.out.println("Sent CONNECT"); | ||
} | ||
|
||
@Override | ||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { | ||
if (evt instanceof IdleStateEvent) { | ||
MqttFixedHeader pingreqFixedHeader = | ||
new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0); | ||
MqttMessage pingreqMessage = new MqttMessage(pingreqFixedHeader); | ||
ctx.writeAndFlush(pingreqMessage); | ||
System.out.println("Sent PINGREQ"); | ||
} else { | ||
super.userEventTriggered(ctx, evt); | ||
} | ||
} | ||
|
||
@Override | ||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { | ||
cause.printStackTrace(); | ||
ctx.close(); | ||
} | ||
} |