前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ElasticSearch 6.x 学习笔记:28.Java API之文档批量操作

ElasticSearch 6.x 学习笔记:28.Java API之文档批量操作

作者头像
程裕强
发布2022-05-06 19:21:09
4180
发布2022-05-06 19:21:09
举报
文章被收录于专栏:大数据学习笔记

1、批量查询

https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.1/java-docs-multi-get.html

Multi Get API The multi get API allows to get a list of documents based on their index, type and id:

代码语言:javascript
复制
package cn.hadron;

import cn.hadron.es.ESUtil;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.client.transport.TransportClient;

public class MultiGetDemo {
    public static void main(String[] args) throws Exception{
        TransportClient client= ESUtil.getClient();
        MultiGetResponse mgResponse = client.prepareMultiGet()
                .add("index1","blog","1","2")
                .add("my-index","persion","1","2","2")
                .get();
        for(MultiGetItemResponse response:mgResponse){
            GetResponse rp=response.getResponse();
            if(rp!=null && rp.isExists()){
                System.out.println(rp.getSourceAsString());
            }
        }
    }
}
这里写图片描述
这里写图片描述
代码语言:javascript
复制
{"id":"1","title":"装饰模式","content":"动态地扩展一个对象的功能","postdate":"2018-02-03 14:38:10","url":"csdn.net/79239072"}
{"id":"2","title":"单例模式解读","content":"枚举单例模式可以防反射攻击。","postdate":"2018-02-03 19:27:00","url":"csdn.net/79247746"}
{
  "name":"张三",
  "age":27,
  "gender":"男",
  "salary":15000,
  "dep":"bigdata"
}

{
  "name":"李四",
  "age":26,
  "gender":"女",
  "salary":15000,
  "dep":"bigdata"
}

{
  "name":"李四",
  "age":26,
  "gender":"女",
  "salary":15000,
  "dep":"bigdata"
}

2、批量操作

https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.1/java-docs-bulk.html

代码语言:javascript
复制
package cn.hadron;

import cn.hadron.es.ESUtil;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;

import java.util.Date;

import static org.elasticsearch.common.xcontent.XContentFactory.*;
public class BulkDemo {
    public static void main(String[] args) throws Exception{
        TransportClient client= ESUtil.getClient();
        BulkRequestBuilder bulkRequest = client.prepareBulk();

        bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
                .setSource(jsonBuilder()
                        .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                        .endObject()
                )
        );
        bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
                .setSource(jsonBuilder()
                        .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                        .endObject()
                )
        );
        //批量执行
        BulkResponse bulkResponse = bulkRequest.get();
        System.out.println(bulkResponse.status());
        if (bulkResponse.hasFailures()) {
            // process failures by iterating through each bulk response item
            System.out.println("存在失败操作");
        }
    }
}
这里写图片描述
这里写图片描述

3、Bulk Processor(批量处理器)

https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.1/java-docs-bulk-processor.html

The BulkProcessor class offers a simple interface to flush bulk operations automatically based on the number or size of requests, or after a given period. BulkProcessor类提供了一个简单接口,可以根据请求的数量或大小自动刷新批量操作,也可以在给定的时间段之后自动刷新批量操作。

代码语言:javascript
复制
package cn.hadron;
import cn.hadron.es.ESUtil;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

import java.util.Date;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

public class BulkProcessorDemo {
    public static void main(String[] args) throws Exception{
        TransportClient client= ESUtil.getClient();
        BulkProcessor bulkProcessor = BulkProcessor.builder(
                client,
                new BulkProcessor.Listener() {
                    @Override
                    public void beforeBulk(long executionId,BulkRequest request) {
                        //设置bulk批处理的预备工作
                        System.out.println("请求数:"+request.numberOfActions());
                    }
                    @Override
                    public void afterBulk(long executionId,BulkRequest request,BulkResponse response) {
                        //设置bulk批处理的善后工作
                        if(!response.hasFailures()) {
                            System.out.println("执行成功!");
                        }else {
                            System.out.println("执行失败!");
                        }
                    }
                    @Override
                    public void afterBulk(long executionId,BulkRequest request,Throwable failure) {
                        //设置bulk批处理的异常处理工作
                        System.out.println(failure);
                    }
                })
                .setBulkActions(1000)//设置提交批处理操作的请求阀值数
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//设置提交批处理操作的请求大小阀值
                .setFlushInterval(TimeValue.timeValueSeconds(5))//设置刷新索引时间间隔
                .setConcurrentRequests(1)//设置并发处理线程个数
                //设置回滚策略,等待时间100ms,retry次数为3次
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();
        // Add your requests
        bulkProcessor.add(new DeleteRequest("twitter", "tweet", "1"));
        bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
        // 刷新所有请求
        bulkProcessor.flush();
        // 关闭bulkProcessor
        bulkProcessor.close();
        // 刷新索引
        client.admin().indices().prepareRefresh().get();
        // Now you can start searching!
        client.prepareSearch().get();
    }
}
这里写图片描述
这里写图片描述
代码语言:javascript
复制
GET twitter/_search
代码语言:javascript
复制
{
  "took": 6,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 0,
    "max_score": null,
    "hits": []
  }
}

4、查询删除

https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.1/java-docs-delete-by-query.html

代码语言:javascript
复制
package cn.hadron;

import cn.hadron.es.ESUtil;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;

public class DeleteByQueryDemo {
    public static void main(String[] args){
        TransportClient client= ESUtil.getClient();
        BulkByScrollResponse response =DeleteByQueryAction.INSTANCE
                .newRequestBuilder(client)
                .filter(QueryBuilders.matchQuery("title", "模式"))
                .source("index1")//设置索引名称
                .get();
        //被删除文档数目
        long deleted = response.getDeleted();
        System.out.println(deleted);
    }
}

执行结果

代码语言:javascript
复制
no modules loaded
loaded plugin [org.elasticsearch.index.reindex.ReindexPlugin]
loaded plugin [org.elasticsearch.join.ParentJoinPlugin]
loaded plugin [org.elasticsearch.percolator.PercolatorPlugin]
loaded plugin [org.elasticsearch.script.mustache.MustachePlugin]
loaded plugin [org.elasticsearch.transport.Netty4Plugin]
2

Process finished with exit code 0
代码语言:javascript
复制
GET index1/_search
代码语言:javascript
复制
{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 0,
    "max_score": null,
    "hits": []
  }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-02-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、批量查询
  • 2、批量操作
  • 3、Bulk Processor(批量处理器)
  • 4、查询删除
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档