前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[721]linux安装kafka

[721]linux安装kafka

作者头像
周小董
发布2020-01-13 17:40:13
2.8K0
发布2020-01-13 17:40:13
举报
文章被收录于专栏:python前行者
外网无法连接Kafka集群(报错:NoBrokersAvailable)

本地Consumer和Producer无法使用远程Kafka服务器的解决方法:

分别修改各台服务器Kafka配置文件server.properties, 在

#listeners=PLAINTEXT://:9092下添加如下一行: advertised.listeners=PLAINTEXT://x.x.x.x:9092 (x.x.x.x为服务器对外的IP)


首先确保你的机器上安装了jdk,kafka需要java运行环境,以前的kafka还需要zookeeper,新版的kafka已经内置了一个zookeeper环境,所以我们可以直接使用

安装zookeeper 不会安装请移步:点我快速进入安装zookeeper文章

一、安装kafka

1、手动下载 下载地址:http://kafka.apache.org/downloads

下载好了然后上传服务器

2、也可以用命令下载(直接下载到服务器哦!):

代码语言:javascript
复制
wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.12-2.0.0.tgz

二、解压kafka:

命令:tar -xzvf kafka_2.12-2.0.0.tgz(文件名可以更换哦!)

三、修改server.properties文件

在config目录下输入命令:vi server.properties

修改内容为:

代码语言:javascript
复制
broker.id=0 
port=9092 #端口号 
实际不用配host.name
#host.name=localhost #单机可直接用localhost
#host.name=内网ip #解决外网无法访问的问题,这里要用内网ip
log.dirs=/home/zerone/zxf/local/kafka/logs/server #日志存放路径可修改可不修改
zookeeper.connect=localhost:2181 #zookeeper地址和端口,单机配置部署,localhost:2181 

  • broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,我们这里采用默认配置即可
  • listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,例如:listeners=PLAINTEXT://192.168.180.128:9092。并确保服务器的9092端口能够访问
  • zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带zookeeper,使用默认配置即可

由于我修改了日志存放地址 我需要创建一个文件夹,直接在kafka目录下输入:mkdir -p logs/server

  • server.properties其它配置说明(broker.id和listeners每个节点都不相同)
代码语言:javascript
复制
#是否允许删除topic,默认false不能手动删除
delete.topic.enable=true
#当前机器在集群中的唯一标识,和zookeeper的myid性质一样
broker.id=0
#当前kafka服务侦听的地址和端口,端口默认是9092
listeners = PLAINTEXT://192.168.100.21:9092
#这个是borker进行网络处理的线程数
num.network.threads=3
#这个是borker进行I/O处理的线程数
num.io.threads=8
#发送缓冲区buffer大小,数据不是一下子就发送的,先会存储到缓冲区到达一定的大小后在发送,能提高性能
socket.send.buffer.bytes=102400
#kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.receive.buffer.bytes=102400
#这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
socket.request.max.bytes=104857600
#消息日志存放的路径
log.dirs=/opt/module/kafka_2.11-1.1.0/logs
#默认的分区数,一个topic默认1个分区数
num.partitions=1
#每个数据目录用来日志恢复的线程数目
num.recovery.threads.per.data.dir=1
#默认消息的最大持久化时间,168小时,7天
log.retention.hours=168
#这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.segment.bytes=1073741824
#每隔300000毫秒去检查上面配置的log失效时间
log.retention.check.interval.ms=300000
#是否启用log压缩,一般不用启用,启用的话可以提高性能
log.cleaner.enable=false
#设置zookeeper的连接端口
zookeeper.connect=node21:2181,node22:2181,node23:2181
#设置zookeeper的连接超时时间
zookeeper.connection.timeout.ms=6000

四、修改zookeeper.properties文件

进入kafka目录下 config目录下输入命令:vi zookeeper.properties

修改内容为:

