前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka——分布式的消息队列

Kafka——分布式的消息队列

作者头像
时间静止不是简史
发布2020-07-24 16:51:43
1.3K0
发布2020-07-24 16:51:43
举报
文章被收录于专栏:Java探索之路

Kafka

  • 第一章 是什么
    • 一 Kafka简介
    • 二 概念理解
      • 总结
    • 三 kafka的特点
    • 四 kafka生产消息、存储消息、消费消息
    • 五 kafka的消息存储和生产消费模型
    • 六 kafka与其他消息队列对比
  • 第二章 安装
    • 一 集群安装
    • 二 使用命令
      • 基本命令
      • 查看zookeeper中topic相关信息
      • 删除kafka中的数据
      • 小技巧: 通过脚本启动Kafka
      • kafka的leader的均衡机制
      • kafka 0.11版本改变
  • 第三章Kafka整合flume
    • 整合步骤

第一章 是什么

一 Kafka简介

kafka是一个高吞吐的分布式消息队列系统。 特点: 生产者消费者模式,先进先出(FIFO)保证顺序,自己不丢数据,默认每隔7天清理数据。 消息列队常见场景:系统之间解耦合、峰值压力缓冲、异步通信。 官网:https://kafka.apache.org/

二 概念理解

Topics and Logs:

  • Topic即为每条发布到Kafka集群的消息都有一个类别,topic在Kafka中可以由多个消费者订阅、消费。
  • 每个topic包含一个或多个partition(分区),partition数量可以在创建topic时指定,每个分区日志中记录了该分区的数据以及索引信息。如下图:
  • Kafka只保证一个分区内的消息有序,不能保证一个主题的不同分区之间的消息有序。如果你想要保证所有的消息都绝对有序可以只为一个主题分配一个分区。
  • 分区会给每个消息记录分配一个顺序ID号(偏移量 /offset), 能够唯一地标识该分区中的每个记录。Kafka集群保留所有发布的记录,不管这个记录有没有被消费过,Kafka提供相应策略通过配置从而对旧数据处理。
  • 实际上,每个消费者唯一保存的元数据信息就是消费者当前消费日志的位移位置。位移位置是由消费者控制,即、消费者可以通过修改偏移量读取任何位置的数据。

Distribution – 分布式

  • 日志的分区分布在Kafka群集中的服务器上,每个服务器处理数据并要求共享分区。每个分区都在可配置数量的服务器之间复制,以实现容错功能。
  • 每个分区都有一个充当“领导者”的服务器和零个或多个充当“跟随者”的服务器。领导者处理对分区的所有读写请求,而跟随者则被动地复制领导者。如果领导者失败,则跟随者之一将自动成为新领导者。每个服务器充当其某些分区的领导者,而充当其他分区的跟随者,因此群集中的负载得到了很好的平衡。

Producers – 生产者 生产者将数据发布到他们选择的主题。生产者负责选择要分配给主题中哪个分区的消息 可以以循环方式完成此操作,仅是为了平衡负载,也可以根据某些语义分区功能(例如基于消息中的某些键)进行此操作。

Consumers – 消费者 根据topic消费相应的消息

在较高级别上,Kafka提供以下保证:

  • 生产者发送到特定主题分区的消息将按其发送顺序附加。也就是说,如果消息M1与消息M2由同一生产者发送,并且首先发送M1,则M1的偏移量将小于M2,并在日志中更早出现。
  • 消费者实例按消息在日志中存储的顺序查看消息。
  • 对于复制因子为N的主题,我们最多可以容忍N-1个服务器故障,而不会丢失提交给日志的任何消息。

总结

  • producer:消息生存者
  • consumer:消息消费者
  • broker:kafka集群的server,负责处理消息读、写请求,存储消息
  • topic:消息队列/分类

三 kafka的特点

  • 系统的特点:生产者消费者模型,FIFO Partition内部是FIFO的,partition之间呢不是FIFO的,当然我们可以把topic设为一个partition,这样就是严格的FIFO。
  • 高性能:单节点支持上千个客户端,百MB/s吞吐,接近网卡的极限
  • 持久性:消息直接持久化在普通磁盘上且性能好 直接写到磁盘中去,就是直接append到磁盘里去,这样的好处是直接持久化,数据不会丢失,第二个好处是顺序写,然后消费数据也是顺序的读,所以持久化的同时还能保证顺序,比较好,因为磁盘顺序读比较好。
  • 分布式:数据副本冗余、流量负载均衡、可扩展 分布式,数据副本,也就是同一份数据可以到不同的broker上面去,也就是当一份数据,磁盘坏掉的时候,数据不会丢失,比如3个副本,就是在3个机器磁盘都坏掉的情况下数据才会丢,在大量使用情况下看这样是非常好的,负载均衡,可扩展,在线扩展,不需要停服务。
  • 很灵活:消息长时间持久化+Client维护消费状态 消费方式非常灵活,第一原因是消息持久化时间跨度比较长,一天或者一星期等,第二消费状态自己维护消费到哪个地方了可以自定义消费偏移量。

