前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【kafka系列】centos7系统安装kafka

【kafka系列】centos7系统安装kafka

作者头像
沁溪源
发布2021-09-28 10:08:12
1.4K0
发布2021-09-28 10:08:12
举报
文章被收录于专栏:沁溪源

一、准备

确保服务器上已经搭建完成JDK,zookeeper服务;

如果未搭建完成,请移步参考以下文章:

安装zookeeper: https://blog.csdn.net/xuan_lu/article/details/120474451

安装JDK1.8:https://blog.csdn.net/xuan_lu/article/details/107297710

kafka官网下载:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.2.0/kafka_2.11-2.2.0.tgz

点击下载安装包

二、安装与配置

  • 解压缩tar -zxvf kafka_2.11-2.2.0.tgz
  • 重命名mv kafka_2.11-2.2.0 kafka
  • 创建**logs**文件,用于存储kafka日志: 在kafka安装目录下创建:mkdir kafka-logs /opt/software/kafka/kafka-logs
  • 修改**server.properties**配置文件
  1. 修改日志存储目录: log.dirs=/opt/software/kafka/kafka-logs
  2. 修改或者确定zookeeper地址和端口: zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000
  3. 修改broker.id broker.id=1

三、配置文件详解

代码语言:javascript
复制
#一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
broker.id=1
 
#listeners=PLAINTEXT://:9092
 
#advertised.listeners=PLAINTEXT://your.host.name:9092
 
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SS
 
# broker 处理消息的最大线程数,一般情况下不需要去修改
num.network.threads=3
 
# broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
num.io.threads=8
 
#  socket的发送缓冲区(SO_SNDBUF)
socket.send.buffer.bytes=102400
 
# socket的接收缓冲区 (SO_RCVBUF)
socket.receive.buffer.bytes=102400
 
# socket请求的最大字节数。为了防止内存溢出,message.max.bytes必然要小于
socket.request.max.bytes=104857600
 
#kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs=/opt/software/kafka/kafka-logs 
# 每个topic的分区个数,更多的partition会产生更多的segment file
num.partitions=1
 
num.recovery.threads.per.data.dir=1
 
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
 
# 当达到下面的消息数量时,会将数据flush到日志文件中。默认10000
#log.flush.interval.messages=10000
 
# 当达到下面的时间(ms)时,执行一次强制的flush操作。interval.ms和interval.messages无论哪个达到,都会flush。默认3000ms
#log.flush.interval.ms=1000
 
# 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。
log.retention.hours=168
 
#log.retention.bytes=1073741824
 
# 控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)
log.segment.bytes=1073741824
 
# 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(log.retention.hours或log.retention.bytes)
log.retention.check.interval.ms=300000
 
# Zookeeper quorum设置。如果有多个使用逗号分割 例如 ip:prot,ip:prot,ip:prot
zookeeper.connect=localhost:2181
 
# 连接zk的超时时间
zookeeper.connection.timeout.ms=6000
 
# ZooKeeper集群中leader和follower之间的同步实际
group.initial.rebalance.delay.ms=0

四、运行

1.启动zookeeper服务

由于小编服务器上已经启动过zookeeper服务,故不需要重新执行启动命令;

如果服务器zookeeper服务未启动,则在kafka目录下执行以下命令:

使用安装包中的脚本启动单节点Zookeeper 实例:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

注意:以下命令,小编均选择在kafka安装根目录下执行;

2.启动kafka服务

以下方式任选其一

  1. kafka目录启动 使用kafka-server-start.sh 启动kafka 服务: bin/kafka-server-start.sh config/server.properties 这种命令执行并不是后台进程运行,故使用以下命令 bin/kafka-server-start.sh -daemon config/server.properties
  2. 命令需要切换到kafka的bin目录下, ./kafka-server-start.sh -daemon /opt/software/kafka//config/server.properties

3.创建topic

使用kafka-topics.sh 创建单分区单副本的topic test:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

4.查看topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

5.产生消息:kafka-console-producer.sh

使用kafka-console-producer.sh 发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

使用Ctrl+C退出生成消息;

6.消费消息:kafka-console-consumer.sh

使用kafka-console-consumer.sh 接收消息并在终端打印

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning (这里用删除线标识,并不是代表命名错误,低版本仍然可以适用!!!)

这里需要注意上面这行命令试用于低版本的kafka,否则会报错,小编安装的kafka版本高,所以报错,故换成以下命令即可:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

7.查看描述 Topic 信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

说明

第一行给出了所有分区的摘要,每个附加行给出了关于一个分区的信息。 由于我们只有一个分区,所以只有一行。

Leader: 是负责给定分区的所有读取和写入的节点。 每个节点将成为分区随机选择部分的领导者。

Replicas: 是复制此分区日志的节点列表,无论它们是否是领导者,或者即使他们当前处于活动状态。

Isr: 是一组“同步”副本。这是复制品列表的子集,当前活着并被引导到领导者。

8.删除topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

参考资源:

1.在CentOS 7上安装Kafka

https://cloud.tencent.com/developer/article/1388439

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/09/25 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、准备
  • 二、安装与配置
  • 三、配置文件详解
  • 四、运行
    • 1.启动zookeeper服务
      • 2.启动kafka服务
        • 3.创建topic
          • 4.查看topic
            • 5.产生消息:kafka-console-producer.sh
              • 6.消费消息:kafka-console-consumer.sh
                • 7.查看描述 Topic 信息
                  • 8.删除topic
                  相关产品与服务
                  云服务器
                  云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档