前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >用java程序完成从kafka队列读取消息到sparkstreaming再从sparkstreaming里把数据导入mysql中

用java程序完成从kafka队列读取消息到sparkstreaming再从sparkstreaming里把数据导入mysql中

作者头像
gzq大数据
发布2020-11-26 16:39:53
9620
发布2020-11-26 16:39:53
举报
文章被收录于专栏:大数据那些事

有一段时间没好好写博客了,因为一直在做一个比较小型的工程项目,也常常用在企业里,就是将流式数据处理收集,再将这些流式数据进行一些计算以后再保存在mysql上,这是一套比较完整的流程,并且可以从数据库中的数据再导入到hadoop上,再在hadoop上进行离线较慢的mapreduce计算,这是我后面要进行的项目。

项目准备环境

(1)zookeeper:

(2)spark

(3)kafka

(4)mysql

(5)navicat

(6)三台虚拟机

(7)jdk

(8)intellij IDEA

(9)虚拟机vmware

虚拟机分别配置

虚拟机

安装环境

node01

kafka zookeeper jdk 192.168.19.110

node02

kafka zookeeper jdk spark 192.168.19.111

node03

kafka zookeeper jdk mysql 192.168.19.112

具体的虚拟机的细节配置就不多说了,肯定是要关闭防火墙的。

开始实行

(1)分别在三台主机上开启zookeeper(zookeeper的集群配置可以看我这篇博客zookeeper的安装和使用

(2)分别在三台主机上开启kafka

(3)开启产生消息队列命令(前提创建好topic:spark(我这里是spark话题))

(4)在node3上开启mysql

在mysql地下创建bigdata数据库,进入数据库后新建wordcount表,创建相应字段即可

(5)将写好的代码打成jar包:

写代码时是要写scala语言,所以要加载好相应的插件:

代码语言:javascript
复制
package com.gzq.spark

import java.sql.DriverManager
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level,Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}




/**
 * @Auther: gzq
 * @Date: 2020/11/23 - 11 - 23 - 22:37 
 * @Description:
 */
object Sparkstream_kafka202020 {
  Logger.getLogger("org").setLevel(Level.WARN)
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark_stream")

    val ssc : StreamingContext = new StreamingContext(conf,Seconds(3))

    val kafkaParams: Map[String,Object] =  Map[String,Object](

      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node01:9092,node02:9092,node03:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "spark2020",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("spark"), kafkaParams)
    )
    kafkaDStream
      .map(_.value())
      .flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .print()
    kafkaDStream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
        val connection = createConnection()
        partitionOfRecords.foreach(record => {
        System.out.println(record)
        // wordcount里的record.value()一定要加双引号这样才能是字符串类型
          val sql = "insert into wordcount(word, wordcount) values(" + '"'+ record.value() + '"' + "," + record.offset() +");"
          connection.createStatement().execute(sql)
        })
        connection.close()
      })
    })



    ssc.start()
    ssc.awaitTermination()
  }
  def createConnection() = {
    Class.forName("com.mysql.jdbc.Driver")
    DriverManager.getConnection("jdbc:mysql://192.168.19.112:3306/bigdata", "root", "123456")
  }


}

maven依赖(可以根据自己的版本修改)

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.gzq.spark2020</groupId>
    <artifactId>spark2020</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
             <dependency>
                 <groupId>mysql</groupId>
                 <artifactId>mysql-connector-java</artifactId>
                 <version>5.1.1</version>
             </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

    </dependencies>



</project>

点击进去

选择自己的main

接下来apply ok

再点击

随后点击build即可:

输出在out目录下

将jar包上传到node02(有spark,直接本地运行)

输入上面的3条内容,可以看见node02上的输出:

查看数据库也输出了:

ps:踩过的坑

(1):

这行sql语句一定要注意。

因为我的word列定义的是varchar类型,所以必须传入的是字符串类型,lang.String,所以要在record.value()两侧加入双引号。

(2):

为什么我打jar包时没有用maven,是因为maven打出来jar包没有我写的主函数,所以在用spark执行时它会报错说找不到main函数的入口,找不到类,后来发现需要在pom文件中做相关的配置:

代码语言:javascript
复制
    <build>
        <finalName>WordCount</finalName>
        <plugins>
            <plugin>
            <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.gzq.spark._01.WordCount</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

(3):

在开启kafka时我发现开一会它就自动关闭,查看日志文件后发现我的kafka-logs文件出了问题,所以我将三台主机这个文件夹下的所有文件全部删除重启kafka成功

(4):

因为我的zookeeper是多集群模式,所以它的选举机制是必须要开启半数以上,所以开启zookeeper时要都开启,如果只开启了其中一台也会启动不起来。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/11/24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 项目准备环境
  • 开始实行
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档