四 kafka生产消息、存储消息、消费消息

  • Kafka架构是由producer(消息生产者)、consumer(消息消费者)、borker(kafka集群的server,负责处理消息读、写请求,存储消息,在kafka cluster这一层这里,其实里面是有很多个broker)、topic(消息队列/分类相当于队列,里面有生产者和消费者模型)、zookeeper(元数据信息存在zookeeper中,包括:存储消费偏移量,topic话题信息,partition信息) 这些部分组成。
  • kafka里面的消息是有topic来组织的,简单的我们可以想象为一个队列,一个队列就是一个topic,然后它把每个topic又分为很多个partition,这个是为了做并行的,在每个partition内部消息强有序,相当于有序的队列,其中每个消息都有个序号offset,比如0到12,从前面读往后面写。一个partition对应一个broker,一个broker可以管多个partition,比如说,topic有6个partition,有两个broker,那每个broker就管3个partition。这个partition可以很简单想象为一个文件,当数据发过来的时候它就往这个partition上面append,追加就行,消息不经过内存缓冲,直接写入文件,kafka和很多消息系统不一样,很多消息系统是消费完了我就把它删掉,而kafka是根据时间策略删除,而不是消费完就删除,在kafka里面没有一个消费完这么个概念,只有过期这样一个概念。
  • producer ( 消息生产者 )自己决定往哪个partition里面去写,这里有一些的策略,譬如hash。consumer自己维护消费到哪个offset,每个consumer都有对应的group,group内是queue消费模型(各个consumer消费不同的partition,因此一个消息在group内只消费一次),group间是publish-subscribe消费模型,各个group各自独立消费,互不影响,因此一个消息在被每个group消费一次。

五 kafka的消息存储和生产消费模型

  • 一个topic分成多个partition
  • 每个partition内部消息强有序,其中的每个消息都有一个序号叫offset
  • 一个partition只对应一个broker,一个broker可以管多个partition
  • 消息直接写入文件,并不是存储在内存中
  • 根据时间策略(默认一周)删除,而不是消费完就删除
  • producer自己决定往哪个partition写消息,可以是轮询的负载均衡,或者是基于hash的partition策略
  • kafka里面的消息是有topic来组织的,简单的我们可以想象为一个队列,一个队列就是一个topic,然后它把每个topic又分为很多个partition,这个是为了做并行的,在每个partition里面是有序的,相当于有序的队列,其中每个消息都有个序号,比如0到12,从前面读往后面写。
  • 一个partition对应一个broker,一个broker可以管多个partition,比如说,topic有6个partition,有两个broker,那每个broker就管3个partition。
  • 这个partition可以很简单想象为一个文件,当数据发过来的时候它就往这个partition上面append,追加就行,kafka和很多消息系统不一样,很多消息系统是消费完了我就把它删掉,而kafka是根据时间策略删除,而不是消费完就删除,在kafka里面没有一个消费完这么个概念,只有过期这样一个概念
  • consumer自己维护消费到哪个offset
  • 每个consumer都有对应的group
  • group内是queue消费模型 各个consumer消费不同的partition 一个消息在group内只消费一次
  • 各个group各自独立消费,互不影响

六 kafka与其他消息队列对比

  • RabbitMQ:分布式,支持多种MQ协议,重量级
  • ActiveMQ:与RabbitMQ类似
  • ZeroMQ:以库的形式提供,使用复杂,无持久化
  • redis:单机、纯内存性好,持久化较差
  • kafka:分布式,较长时间持久化,高性能,轻量灵活

第二章 安装

一 集群安装

前提: 需要有zookeeper支持 因此本次将kafka集群安装在zookeeper集群所在节点node2,node3,node4

代码语言:javascript
复制
# 1. 下载压缩包(官网地址:http://kafka.apache.org/downloads.html).分享至底部
## 解压:
tar zxvf kafka_2.10-0.9.0.1.tgz -C /opt/sxt
mv kafka_2.10-0.9.0.1/  /opt/kafka


