最近开发的机器人操作系统 ROS 基于 Android,在里面做一些深度定制,其中运动控制与 Server 的交互需要双向通道,经过权衡和讨论我们最终选用 MQTT 作为长连接通信方案。
MQTT 的全称为 Message Queue Telemetry Transport(消息队列遥测传输协议),是 ISO 标准(ISO/IEC PRF 20922)下基于客户端-服务器的消息发布/订阅传输协议,目的是为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。目前在 Iot 应用广泛,主要有以下优点:
总结下来就是:简单易用开放。
为了满足低电量消耗和低网络带宽的需求,MQTT 协议在设计之初就包含了以下一些特点:
MQTT 的通信是通过发布/订阅的方式来实现的,订阅和发布又是基于主题(Topic)的。发布方和订阅方不直接进行连接,而是用到了一个中间方,它们是通过这种方式来进行解耦。两端通信的主要流程如下:
MQTT 还支持离线消息,发布方在发布消息时并不需要订阅方也连接到 Broker,只要订阅方之前订阅过相应主题,那么它在连接到 Broker 之后就可以收到发布方在它离线期间发布的消息。
MQTT 协议数据包的消息格式为:固定头|可变头|消息体。
具体的结构说明可以参考官方文档,这里截图两个 MQTT 协议传输时数据内容:
在我们了解 MQTT 开源源码或者自己实现 MQTT 协议的库时再着重研究协议具体内容。
我们可以使用 mosquitto 快速搭建 mqtt server。在 ubuntu 上直接执行sudo apt-get install mosquitto
,在 Mac 上执行sudo brew install mosquitto
即可。
具体配置操作以 ubuntu 为例:
sudo service mosquitto statussudo service mosquitto startsudo service mosquitto stop
conf 配置文件 mosquitto.conf:
pid_file /var/run/mosquitto.pid
# 消息持久存储
persistence true
persistence_location /var/lib/mosquitto/
# 日志文件
log_dest file /var/log/mosquitto/mosquitto.log
# 其他配置
include_dir /etc/mosquitto/conf.d
# 禁止匿名访问
allow_anonymous false
# 认证配置
password_file /etc/mosquitto/pwfile
# 权限配置
acl_file /etc/mosquitto/aclfile
mosquitto -c /etc/mosquitto/mosquitto.conf -d
安装 mosquitto 配套客户端文件:
sudo apt-get install mosquitto-clients
mosquitto-client 包含 pub 和 sub 两个命令工具,对应参数说明:
mosquitto_pub 命令参数说明:
mosquitto_sub 命令参数说明:
示例:
mosquitto_pub -h localhost -p 1883 -t "demo/1" -m "test"
mosquitto_sub -h localhost -p 1883 -t "demo/1"
直接运行上面命令既可完成发布与订阅操作。
上面是 mosquitto 提供的现成的 PC 命令工具,我们要在 Android 平台上跑通客户端需要用 Java 和 C++实现一套,eclipse 也提供了 C++/C 以及 Java 版本的库,我们直接编译到 Android 平台即可。已 C 库为例:
下载https://github.com/eclipse/paho.mqtt.c源码,创建 AndroidStudio 工程后将下载的源码拷贝到 cpp 目录下,编写 CMakelist.txt 脚本:
make_minimum_required(VERSION 3.4.1)
set(CMAKE_INSTALL_PREFIX "${CMAKE_BINARY_DIR}" CACHE PATH "Installation directory" FORCE)
message(STATUS "CMAKE_INSTALL_PREFIX=${CMAKE_INSTALL_PREFIX}")
project(mqtt)
SET(VERSION 0.0.1)
add_definitions(-w)
#file(READ version.major PAHO_VERSION_MAJOR)
#file(READ version.minor PAHO_VERSION_MINOR)
#file(READ version.patch PAHO_VERSION_PATCH)
SET(CLIENT_VERSION ${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH})
STRING(TIMESTAMP BUILD_TIMESTAMP UTC)
MESSAGE(STATUS "Timestamp is ${BUILD_TIMESTAMP}")
## build options
SET(PAHO_WITH_SSL FALSE CACHE BOOL "Flag that defines whether to build ssl-enabled binaries too. ")
SET(PAHO_BUILD_SHARED TRUE CACHE BOOL "Build shared library")
SET(PAHO_BUILD_STATIC FALSE CACHE BOOL "Build static library")
SET(PAHO_BUILD_DOCUMENTATION FALSE CACHE BOOL "Create and install the HTML based API documentation (requires Doxygen)")
SET(PAHO_BUILD_SAMPLES FALSE CACHE BOOL "Build sample programs")
SET(PAHO_BUILD_DEB_PACKAGE FALSE CACHE BOOL "Build debian package")
SET(PAHO_ENABLE_TESTING TRUE CACHE BOOL "Build tests and run")
SET(PAHO_ENABLE_CPACK TRUE CACHE BOOL "Enable CPack")
SET(PAHO_HIGH_PERFORMANCE FALSE CACHE BOOL "Disable tracing and heap tracking")
SET(PAHO_USE_SELECT FALSE CACHE BOOL "Revert to select system call instead of poll")
IF (PAHO_HIGH_PERFORMANCE)
ADD_DEFINITIONS(-DHIGH_PERFORMANCE=1)
ENDIF()
IF (PAHO_USE_SELECT)
ADD_DEFINITIONS(-DUSE_SELECT=1)
ENDIF()
IF (NOT PAHO_BUILD_SHARED AND NOT PAHO_BUILD_STATIC)
MESSAGE(FATAL_ERROR "You must set either PAHO_BUILD_SHARED, PAHO_BUILD_STATIC, or both")
ENDIF()
SET(ROOT ${CMAKE_SOURCE_DIR})
SET(BUILD_PATH "${CMAKE_BINARY_DIR}")
MESSAGE(STATUS "current project name is ${PROJECTNAME}")
SET(CMAKE_BUILD_TYPE "DEBUG")
#SET(CMAKE_BUILD_TYPE "RELEASE")
set(SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src/main/cpp/paho.mqtt.c/src)
set(JNI_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src/main/cpp)
find_library(log-lib log)
include_directories(${SOURCE_DIR})
include_directories(${JNI_SOURCE_DIR})
set(common_src
${SOURCE_DIR}/MQTTTime.c
${SOURCE_DIR}/MQTTProtocolClient.c
${SOURCE_DIR}/Clients.c
${SOURCE_DIR}/utf-8.c
${SOURCE_DIR}/MQTTPacket.c
${SOURCE_DIR}/MQTTPacketOut.c
${SOURCE_DIR}/Messages.c
${SOURCE_DIR}/Tree.c
${SOURCE_DIR}/Socket.c
${SOURCE_DIR}/Log.c
${SOURCE_DIR}/MQTTPersistence.c
${SOURCE_DIR}/Thread.c
${SOURCE_DIR}/MQTTProtocolOut.c
${SOURCE_DIR}/MQTTPersistenceDefault.c
${SOURCE_DIR}/SocketBuffer.c
${SOURCE_DIR}/LinkedList.c
${SOURCE_DIR}/MQTTProperties.c
${SOURCE_DIR}/MQTTReasonCodes.c
${SOURCE_DIR}/Base64.c
${SOURCE_DIR}/SHA1.c
${SOURCE_DIR}/WebSocket.c
${SOURCE_DIR}/Proxy.c
${SOURCE_DIR}/MQTTClient.c
${JNI_SOURCE_DIR}/JNI_OnLoad.cc
${JNI_SOURCE_DIR}/jni_utils.cc
${JNI_SOURCE_DIR}/keutil.cc
)
IF (NOT PAHO_HIGH_PERFORMANCE)
SET(common_src ${common_src}
${SOURCE_DIR}/StackTrace.c
${SOURCE_DIR}/Heap.c
)
ENDIF()
SET(LIBS_SYSTEM c dl)
set(SELF_LIB_NAME kemqtt)
add_library(${SELF_LIB_NAME} SHARED ${common_src})
target_link_libraries(${SELF_LIB_NAME}
${log-lib}
)
封装 JNI 接口进行 Pub:
static jlong
_start(JNIEnv *env, jclass cls) {
LOGI("start...");
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
conn_opts.username = "A1_TEST_TOKEN";
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
int rc;
if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
{
LOGI("Failed to create client, return code %d\n", rc);
return -1;
}
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
LOGI("Failed to connect, return code %d\n", rc);
return -2;
}
pubmsg.payload = (void *)PAYLOAD;
pubmsg.payloadlen = (int)strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
if ((rc = MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token)) != MQTTCLIENT_SUCCESS)
{
LOGI("Failed to publish message, return code %d\n", rc);
return -3;
}
LOGI("Waiting for up to %d seconds for publication of %s\n"
"on topic %s for client with ClientID: %s\n",
(int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
LOGI("Message with delivery token %d delivered\n", token);
if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS)
LOGI("Failed to disconnect, return code %d\n", rc);
MQTTClient_destroy(&client);
return (jlong) 0;
}
这里直接改写官方 demo,最开始怎么也连不上,连接错误返回-1,最后发现是测试机好久没用,没有联网了,浪费了好长时间去查代码问题;网络修复后又遇到返回错误 5 的问题,是因为 server 开启了用户验证,所以需要配置 username,配置好后成功完成了发布。
本文介绍了 MQTT 和 MQTT 协议的数据包结构,并且介绍了 MQTT 开发环境的搭建,都比较粗浅,后续文章深入分析 MQTT 协议内容以及 MQTT Java 和 C 版本代码实现细节,从代码角度分析 MQTT 协议的优点,并完成实现 MQTT 的 Android 版本交互以及与 Protbuf 的结合等。
领取专属 10元无门槛券
私享最新 技术干货