前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka学习笔记

kafka学习笔记

作者头像
保持热爱奔赴山海
发布2019-09-17 14:12:30
5370
发布2019-09-17 14:12:30
举报
文章被收录于专栏:DevOps数据库相关

这是之前学习时候写的笔记,贴一下便于日常查阅。。。

官方文档:http://kafka.apache.org/quickstart

https://engineering.linkedin.com/kafka/

搭建参考: https://www.cnblogs.com/luotianshuai/p/5206662.html

理论资料参考 http://www.jasongj.com/tags/Kafka

本文大部分理论知识来自技术世界原文链接http://www.jasongj.com/2015/03/10/KafkaColumn1

环境:

CentOS7.3

kafka_2.11-1.0.0

zookeeper-3.4.9

3台主机:192.168.5.71、192.168.5.、192.168.5.73

配置hosts解析

编辑3台主机vim /etc/hosts 加上如下的3行:

192.168.5.71  node71

192.168.5.72  node72

192.168.5.73  node73

安装zk

tar xf zookeeper-3.4.9.tar.gz -C /opt/

cd /opt

ln -s zookeeper-3.4.9 zk

cd zk/conf

vim zoo.cfg  内容如下:

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/opt/zk

dataLogDir=/opt/zk

clientPort=2181

maxClientCnxns=60

minSessionTimeout=4000

maxSessionTimeout=60000

autopurge.purgeInterval=24

autopurge.snapRetainCount=5

quorum.auth.enableSasl=false

quorum.cnxn.threads.size=20

server.1=node71:3181:4181

server.2=node72:3181:4181

server.3=node73:3181:4181

在3个节点分别创建myid,并启动zkserver:

node71上: echo '1' > /opt/zk/myid

node72上: echo '2' > /opt/zk/myid

node73上: echo '3' > /opt/zk/myid

在3个节点分别启动zkserver:

/opt/zk/bin/zkServer.sh start /opt/zk/conf/zoo.cfg

在3个节点观察集群主机的状态:

/opt/zk/bin/zkServer.sh status /opt/zk/conf/zoo.cfg

安装kafka

tar xf kafka_2.11-1.0.0.tgz -C /opt/

cd /opt/

ln -s kafka_2.11-1.0.0 kafka

每个主机的配置如下:

[root@node71 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties broker.id=0      #当前机器在集群中的唯一标识,和zookeeper的myid性质一样 listeners=PLAINTEXT://:9092        #当前kafka对外提供服务的端口默认是9092 num.network.threads=3        # 这个是borker进行网络处理的线程数 num.io.threads=8              # 这个是borker进行I/O处理的线程数 socket.send.buffer.bytes=102400    #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能 socket.receive.buffer.bytes=102400    #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘 socket.request.max.bytes=104857600   #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小 log.dirs=/tmp/kafka-logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个 num.partitions=1      # 默认的分区数,一个topic默认1个分区数 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168        # 默认消息的最大持久化时间,168小时,7天 log.segment.bytes=1073741824   log.retention.check.interval.ms=300000  #每隔300000毫秒去检查上面配置的log失效时间(默认168 ),到目录查看是否有过期的消息如果有,删除 message.max.byte=5242880        #消息保存的最大值5M default.replication.factor=2    #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务 replica.fetch.max.bytes=5242880  #取消息的最大直接数 zookeeper.connect=node71:2181,node72:2181,node73:2181  #设置zookeeper的连接端口 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 [root@node72 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties broker.id=1 listeners=PLAINTEXT://:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 zookeeper.connect=node71:2181,node72:2181,node73:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 [root@node73 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties broker.id=2 listeners=PLAINTEXT://:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 zookeeper.connect=node71:2181,node72:2181,node73:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0

启动Kafka集群

在3台机器上都执行启动Kafka集群:

/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

测试kafka的功能

1、创建Topic

/opt/kafka/bin/kafka-topics.sh --create --zookeeper node71:2181 --replication-factor 2 --partitions 3 --topic mysql-order

/opt/kafka/bin/kafka-topics.sh --create --zookeeper node71:2181 --replication-factor 2 --partitions 2 --topic mysql-coupons

#参数含义

--replication-factor 2   #复制两份

--partitions 2    #创建2个分区

--topic #主题

2、在一台服务器上创建一个发布者broker

/opt/kafka/bin/kafka-console-producer.sh --broker-list node72:9092 --topic mysql-order

然后,可以输入一些文字内容

3、在一台服务器上创建一个订阅者consumer

/opt/kafka/bin/kafka-console-consumer.sh --zookeeper node72:2181 --from-beginning --topic mysql-order

4、列出全部的topic

/opt/kafka/bin/kafka-topics.sh --list --zookeeper node72:2181

5、查看指定的topic状态

[root@node72 /opt/kafka/config ]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper node72:2181 --topic mysql-coupons

查看zk中kafka的状态:

cd /etc/zk/bin

./zkCli.sh -server node71:2181

ls /brokers

get /brokers/ids/2

get /brokers/topics/mysql-coupons/partitions/0    # 这里就不截图了

关闭Kafka集群

官方提到的关闭方式就是kill -9

ps aux|grep kafka|grep -v grep|awk '{print $2}'|xargs kill -9

kafka架构

Broker

Kafka集群包含一个或多个服务器,这种服务器被称为broker

Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)