# 2.node2修改配置文件:config/server.properties
---------------------------------------------------
broker.id=0
zookeeper.connect=node2:2181,node3:2181,node4:2181
---------------------------------------------------
## 核心配置参数说明:
broker.id: broker集群中唯一标识id,0、1、2、3依次增长(broker即Kafka集群中的一台服务器)
zookeeper.connect: zk集群地址列表

# 3.配置环境变量, 作用是不需要进入软件的bin目录下, 即可进行软件的启停和相关命令的使用
vim /etc/profile (编辑内容如图1)
. /etc/profile
## 测试是否安装成功
kafka+tab+tab+tab (输入kafka,然后连续按三个tab键,出现图2说明安装成功)

# 4. 将当前node2服务器上的Kafka目录同步到其他node3、node4服务器上:
scp -r /opt/kafka/ node3:/opt
scp -r /opt/kafka/ node4:/opt
## 修改node3、node4上Kafka配置文件中的broker.id(分别在node2、3服务器上执行以下命令修改broker.id)
sed -i -e 's/broker.id=.*/broker.id=1/' /opt/kafka/config/server.properties
sed -i -e 's/broker.id=.*/broker.id=2/' /opt/kafka/config/server.properties
## 分别配置环境变量,并测试是否安装成功

图1

图2


二 使用命令

基本命令

代码语言:javascript
复制
# 任意目录下
## 启动Kafka (node2,3,4)
kafka-server-start.sh /opt/kafka/config/server.properties

## 查看帮助手册
kafka-topics.sh --help

## 创建topic,名为test
kafka-topics.sh --zookeeper node2:2181,node3:2181,node4:2181 --create --replication-factor 2 --partitions 3 --topic test

## 查看创建topic列表:(图1)
kafka-topics.sh --zookeeper node2:2181,node3:2181,node4:2181 --list

## 查看“test”topic描述:(图2)
###注意:ISR是检查数据的完整性有哪些个节点。
kafka-topics.sh  --zookeeper node2:2181,node3:2181,node4:2181 --describe --topic test

# 创建生产者:(图3)
kafka-console-producer.sh --broker-list node2:9092,node3:9092,node4:9092 --topic test
# 创建消费者:(图4)
kafka-console-consumer.sh --zookeeper node2:2181,node3:2181,node4:2181 --from-beginning --topic test
# 查看帮助手册:
kafka-console-consumer.sh help

图1

图2

图3 启动后会将程序阻塞, 直接输入想要输入的数据即可

图4 从消费者显示可以看出kafka消息传递遵循 "单分区有序, 多分区无序"的规则 即: 在很多行数据并行传递(刚打开消费者程序)时, 使用了多个分区, 接收到的信息是无序的 后燃面再次在生产者程序追加数据时, 因为是实时传递使用了单个分区, 因此是有序的

查看zookeeper中topic相关信息

代码语言:javascript
复制
启动zookeeper客户端:
./zkCli.sh
查看topic相关信息:
ls /brokers/topics/
查看消费者相关信息:
ls /consumers

删除kafka中的数据

① :在kafka集群中删除topic,当前topic被标记成删除。

代码语言:javascript
复制
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node2:2181 --delete --topic test

在每台broker节点上删除当前这个topic对应的真实数据。 ② :进入zookeeper客户端,删除topic信息

代码语言:javascript
复制
rmr /brokers/topics/test

③ :删除zookeeper中被标记为删除的topic信息

代码语言:javascript
复制
rmr /admin/delete_topics/test

以上方式比较笨重,可以直接在每台broker节点中的…/config/server.properties中配置属性:delete.topic.enable=true,重启kafka集群即可实现删除topic时,自动清除topic信息。

小技巧: 通过脚本启动Kafka

在上面的启动方式中, 我们启动Kafka集群中每个节点时, 都是占用当前shell ( 即阻塞式界面 ). 我们可以通过编写脚本的方式来启动, 令集群中每个节点上的kafka都能够在后台启动, 方便操作节省资源

步骤:

代码语言:javascript
复制
# 1. 创建脚本, 在kafka软件的根目录下创建启动脚本, 名称为 startkafka.sh,内容如下
--------------------------------------------------------------------
nohup bin/kafka-server-start.sh   config/server.properties > kafka.log 2>&1 &
--------------------------------------------------------------------
kafka-server-start.sh /opt/kafka/config/server.properties