代码语言:javascript
复制
dataDir=/home/zerone/zxf/local/kafka/data/zookeeper  #zookeeper数据目录  (可以修改可以不修改)
dataLogDir=/home/zerone/zxf/local/kafka/logs/zookeeper #zookeeper日志目录 (可以修改可以不修改)
clientPort=2181 
maxClientCnxns=100 
tickTime=2000 
initLimit=10

由于我修改了默认的数据目录地址和日志目录需要创建文件夹

五、修改完之后就可以启动zookeeper和kafka了。

直接敲命令感觉有些low呀。弄一个脚本命令吧: 启动脚本:

进入kafka目录下 输入命令:vi kafkaStart.sh

添加内容为:

代码语言:javascript
复制
#!/bin/bash
#启动zookeeper
nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties 1>& ./logs/zookeeper.log &
sleep 3  #默默等3秒后执行 
#启动kafka
nohup ./bin/kafka-server-start.sh ./config/server.properties 1>& ./logs/kafka.log &

添加脚本执行权限:

在刚创建的脚本目录许下执行以下命令

代码语言:javascript
复制
chmod +x kafkastart.sh(启动脚本名称)

六、启动kafka

1、先启动zookeeper。

启动命令:sh $zookeeper_home/bin/zkServer.sh start

2、启动kafka

在kafka目录下输入 启动脚本命令: ./kafkaStart.sh

七、创建topic 出现Created topid test 则创建成功

命令:在kafka 目录下bin目录下执行:

注意:上面配的host.name是localhost,下面就用localhost,是内网ip就用内网ip

代码语言:javascript
复制
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test  #test是topic名字可以随便换哦。

选项说明:

–topic 定义topic名

–replication-factor 定义副本数

–partitions 定义分区数

  • 查询创建的top主题 出现test则为正常

命令:在kafka 目录下bin目录下执行:

代码语言:javascript
复制
./kafka-topics.sh --list --zookeeper localhost:2181
  • 删除创建的topic

命令:在kafka 目录下bin目录下执行:

代码语言:javascript
复制
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test #tsst为要删除的topic名称

八、测试生成者(test topic):

命令:在kafka 目录下bin目录下执行:

代码语言:javascript
复制
./kafka-console-producer.sh --broker-list localhost:9092 --topic test(要启动生成者的topic名称)

九、测试消费者 启动另一个xsheel窗口 这样效果更明显哦!(test topic):

命令:在kafka 目录下bin目录下执行:

代码语言:javascript
复制
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic crawl_data --from-beginning

–from-beginning:会把TestTopic主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

发送完消息消费者会变成感叹号哦!点击查看输入内容已经打印控制台证明消费成功。

十、到此结束kafka单机版本就集成了

我们来停止kafka网上有很多脚本 你们可以参考一下

关闭脚本:进入kafka目录下 输入命令:vi kafkaStop.sh

代码语言:javascript
复制
#!/bin/bash
#关闭kafka
nohup ./bin/kafka-server-stop.sh ./config/server.properties 1>& ./logs/kafka.log &
sleep 3 #默默等3秒后执行 
#关闭zookeeper
nohup ./bin/zookeeper-server-stop.sh  ./config/zookeeper.properties 1>& ./logs/zookeeper.log &

关闭kafka: 在kafka目录下执行./kafkaStop.sh

  • 关闭zookeeper
代码语言:javascript
复制
sh $zookeeper_home/bin/zkServer.sh stop 

但是他会报没有启动着zookeeper和kafka

我们ps-ef | grep kafka 一下你会发现还在启动着。

上面的方法我没成功。你们可以测试一下。用最粗暴的方法把。

代码语言:javascript
复制
ps -ef | grep kafka
kill -9  kafka进程号
ps -ef | grep zookeeper
kill -9  zookeeper进程号

注意:一定要先关闭kafka在关闭zookeeper !!! 如果先关闭zookeeper kafka会一直去连接zookeeper服务 进入死循环了。 如果进入死循环有两种解决办法:

1、重启服务

