前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >物联网数据库 IoTDB —— 从协议到数据

物联网数据库 IoTDB —— 从协议到数据

作者头像
麒思妙想
发布2021-06-15 16:08:30
1.5K0
发布2021-06-15 16:08:30
举报
文章被收录于专栏:麒思妙想

在这个系列之前的文章里,我们介绍了Iotdb的LSM,以及Iot中的最佳实践,这次我们看看如何将mqtt和Iotdb整合起来。下面我们开始:

iotdb in docker

首先,做一个测试环境,我现在越发喜欢docker 和 WSL 了,除了吃点硬盘,内存和CPU资源以外,没有什么缺点了......

run in docker

直接把该开的端口都打开,只是测试环境,我就没再挂目录。

docker run -d -p 6667:6667 -p 31999:31999 -p 8181:8181 -p 5555:5555 -p 1883:1883 apache/iotdb

等待一会,执行 docker ps 查看是否成功了

代码语言:javascript
复制
➜  ~ docker ps
CONTAINER ID   IMAGE          COMMAND                  CREATED        STATUS        PORTS                                                                                                                                                                                                                       NAMES
ad9b18f8bff3   apache/iotdb   "/iotdb/sbin/start-s…"   2 hours ago    Up 2 hours    0.0.0.0:1883->1883/tcp, :::1883->1883/tcp, 0.0.0.0:5555->5555/tcp, :::5555->5555/tcp, 0.0.0.0:6667->6667/tcp, :::6667->6667/tcp, 0.0.0.0:8181->8181/tcp, :::

初步的iotdb in docker 环境,我们就搞好了。接下来,开启mqtt服务。

开启 Mqtt 服务

进入iotdb的docker docker exec -it ad9b18f8bff3 /bin/bash

编辑配置文件vi iotdb/conf/iotdb-engine.properties

开启服务,根据自己的需要,配置ip和端口等。

代码语言:javascript
复制
####################
### MQTT Broker Configuration
####################

# whether to enable the mqtt service.
enable_mqtt_service=false   # 修改成 true , 代表开启 mqtt服务

# the mqtt service binding host.
mqtt_host=0.0.0.0 # ip

# the mqtt service binding port.
mqtt_port=1883  # 端口

# the handler pool size for handing the mqtt messages.
mqtt_handler_pool_size=1

# the mqtt message payload formatter.
mqtt_payload_formatter=json  # 数据格式

# max length of mqtt message in byte
mqtt_max_message_size=1048576

重启服务,如果不会,就重启docker镜像。

iotdb 基础操作

  • 启动服务: sbin/start-client.sh root@ad9b18f8bff3:/iotdb/sbin# ./start-cli.sh --------------------- Starting IoTDB Cli --------------------- _____ _________ ______ ______ |_ _| | _ _ ||_ _ `.|_ _ \ | | .--.|_/ | | \_| | | `. \ | |_) | | | / .'`\ \ | | | | | | | __'. _| |_| \__. | _| |_ _| |_.' /_| |__) | |_____|'.__.' |_____| |______.'|_______/ version 0.11.1 IoTDB> login successfully
  • 退出CLI: quitexit
  • 停止服务:$sbin/stop-server.sh
  • 设置一个存储组到IOTDB,名为root : IoTDB> SET STORAGE GROUP TO root
  • 查看当前IOTDB的存储组 : IoTDB> SHOW STORAGE GROUP IoTDB> SHOW STORAGE GROUP +-------------+ |storage group| +-------------+ | root.test| +-------------+ Total line number = 1 It costs 0.127s
  • 查看系统中存在的所有时间序列 :IoTDB> SHOW TIMESERIES IoTDB> show timeseries +-------------------------------+-----+-------------+--------+--------+-----------+----+----------+ | timeseries|alias|storage group|dataType|encoding|compression|tags|attributes| +-------------------------------+-----+-------------+--------+--------+-----------+----+----------+ |root.test.wf01.wt01.temperature| null| root.test| FLOAT| GORILLA| SNAPPY|null| null| | root.test.wf01.wt01.status| null| root.test| BOOLEAN| RLE| SNAPPY|null| null| | root.test.wf01.wt01.hardware| null| root.test| TEXT| PLAIN| SNAPPY|null| null| +-------------------------------+-----+-------------+--------+--------+-----------+----+----------+ Total line number = 3 It costs 0.009s
  • 查看系统中存在的特定时间序列: SHOW TIMESERIES root.test.wf01.wt01.status IoTDB> SHOW TIMESERIES root.test.wf01.wt01.status +--------------------------+-----+-------------+--------+--------+-----------+----+----------+ | timeseries|alias|storage group|dataType|encoding|compression|tags|attributes| +--------------------------+-----+-------------+--------+--------+-----------+----+----------+ |root.test.wf01.wt01.status| null| root.test| BOOLEAN| RLE| SNAPPY|null| null| +--------------------------+-----+-------------+--------+--------+-----------+----+----------+ Total line number = 1 It costs 0.003s
  • 插入数据 INSERT INTO root.test.wf01.wt01(timestamp,status,temperature) values(200,false,20.71) IoTDB> INSERT INTO root.test.wf01.wt01(timestamp,status,temperature) values(200,false,20.71) Msg: The statement is executed successfully.
  • 查看数据: select * from root.test; IoTDB> select * from root.test; +------------------------+-------------------------------+--------------------------+----------------------------+ | Time|root.test.wf01.wt01.temperature|root.test.wf01.wt01.status|root.test.wf01.wt01.hardware| +------------------------+-------------------------------+--------------------------+----------------------------+ |2021-01-20T02:00:00.000Z| 21.2| true| hello| +------------------------+-------------------------------+--------------------------+----------------------------+ Total line number = 1 It costs 0.077s
  • 查看设备:show devices IoTDB> show devices +-------------------+ | devices| +-------------------+ |root.test.wf01.wt01| +-------------------+ Total line number = 1 It costs 0.002s

