[Android] Meet IoT MQTT

Posted by xiuyuantech on 2024-10-06

MQTT (Message Queuing Telemetry Transport,消息队列遥测传输) 是一种用于物联网 (IoT) 的 OASIS 标准消息传递的基于 TCP/IP 协议族的应用层协议。是用于物联网 (IoT) 的 OASIS 标准消息传递协议。它被设计为一种极其轻量级的发布/订阅消息传输,非常适合以较小的代码占用空间和最小的网络带宽连接远程设备。如今,MQTT 已广泛应用于各个行业,例如汽车、制造、电信、石油和天然气等。

物联网相关协议

‌物联网常用的通信协议包括MQTT、CoAP、HTTP、Zigbee、LoRaWAN、NB-IoT等‌。这些协议各有其特点和适用场景,下面将详细介绍每种协议的基本信息和应用场景。

  • MQTT:是一种轻量级的、基于发布/订阅模式的消息传输协议。它特别适用于低带宽、高延迟或不稳定网络环境中的设备通信。MQTT的特点包括消息推送及时且高效、支持QoS(服务质量)等级以保证消息可靠传输,并且通过MQTT服务器(Broker)简化了设备间的通信架构‌。

  • CoAP:专为资源受限的物联网设备设计,基于UDP协议构建。它模仿HTTP的RESTful交互模型,但设计更为精简,适合低功耗、低内存的传感器网络。CoAP使用无状态请求响应机制,不维持长连接,但在需要实时数据推送时,可以通过观察者模式实现类似发布/订阅的效果‌。

  • HTTP:在物联网场景中,传统的HTTP协议通过结合REST(Representational State Transfer)架构风格,提供了一种易于实施且广泛应用的服务调用方式。虽然HTTP在功耗和带宽效率上不如MQTT和CoAP,但它具有广泛的兼容性和成熟的生态系统,在云端接口和设备管理中扮演重要角色‌。

  • Zigbee:是一种短距离、低功耗无线网络标准,主要应用于家庭自动化、工业控制等领域。它基于IEEE 802.15.4标准,支持自组网、多跳路由及安全性较高的无线通信。Zigbee联盟制定了统一的应用层规范,确保不同厂商设备间的互操作性‌。

  • LoRaWAN:它并非严格意义上的物联网通信协议,而是低功耗广域网络(LPWAN)的标准之一。它采用Chirp Spread Spectrum调制技术,提供远距离、低功耗的无线通信服务,特别适用于大规模部署传感器网络,如智慧城市、农业监控等场景‌。

  • NB-IoT:是3GPP标准化的蜂窝物联网技术,旨在利用现有LTE网络提供窄带物联网连接。它适用于对数据传输速率要求不高的场景,如智能水表、智能停车等‌。

本文是站在 Android 开发者角度主要介绍MQTT协议。

MQTT特点

  • 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
  • 对负载内容屏蔽的消息传输。
  • 使用 TCP/IP 提供网络连接。
  • 有三种消息发布服务质量:
    qos为0:“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
    qos为1:“至少一次”,确保消息到达,但消息重复可能会发生。这一级别可用于如下情况,你需要获得每一条消息,并且消息重复发送对你的使用场景无影响。
    qos为2:“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
  • 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

MQTT协议和HTTP协议区别

特性MQTT 协议HTTP 协议
传输层TCPTCP 或 UDP
分发模型发布 - 订阅模型请求 - 响应模型
分发关系1 对 0/1/N1 对 1
数据安全使用 SSL/TLS不一定采用 HTTPS
加密应用层对有效载荷加密不在应用层加密
消息头大小较小较大

Android中MQTT的使用

在module的build.gradle文件中添加依赖

