首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >GeoWave导入矢量数据到HBase/Accumulo数据库

GeoWave导入矢量数据到HBase/Accumulo数据库

原创
作者头像
GIS小甲鱼
修改2024-12-13 15:35:48
修改2024-12-13 15:35:48
2310
举报

前言

最近在做有关地理时空大数据的实验,本文将介绍如何利用geowave框架,将矢量数据导入到HBase或Accumulo等NoSQL数据库中。

软件版本:

Hadoop: 2.10.2

Zookeeper: 3.6.4

geowave: 1.2.0

Accumulo:1.9.3

HBase: 1.4.0

Java: 1.8

准备工作

从GeoWave官网下载geowave-hbase-1.2.0-apache.jar导入到HBase的lib文件夹下。(Accumulo数据库导入geowave-accumulo-1.2.0-apache-accumulo1.7.jar包)

代码

1、引入依赖

代码语言:javascript
复制
<dependency>
    <groupId>org.locationtech.geowave</groupId>
    <artifactId>geowave-datastore-hbase</artifactId>
    <version>1.2.0</version>
</dependency>
<dependency>
    <groupId>org.locationtech.geowave</groupId>
    <artifactId>geowave-adapter-vector</artifactId>
    <version>1.2.0</version>
</dependency>
<dependency>
    <groupId>org.locationtech.geowave</groupId>
    <artifactId>geowave-format-vector</artifactId>
    <version>1.2.0</version>
</dependency>
<dependency>
    <groupId>org.locationtech.geowave</groupId>
    <artifactId>geowave-datastore-accumulo</artifactId>
    <version>1.2.0</version>
</dependency>
<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>4.6.10</version>
</dependency>
<dependency>
    <groupId>org.apache.accumulo</groupId>
    <artifactId>accumulo-core</artifactId>
    <version>1.9.3</version>
</dependency>

2、HBase数据库导入矢量数据

本文选取AIS数据,文件为JSON格式如下:

首先需要写代码构建SimpleFeatureTypeBuilder对象,该对象用于创建SimpleFeatureType,其实 用于定义你的矢量数据的Geometry类型,有哪些属性字段。

代码语言:javascript
复制
public static SimpleFeatureTypeBuilder getSimpleFeatureBuilder (String typeName) throws IOException, JSONException {

        // 创建 SimpleFeatureTypeBuilder 对象
        SimpleFeatureTypeBuilder featureTypeBuilder = new SimpleFeatureTypeBuilder();
        AttributeTypeBuilder attributeBuilder = new AttributeTypeBuilder();

        featureTypeBuilder.add(attributeBuilder.binding(Point.class).nillable(false).buildDescriptor("the_geom"));
        // 添加属性
        featureTypeBuilder.add("uuid", String.class);
        featureTypeBuilder.add("mmsi", Integer.class);
        featureTypeBuilder.add("timestamp", Date.class);
        featureTypeBuilder.add("system_timestamp", Date.class);
        featureTypeBuilder.add("nav_status", Integer.class);
        featureTypeBuilder.add("rot", Double.class);
        featureTypeBuilder.add("sog", Double.class);
        featureTypeBuilder.add("pos_acc", Double.class);
        featureTypeBuilder.add("longitude", Double.class);
        featureTypeBuilder.add("latitude", Double.class);
        featureTypeBuilder.add("cog", Double.class);
        featureTypeBuilder.add("true_head", Double.class);
        featureTypeBuilder.add("eta", String.class);
        featureTypeBuilder.add("destid", Integer.class);
        featureTypeBuilder.add("dest", String.class);
        featureTypeBuilder.add("srcid", Integer.class);
        featureTypeBuilder.add("distance", Double.class);
        featureTypeBuilder.add("speed", Double.class);
        featureTypeBuilder.add("draught", Double.class);
        featureTypeBuilder.add("ship_type", Integer.class);
        featureTypeBuilder.setCRS(DefaultGeographicCRS.WGS84);
        featureTypeBuilder.setName(typeName);

        return featureTypeBuilder;
    }

导入数据,这里的typeName参数是定义矢量数据名称,indexName参数定义索引名称