代码语言:javascript
复制
shutdown -h 10          #计算机将于10分钟后关闭,且会显示在登录用户的当前屏幕中
shutdown -h now      #计算机会立刻关机
shutdown -h 22:22    #计算机会在这个时刻关机
shutdown -r now       #计算机会立刻重启
shutdown -r +10       #计算机会将于10分钟后重启
reboot                       #重启
halt                           #关机

2、新打开一个xsheel窗口 然后把kafka杀掉。


Kafka配置信息详解

Broker配置信息

属性

默认值

描述

broker.id

必填参数,broker的唯一标识

log.dirs

/tmp/kafka-logs

Kafka数据存放的目录。可以指定多个目录,中间用逗号分隔,当新partition被创建的时会被存放到当前存放partition最少的目录。

port

9092

BrokerServer接受客户端连接的端口号

zookeeper.connect

null

Zookeeper的连接串,格式为:hostname1:port1,hostname2:port2,hostname3:port3。可以填一个或多个,为了提高可靠性,建议都填上。注意,此配置允许我们指定一个zookeeper路径来存放此kafka集群的所有数据,为了与其他应用集群区分开,建议在此配置中指定本集群存放目录,格式为:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。需要注意的是,消费者的参数要和此参数一致。

message.max.bytes

1000000

服务器可以接收到的最大的消息大小。注意此参数要和consumer的maximum.message.size大小一致,否则会因为生产者生产的消息太大导致消费者无法消费。

num.io.threads

8

服务器用来执行读写请求的IO线程数,此参数的数量至少要等于服务器上磁盘的数量。

queued.max.requests

500

I/O线程可以处理请求的队列大小,若实际请求数超过此大小,网络线程将停止接收新的请求。

socket.send.buffer.bytes

100 * 1024

The SO_SNDBUFF buffer the server prefers for socket connections.

socket.receive.buffer.bytes

100 * 1024

The SO_RCVBUFF buffer the server prefers for socket connections.

socket.request.max.bytes

100 * 1024 * 1024

服务器允许请求的最大值, 用来防止内存溢出,其值应该小于 Java heap size.

num.partitions

1

默认partition数量,如果topic在创建时没有指定partition数量,默认使用此值,建议改为5

log.segment.bytes

1024 * 1024 * 1024

Segment文件的大小,超过此值将会自动新建一个segment,此值可以被topic级别的参数覆盖。

log.roll.{ms,hours}

24 * 7 hours

新建segment文件的时间,此值可以被topic级别的参数覆盖。

log.retention.{ms,minutes,hours}

7 days

Kafka segment log的保存周期,保存周期超过此时间日志就会被删除。此参数可以被topic级别参数覆盖。数据量大时,建议减小此值。

log.retention.bytes

-1

每个partition的最大容量,若数据量超过此值,partition数据将会被删除。注意这个参数控制的是每个partition而不是topic。此参数可以被log级别参数覆盖。

log.retention.check.interval.ms

5 minutes

删除策略的检查周期

auto.create.topics.enable

true

自动创建topic参数,建议此值设置为false,严格控制topic管理,防止生产者错写topic。

default.replication.factor

1

默认副本数量,建议改为2。

replica.lag.time.max.ms

10000

在此窗口时间内没有收到follower的fetch请求,leader会将其从ISR(in-sync replicas)中移除。

replica.lag.max.messages

4000

如果replica节点落后leader节点此值大小的消息数量,leader节点就会将其从ISR中移除。

replica.socket.timeout.ms

30 * 1000

replica向leader发送请求的超时时间。

replica.socket.receive.buffer.bytes

64 * 1024

The socket receive buffer for network requests to the leader for replicating data.

replica.fetch.max.bytes

1024 * 1024

The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader.

replica.fetch.wait.max.ms

500

The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader.

num.replica.fetchers

1

Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker.

fetch.purgatory.purge.interval.requests

1000

The purge interval (in number of requests) of the fetch request purgatory.

zookeeper.session.timeout.ms

6000