1
2
3
4
5
6
7
8
9
repositories {
maven {
url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
}
}
dependencies {
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.4'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}

在 AndroidManifest.xml 添加所需限权、服务

1
2
3
4
5
6
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.WAKE_LOCK" />

<service android:name="org.eclipse.paho.android.service.MqttService" /> <!--MqttService-->
<service android:name="com.dongyk.service.MyMqttService"/>

MQTT服务——MyMqttService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package com.demo.mqttandroidclient;

import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;
import android.os.Build;
import android.os.Handler;
import android.os.IBinder;
import android.support.annotation.Nullable;
import android.support.annotation.RequiresApi;
import android.util.Log;
import android.widget.Toast;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
* Desc ${MQTT服务},Android中使用MQTT最主要的就是connect、publish、subscribe、disconnect这几个方法
*/

public class MyMqttService extends Service {

public final String TAG = MyMqttService.class.getSimpleName();
private static MqttAndroidClient mqttAndroidClient;
private MqttConnectOptions mMqttConnectOptions;
public String HOST = "tcp://192.168.0.102:61613";//服务器地址(协议+地址+端口号)
public String USERNAME = "admin";//用户名
public String PASSWORD = "password";//密码
public static String PUBLISH_TOPIC = "tourist_enter";//发布主题
public static String RESPONSE_TOPIC = "message_arrived";//响应主题
@RequiresApi(api = 26)
public String CLIENTID = Build.VERSION.SDK_INT >= Build.VERSION_CODES.O
? Build.getSerial() : Build.SERIAL;//客户端ID,一般以客户端唯一标识符表示,这里用设备序列号表示

@Override
public int onStartCommand(Intent intent, int flags, int startId) {
init();
return super.onStartCommand(intent, flags, startId);
}

@Nullable
@Override
public IBinder onBind(Intent intent) {
return null;
}

/**
* 开启服务
*/
public static void startService(Context mContext) {
mContext.startService(new Intent(mContext, MyMqttService.class));
}

/**
* 发布 (模拟其他客户端发布消息)
*
* @param message 消息
*/
public static void publish(String message) {
String topic = PUBLISH_TOPIC;
Integer qos = 2;
Boolean retained = false;
try {
//参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息
mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
} catch (MqttException e) {
e.printStackTrace();
}
}

/**
* 响应 (收到其他客户端的消息后,响应给对方告知消息已到达或者消息有问题等)
*
* @param message 消息
*/
public void response(String message) {
String topic = RESPONSE_TOPIC;
Integer qos = 2;
Boolean retained = false;
try {
//参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息
mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
} catch (MqttException e) {
e.printStackTrace();
}
}

/**
* 初始化
*/
private void init() {
String serverURI = HOST; //服务器地址(协议+地址+端口号)
mqttAndroidClient = new MqttAndroidClient(this, serverURI, CLIENTID);
mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调
mMqttConnectOptions = new MqttConnectOptions();
mMqttConnectOptions.setCleanSession(true); //设置是否清除缓存
mMqttConnectOptions.setConnectionTimeout(10); //设置超时时间,单位:秒
mMqttConnectOptions.setKeepAliveInterval(20); //设置心跳包发送间隔,单位:秒
mMqttConnectOptions.setUserName(USERNAME); //设置用户名
mMqttConnectOptions.setPassword(PASSWORD.toCharArray()); //设置密码

// last will message
boolean doConnect = true;
String message = "{\"terminal_uid\":\"" + CLIENTID + "\"}";
String topic = PUBLISH_TOPIC;
Integer qos = 2;
Boolean retained = false;
if ((!message.equals("")) || (!topic.equals(""))) {
// 最后的遗嘱
try {
mMqttConnectOptions.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
} catch (Exception e) {
Log.i(TAG, "Exception Occured", e);
doConnect = false;
iMqttActionListener.onFailure(null, e);
}
}
if (doConnect) {
doClientConnection();
}
}

/**
* 连接MQTT服务器
*/
private void doClientConnection() {
if (!mqttAndroidClient.isConnected() && isConnectIsNomarl()) {
try {
mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener);
} catch (MqttException e) {
e.printStackTrace();
}
}
}

/**
* 判断网络是否连接
*/
private boolean isConnectIsNomarl() {
ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo info = connectivityManager.getActiveNetworkInfo();
if (info != null && info.isAvailable()) {
String name = info.getTypeName();
Log.i(TAG, "当前网络名称:" + name);
return true;
} else {
Log.i(TAG, "没有可用网络");
/*没有可用网络的时候,延迟3秒再尝试重连*/
new Handler().postDelayed(new Runnable() {
@Override
public void run() {
doClientConnection();
}
}, 3000);
return false;
}
}

//MQTT是否连接成功的监听
private IMqttActionListener iMqttActionListener = new IMqttActionListener() {

@Override
public void onSuccess(IMqttToken arg0) {
Log.i(TAG, "连接成功 ");
try {
mqttAndroidClient.subscribe(PUBLISH_TOPIC, 2);//订阅主题,参数:主题、服务质量
} catch (MqttException e) {
e.printStackTrace();
}
}

@Override
public void onFailure(IMqttToken arg0, Throwable arg1) {
arg1.printStackTrace();
Log.i(TAG, "连接失败 ");
doClientConnection();//连接失败,重连(可关闭服务器进行模拟)
}
};

//订阅主题的回调
private MqttCallback mqttCallback = new MqttCallback() {

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.i(TAG, "收到消息: " + new String(message.getPayload()));
//收到消息,这里弹出Toast表示。如果需要更新UI,可以使用广播或者EventBus进行发送
Toast.makeText(getApplicationContext(), "messageArrived: " + new String(message.getPayload()), Toast.LENGTH_LONG).show();
//收到其他客户端的消息后,响应给对方告知消息已到达或者消息有问题等
response("message arrived");
}

@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {

}

@Override
public void connectionLost(Throwable arg0) {
Log.i(TAG, "连接断开 ");
doClientConnection();//连接断开,重连
}
};