代码语言:javascript
复制
public static void IngestData(String typeName, String indexName) throws
            IOException, JSONException, ParseException {
        HBaseRequiredOptions hBaseRequiredOptions = new HBaseRequiredOptions();
        //Zookeeper IP
        hBaseRequiredOptions.setZookeeper("localhost:2181");
        HBaseDataStore hBaseDataStore = (HBaseDataStore) 
                    DataStoreFactory.createDataStore(hBaseRequiredOptions);
        // JSON 数据 文件路径
        String filePath = "D:\\轨迹数据\\5h.json";

        long startTimeStamp = System.currentTimeMillis();

        // 转换 JSON 文件为 SimpleFeatureType
        SimpleFeatureTypeBuilder featureTypeBuilder = getSimpleFeatureBuilder(typeName);
        SimpleFeatureType pointType = featureTypeBuilder.buildFeatureType();

        //创建SimpleFeatureBuilder
        SimpleFeatureBuilder pointFeatureBuilder = new SimpleFeatureBuilder(pointType);

        // Create an adapter for point type
        FeatureDataAdapter pointTypeAdapter = new FeatureDataAdapter(pointType);

        //创建索引
        Index spatialTemporalIndex = new SpatialTemporalIndexBuilder()
                .setMaxDuplicates(-1)
                .setNumPartitions(3)
                .setPeriodicity(TemporalBinningStrategy.Unit.DAY)
                .setPartitionStrategy(IndexPluginOptions.PartitionStrategy.HASH)
                .setName(indexName).createIndex();

        //Add the point type to the data store in the spatial index
        hbaseStore.addType(pointTypeAdapter, spatialTemporalIndex);
        hbaseStore.getIndexStore().addIndex(spatialTemporalIndex);
        Writer<SimpleFeature> writer = hbaseStore.createWriter(pointTypeAdapter.getTypeName());

        // 读取 JSON 文件
        BufferedReader bufferedReader = new BufferedReader(new FileReader(filePath));
        String line;

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

        while ((line = bufferedReader.readLine()) != null) {
            JSONObject jsonObject = JSONObject.parseObject(line);
            // Write some features to the data store
            GeometryFactory factory = new GeometryFactory();

            String uuId = jsonObject.get("uuid").toString();
            String mmsi = jsonObject.get("mmsi").toString();

            pointFeatureBuilder.set("the_geom", factory.createPoint(new Coordinate(Double.parseDouble(jsonObject.get("longitude").toString()),
                    Double.parseDouble(jsonObject.get("latitude").toString()))));

            pointFeatureBuilder.set("uuid", uuId);

            pointFeatureBuilder.set("mmsi", mmsi);

            String timestamp_str = jsonObject.get("timestamp").toString();
            pointFeatureBuilder.set("timestamp", sdf.parse(timestamp_str));

            String system_timestamp_str = jsonObject.get("system_timestamp").toString();
            pointFeatureBuilder.set("system_timestamp", sdf.parse(system_timestamp_str));

            pointFeatureBuilder.set("nav_status", jsonObject.get("nav_status"));
            pointFeatureBuilder.set("rot", jsonObject.get("rot"));
            pointFeatureBuilder.set("sog", jsonObject.get("sog"));
            pointFeatureBuilder.set("pos_acc", jsonObject.get("pos_acc"));
            pointFeatureBuilder.set("cog", jsonObject.get("cog"));
            pointFeatureBuilder.set("true_head", jsonObject.get("true_head"));
            pointFeatureBuilder.set("eta", jsonObject.get("eta"));
            pointFeatureBuilder.set("destid", jsonObject.get("destid"));
            pointFeatureBuilder.set("dest", jsonObject.get("dest"));
            pointFeatureBuilder.set("srcid", jsonObject.get("srcid"));
            pointFeatureBuilder.set("distance", jsonObject.get("distance"));
            pointFeatureBuilder.set("speed", jsonObject.get("speed"));
            pointFeatureBuilder.set("draught", jsonObject.get("draught"));
            pointFeatureBuilder.set("ship_type", jsonObject.get("ship_type"));
            //取UUID 前三位 + 后三位  + 船舶编号
            String id = uuId.substring(0, 3) + uuId.substring(uuId.length() -3) + "-" + mmsi;
            writer.write(pointFeatureBuilder.buildFeature(id));
        }
        System.out.println("ingest finished!!!!");
        Long endTimeStamp = System.currentTimeMillis();
        System.out.println("导入耗时:" + (endTimeStamp - startTimeStamp) + "毫秒");
        bufferedReader.close();
        writer.flush();
        writer.close();
    }

3、Accumulo数据库导入矢量数据

代码和上面基本相同,把HBaseDataStore换成AccumuloDataStore即可。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 软件版本:
  • 准备工作
  • 代码
    • 1、引入依赖
    • 2、HBase数据库导入矢量数据
    • 3、Accumulo数据库导入矢量数据
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档