ZooKeeper session 超时时间。如果在此时间内server没有向zookeeper发送心跳,zookeeper就会认为此节点已挂掉。 此值太低导致节点容易被标记死亡;若太高,.会导致太迟发现节点死亡。

zookeeper.connection.timeout.ms

6000

客户端连接zookeeper的超时时间。

zookeeper.sync.time.ms

2000

H ZK follower落后 ZK leader的时间。

controlled.shutdown.enable

true

允许broker shutdown。如果启用,broker在关闭自己之前会把它上面的所有leaders转移到其它brokers上,建议启用,增加集群稳定性。

auto.leader.rebalance.enable

true

If this is enabled the controller will automatically try to balance leadership for partitions among the brokers by periodically returning leadership to the “preferred” replica for each partition if it is available.

leader.imbalance.per.broker.percentage

10

The percentage of leader imbalance allowed per broker. The controller will rebalance leadership if this ratio goes above the configured value per broker.

leader.imbalance.check.interval.seconds

300

The frequency with which to check for leader imbalance.

offset.metadata.max.bytes

4096

The maximum amount of metadata to allow clients to save with their offsets.

connections.max.idle.ms

600000

Idle connections timeout: the server socket processor threads close the connections that idle more than this.

num.recovery.threads.per.data.dir

1

The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.

unclean.leader.election.enable

true

Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss.

delete.topic.enable

false

启用deletetopic参数,建议设置为true。

offsets.topic.num.partitions

50

The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200).

offsets.topic.retention.minutes

1440

Offsets that are older than this age will be marked for deletion. The actual purge will occur when the log cleaner compacts the offsets topic.

offsets.retention.check.interval.ms

600000

The frequency at which the offset manager checks for stale offsets.

offsets.topic.replication.factor

3

The replication factor for the offset commit topic. A higher setting (e.g., three or four) is recommended in order to ensure higher availability. If the offsets topic is created when fewer brokers than the replication factor then the offsets topic will be created with fewer replicas.

offsets.topic.segment.bytes

104857600

Segment size for the offsets topic. Since it uses a compacted topic, this should be kept relatively low in order to facilitate faster log compaction and loads.

offsets.load.buffer.size

5242880

An offset load occurs when a broker becomes the offset manager for a set of consumer groups (i.e., when it becomes a leader for an offsets topic partition). This setting corresponds to the batch size (in bytes) to use when reading from the offsets segments when loading offsets into the offset manager’s cache.

offsets.commit.required.acks

-1

The number of acknowledgements that are required before the offset commit can be accepted. This is similar to the producer’s acknowledgement setting. In general, the default should not be overridden.

offsets.commit.timeout.ms

5000

The offset commit will be delayed until this timeout or the required number of replicas have received the offset commit. This is similar to the producer request timeout.

Producer配置信息

属性

默认值

描述

metadata.broker.list

启动时producer查询brokers的列表,可以是集群中所有brokers的一个子集。注意,这个参数只是用来获取topic的元信息用,producer会从元信息中挑选合适的broker并与之建立socket连接。格式是:host1:port1,host2:port2。

request.required.acks

0

参见3.2节介绍

request.timeout.ms

10000

Broker等待ack的超时时间,若等待时间超过此值,会返回客户端错误信息。

producer.type

sync

同步异步模式。async表示异步,sync表示同步。如果设置成异步模式,可以允许生产者以batch的形式push数据,这样会极大的提高broker性能,推荐设置为异步。

serializer.class

kafka.serializer.DefaultEncoder

序列号类,.默认序列化成 byte[] 。

key.serializer.class

Key的序列化类,默认同上。

partitioner.class

kafka.producer.DefaultPartitioner

Partition类,默认对key进行hash。

compression.codec

none

指定producer消息的压缩格式,可选参数为: “none”, “gzip” and “snappy”。关于压缩参见4.1节

compressed.topics

null

启用压缩的topic名称。若上面参数选择了一个压缩格式,那么压缩仅对本参数指定的topic有效,若本参数为空,则对所有topic有效。