Partition

parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件

Producer

负责发布消息到Kafka broker (push消息到broker)

Consumer

消费消息(从broker那里poll消息)。每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。

如上图所示,一个典型的kafka集群中包含若干producer(可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干consumer group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。

Push vs. Pull的区别

作为一个messaging system,Kafka遵循了传统的方式,选择由producer向broker push消息并由consumer从broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用非常不同的push模式。事实上,push模式和pull模式各有优劣。

push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。

而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

kafka消息的消费&留存

本节所有描述都是基于consumer hight level API而非low level API

每一个consumer实例都属于一个consumer group,每一条消息只会被同一个consumer group里的一个consumer实例消费(这是为了实现传统message queue消息只被消费一次的语义),但是不同consumer group可以同时消费同一条消息,这一特性可以为消息的多元化处理提供了支持。实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的consumer在不同的consumer group即可。

Linked的一种kafka部署方案:

对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略去删除旧数据。一是基于时间,二是基于partition文件大小。例如可以通过配置server.properties,让Kafka删除一周前的数据,也可通过配置让Kafka在partition文件超过1GB时删除旧数据

log.retention.hours=168        # 默认消息的最大持久化时间,168小时,7天

log.segment.bytes=1073741824     # partition大小超过1G时候,清理旧数据

log.retention.check.interval.ms=300000  #每隔300000毫秒去检查上面配置的log失效时间(默认168 ),到目录查看是否有过期的消息如果有,删除

这里要注意,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除文件与Kafka性能无关,选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个consumer group保留一些metadata信息—当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些consumer过,不需要通过broker去保证同一个consumer group只有一个consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。

kafka customer rebalance

本节所讲述内容均基于Kafka consumer high level API

具体参考:http://developer.51cto.com/art/201501/464491.htm

kafka的数据复制

Kafka从0.8开始提供partition级别的replication,replication的数量可在server.properties中配置。

kafka默认default.replication.factor = 2

该Replication与leader election配合提供了自动的failover机制。replication对Kafka的吞吐率是有一定影响的,但极大的增强了可用性。每个partition都有一个唯一的leader,所有的读写操作都在leader上完成,consumer批量从leader上pull数据。一般情况下partition的数量大于等于broker的数量,并且所有partition的leader均匀分布在broker上。follower上的日志和其leader上的完全一样。

和大部分分布式系统一样,Kakfa处理失败需要明确定义一个broker是否alive。

对于Kafka而言,Kafka存活包含两个条件,一是它必须维护与Zookeeper的session(这个通过Zookeeper的heartbeat机制来实现)。二是follower必须能够及时将leader的writing复制过来,不能落后太多。

leader会track "in sync"的node list。如果一个follower宕机,或者落后太多,leader将把它从"in sync" list中移除。这里所描述的"落后太多" 指follower复制的消息落后于leader后的条数超过预定值,该值可在 server.properties中配置。

replica.lag.max.messages=4000

replica.lag.time.max.ms=10000

需要说明的是,Kafka只解决”fail/recover”,不处理“Byzantine”(“拜占庭”)问题。

个人觉得,"in sync" list 机制,类似于MySQL中的semi-sync半同步。

一条消息只有被"in sync" list里的所有follower都从leader复制过去才会被认为已提交。这样就避免了部分数据被写进了leader,还没来得及被任何follower复制就宕机了,而造成数据丢失(consumer无法消费这些数据)。而对于producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks来设置。这种机制确保了只要"in sync" list有一个或以上的flollower,一条被commit的消息就不会丢失。

这里的复制机制即不是同步复制,也不是单纯的异步复制。事实上,同步复制要求“活着的”follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follwer都落后于leader,而leader突然宕机,则会丢失数据。而Kafka的这种使用"in sync" list的方式则很好的均衡了确保数据不丢失以及吞吐率。follower可以批量的从leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了follower与leader的差距(前文有说到,只要follower落后leader不太远,则被认为在"in sync" list里)。

Kafka的选举机制

首先Kafka会将接收到的消息分区(partition),每个主题(topic)的消息有不同的分区。这样一方面消息的存储就不会受到单一服务器存储空间大小的限制,另一方面消息的处理也可以在多个服务器上并行。

其次为了保证高可用,每个分区都会有一定数量的副本(replica)。这样如果有部分服务器不可用,副本所在的服务器就会接替上来,保证应用的持续性。 

但是,为了保证较高的处理效率,消息的读写都是在固定的一个副本上完成。这个副本就是所谓的Leader,而其他副本则是Follower。而Follower则会定期地到Leader上同步数据。

Leader选举

如果某个分区所在的服务器除了问题,不可用,kafka会从该分区的其他的副本中选择一个作为新的Leader。之后所有的读写就会转移到这个新的Leader上。现在的问题是应当选择哪个作为新的Leader。显然,只有那些跟Leader保持同步的Follower才应该被选作新的Leader。

Kafka会在Zookeeper上针对每个Topic维护一个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是一些分区的副本。只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的生产者。如果这个集合有增减,kafka会更新zookeeper上的记录。

如果某个分区的Leader不可用,Kafka就会从ISR集合中选择一个副本作为新的Leader。

显然通过ISR,kafka需要的冗余度较低,可以容忍的失败数比较高。假设某个topic有f+1个副本,kafka可以容忍f个服务器不可用。

为什么不用少数服从多数的方法:

少数服从多数是一种比较常见的一致性算法和Leader选举法。它的含义是只有超过半数的副本同步了,系统才会认为数据已同步;选择Leader时也是从超过半数的同步的副本中选择。这种算法需要较高的冗余度。譬如只允许一台机器失败,需要有三个副本;而如果只容忍两台机器失败,则需要五个副本。而kafka的ISR集合方法,分别只需要两个和三个副本。

如果所有的ISR副本都失败了怎么办:

  此时有两种方法可选,一种是等待ISR集合中的副本复活,一种是选择任何一个立即可用的副本,而这个副本不一定是在ISR集合中。这两种方法各有利弊,实际生产中按需选择。

  如果要等待ISR副本复活,虽然可以保证一致性,但可能需要很长时间。而如果选择立即可用的副本,则很可能该副本并不一致。

例如下图:

Kafka的监控

1、zabbix监控

2、prometheus监控

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018/01/31 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 配置hosts解析
  • 安装zk
  • 安装kafka
  • 启动Kafka集群
  • 测试kafka的功能
  • 查看zk中kafka的状态:
  • 关闭Kafka集群
  • kafka架构
  • Push vs. Pull的区别
  • kafka消息的消费&留存
  • kafka customer rebalance
  • kafka的数据复制
  • Kafka的选举机制
  • Kafka的监控
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档