@Override
public void onDestroy() {
try {
mqttAndroidClient.disconnect(); //断开连接
} catch (MqttException e) {
e.printStackTrace();
}
super.onDestroy();
}
}

在MainActivity中开启服务

1
2
3
4
5
6
7
8
9
public class MainActivity extends AppCompatActivity {

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
MyMqttService.startService(this); //开启服务
}
}

需要注意的是,如果你的项目是基于AndroidX开发,那么还需要额外添加下localbroadcastmanager的依赖,并且在gradle.properties文件中添加android.enableJetifier=true配置,这是因为org.eclipse.paho.android.service中引用了v4的localbroadcastmanager。

性能优化

  • 网络传输优化‌
    ‌启用MQTT 5.0‌:支持 ‌会话恢复‌(Session Reconnection)和 ‌原因码‌(Reason Code),减少重复握手开销。
    ‌消息压缩‌:对JSON负载使用GZIP压缩(数据量降低50%~70%)。

  • 设备端资源优化‌
    ‌心跳间隔调优‌:根据网络稳定性设置Keep Alive时间(建议30~120秒)。
    ‌遗嘱消息精简‌:仅传递关键状态(如offline),避免大报文占用带宽。

  • 云端扩展性设计
    ‌共享订阅(Shared Subscription)‌:在MQTT 5.0中实现负载均衡,支持百万级设备并发。
    ‌水平扩展Broker‌:使用EMQX集群或AWS IoT Core自动扩缩容。

安全实践

  • 传输层安全‌
    强制使用 TLS加密‌(端口8883),禁用明文通信(1883)。
    设备端预置CA证书,防止中间人攻击。

  • 权限控制
    ACL规则‌:限制设备只能发布/订阅特定主题(如device/${deviceId}/cmd)。
    ‌动态令牌‌:使用JWT令牌短期授权,定期刷新(如每小时更新)。

  • 固件安全
    OTA升级签名校验‌:使用RSA/ECC签名验证固件合法性。
    ‌安全启动(Secure Boot)‌:ESP32等硬件支持防止未授权固件运行。

典型问题与解决方案

问题场景 解决方案
设备频繁断线重连 优化Keep Alive时间,检查网络丢包率
高QoS导致消息堆积 降级为QoS 0或1,增加Broker处理线程
主题设计混乱导致性能下降 按功能分层命名(如region/device/type
设备证书泄露 启用证书吊销列表(CRL),实时拉取黑名单