Kafka实战练习
Kafka实战练习
实验预计耗时:60分钟
1. 课程背景
1.1 课程目的
消息队列Kafka是一个分布式、高吞吐量、高可扩展性的消息系统,其也因为强大的分布式流数据处理能力,被广泛用于大数据数据传递场景。Kafka基于发布/订阅模式,通过消息解耦,使生产者和消费者异步交互,无需彼此等待。Kafka具有数据压缩、同时支持离线和实时数据处理等优点,适用于日志压缩收集、监控数据聚合等场景。
通过此实验,您可以掌握Kafka基本功能的使用,掌握Kafka生产者与消费者的开发过程。
1.2 课前知识准备
学习本课程前,学员需要掌握以下前置知识:
1、能力基础
- Linux基本操作:掌握Linux远程登录、文件与目录管理、vim编辑器使用等。
- Java开发基础:掌握Java面向对象编程、Maven项目构建等。
2、相关技术
- Kafka:是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。消息队列 Kafka的架构图如下所示:
- 生产者Producer可能是网页活动产生的消息、服务日志等信息。生产者通过push模式将消息发布到 Cloud Kafka 的 Broker 集群。
- 集群通过ZooKeeper管理集群配置,进行leader选举,故障容错等。
- 消费者Consumer被划分为若干个Consumer Group。消费者通过pull模式从Broker中消费消息。
3、相关概念
- Kafka相关术语介绍:
- Broker:Kafka集群包含一个或多个服务器时,这种服务器被称为broker。
- Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。
- Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition。
- Producer:负责发布消息到Kafka broker。
- Consumer:消息消费者,向Kafka broker读取消息的客户端。
- Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
2. 实验环境
2.1 实验操作环境
本课程需要以下实验操作环境:
- 可以接入互联网的笔记本电脑或者台式机,本实验使用的本地计算机为Windows系统。
- 实验环境:本地计算机(具备Java开发环境+PuTTY)+腾讯云控制台。
- Java软件开发工具包JDK(版本:1.8)
- Maven(版本:3.5及以上)
- Eclipse或者IDEA,此实验采用IDEA作为开发工具
2.2 实验架构图
本实验将使用EMR三节点集群(Master节点和两个Core节点),使用PuTTY连接Master节点的方式访问集群。实验任务通过Presto和Hive的协同使用,让学员了解Presto的基本使用流程。实验架构图如下:
2.3 实验的数据规划表
资源名称 | 数据 | 说明 |
---|---|---|
腾讯云账号 | 账号:XXXXXXXX、密码:XXXXXXXX | 涉及产品如下:VPC、EMR |
PuTTY | 版本:0.73 |
3. 实验流程
实验共包含三个阶段的任务:
- 实验环境准备:需要在腾讯云云CVM上部署一个Kafka实例,步骤包括CVM实例购买,Java安装,ZooKeeper安装以及Kafka的安装与测试。
- 生产者与消费者代码开发:在体验过简单的Kafka基本功能后,我们将通过Java分别开发Producer和Consumer,用于在Kafka实例上运行,实现消息的生产与消费。
- 消息生产与消费:在Producer和Consumer打包成功后,我们将它们上传至云服务器,通过运行自己封装的Jar包实现Kafka基本功能的使用。
4. 实验步骤
任务1 实验环境准备
【任务目标】
在腾讯云通过云服务器CVM实例,快速安装Kafka,并在启动Kafka后,测试Kafka的基本功能。
【任务步骤】
1、云服务器购买
1.购买三台云服务器。登录腾讯云官网,进入云服务器主页:https://console.cloud.tencent.com/cvm/index
2.点击“新建“按钮购买云服务器
3.首先在选择机型页面,机型配置如下。
配置项 | 内容 |
---|---|
计费模式 | 按量计费或竞价实例 |
地域 | 广州 |
可用区 | 广州三区 |
网络 | 新建或使用已有VPC |
实例 | S5.MEDIUM4(标准型S5,2核4GB) |
镜像 | 公共镜像 CentOS 7.6 64位 |
系统盘 | 高性能云硬盘 50GB |
公网带宽 | 免费分配独立公网IP,按使用流量 |
确认无误后,点击下一步:设置主机。
4.设置主机页面,配置如下:
配置项 | 内容 |
---|---|
所属项目 | 默认项目 |
安全组 | 使用使用放通全部端口安全组 |
实例名称 | kafka |
登陆方式 | 设置密码(学员可根据自己习惯选择) |
注:如果没有新建安全组,可以通过安全组选项下方的“新建安全组”直接新建。
确认无误后,点击下一步:确认配置信息。
5.确认开通
服务器信息如下图所示,点击开通:
6.开通后,可以在云服务器实例列表中查看到实例信息。
2、安装Kafka环境
Kafka运行由Java语言编写,故运行依赖于JRE。Kafka依赖于ZooKpeer。因此本步骤安装Kafka之前,先要依次安装Java和ZooKeeper。
1.使用PuTTY登录CVM实例,安装Java开发环境
yum -y install java-1.8.0-openjdk.x86_64
测试是否安装成功:
java -version
2.安装ZooKeeper
切换到/opt目录下:
cd /opt
下载ZooKeeper压缩包:
wget https://course-public-resources-1252758970.cos.ap-chengdu.myqcloud.com/PracticalApplication/202001bigdata/9-kafka/apache-zookeeper-3.5.6-bin.tar.gz
解压缩文件到当前文件夹:
tar -xzvf apache-zookeeper-3.5.6-bin.tar.gz
目录重命名:
mv apache-zookeeper-3.5.6-bin zookeeper
3.配置ZooKeeper
复制默认配置:
cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg
配置zookeeper环境变量,首先打开profile文件;
vim /etc/profile
按i进入编辑模式,在文件末尾添加zookeeper环境变量;
#set zookeeper environment
export ZK_HOME=/opt/zookeeper
export PATH=$ZK_HOME/bin:$PATH
保存文件后,让该环境变量生效:
source /etc/profile
4.启动ZooKeeper:
/opt/zookeeper/bin/zkServer.sh start
全部启动后,查看启动结果,显示启动模式为:standalone;
/opt/zookeeper/bin/zkServer.sh status
5.安装Kafka
切换到/opt目录下:
cd /opt
下载Kafka:
wget https://course-public-resources-1252758970.cos.ap-chengdu.myqcloud.com/PracticalApplication/202001bigdata/9-kafka/kafka_2.11-2.4.0.tgz
解压并进入安装目录:
tar -xzf kafka_2.11-2.4.0.tgz
重命名文件夹名称:
mv kafka_2.11-2.4.0 kafka
启动Kafka服务器,使用默认配置;
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
3、测试Kafka基本功能
1.建立主题
让我们用一个分区和一个副本创建一个名为“ test”的主题:
/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
现在,如果我们运行list topic命令,我们可以看到该主题:
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
2.发送一些消息
Kafka带有一个命令行客户端,它将获取输入,并将其作为消息发送到Kafka。默认情况下,每行将作为单独的消息发送。运行生产者,然后在控制台中键入一些消息以发送到服务器。
/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
发送消息你想输入的一些消息内容,如:
This is a message
This is another message
Ctrl+C 结束消息的输入。
3.启动消费者
Kafka还有一个命令行消费者,它将消息转储到标准输出。
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
任务2 生产者与消费者代码开发
【任务目标】
使用IDEA创建Maven工程,开发Kafka生产者与消费者,并打包等待上传云服务器运行。
【任务步骤】
1、Idea中创建项目
1.打开IDEA后,点击Create New Project。
2.选择项目类型为Maven:
3.GroupId为:com.test;ArtifactId为:kafka_project;点击Next。
4.Project name为:kafka_project,点击Finish。
5.项目创建成功后配置Maven的pom.xml,内容参考如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>kafka_project</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies>
<build>
<finalName>Consumer</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.test.bigdata.Consumer</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
编辑pom.xml后,导入并开始下载依赖。
2、项目代码编写
1.在java目录下,创建包com.test.bigdata;
2.在创建好的包内创建生产者Producer类,并编写生产者代码:
package com.test.bigdata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
// 设置broker地址,请修改为CVM实例的内网IP
props.put("bootstrap.servers", "172.16.16.3:9092");
props.put("retries", 0);
// 配置key-value允许使用参数化类型
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<>(props);
//使用producer发送一条消息
producer.send(new ProducerRecord("test", "key1", "这是一条消息"));
System.out.println("发送成功");
producer.close();
}
}
注意:请自行修改bootstrap.servers的IP地址为您自己的实例IP。
3.编写消费者代码;
在com.test.bigdata包下创建Cosumer.java类,编写代码如下:
package com.test.bigdata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties props=new Properties();
// 设置broker地址,请修改为CVM实例的内网IP
props.put("bootstrap.servers","172.16.16.3:9092");
props.put("group.id","tp");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//创建一个消费者客户端实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅主题
consumer.subscribe(Collections.singletonList("test"));
System.out.println("Subscribed to topic"+"test");
//循环消费消息
while(true){
ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String,String> record:records){
System.out.println("receiver a message from consumer client:"+record.value());
}
}
}
}
注意:请自行修改bootstrap.servers的IP地址为您自己的实例IP。
3、项目代码打包
1.首先对Producer进行打包,注意pom.xml中的finalName为Producer,mainClass为com.test.bigdata.Producer;
2.双击Maven项目生命周期的package,项目打包后会在项目路径下的target目录下生成一个Producer.jar
3.接下来打包Consumer,需要先修改pom.xml中的finalName标签为Consumer,mainClass标签为com.test.bigdata.Consumer;
4.将项目目录下的target文件夹内的Producer.jar,Consumer.jar移动到D盘,等待上传云服务器。
任务3 消息生产与消费
【任务目标】
在云服务器Kafka实例上运行生产者和消费者的Jar包,实现消息的生产与消费。
【任务步骤】
1、jar包上传云服务器
1.使用mkdir命令在CVM中创建一个/test目录。
创建文件夹test;
mkdir /test
切换到test路径下;
cd /test
2.找到PuTTY的安装目录,在上方地址栏输入cmd并执行。
3.上传jar包
在弹出的黑窗口首先输入psftp,打开psftp工具用来传输文件;
psftp
接下来连接服务器,回车后需要输入用户名和密码;
open xxx.xxx.xxx.xxx
用于切换远程Linux 服务器上的目录;
cd /test/
lcd命令用于切换本地的路径;
lcd D:\
上传生产者jar包;
put Producer.jar
上传消费者jar包;
put Consumer.jar
上传需要一点时间,请耐心等待,命令使用可以参考下图:
2、执行消息生产与消费
1.执行消息生产
本步骤需同时打开两个PuTTY客户端链接实例,首先在第一个PuTTY上使用Java运行消费者,执行命令如下;
切换到/test目录:
cd /test
执行Consumer:
java -jar Consumer.jar
在打印Subscribed to topictest后,表示等待topic里的数据接收。
2.另启一个PuTTY客户端连接实例,执行生产者,执行命令如下:
切换到/test目录:
cd /test
执行Producer:
java -jar Producer.jar
3.再回看第一个PuTTY客户端,消费者此时已经接收到了信息,打印结果如下:
至此,您已经完成了本次实验的全部任务,相信您已经掌握了Kafka的基本编程与接本功能的使用。
5. 注意事项
如实验资源无需保留,请在实验结束后及时销毁,以免产生额外费用。
学员评价