message.send.max.retries

3

Producer发送失败时重试次数。若网络出现问题,可能会导致不断重试。

retry.backoff.ms

100

Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.

topic.metadata.refresh.interval.ms

600 * 1000

The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available…). It will also poll regularly (default: every 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. If you set this to zero, the metadata will get refreshed after each message sent (not recommended). Important note: the refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed

queue.buffering.max.ms

5000

启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1秒的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。

queue.buffering.max.messages

10000

采用异步模式时producer buffer 队列里最大缓存的消息数量,如果超过这个数值,producer就会阻塞或者丢掉消息。

queue.enqueue.timeout.ms

-1

当达到上面参数值时producer阻塞等待的时间。如果值设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉。若值设置为-1,producer会被阻塞,不会丢消息。

batch.num.messages

200

采用异步模式时,一个batch缓存的消息数量。达到这个数量值时producer才会发送消息。

send.buffer.bytes

100 * 1024

Socket write buffer size

client.id

“”

The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.

Consumer配置信息

属性

默认值

描述

group.id

Consumer的组ID,相同goup.id的consumer属于同一个组。

zookeeper.connect

Consumer的zookeeper连接串,要和broker的配置一致。

consumer.id

null

如果不设置会自动生成。

socket.timeout.ms

30 * 1000

网络请求的socket超时时间。实际超时时间由max.fetch.wait + socket.timeout.ms 确定。

socket.receive.buffer.bytes

64 * 1024

The socket receive buffer for network requests.

fetch.message.max.bytes

1024 * 1024

查询topic-partition时允许的最大消息大小。consumer会为每个partition缓存此大小的消息到内存,因此,这个参数可以控制consumer的内存使用量。这个值应该至少比server允许的最大消息大小大,以免producer发送的消息大于consumer允许的消息。

num.consumer.fetchers

1

The number fetcher threads used to fetch data.

auto.commit.enable

true

如果此值设置为true,consumer会周期性的把当前消费的offset值保存到zookeeper。当consumer失败重启之后将会使用此值作为新开始消费的值。

auto.commit.interval.ms

60 * 1000

Consumer提交offset值到zookeeper的周期。

queued.max.message.chunks

2

用来被consumer消费的message chunks 数量, 每个chunk可以缓存fetch.message.max.bytes大小的数据量。

auto.commit.interval.ms

60 * 1000

Consumer提交offset值到zookeeper的周期。

queued.max.message.chunks

2

用来被consumer消费的message chunks 数量, 每个chunk可以缓存fetch.message.max.bytes大小的数据量。

fetch.min.bytes

1

The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.

fetch.wait.max.ms

100

The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy fetch.min.bytes.

rebalance.backoff.ms

2000

Backoff time between retries during rebalance.

refresh.leader.backoff.ms

200

Backoff time to wait before trying to determine the leader of a partition that has just lost its leader.

auto.offset.reset

largest

What to do when there is no initial offset in ZooKeeper or if an offset is out of range ;smallest : automatically reset the offset to the smallest offset; largest : automatically reset the offset to the largest offset;anything else: throw exception to the consumer

consumer.timeout.ms

-1

若在指定时间内没有消息消费,consumer将会抛出异常。

exclude.internal.topics

true

Whether messages from internal topics (such as offsets) should be exposed to the consumer.

zookeeper.session.timeout.ms

6000

ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur.

zookeeper.connection.timeout.ms

6000

The max time that the client waits while establishing a connection to zookeeper.

zookeeper.sync.time.ms

2000

How far a ZK follower can be behind a ZK leader


kafka的配置

在kafka/config/目录下面有3个配置文件:

producer.properties 生产者配置

consumer.properties 消费者配置

server.properties kafka服务器的配置

kafka的配置分为 broker(server.properties)、producter(producer.properties)、consumer(consumer.properties)3个不同的配置

一 BROKER 的全局配置

最为核心的三个配置 broker.id、log.dir、zookeeper.connect 。

代码语言:javascript
复制
------------------------------------------- 系统 相关 -------------------------------------------
##每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
broker.id =1
 
##kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs = /tmp/kafka-logs
 
##提供给客户端响应的端口
port =6667
 
##消息体的最大大小,单位是字节
message.max.bytes =1000000
 
## broker 处理消息的最大线程数,一般情况下不需要去修改
num.network.threads =3
 
## broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
num.io.threads =8
 
## 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
background.threads =4
 
## 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,算是一种自我保护机制
queued.max.requests =500
 
##broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
host.name
 
## 打广告的地址,若是设置的话,会提供给producers, consumers,其他broker连接,具体如何使用还未深究
advertised.host.name
 
## 广告地址端口,必须不同于port中的设置
advertised.port
 
## socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.send.buffer.bytes =100*1024
 
## socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.receive.buffer.bytes =100*1024
 
## socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes =100*1024*1024
 
------------------------------------------- LOG 相关 -------------------------------------------
## topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.segment.bytes =1024*1024*1024
 
## 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment 会被 topic创建时的指定参数覆盖
log.roll.hours =24*7
 
## 日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.cleanup.policy = delete
 
## 数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据
## log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.minutes=7days

指定日志每隔多久检查看是否可以被删除,默认1分钟
log.cleanup.interval.mins=1
 
## topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes 。-1没有大小限制
## log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.bytes=-1
 
## 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=5minutes
 
## 是否开启日志压缩
log.cleaner.enable=false
 
## 日志压缩运行的线程数
log.cleaner.threads =1
 
## 日志压缩时候处理的最大大小
log.cleaner.io.max.bytes.per.second=None
 
## 日志压缩去重时候的缓存空间 ,在空间允许的情况下,越大越好
log.cleaner.dedupe.buffer.size=500*1024*1024
 
## 日志清理时候用到的IO块大小 一般不需要修改
log.cleaner.io.buffer.size=512*1024
 
## 日志清理中hash表的扩大因子 一般不需要修改
log.cleaner.io.buffer.load.factor =0.9
 
## 检查是否处罚日志清理的间隔
log.cleaner.backoff.ms =15000
 
## 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖
log.cleaner.min.cleanable.ratio=0.5
 
## 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖
log.cleaner.delete.retention.ms =1day
 
## 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
log.index.size.max.bytes =10*1024*1024
 
## 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
log.index.interval.bytes =4096
 
## log文件"sync"到磁盘之前累积的消息条数
## 因为磁盘IO操作是一个慢操作,但又是一个"数据可靠性"的必要手段
## 所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.
## 如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞)
## 如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.
## 物理server故障,将会导致没有fsync的消息丢失.
log.flush.interval.messages=None
 
## 检查是否需要固化到硬盘的时间间隔
log.flush.scheduler.interval.ms =3000
 
## 仅仅通过interval来控制消息的磁盘写入时机,是不足的.
## 此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔
## 达到阀值,也将触发.
log.flush.interval.ms = None
 
## 文件在索引中清除后保留的时间 一般不需要去修改
log.delete.delay.ms =60000
 
## 控制上次固化硬盘的时间点,以便于数据恢复 一般不需要去修改
log.flush.offset.checkpoint.interval.ms =60000
 
------------------------------------------- TOPIC 相关 -------------------------------------------
## 是否允许自动创建topic ,若是false,就需要通过命令创建topic
auto.create.topics.enable =true
 
## 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数
default.replication.factor =1
 
## 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖
num.partitions =1
 
实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有一个分区,分区被复制到三个broker上。
 
----------------------------------复制(Leader、replicas) 相关 ----------------------------------
## partition leader与replicas之间通讯时,socket的超时时间
controller.socket.timeout.ms =30000
 
## partition leader与replicas数据同步时,消息的队列尺寸
controller.message.queue.size=10
 
## replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中
replica.lag.time.max.ms =10000
 
## 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效
## 通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后
## 如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移
## 到其他follower中.
## 在broker数量较少,或者网络不足的环境中,建议提高此值.
replica.lag.max.messages =4000
 
##follower与leader之间的socket超时时间
replica.socket.timeout.ms=30*1000
 
## leader复制时候的socket缓存大小
replica.socket.receive.buffer.bytes=64*1024
 
## replicas每次获取数据的最大大小
replica.fetch.max.bytes =1024*1024
 
## replicas同leader之间通信的最大等待时间,失败了会重试
replica.fetch.wait.max.ms =500
 
## fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
replica.fetch.min.bytes =1
 
## leader 进行复制的线程数,增大这个数值会增加follower的IO
num.replica.fetchers=1
 
## 每个replica检查是否将最高水位进行固化的频率
replica.high.watermark.checkpoint.interval.ms =5000
 
## 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker
controlled.shutdown.enable =false
 
## 控制器关闭的尝试次数
controlled.shutdown.max.retries =3
 
## 每次关闭尝试的时间间隔
controlled.shutdown.retry.backoff.ms =5000
 
## 是否自动平衡broker之间的分配策略
auto.leader.rebalance.enable =false
 
## leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
leader.imbalance.per.broker.percentage =10
 
## 检查leader是否不平衡的时间间隔
leader.imbalance.check.interval.seconds =300
 
## 客户端保留offset信息的最大空间大小
offset.metadata.max.bytes
 
----------------------------------ZooKeeper 相关----------------------------------
##zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect = localhost:2181
 
## ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
zookeeper.session.timeout.ms=6000
 
## ZooKeeper的连接超时时间
zookeeper.connection.timeout.ms =6000
 
## ZooKeeper集群中leader和follower之间的同步实际那
zookeeper.sync.time.ms =2000
配置的修改
其中一部分配置是可以被每个topic自身的配置所代替,例如
新增配置
bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config max.message.bytes=64000--config flush.messages=1
 
修改配置
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --config max.message.bytes=128000
 
删除配置 :
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig max.message.bytes

二 CONSUMER 配置

最为核心的配置是group.id、zookeeper.connect

代码语言:javascript
复制
## Consumer归属的组ID,broker是根据group.id来判断是队列模式还是发布订阅模式,非常重要
 group.id
 
## 消费者的ID,若是没有设置的话,会自增
 consumer.id
 
## 一个用于跟踪调查的ID ,最好同group.id相同
 client.id = group id value
 
## 对于zookeeper集群的指定,可以是多个 hostname1:port1,hostname2:port2,hostname3:port3 必须和broker使用同样的zk配置
 zookeeper.connect=localhost:2182
 
## zookeeper的心跳超时时间,超过这个时间就认为是dead消费者
 zookeeper.session.timeout.ms =6000
 
## zookeeper的等待连接时间
 zookeeper.connection.timeout.ms =6000
 
## zookeeper的follower同leader的同步时间
 zookeeper.sync.time.ms =2000
 
## 当zookeeper中没有初始的offset时候的处理方式 。smallest :重置为最小值 largest:重置为最大值 anythingelse:抛出异常
 auto.offset.reset = largest
 
## socket的超时时间,实际的超时时间是:max.fetch.wait + socket.timeout.ms.
 socket.timeout.ms=30*1000
 
## socket的接受缓存空间大小
 socket.receive.buffer.bytes=64*1024
 
##从每个分区获取的消息大小限制
 fetch.message.max.bytes =1024*1024
 
## 是否在消费消息后将offset同步到zookeeper,当Consumer失败后就能从zookeeper获取最新的offset
 auto.commit.enable =true
 
## 自动提交的时间间隔
 auto.commit.interval.ms =60*1000
 
## 用来处理消费消息的块,每个块可以等同于fetch.message.max.bytes中数值
 queued.max.message.chunks =10
 
## 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新
## 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册
##"Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点,
## 此值用于控制,注册节点的重试次数.
 rebalance.max.retries =4
 
## 每次再平衡的时间间隔
 rebalance.backoff.ms =2000
 
## 每次重新选举leader的时间
 refresh.leader.backoff.ms
 
## server发送到消费端的最小数据,若是不满足这个数值则会等待,知道满足数值要求
 fetch.min.bytes =1
 
## 若是不满足最小大小(fetch.min.bytes)的话,等待消费端请求的最长等待时间
 fetch.wait.max.ms =100
 
## 指定时间内没有消息到达就抛出异常,一般不需要改
 consumer.timeout.ms = -1

三 PRODUCER 的配置

比较核心的配置:metadata.broker.list、request.required.acks、producer.type、serializer.class

代码语言:javascript
复制
## 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vip
 metadata.broker.list
 
##消息的确认模式
 ##0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP
 ##1:发送消息,并会等待leader 收到确认后,一定的可靠性
 ## -1:发送消息,等待leader收到确认,并进行复制操作后,才返回,最高的可靠性
 request.required.acks =0
 
## 消息发送的最长等待时间
 request.timeout.ms =10000
 
## socket的缓存大小
 send.buffer.bytes=100*1024
 
## key的序列化方式,若是没有设置,同serializer.class
 key.serializer.class
 
## 分区的策略,默认是取模
 partitioner.class=kafka.producer.DefaultPartitioner
 
## 消息的压缩模式,默认是none,可以有gzip和snappy
 compression.codec = none
 
## 可以针对默写特定的topic进行压缩
 compressed.topics=null
 
## 消息发送失败后的重试次数
 message.send.max.retries =3
 
## 每次失败后的间隔时间
 retry.backoff.ms =100
 
## 生产者定时更新topic元信息的时间间隔 ,若是设置为0,那么会在每个消息发送后都去更新数据
 topic.metadata.refresh.interval.ms =600*1000
 
## 用户随意指定,但是不能重复,主要用于跟踪记录消息
 client.id=""
 
------------------------------------------- 消息模式 相关 -------------------------------------------
 ## 生产者的类型 async:异步执行消息的发送 sync:同步执行消息的发送
 producer.type=sync
 
## 异步模式下,那么就会在设置的时间缓存消息,并一次性发送
 queue.buffering.max.ms =5000
 
## 异步的模式下 最长等待的消息数
 queue.buffering.max.messages =10000
 
## 异步模式下,进入队列的等待时间 若是设置为0,那么要么进入队列,要么直接抛弃
 queue.enqueue.timeout.ms = -1
 
## 异步模式下,每次发送的最大消息数,前提是触发了queue.buffering.max.messages或是queue.buffering.max.ms的限制
 batch.num.messages=200
 
## 消息体的系列化处理类 ,转化为字节流进行传输
 serializer.class= kafka.serializer.DefaultEncoder

参考 :https://blog.csdn.net/weixin_39984161/article/details/91971731

https://www.cnblogs.com/frankdeng/p/9403883.html

https://www.cnblogs.com/zhangtianyuan/p/7687156.html

https://blog.csdn.net/luanpeng825485697/article/details/81036028

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019/12/12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 外网无法连接Kafka集群(报错:NoBrokersAvailable)
  • 一、安装kafka
  • 二、解压kafka:
  • 三、修改server.properties文件
  • 四、修改zookeeper.properties文件
  • 五、修改完之后就可以启动zookeeper和kafka了。
  • 六、启动kafka
  • 七、创建topic 出现Created topid test 则创建成功
  • 八、测试生成者(test topic):
  • 九、测试消费者 启动另一个xsheel窗口 这样效果更明显哦!(test topic):
  • 十、到此结束kafka单机版本就集成了
  • Kafka配置信息详解
    • Broker配置信息
      • Producer配置信息
        • Consumer配置信息
        • kafka的配置
          • 一 BROKER 的全局配置
            • 二 CONSUMER 配置
              • 三 PRODUCER 的配置
              相关产品与服务
              文件存储
              文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档