Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。 优势:
术语
应用场景
环境准备 Kafka是用Scala语言开发的,运行在JVM上,在安装Kafka之前需要先安装JDK kafka依赖zookeeper,需要先安装zookeeper,下载地址
wgethttp://archive.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-
3.4.9.tar.gz
tar-zxvfzookeeper-3.4.9.tar.gz
cdzookeeper-3.4.9
cpconf/zoo_sample.cfgconf/zoo.cfg
#启动zookeeper服务
bin/zkServer.shstart
#启动客户端
bin/zkCli.sh
下载kafka安装包 参考官网安装步骤
wgethttps://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz
tar-zxvf kafka_2.11-1.1.1.tgz
cd kafka_2.11-1.1.1.tgz
启动kafka服务
bin/kafka-server-start.sh config/server.properties
server.properties是kafka核心配置文件(官网)
Property | Default | Description |
---|---|---|
broker.id | 0 | 每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯一的即可 |
log.dirs | /tmp/kafka-logs | kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行 |
listeners | 9092 | server接受客户端连接的端口 |
zookeeper.connect | localhost:2181 | zooKeeper连接字符串的格式为hostname:port,此处hostname和port分别ZooKeeper集群中某个节点的host和port;zookeeper如果是集群,连接方式为hostname1:port1, hostname2:port2,hostname3:port3 |
log.retention.hours | 168 | 每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样 |
min.insync.replicas | 1 | 当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常 |
delete.topic.enable | false | 是否运行删除主题 |
创建主题
#创建分区数是1,副本数是1的主题为test的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#查看topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
#查看命令帮助
bin/kafka-topics.sh
启动Producer发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
启动Consumer消费消息
# --group 指定消费组 --from-beginning 从头开始消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -- group consumer1 --from-beginning
# 查看消费组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
集群配置 配置3个broker
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
#启动broker
bin/kafka-server-start.sh -daemon config/server-1.properties bin/kafka-server-start.sh -daemon config/server-2.properties
#创建一个 副本为3,分区为3的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 - -partitions 3 --topic my-replicated-topic
# 查看topic的情况
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Java中kafka‐clients应用 Java中使用kafka,引入maven依赖
>
>org.apache.kafka>
>kafka-clients>
>1.1.1>
>
具体Java客户端学习源码地址:项目源码