# 2. 给当前文件授予可执行权限
chmod +x  startkafka.sh

# 3. 分发到其他节点, 然后执行即可
## 分发
scp -r startkafka.sh 节点名称/ip:`pwd`
## 执行
./startkafka.sh 
## 查看kafka是否启动成功( 下图 )
jps

kafka的leader的均衡机制

  • 当一个broker停止或者crashes时,所有本来将它作为leader的分区将会把leader转移到其他broker上去,极端情况下,会导致同一个leader管理多个分区,导致负载不均衡,同时当这个broker重启时,如果这个broker不再是任何分区的leader,kafka的client也不会从这个broker来读取消息,从而导致资源的浪费。
  • kafka中有一个被称为优先副本(preferred replicas)的概念。如果一个分区有3个副本,且这3个副本的优先级别分别为0,1,2,根据优先副本的概念,0会作为leader 。当0节点的broker挂掉时,会启动1这个节点broker当做leader。当0节点的broker再次启动后,会自动恢复为此partition的leader。不会导致负载不均衡和资源浪费,这就是leader的均衡机制。 在配置文件conf/ server.properties中配置开启(默认就是开启): auto.leader.rebalance.enable true

kafka 0.11版本改变

  • kafka 0.8.2版本消费者offset存储在zookeeper中,对于zookeeper而言每次写操作代价是很昂贵的,而且zookeeper集群是不能扩展写能力。
  • kafka 0.11版本默认使用新的消费者api ,消费者offset会更新到一个kafka自带的topic【__consumer_offsets】中。以消费者组groupid 为单位,可以查询每个组的消费topic情况: #查看所有消费者组 ./kafka-consumer-groups.sh --bootstrap-server c7node1:9092,c7node2:9092,c7node3:9092 --list #查看消费者消费的offset位置信息 ./kafka-consumer-groups.sh --bootstrap-server c7node1:9092,c7node2:9092,c7node3:9092 --describe --group MyGroupId #重置消费者组的消费offset信息 ,--reset-offsets –all-topics 所有offset。--to-earliest 最小位置。 # --execute 执行 ./kafka-consumer-groups.sh --bootstrap-server c7node1:9092,c7node2:9092,c7node3:9092 --group MyGroupId --reset-offsets --all-topics --to-earliest --execute

第三章Kafka整合flume

flume作为kafka的数据提供方(生产者), kafka的 kafkaspout作为消息的消费者 flume的安装以及介绍

整合步骤

1.在node2安装flume后, 配置启动脚本 flume-kafka.conf

无需手工在kafka中传建 testflume这个topic, 因为在有数据传输后会自动创建这个主题

代码语言:javascript
复制
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node2
a1.sources.r1.port = 41414

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = testflume
a1.sinks.k1.brokerList = node2:9092,node3:9092,node4:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.启动Zookeeper,启动Kafka集群, 启动flume. 运行该脚本, 命令如下(图1)

代码语言:javascript
复制
# 需要指定脚本编写的位置/opt/flume/conf/flume-kafka.conf
flume-ng agent -n a1 -c conf -f /opt/flume/conf/flume-kafka.conf -Dflume.root.logger=DEBUG,console

3.启动kafka的消费者端

代码语言:javascript
复制
# 事先不需要创建 testflume这个topic
kafka-console-consumer.sh --zookeeper node2:2181,node3:2181,node4:2181 --from-beginning --topic testflume

4.启动测试代码(资料分享至底部)

需要在 client.init(“node2”, 41414);指定自己flume所在节点以及启动脚本指定的端口号

5. 查看kafka的消费者端是否会有消息输出( 图2 )

图1

图2

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019/12/05 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Kafka
  • 第一章 是什么
    • 一 Kafka简介
      • 二 概念理解
        • 总结
      • 三 kafka的特点
        • 四 kafka生产消息、存储消息、消费消息
          • 五 kafka的消息存储和生产消费模型
            • 六 kafka与其他消息队列对比
            • 第二章 安装
              • 一 集群安装
                • 二 使用命令
                  • 基本命令
                  • 查看zookeeper中topic相关信息
                  • 删除kafka中的数据
                  • 小技巧: 通过脚本启动Kafka
                  • kafka的leader的均衡机制
                  • kafka 0.11版本改变
              • 第三章Kafka整合flume
                • 整合步骤
                相关产品与服务
                消息队列 CMQ 版
                消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档