这是之前学习时候写的笔记,贴一下便于日常查阅。。。
官方文档:http://kafka.apache.org/quickstart
https://engineering.linkedin.com/kafka/
搭建参考: https://www.cnblogs.com/luotianshuai/p/5206662.html
理论资料参考 http://www.jasongj.com/tags/Kafka
本文大部分理论知识来自技术世界,原文链接:http://www.jasongj.com/2015/03/10/KafkaColumn1
环境:
CentOS7.3
kafka_2.11-1.0.0
zookeeper-3.4.9
3台主机:192.168.5.71、192.168.5.、192.168.5.73
编辑3台主机vim /etc/hosts 加上如下的3行:
192.168.5.71 node71
192.168.5.72 node72
192.168.5.73 node73
tar xf zookeeper-3.4.9.tar.gz -C /opt/
cd /opt
ln -s zookeeper-3.4.9 zk
cd zk/conf
vim zoo.cfg 内容如下:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zk
dataLogDir=/opt/zk
clientPort=2181
maxClientCnxns=60
minSessionTimeout=4000
maxSessionTimeout=60000
autopurge.purgeInterval=24
autopurge.snapRetainCount=5
quorum.auth.enableSasl=false
quorum.cnxn.threads.size=20
server.1=node71:3181:4181
server.2=node72:3181:4181
server.3=node73:3181:4181
在3个节点分别创建myid,并启动zkserver:
node71上: echo '1' > /opt/zk/myid
node72上: echo '2' > /opt/zk/myid
node73上: echo '3' > /opt/zk/myid
在3个节点分别启动zkserver:
/opt/zk/bin/zkServer.sh start /opt/zk/conf/zoo.cfg
在3个节点观察集群主机的状态:
/opt/zk/bin/zkServer.sh status /opt/zk/conf/zoo.cfg
tar xf kafka_2.11-1.0.0.tgz -C /opt/
cd /opt/
ln -s kafka_2.11-1.0.0 kafka
每个主机的配置如下:
[root@node71 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样 listeners=PLAINTEXT://:9092 #当前kafka对外提供服务的端口默认是9092 num.network.threads=3 # 这个是borker进行网络处理的线程数 num.io.threads=8 # 这个是borker进行I/O处理的线程数 socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能 socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘 socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小 log.dirs=/tmp/kafka-logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个 num.partitions=1 # 默认的分区数,一个topic默认1个分区数 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 # 默认消息的最大持久化时间,168小时,7天 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(默认168 ),到目录查看是否有过期的消息如果有,删除 message.max.byte=5242880 #消息保存的最大值5M default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务 replica.fetch.max.bytes=5242880 #取消息的最大直接数 zookeeper.connect=node71:2181,node72:2181,node73:2181 #设置zookeeper的连接端口 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 [root@node72 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties broker.id=1 listeners=PLAINTEXT://:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 zookeeper.connect=node71:2181,node72:2181,node73:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 [root@node73 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties broker.id=2 listeners=PLAINTEXT://:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 zookeeper.connect=node71:2181,node72:2181,node73:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
在3台机器上都执行启动Kafka集群:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
1、创建Topic
/opt/kafka/bin/kafka-topics.sh --create --zookeeper node71:2181 --replication-factor 2 --partitions 3 --topic mysql-order
/opt/kafka/bin/kafka-topics.sh --create --zookeeper node71:2181 --replication-factor 2 --partitions 2 --topic mysql-coupons
#参数含义
--replication-factor 2 #复制两份
--partitions 2 #创建2个分区
--topic #主题
2、在一台服务器上创建一个发布者broker
/opt/kafka/bin/kafka-console-producer.sh --broker-list node72:9092 --topic mysql-order
然后,可以输入一些文字内容
3、在一台服务器上创建一个订阅者consumer
/opt/kafka/bin/kafka-console-consumer.sh --zookeeper node72:2181 --from-beginning --topic mysql-order
4、列出全部的topic
/opt/kafka/bin/kafka-topics.sh --list --zookeeper node72:2181
5、查看指定的topic状态
[root@node72 /opt/kafka/config ]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper node72:2181 --topic mysql-coupons
cd /etc/zk/bin
./zkCli.sh -server node71:2181
ls /brokers
get /brokers/ids/2
get /brokers/topics/mysql-coupons/partitions/0 # 这里就不截图了
官方提到的关闭方式就是kill -9
ps aux|grep kafka|grep -v grep|awk '{print $2}'|xargs kill -9
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)
Partition
parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件
Producer
负责发布消息到Kafka broker (push消息到broker)
Consumer
消费消息(从broker那里poll消息)。每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。
如上图所示,一个典型的kafka集群中包含若干producer(可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干consumer group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。
作为一个messaging system,Kafka遵循了传统的方式,选择由producer向broker push消息并由consumer从broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用非常不同的push模式。事实上,push模式和pull模式各有优劣。
push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。
而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
本节所有描述都是基于consumer hight level API而非low level API
每一个consumer实例都属于一个consumer group,每一条消息只会被同一个consumer group里的一个consumer实例消费(这是为了实现传统message queue消息只被消费一次的语义),但是不同consumer group可以同时消费同一条消息,这一特性可以为消息的多元化处理提供了支持。实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的consumer在不同的consumer group即可。
Linked的一种kafka部署方案:
对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略去删除旧数据。一是基于时间,二是基于partition文件大小。例如可以通过配置server.properties,让Kafka删除一周前的数据,也可通过配置让Kafka在partition文件超过1GB时删除旧数据
log.retention.hours=168 # 默认消息的最大持久化时间,168小时,7天
log.segment.bytes=1073741824 # partition大小超过1G时候,清理旧数据
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(默认168 ),到目录查看是否有过期的消息如果有,删除
这里要注意,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除文件与Kafka性能无关,选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个consumer group保留一些metadata信息—当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些consumer过,不需要通过broker去保证同一个consumer group只有一个consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。
本节所讲述内容均基于Kafka consumer high level API
具体参考:http://developer.51cto.com/art/201501/464491.htm
Kafka从0.8开始提供partition级别的replication,replication的数量可在server.properties中配置。
kafka默认default.replication.factor = 2
该Replication与leader election配合提供了自动的failover机制。replication对Kafka的吞吐率是有一定影响的,但极大的增强了可用性。每个partition都有一个唯一的leader,所有的读写操作都在leader上完成,consumer批量从leader上pull数据。一般情况下partition的数量大于等于broker的数量,并且所有partition的leader均匀分布在broker上。follower上的日志和其leader上的完全一样。
和大部分分布式系统一样,Kakfa处理失败需要明确定义一个broker是否alive。
对于Kafka而言,Kafka存活包含两个条件,一是它必须维护与Zookeeper的session(这个通过Zookeeper的heartbeat机制来实现)。二是follower必须能够及时将leader的writing复制过来,不能落后太多。
leader会track "in sync"的node list。如果一个follower宕机,或者落后太多,leader将把它从"in sync" list中移除。这里所描述的"落后太多" 指follower复制的消息落后于leader后的条数超过预定值,该值可在 server.properties中配置。
replica.lag.max.messages=4000
replica.lag.time.max.ms=10000
需要说明的是,Kafka只解决”fail/recover”,不处理“Byzantine”(“拜占庭”)问题。
个人觉得,"in sync" list 机制,类似于MySQL中的semi-sync半同步。
一条消息只有被"in sync" list里的所有follower都从leader复制过去才会被认为已提交。这样就避免了部分数据被写进了leader,还没来得及被任何follower复制就宕机了,而造成数据丢失(consumer无法消费这些数据)。而对于producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks来设置。这种机制确保了只要"in sync" list有一个或以上的flollower,一条被commit的消息就不会丢失。
这里的复制机制即不是同步复制,也不是单纯的异步复制。事实上,同步复制要求“活着的”follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follwer都落后于leader,而leader突然宕机,则会丢失数据。而Kafka的这种使用"in sync" list的方式则很好的均衡了确保数据不丢失以及吞吐率。follower可以批量的从leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了follower与leader的差距(前文有说到,只要follower落后leader不太远,则被认为在"in sync" list里)。
首先Kafka会将接收到的消息分区(partition),每个主题(topic)的消息有不同的分区。这样一方面消息的存储就不会受到单一服务器存储空间大小的限制,另一方面消息的处理也可以在多个服务器上并行。
其次为了保证高可用,每个分区都会有一定数量的副本(replica)。这样如果有部分服务器不可用,副本所在的服务器就会接替上来,保证应用的持续性。
但是,为了保证较高的处理效率,消息的读写都是在固定的一个副本上完成。这个副本就是所谓的Leader,而其他副本则是Follower。而Follower则会定期地到Leader上同步数据。
Leader选举
如果某个分区所在的服务器除了问题,不可用,kafka会从该分区的其他的副本中选择一个作为新的Leader。之后所有的读写就会转移到这个新的Leader上。现在的问题是应当选择哪个作为新的Leader。显然,只有那些跟Leader保持同步的Follower才应该被选作新的Leader。
Kafka会在Zookeeper上针对每个Topic维护一个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是一些分区的副本。只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的生产者。如果这个集合有增减,kafka会更新zookeeper上的记录。
如果某个分区的Leader不可用,Kafka就会从ISR集合中选择一个副本作为新的Leader。
显然通过ISR,kafka需要的冗余度较低,可以容忍的失败数比较高。假设某个topic有f+1个副本,kafka可以容忍f个服务器不可用。
为什么不用少数服从多数的方法:
少数服从多数是一种比较常见的一致性算法和Leader选举法。它的含义是只有超过半数的副本同步了,系统才会认为数据已同步;选择Leader时也是从超过半数的同步的副本中选择。这种算法需要较高的冗余度。譬如只允许一台机器失败,需要有三个副本;而如果只容忍两台机器失败,则需要五个副本。而kafka的ISR集合方法,分别只需要两个和三个副本。
如果所有的ISR副本都失败了怎么办:
此时有两种方法可选,一种是等待ISR集合中的副本复活,一种是选择任何一个立即可用的副本,而这个副本不一定是在ISR集合中。这两种方法各有利弊,实际生产中按需选择。
如果要等待ISR副本复活,虽然可以保证一致性,但可能需要很长时间。而如果选择立即可用的副本,则很可能该副本并不一致。
例如下图:
1、zabbix监控
2、prometheus监控