前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark读写ES最佳实践

Spark读写ES最佳实践

原创
作者头像
沈小翊
发布2023-11-14 19:31:44
6150
发布2023-11-14 19:31:44
举报
文章被收录于专栏:大数据生态

本文介绍了Spark local模式下读写ES的2种方式

Spark RDD读写ES

Spark Streaming写入ES

环境准备

Elaticsearch-7.14.2

Spark-3.2.1

jdk-1.8

maven依赖

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.qcloud.abi</groupId>
    <artifactId>esspark</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
                <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.14.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.8</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.12.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-30_2.12</artifactId>
            <version>7.14.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-network-common_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <!--指定入口文件的位置-->
                            <mainClass>com.xx.TestMain</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>
                            jar-with-dependencies
                        </descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Spark RDD读ES

代码语言:javascript
复制
public class ReadES {
    public static void main(String[] args) {
        SparkConf  conf = new SparkConf().setAppName("readEs").setMaster("local[2]")
                .set("es.nodes", "https://es-jnycbqnd.public.tencentelasticsearch.com")
                .set("es.port", "9200")
                .set("es.net.http.auth.user", "elastic")
                .set("es.net.http.auth.pass", "passwd")
                .set("es.nodes.wan.only", "true")
                .set("es.nodes.discovery","false")
                .set("es.input.use.sliced.partitions","false")
                .set("es.resource", "spark_write")
                .set("es.scroll.size","500");

        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc);

        for ( Map<String, Object> item : rdd.values().collect()) {
            System.out.println(item);
        }

        sc.stop();
    }

}

Spark读写ES还支持JSON格式

代码语言:javascript
复制
//直接读
JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc);
//ES嵌套数据格式
{test={data=39.0, feature1=1.39, feature2=0.78, feature3=-0.83}}

//选择JSON格式
JavaPairRDD<String, String> rdd = JavaEsSpark.esJsonRDD(sc);
//JSON数据格式
{"test":{"data":50.0,"feature1":1.5,"feature2":1.0,"feature3":-0.5}}

Spark RDD写ES

代码语言:javascript
复制
public class SparkWriteEs {
    public static void main(String[] args) {
        //RDD方式写数据到ES
        SparkConf conf = new SparkConf().setAppName("my-app").setMaster("local[2]")
                .set("es.nodes", "https://es-jnycbqnd.public.tencentelasticsearch.com")
                .set("es.port", "9200")
                .set("es.net.http.auth.user", "elastic")
                .set("es.net.http.auth.pass", "passwd")
                .set("es.nodes.wan.only", "true")
                .set("es.resource", "spark_write/_doc")
                .set("es.nodes.discovery","false")
                .set("es.input.use.sliced.partitions","false")
                .set("es.scroll.size","500");

        JavaSparkContext sc = new JavaSparkContext(conf);

        Map<String, ?> logs = ImmutableMap.of("yesyes", "255.255.255.254",
                "request", "POST /write/using_spark_rdd HTTP/1.1",
                "status", 200,"size", 802,
                "@timestamp", 895435190);

        List<Map<String, ?>> list = ImmutableList.of(logs);

        JavaRDD<Map<String, ?>> javaRDD = sc.parallelize(list);

        JavaEsSpark.saveToEs(javaRDD, "spark_write/_doc");

        sc.stop();
    }
}

Spark Streaming消费kafka数据写入ES

代码语言:javascript
复制
public class RealTime_Data {
    public static void main(String[] args) throws Exception {
        	 String master = "local[2]";

        SparkConf conf = new SparkConf().setMaster(master).setAppName("StreamingTest")
                .set("spark.es.nodes", "43.139.24.126")//指定es地址
                .set("spark.es.port", "9200")
                .set("spark.es.nodes.wan.only","true");//指定es端口
        //指定5秒获取一次kafka数据
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
        jssc.sparkContext().setLogLevel("WARN");
        String brokers = "43.139.24.126:9092";
        String groupId = "kafka";//消费者组id
        String topics = "test";//topic
        Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        //取出1秒内的数据转成rddstream
        JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(jssc,
                LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

        //取出每条message中的value
        JavaDStream<String> lines = messages.map(record -> record.value());

        //拼成可以插入Elasticsearch的格式
        JavaDStream<String> out = lines.map(str -> "{\"test\":"+str+"}");

        //打印
        out.print();

        //写入Elasticsearch
        JavaEsSparkStreaming.saveJsonToEs(out, "/spark/doc");

        //启动streaming
        jssc.start();

        // 等待生产者发送数据
        jssc.awaitTermination();
        jssc.stop();
    }
}

也可以直接写入ES或者带上指定了数据结构的Map<String,String>

代码语言:javascript
复制
JavaEsSparkStreaming.saveToEs(JavaDStream , "<resource>");
JavaEsSparkStreaming.saveToEsWithMeta(JavaDStream, "spark/docs", Map<String,String>());

- 参数说明

参数

说明

es.nodes

Elasticsearch访问地址

es.port

ES访问端口号9200

es.net.http.auth.user

ES用户名

es.net.http.auth.pass

ES用户密码

es.nodes.wan.only

是否进行节点嗅探

es.nodes.discovery

是否禁用节点发现

es.index.auto.create

自动创建index开关

es.resource

指定要读写的index和type

es.mapping.names

表字段与Elasticsearch的索引字段名映射

es.input.use.sliced.partitions

是否开启slice分区

本地运行

打包

更换代码中公网ip为内网ip,选择maven assembly plugin进行打包,上传带依赖的jar包到EMR上,运行"ReadES"

代码语言:javascript
复制
su - hadoop
cd /usr/local/service/spark

./bin/spark-submit  --master yarn --executor-cores 1 --class "ReadES"  /home/hadoop/esspark-1.0-SNAPSHOT-jar-with-dependencies.jar

运行"SparkWriteEs"

代码语言:javascript
复制
./bin/spark-submit  --master yarn --executor-cores 1 --class "SparkWriteEs"  /home/hadoop/esspark-1.0-SNAPSHOT-jar-with-dependencies.jar

kibana上查询数据

代码语言:javascript
复制
GET SparkWriteEs/_search

问题总结

1. 打包项目后上传运行报错找不到类

代码语言:javascript
复制
Exception in thread "main" java.lang.NoClassDefFoundError: org/elasticsearch/spark/rdd/api/java/JavaEsSpark...

分析

显示缺少ESspark依赖,说明是因为打包没有带上依赖导致代码运行错误

解决方式

使用assembly打包,上传带依赖jar包

2. 客户端直接访问发生连接问题

代码语言:javascript
复制
Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'...

分析:

ES公网地址可直接访问,用户名密码参数有填写,'es.nodes.wan.only'参数填写没问题还是出现了与ES的连接问题,索引都没有创建,说明参数配置或者依赖包版本可能存在问题。

解决方式

ES.resource参数配置问题,未填写type 正确示例: "spark_write/_doc"

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 本文介绍了Spark local模式下读写ES的2种方式
  • 环境准备
  • maven依赖
  • Spark RDD读ES
  • Spark RDD写ES
  • Spark Streaming消费kafka数据写入ES
  • - 参数说明
  • 本地运行
    • 打包
    • 问题总结
    相关产品与服务
    Elasticsearch Service
    腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档