kafka是一个高吞吐的分布式消息队列系统。 特点: 生产者消费者模式,先进先出(FIFO)保证顺序,自己不丢数据,默认每隔7天清理数据。 消息列队常见场景:系统之间解耦合、峰值压力缓冲、异步通信。 官网:https://kafka.apache.org/
Topics and Logs:
Distribution – 分布式
Producers – 生产者 生产者将数据发布到他们选择的主题。生产者负责选择要分配给主题中哪个分区的消息 可以以循环方式完成此操作,仅是为了平衡负载,也可以根据某些语义分区功能(例如基于消息中的某些键)进行此操作。
Consumers – 消费者 根据topic消费相应的消息
在较高级别上,Kafka提供以下保证:
前提: 需要有zookeeper支持 因此本次将kafka集群安装在zookeeper集群所在节点node2,node3,node4
# 1. 下载压缩包(官网地址:http://kafka.apache.org/downloads.html).分享至底部
## 解压:
tar zxvf kafka_2.10-0.9.0.1.tgz -C /opt/sxt
mv kafka_2.10-0.9.0.1/ /opt/kafka
# 2.node2修改配置文件:config/server.properties
---------------------------------------------------
broker.id=0
zookeeper.connect=node2:2181,node3:2181,node4:2181
---------------------------------------------------
## 核心配置参数说明:
broker.id: broker集群中唯一标识id,0、1、2、3依次增长(broker即Kafka集群中的一台服务器)
zookeeper.connect: zk集群地址列表
# 3.配置环境变量, 作用是不需要进入软件的bin目录下, 即可进行软件的启停和相关命令的使用
vim /etc/profile (编辑内容如图1)
. /etc/profile
## 测试是否安装成功
kafka+tab+tab+tab (输入kafka,然后连续按三个tab键,出现图2说明安装成功)
# 4. 将当前node2服务器上的Kafka目录同步到其他node3、node4服务器上:
scp -r /opt/kafka/ node3:/opt
scp -r /opt/kafka/ node4:/opt
## 修改node3、node4上Kafka配置文件中的broker.id(分别在node2、3服务器上执行以下命令修改broker.id)
sed -i -e 's/broker.id=.*/broker.id=1/' /opt/kafka/config/server.properties
sed -i -e 's/broker.id=.*/broker.id=2/' /opt/kafka/config/server.properties
## 分别配置环境变量,并测试是否安装成功
图1
图2
# 任意目录下
## 启动Kafka (node2,3,4)
kafka-server-start.sh /opt/kafka/config/server.properties
## 查看帮助手册
kafka-topics.sh --help
## 创建topic,名为test
kafka-topics.sh --zookeeper node2:2181,node3:2181,node4:2181 --create --replication-factor 2 --partitions 3 --topic test
## 查看创建topic列表:(图1)
kafka-topics.sh --zookeeper node2:2181,node3:2181,node4:2181 --list
## 查看“test”topic描述:(图2)
###注意:ISR是检查数据的完整性有哪些个节点。
kafka-topics.sh --zookeeper node2:2181,node3:2181,node4:2181 --describe --topic test
# 创建生产者:(图3)
kafka-console-producer.sh --broker-list node2:9092,node3:9092,node4:9092 --topic test
# 创建消费者:(图4)
kafka-console-consumer.sh --zookeeper node2:2181,node3:2181,node4:2181 --from-beginning --topic test
# 查看帮助手册:
kafka-console-consumer.sh help
图1
图2
图3 启动后会将程序阻塞, 直接输入想要输入的数据即可
图4 从消费者显示可以看出kafka消息传递遵循 "单分区有序, 多分区无序"的规则 即: 在很多行数据并行传递(刚打开消费者程序)时, 使用了多个分区, 接收到的信息是无序的 后燃面再次在生产者程序追加数据时, 因为是实时传递使用了单个分区, 因此是有序的
启动zookeeper客户端:
./zkCli.sh
查看topic相关信息:
ls /brokers/topics/
查看消费者相关信息:
ls /consumers
① :在kafka集群中删除topic,当前topic被标记成删除。
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node2:2181 --delete --topic test
在每台broker节点上删除当前这个topic对应的真实数据。 ② :进入zookeeper客户端,删除topic信息
rmr /brokers/topics/test
③ :删除zookeeper中被标记为删除的topic信息
rmr /admin/delete_topics/test
以上方式比较笨重,可以直接在每台broker节点中的…/config/server.properties中配置属性:delete.topic.enable=true
,重启kafka集群即可实现删除topic时,自动清除topic信息。
在上面的启动方式中, 我们启动Kafka集群中每个节点时, 都是占用当前shell ( 即阻塞式界面 ). 我们可以通过编写脚本的方式来启动, 令集群中每个节点上的kafka都能够在后台启动, 方便操作节省资源
步骤:
# 1. 创建脚本, 在kafka软件的根目录下创建启动脚本, 名称为 startkafka.sh,内容如下
--------------------------------------------------------------------
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
--------------------------------------------------------------------
kafka-server-start.sh /opt/kafka/config/server.properties
# 2. 给当前文件授予可执行权限
chmod +x startkafka.sh
# 3. 分发到其他节点, 然后执行即可
## 分发
scp -r startkafka.sh 节点名称/ip:`pwd`
## 执行
./startkafka.sh
## 查看kafka是否启动成功( 下图 )
jps
flume作为kafka的数据提供方(生产者), kafka的 kafkaspout作为消息的消费者 flume的安装以及介绍
1.在node2安装flume后, 配置启动脚本 flume-kafka.conf
无需手工在kafka中传建 testflume这个topic, 因为在有数据传输后会自动创建这个主题
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node2
a1.sources.r1.port = 41414
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = testflume
a1.sinks.k1.brokerList = node2:9092,node3:9092,node4:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.启动Zookeeper,启动Kafka集群, 启动flume. 运行该脚本, 命令如下(图1)
# 需要指定脚本编写的位置/opt/flume/conf/flume-kafka.conf
flume-ng agent -n a1 -c conf -f /opt/flume/conf/flume-kafka.conf -Dflume.root.logger=DEBUG,console
3.启动kafka的消费者端
# 事先不需要创建 testflume这个topic
kafka-console-consumer.sh --zookeeper node2:2181,node3:2181,node4:2181 --from-beginning --topic testflume
4.启动测试代码(资料分享至底部)
需要在 client.init(“node2”, 41414);指定自己flume所在节点以及启动脚本指定的端口号
5. 查看kafka的消费者端是否会有消息输出( 图2 )
图1
图2