mqtt to iotdb

代码

构建一个实体对象,用于存储

代码语言:javascript
复制
package wang.datahub.iotdb;

import com.google.gson.Gson;
import java.util.List;

public class IotdbVO {
    private String device;
    private long timestamp = System.currentTimeMillis();
    private List<String> measurements;
    private List<Object> values;

    public String getDevice() {
        return device;
    }

    public void setDevice(String device) {
        this.device = device;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public List<String> getMeasurements() {
        return measurements;
    }

    public void setMeasurements(List<String> measurements) {
        this.measurements = measurements;
    }

    public List<Object> getValues() {
        return values;
    }

    public void setValues(List<Object> values) {
        this.values = values;
    }

    public String toJson(){
        Gson g = new Gson();
        String jsonData = g.toJson(this);
        return jsonData;
    }


    @Override
    public String toString() {
        return "IotdbVO{" +
                "device='" + device + '\'' +
                ", timestamp=" + timestamp +
                ", measurements=" + measurements +
                ", values=" + values +
                '}';
    }
}

使用祖传的代码来模拟数据发射到iotdb,这里直接将mqtt的主机和端口,配置到前文所修改的iotdb的mqtt服务上,就大功告成了。

代码语言:javascript
复制
package wang.datahub.iotdb;

import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class EmmitToIotdb {

    public static void main(String[] args) {
        String[] hardwares = new String[]{
                "a1",
                "b1",
                "b2",
                "c3",
                "d1",
                "f5"
        };
        int count = 1000;
               
        for(int i = 0; i < count ;i++){
            IotdbVO iotdbVO = new IotdbVO();
            iotdbVO.setDevice("root.test.wf01.wt01");
            List<String> measurements = new ArrayList<>();
            List<Object> values = new ArrayList<>();
            measurements.add("temperature");
            measurements.add("status");
            measurements.add("hardware");

            Random r = new Random();
            values.add(r.nextInt(40));
            values.add(r.nextBoolean());
            values.add(hardwares[r.nextInt(hardwares.length)]);

            iotdbVO.setMeasurements(measurements);
            iotdbVO.setValues(values);
            emmitToIotdb(iotdbVO);
        }
    }

    public static void emmitToIotdb(IotdbVO content){
        try {
            MQTT mqtt = new MQTT();
            mqtt.setHost("127.0.0.1", 1883);
            mqtt.setUserName("root");
            mqtt.setPassword("root");

            BlockingConnection connection = mqtt.blockingConnection();
            connection.connect();

            String payload = content.toJson();
            connection.publish(content.getDevice(),payload.getBytes(), QoS.AT_LEAST_ONCE,false);

            connection.disconnect();
        } catch (Exception e){
            e.printStackTrace();
        }

    }


}

执行结果

iotdb,功能还是相当强大的,也非常有意思,希望本篇文章对你有所帮助,也非常欢迎您来与我交流。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-06-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 麒思妙想 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • iotdb in docker
    • run in docker
      • 开启 Mqtt 服务
        • iotdb 基础操作
        • mqtt to iotdb
          • 代码
            • 执行结果
            相关产品与服务
            容器镜像服务
            容器镜像服务(Tencent Container Registry,TCR)为您提供安全独享、高性能的容器镜像托管分发服务。您可同时在全球多个地域创建独享实例,以实现容器镜像的就近拉取,降低拉取时间,节约带宽成本。TCR 提供细颗粒度的权限管理及访问控制,保障您的数据安全。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档