首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >[721]linux安装kafka

[721]linux安装kafka

作者头像
周小董
发布于 2020-01-13 09:40:13
发布于 2020-01-13 09:40:13
2.9K00
代码可运行
举报
文章被收录于专栏:python前行者python前行者
运行总次数:0
代码可运行
外网无法连接Kafka集群(报错:NoBrokersAvailable)

本地Consumer和Producer无法使用远程Kafka服务器的解决方法:

分别修改各台服务器Kafka配置文件server.properties, 在

#listeners=PLAINTEXT://:9092下添加如下一行: advertised.listeners=PLAINTEXT://x.x.x.x:9092 (x.x.x.x为服务器对外的IP)


首先确保你的机器上安装了jdk,kafka需要java运行环境,以前的kafka还需要zookeeper,新版的kafka已经内置了一个zookeeper环境,所以我们可以直接使用

安装zookeeper 不会安装请移步:点我快速进入安装zookeeper文章

一、安装kafka

1、手动下载 下载地址:http://kafka.apache.org/downloads

下载好了然后上传服务器

2、也可以用命令下载(直接下载到服务器哦!):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.12-2.0.0.tgz

二、解压kafka:

命令:tar -xzvf kafka_2.12-2.0.0.tgz(文件名可以更换哦!)

三、修改server.properties文件

在config目录下输入命令:vi server.properties

修改内容为:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
broker.id=0 
port=9092 #端口号 
实际不用配host.name
#host.name=localhost #单机可直接用localhost
#host.name=内网ip #解决外网无法访问的问题,这里要用内网ip
log.dirs=/home/zerone/zxf/local/kafka/logs/server #日志存放路径可修改可不修改
zookeeper.connect=localhost:2181 #zookeeper地址和端口,单机配置部署,localhost:2181 

  • broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,我们这里采用默认配置即可
  • listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,例如:listeners=PLAINTEXT://192.168.180.128:9092。并确保服务器的9092端口能够访问
  • zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带zookeeper,使用默认配置即可

由于我修改了日志存放地址 我需要创建一个文件夹,直接在kafka目录下输入:mkdir -p logs/server

  • server.properties其它配置说明(broker.id和listeners每个节点都不相同)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
#是否允许删除topic,默认false不能手动删除
delete.topic.enable=true
#当前机器在集群中的唯一标识,和zookeeper的myid性质一样
broker.id=0
#当前kafka服务侦听的地址和端口,端口默认是9092
listeners = PLAINTEXT://192.168.100.21:9092
#这个是borker进行网络处理的线程数
num.network.threads=3
#这个是borker进行I/O处理的线程数
num.io.threads=8
#发送缓冲区buffer大小,数据不是一下子就发送的,先会存储到缓冲区到达一定的大小后在发送,能提高性能
socket.send.buffer.bytes=102400
#kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.receive.buffer.bytes=102400
#这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
socket.request.max.bytes=104857600
#消息日志存放的路径
log.dirs=/opt/module/kafka_2.11-1.1.0/logs
#默认的分区数,一个topic默认1个分区数
num.partitions=1
#每个数据目录用来日志恢复的线程数目
num.recovery.threads.per.data.dir=1
#默认消息的最大持久化时间,168小时,7天
log.retention.hours=168
#这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.segment.bytes=1073741824
#每隔300000毫秒去检查上面配置的log失效时间
log.retention.check.interval.ms=300000
#是否启用log压缩,一般不用启用,启用的话可以提高性能
log.cleaner.enable=false
#设置zookeeper的连接端口
zookeeper.connect=node21:2181,node22:2181,node23:2181
#设置zookeeper的连接超时时间
zookeeper.connection.timeout.ms=6000

四、修改zookeeper.properties文件

进入kafka目录下 config目录下输入命令:vi zookeeper.properties

修改内容为:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
dataDir=/home/zerone/zxf/local/kafka/data/zookeeper  #zookeeper数据目录  (可以修改可以不修改)
dataLogDir=/home/zerone/zxf/local/kafka/logs/zookeeper #zookeeper日志目录 (可以修改可以不修改)
clientPort=2181 
maxClientCnxns=100 
tickTime=2000 
initLimit=10

由于我修改了默认的数据目录地址和日志目录需要创建文件夹

五、修改完之后就可以启动zookeeper和kafka了。

直接敲命令感觉有些low呀。弄一个脚本命令吧: 启动脚本:

进入kafka目录下 输入命令:vi kafkaStart.sh

添加内容为:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
#!/bin/bash
#启动zookeeper
nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties 1>& ./logs/zookeeper.log &
sleep 3  #默默等3秒后执行 
#启动kafka
nohup ./bin/kafka-server-start.sh ./config/server.properties 1>& ./logs/kafka.log &

添加脚本执行权限:

在刚创建的脚本目录许下执行以下命令

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
chmod +x kafkastart.sh(启动脚本名称)

六、启动kafka

1、先启动zookeeper。

启动命令:sh $zookeeper_home/bin/zkServer.sh start

2、启动kafka

在kafka目录下输入 启动脚本命令: ./kafkaStart.sh

七、创建topic 出现Created topid test 则创建成功

命令:在kafka 目录下bin目录下执行:

注意:上面配的host.name是localhost,下面就用localhost,是内网ip就用内网ip

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test  #test是topic名字可以随便换哦。

选项说明:

–topic 定义topic名

–replication-factor 定义副本数

–partitions 定义分区数

  • 查询创建的top主题 出现test则为正常

命令:在kafka 目录下bin目录下执行:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
./kafka-topics.sh --list --zookeeper localhost:2181
  • 删除创建的topic

命令:在kafka 目录下bin目录下执行:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test #tsst为要删除的topic名称

八、测试生成者(test topic):

命令:在kafka 目录下bin目录下执行:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
./kafka-console-producer.sh --broker-list localhost:9092 --topic test(要启动生成者的topic名称)

九、测试消费者 启动另一个xsheel窗口 这样效果更明显哦!(test topic):

命令:在kafka 目录下bin目录下执行:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic crawl_data --from-beginning

–from-beginning:会把TestTopic主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

发送完消息消费者会变成感叹号哦!点击查看输入内容已经打印控制台证明消费成功。

十、到此结束kafka单机版本就集成了

我们来停止kafka网上有很多脚本 你们可以参考一下

关闭脚本:进入kafka目录下 输入命令:vi kafkaStop.sh

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
#!/bin/bash
#关闭kafka
nohup ./bin/kafka-server-stop.sh ./config/server.properties 1>& ./logs/kafka.log &
sleep 3 #默默等3秒后执行 
#关闭zookeeper
nohup ./bin/zookeeper-server-stop.sh  ./config/zookeeper.properties 1>& ./logs/zookeeper.log &

关闭kafka: 在kafka目录下执行./kafkaStop.sh

  • 关闭zookeeper
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
sh $zookeeper_home/bin/zkServer.sh stop 

但是他会报没有启动着zookeeper和kafka

我们ps-ef | grep kafka 一下你会发现还在启动着。

上面的方法我没成功。你们可以测试一下。用最粗暴的方法把。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
ps -ef | grep kafka
kill -9  kafka进程号
ps -ef | grep zookeeper
kill -9  zookeeper进程号

注意:一定要先关闭kafka在关闭zookeeper !!! 如果先关闭zookeeper kafka会一直去连接zookeeper服务 进入死循环了。 如果进入死循环有两种解决办法:

1、重启服务

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
shutdown -h 10          #计算机将于10分钟后关闭,且会显示在登录用户的当前屏幕中
shutdown -h now      #计算机会立刻关机
shutdown -h 22:22    #计算机会在这个时刻关机
shutdown -r now       #计算机会立刻重启
shutdown -r +10       #计算机会将于10分钟后重启
reboot                       #重启
halt                           #关机

2、新打开一个xsheel窗口 然后把kafka杀掉。


Kafka配置信息详解

Broker配置信息

属性

默认值

描述

broker.id

必填参数,broker的唯一标识

log.dirs

/tmp/kafka-logs

Kafka数据存放的目录。可以指定多个目录,中间用逗号分隔,当新partition被创建的时会被存放到当前存放partition最少的目录。

port

9092

BrokerServer接受客户端连接的端口号

zookeeper.connect

null

Zookeeper的连接串,格式为:hostname1:port1,hostname2:port2,hostname3:port3。可以填一个或多个,为了提高可靠性,建议都填上。注意,此配置允许我们指定一个zookeeper路径来存放此kafka集群的所有数据,为了与其他应用集群区分开,建议在此配置中指定本集群存放目录,格式为:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。需要注意的是,消费者的参数要和此参数一致。

message.max.bytes

1000000

服务器可以接收到的最大的消息大小。注意此参数要和consumer的maximum.message.size大小一致,否则会因为生产者生产的消息太大导致消费者无法消费。

num.io.threads

8

服务器用来执行读写请求的IO线程数,此参数的数量至少要等于服务器上磁盘的数量。

queued.max.requests

500

I/O线程可以处理请求的队列大小,若实际请求数超过此大小,网络线程将停止接收新的请求。

socket.send.buffer.bytes

100 * 1024

The SO_SNDBUFF buffer the server prefers for socket connections.

socket.receive.buffer.bytes

100 * 1024

The SO_RCVBUFF buffer the server prefers for socket connections.

socket.request.max.bytes

100 * 1024 * 1024

服务器允许请求的最大值, 用来防止内存溢出,其值应该小于 Java heap size.

num.partitions

1

默认partition数量,如果topic在创建时没有指定partition数量,默认使用此值,建议改为5

log.segment.bytes

1024 * 1024 * 1024

Segment文件的大小,超过此值将会自动新建一个segment,此值可以被topic级别的参数覆盖。

log.roll.{ms,hours}

24 * 7 hours

新建segment文件的时间,此值可以被topic级别的参数覆盖。

log.retention.{ms,minutes,hours}

7 days

Kafka segment log的保存周期,保存周期超过此时间日志就会被删除。此参数可以被topic级别参数覆盖。数据量大时,建议减小此值。

log.retention.bytes

-1

每个partition的最大容量,若数据量超过此值,partition数据将会被删除。注意这个参数控制的是每个partition而不是topic。此参数可以被log级别参数覆盖。

log.retention.check.interval.ms

5 minutes

删除策略的检查周期

auto.create.topics.enable

true

自动创建topic参数,建议此值设置为false,严格控制topic管理,防止生产者错写topic。

default.replication.factor

1

默认副本数量,建议改为2。

replica.lag.time.max.ms

10000

在此窗口时间内没有收到follower的fetch请求,leader会将其从ISR(in-sync replicas)中移除。

replica.lag.max.messages

4000

如果replica节点落后leader节点此值大小的消息数量,leader节点就会将其从ISR中移除。

replica.socket.timeout.ms

30 * 1000

replica向leader发送请求的超时时间。

replica.socket.receive.buffer.bytes

64 * 1024

The socket receive buffer for network requests to the leader for replicating data.

replica.fetch.max.bytes

1024 * 1024

The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader.

replica.fetch.wait.max.ms

500

The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader.

num.replica.fetchers

1

Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker.

fetch.purgatory.purge.interval.requests

1000

The purge interval (in number of requests) of the fetch request purgatory.

zookeeper.session.timeout.ms

6000

ZooKeeper session 超时时间。如果在此时间内server没有向zookeeper发送心跳,zookeeper就会认为此节点已挂掉。 此值太低导致节点容易被标记死亡;若太高,.会导致太迟发现节点死亡。

zookeeper.connection.timeout.ms

6000

客户端连接zookeeper的超时时间。

zookeeper.sync.time.ms

2000

H ZK follower落后 ZK leader的时间。

controlled.shutdown.enable

true

允许broker shutdown。如果启用,broker在关闭自己之前会把它上面的所有leaders转移到其它brokers上,建议启用,增加集群稳定性。

auto.leader.rebalance.enable

true

If this is enabled the controller will automatically try to balance leadership for partitions among the brokers by periodically returning leadership to the “preferred” replica for each partition if it is available.

leader.imbalance.per.broker.percentage

10

The percentage of leader imbalance allowed per broker. The controller will rebalance leadership if this ratio goes above the configured value per broker.

leader.imbalance.check.interval.seconds

300

The frequency with which to check for leader imbalance.

offset.metadata.max.bytes

4096

The maximum amount of metadata to allow clients to save with their offsets.

connections.max.idle.ms

600000

Idle connections timeout: the server socket processor threads close the connections that idle more than this.

num.recovery.threads.per.data.dir

1

The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.

unclean.leader.election.enable

true

Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss.

delete.topic.enable

false

启用deletetopic参数,建议设置为true。

offsets.topic.num.partitions

50

The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200).

offsets.topic.retention.minutes

1440

Offsets that are older than this age will be marked for deletion. The actual purge will occur when the log cleaner compacts the offsets topic.

offsets.retention.check.interval.ms

600000

The frequency at which the offset manager checks for stale offsets.

offsets.topic.replication.factor

3

The replication factor for the offset commit topic. A higher setting (e.g., three or four) is recommended in order to ensure higher availability. If the offsets topic is created when fewer brokers than the replication factor then the offsets topic will be created with fewer replicas.

offsets.topic.segment.bytes

104857600

Segment size for the offsets topic. Since it uses a compacted topic, this should be kept relatively low in order to facilitate faster log compaction and loads.

offsets.load.buffer.size

5242880

An offset load occurs when a broker becomes the offset manager for a set of consumer groups (i.e., when it becomes a leader for an offsets topic partition). This setting corresponds to the batch size (in bytes) to use when reading from the offsets segments when loading offsets into the offset manager’s cache.

offsets.commit.required.acks

-1

The number of acknowledgements that are required before the offset commit can be accepted. This is similar to the producer’s acknowledgement setting. In general, the default should not be overridden.

offsets.commit.timeout.ms

5000

The offset commit will be delayed until this timeout or the required number of replicas have received the offset commit. This is similar to the producer request timeout.

Producer配置信息

属性

默认值

描述

metadata.broker.list

启动时producer查询brokers的列表,可以是集群中所有brokers的一个子集。注意,这个参数只是用来获取topic的元信息用,producer会从元信息中挑选合适的broker并与之建立socket连接。格式是:host1:port1,host2:port2。

request.required.acks

0

参见3.2节介绍

request.timeout.ms

10000

Broker等待ack的超时时间,若等待时间超过此值,会返回客户端错误信息。

producer.type

sync

同步异步模式。async表示异步,sync表示同步。如果设置成异步模式,可以允许生产者以batch的形式push数据,这样会极大的提高broker性能,推荐设置为异步。

serializer.class

kafka.serializer.DefaultEncoder

序列号类,.默认序列化成 byte[] 。

key.serializer.class

Key的序列化类,默认同上。

partitioner.class

kafka.producer.DefaultPartitioner

Partition类,默认对key进行hash。

compression.codec

none

指定producer消息的压缩格式,可选参数为: “none”, “gzip” and “snappy”。关于压缩参见4.1节

compressed.topics

null

启用压缩的topic名称。若上面参数选择了一个压缩格式,那么压缩仅对本参数指定的topic有效,若本参数为空,则对所有topic有效。

message.send.max.retries

3

Producer发送失败时重试次数。若网络出现问题,可能会导致不断重试。

retry.backoff.ms

100

Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.

topic.metadata.refresh.interval.ms

600 * 1000

The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available…). It will also poll regularly (default: every 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. If you set this to zero, the metadata will get refreshed after each message sent (not recommended). Important note: the refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed

queue.buffering.max.ms

5000

启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1秒的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。

queue.buffering.max.messages

10000

采用异步模式时producer buffer 队列里最大缓存的消息数量,如果超过这个数值,producer就会阻塞或者丢掉消息。

queue.enqueue.timeout.ms

-1

当达到上面参数值时producer阻塞等待的时间。如果值设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉。若值设置为-1,producer会被阻塞,不会丢消息。

batch.num.messages

200

采用异步模式时,一个batch缓存的消息数量。达到这个数量值时producer才会发送消息。

send.buffer.bytes

100 * 1024

Socket write buffer size

client.id

“”

The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.

Consumer配置信息

属性

默认值

描述

group.id

Consumer的组ID,相同goup.id的consumer属于同一个组。

zookeeper.connect

Consumer的zookeeper连接串,要和broker的配置一致。

consumer.id

null

如果不设置会自动生成。

socket.timeout.ms

30 * 1000

网络请求的socket超时时间。实际超时时间由max.fetch.wait + socket.timeout.ms 确定。

socket.receive.buffer.bytes

64 * 1024

The socket receive buffer for network requests.

fetch.message.max.bytes

1024 * 1024

查询topic-partition时允许的最大消息大小。consumer会为每个partition缓存此大小的消息到内存,因此,这个参数可以控制consumer的内存使用量。这个值应该至少比server允许的最大消息大小大,以免producer发送的消息大于consumer允许的消息。

num.consumer.fetchers

1

The number fetcher threads used to fetch data.

auto.commit.enable

true

如果此值设置为true,consumer会周期性的把当前消费的offset值保存到zookeeper。当consumer失败重启之后将会使用此值作为新开始消费的值。

auto.commit.interval.ms

60 * 1000

Consumer提交offset值到zookeeper的周期。

queued.max.message.chunks

2

用来被consumer消费的message chunks 数量, 每个chunk可以缓存fetch.message.max.bytes大小的数据量。

auto.commit.interval.ms

60 * 1000

Consumer提交offset值到zookeeper的周期。

queued.max.message.chunks

2

用来被consumer消费的message chunks 数量, 每个chunk可以缓存fetch.message.max.bytes大小的数据量。

fetch.min.bytes

1

The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.

fetch.wait.max.ms

100

The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy fetch.min.bytes.

rebalance.backoff.ms

2000

Backoff time between retries during rebalance.

refresh.leader.backoff.ms

200

Backoff time to wait before trying to determine the leader of a partition that has just lost its leader.

auto.offset.reset

largest

What to do when there is no initial offset in ZooKeeper or if an offset is out of range ;smallest : automatically reset the offset to the smallest offset; largest : automatically reset the offset to the largest offset;anything else: throw exception to the consumer

consumer.timeout.ms

-1

若在指定时间内没有消息消费,consumer将会抛出异常。

exclude.internal.topics

true

Whether messages from internal topics (such as offsets) should be exposed to the consumer.

zookeeper.session.timeout.ms

6000

ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur.

zookeeper.connection.timeout.ms

6000

The max time that the client waits while establishing a connection to zookeeper.

zookeeper.sync.time.ms

2000

How far a ZK follower can be behind a ZK leader


kafka的配置

在kafka/config/目录下面有3个配置文件:

producer.properties 生产者配置

consumer.properties 消费者配置

server.properties kafka服务器的配置

kafka的配置分为 broker(server.properties)、producter(producer.properties)、consumer(consumer.properties)3个不同的配置

一 BROKER 的全局配置

最为核心的三个配置 broker.id、log.dir、zookeeper.connect 。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
------------------------------------------- 系统 相关 -------------------------------------------
##每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
broker.id =1
 
##kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1/tmp/kafka-logs-2
log.dirs = /tmp/kafka-logs
 
##提供给客户端响应的端口
port =6667
 
##消息体的最大大小,单位是字节
message.max.bytes =1000000
 
## broker 处理消息的最大线程数,一般情况下不需要去修改
num.network.threads =3
 
## broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
num.io.threads =8
 
## 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
background.threads =4
 
## 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,算是一种自我保护机制
queued.max.requests =500
 
##broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
host.name
 
## 打广告的地址,若是设置的话,会提供给producers, consumers,其他broker连接,具体如何使用还未深究
advertised.host.name
 
## 广告地址端口,必须不同于port中的设置
advertised.port
 
## socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.send.buffer.bytes =100*1024
 
## socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.receive.buffer.bytes =100*1024
 
## socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes =100*1024*1024
 
------------------------------------------- LOG 相关 -------------------------------------------
## topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.segment.bytes =1024*1024*1024
 
## 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment 会被 topic创建时的指定参数覆盖
log.roll.hours =24*7
 
## 日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.cleanup.policy = delete
 
## 数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据
## log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.minutes=7days

指定日志每隔多久检查看是否可以被删除,默认1分钟
log.cleanup.interval.mins=1
 
## topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes 。-1没有大小限制
## log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.bytes=-1
 
## 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=5minutes
 
## 是否开启日志压缩
log.cleaner.enable=false
 
## 日志压缩运行的线程数
log.cleaner.threads =1
 
## 日志压缩时候处理的最大大小
log.cleaner.io.max.bytes.per.second=None
 
## 日志压缩去重时候的缓存空间 ,在空间允许的情况下,越大越好
log.cleaner.dedupe.buffer.size=500*1024*1024
 
## 日志清理时候用到的IO块大小 一般不需要修改
log.cleaner.io.buffer.size=512*1024
 
## 日志清理中hash表的扩大因子 一般不需要修改
log.cleaner.io.buffer.load.factor =0.9
 
## 检查是否处罚日志清理的间隔
log.cleaner.backoff.ms =15000
 
## 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖
log.cleaner.min.cleanable.ratio=0.5
 
## 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖
log.cleaner.delete.retention.ms =1day
 
## 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
log.index.size.max.bytes =10*1024*1024
 
## 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
log.index.interval.bytes =4096
 
## log文件"sync"到磁盘之前累积的消息条数
## 因为磁盘IO操作是一个慢操作,但又是一个"数据可靠性"的必要手段
## 所以此参数的设置,需要在"数据可靠性""性能"之间做必要的权衡.
## 如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞)
## 如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.
## 物理server故障,将会导致没有fsync的消息丢失.
log.flush.interval.messages=None
 
## 检查是否需要固化到硬盘的时间间隔
log.flush.scheduler.interval.ms =3000
 
## 仅仅通过interval来控制消息的磁盘写入时机,是不足的.
## 此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔
## 达到阀值,也将触发.
log.flush.interval.ms = None
 
## 文件在索引中清除后保留的时间 一般不需要去修改
log.delete.delay.ms =60000
 
## 控制上次固化硬盘的时间点,以便于数据恢复 一般不需要去修改
log.flush.offset.checkpoint.interval.ms =60000
 
------------------------------------------- TOPIC 相关 -------------------------------------------
## 是否允许自动创建topic ,若是false,就需要通过命令创建topic
auto.create.topics.enable =true
 
## 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数
default.replication.factor =1
 
## 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖
num.partitions =1
 
实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有一个分区,分区被复制到三个broker上。
 
----------------------------------复制(Leader、replicas) 相关 ----------------------------------
## partition leader与replicas之间通讯时,socket的超时时间
controller.socket.timeout.ms =30000
 
## partition leader与replicas数据同步时,消息的队列尺寸
controller.message.queue.size=10
 
## replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中
replica.lag.time.max.ms =10000
 
## 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效
## 通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后
## 如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移
## 到其他follower中.
## 在broker数量较少,或者网络不足的环境中,建议提高此值.
replica.lag.max.messages =4000
 
##follower与leader之间的socket超时时间
replica.socket.timeout.ms=30*1000
 
## leader复制时候的socket缓存大小
replica.socket.receive.buffer.bytes=64*1024
 
## replicas每次获取数据的最大大小
replica.fetch.max.bytes =1024*1024
 
## replicas同leader之间通信的最大等待时间,失败了会重试
replica.fetch.wait.max.ms =500
 
## fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
replica.fetch.min.bytes =1
 
## leader 进行复制的线程数,增大这个数值会增加follower的IO
num.replica.fetchers=1
 
## 每个replica检查是否将最高水位进行固化的频率
replica.high.watermark.checkpoint.interval.ms =5000
 
## 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker
controlled.shutdown.enable =false
 
## 控制器关闭的尝试次数
controlled.shutdown.max.retries =3
 
## 每次关闭尝试的时间间隔
controlled.shutdown.retry.backoff.ms =5000
 
## 是否自动平衡broker之间的分配策略
auto.leader.rebalance.enable =false
 
## leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
leader.imbalance.per.broker.percentage =10
 
## 检查leader是否不平衡的时间间隔
leader.imbalance.check.interval.seconds =300
 
## 客户端保留offset信息的最大空间大小
offset.metadata.max.bytes
 
----------------------------------ZooKeeper 相关----------------------------------
##zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect = localhost:2181
 
## ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
zookeeper.session.timeout.ms=6000
 
## ZooKeeper的连接超时时间
zookeeper.connection.timeout.ms =6000
 
## ZooKeeper集群中leader和follower之间的同步实际那
zookeeper.sync.time.ms =2000
配置的修改
其中一部分配置是可以被每个topic自身的配置所代替,例如
新增配置
bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config max.message.bytes=64000--config flush.messages=1
 
修改配置
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --config max.message.bytes=128000
 
删除配置 :
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig max.message.bytes

二 CONSUMER 配置

最为核心的配置是group.id、zookeeper.connect

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
## Consumer归属的组ID,broker是根据group.id来判断是队列模式还是发布订阅模式,非常重要
 group.id
 
## 消费者的ID,若是没有设置的话,会自增
 consumer.id
 
## 一个用于跟踪调查的ID ,最好同group.id相同
 client.id = group id value
 
## 对于zookeeper集群的指定,可以是多个 hostname1:port1,hostname2:port2,hostname3:port3 必须和broker使用同样的zk配置
 zookeeper.connect=localhost:2182
 
## zookeeper的心跳超时时间,超过这个时间就认为是dead消费者
 zookeeper.session.timeout.ms =6000
 
## zookeeper的等待连接时间
 zookeeper.connection.timeout.ms =6000
 
## zookeeper的follower同leader的同步时间
 zookeeper.sync.time.ms =2000
 
## 当zookeeper中没有初始的offset时候的处理方式 。smallest :重置为最小值 largest:重置为最大值 anythingelse:抛出异常
 auto.offset.reset = largest
 
## socket的超时时间,实际的超时时间是:max.fetch.wait + socket.timeout.ms.
 socket.timeout.ms=30*1000
 
## socket的接受缓存空间大小
 socket.receive.buffer.bytes=64*1024
 
##从每个分区获取的消息大小限制
 fetch.message.max.bytes =1024*1024
 
## 是否在消费消息后将offset同步到zookeeper,当Consumer失败后就能从zookeeper获取最新的offset
 auto.commit.enable =true
 
## 自动提交的时间间隔
 auto.commit.interval.ms =60*1000
 
## 用来处理消费消息的块,每个块可以等同于fetch.message.max.bytes中数值
 queued.max.message.chunks =10
 
## 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新
## 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册
##"Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点,
## 此值用于控制,注册节点的重试次数.
 rebalance.max.retries =4
 
## 每次再平衡的时间间隔
 rebalance.backoff.ms =2000
 
## 每次重新选举leader的时间
 refresh.leader.backoff.ms
 
## server发送到消费端的最小数据,若是不满足这个数值则会等待,知道满足数值要求
 fetch.min.bytes =1
 
## 若是不满足最小大小(fetch.min.bytes)的话,等待消费端请求的最长等待时间
 fetch.wait.max.ms =100
 
## 指定时间内没有消息到达就抛出异常,一般不需要改
 consumer.timeout.ms = -1

三 PRODUCER 的配置

比较核心的配置:metadata.broker.list、request.required.acks、producer.type、serializer.class

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
## 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vip
 metadata.broker.list
 
##消息的确认模式
 ##0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP
 ##1:发送消息,并会等待leader 收到确认后,一定的可靠性
 ## -1:发送消息,等待leader收到确认,并进行复制操作后,才返回,最高的可靠性
 request.required.acks =0
 
## 消息发送的最长等待时间
 request.timeout.ms =10000
 
## socket的缓存大小
 send.buffer.bytes=100*1024
 
## key的序列化方式,若是没有设置,同serializer.class
 key.serializer.class
 
## 分区的策略,默认是取模
 partitioner.class=kafka.producer.DefaultPartitioner
 
## 消息的压缩模式,默认是none,可以有gzip和snappy
 compression.codec = none
 
## 可以针对默写特定的topic进行压缩
 compressed.topics=null
 
## 消息发送失败后的重试次数
 message.send.max.retries =3
 
## 每次失败后的间隔时间
 retry.backoff.ms =100
 
## 生产者定时更新topic元信息的时间间隔 ,若是设置为0,那么会在每个消息发送后都去更新数据
 topic.metadata.refresh.interval.ms =600*1000
 
## 用户随意指定,但是不能重复,主要用于跟踪记录消息
 client.id=""
 
------------------------------------------- 消息模式 相关 -------------------------------------------
 ## 生产者的类型 async:异步执行消息的发送 sync:同步执行消息的发送
 producer.type=sync
 
## 异步模式下,那么就会在设置的时间缓存消息,并一次性发送
 queue.buffering.max.ms =5000
 
## 异步的模式下 最长等待的消息数
 queue.buffering.max.messages =10000
 
## 异步模式下,进入队列的等待时间 若是设置为0,那么要么进入队列,要么直接抛弃
 queue.enqueue.timeout.ms = -1
 
## 异步模式下,每次发送的最大消息数,前提是触发了queue.buffering.max.messages或是queue.buffering.max.ms的限制
 batch.num.messages=200
 
## 消息体的系列化处理类 ,转化为字节流进行传输
 serializer.class= kafka.serializer.DefaultEncoder

参考 :https://blog.csdn.net/weixin_39984161/article/details/91971731

https://www.cnblogs.com/frankdeng/p/9403883.html

https://www.cnblogs.com/zhangtianyuan/p/7687156.html

https://blog.csdn.net/luanpeng825485697/article/details/81036028

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019/12/12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
哪类人适合当产品经理?
01.24-2018更新 //增加:5.0简化版建议,删除7.0杂谈
天青色
2018/02/01
8570
想做产品经理,先从写一篇PRD开始吧
一、什么是PRD? PRD为Product Requirement Document的简称,其中文翻译为:产品需求文档。该文档是产品项目由“概念化”阶段进入到“图纸化”阶段的最主要的一个文档。当然,这
BestSDK
2018/02/28
2K0
想做产品经理,先从写一篇PRD开始吧
7、产品需求文档(PRD)的写作方法
1、理解并掌握PRD文档 -写作思路 -写作方法 -写作格式 2、什么是PRD文档 – PRD文档向上是对MRD内容的继承与发展,向下则是要把MRD文档里面的各种理论要求技术化,向研发部门与设计部门说明产品的的功能和性能要求。 – PRD文档是产品文档中最底层最细致的文档,所以写作的时候,需要细致耐心。 3、再谈BRD/MRD/PRD文档的区别与用途 3.1 BRD -这么做有好处,并说明好处在哪里 – 唐僧出发前,参见唐皇(老板),告诉唐皇西去取经的重要意义与大兴佛法的好处,唐皇答应,并发放免签护照(授权
陈树义
2018/04/13
3.5K0
7、产品需求文档(PRD)的写作方法
人人都是产品经理 : 如何写出一份优秀的 PRD ? 精于心简于形 !
一个好的prd框架结构应该至少包含以下内容:产品简介、产品概览、产品架构、产品原型、非功能性需求,如下图:
一个会写诗的程序员
2019/07/18
3K0
人人都是产品经理 : 如何写出一份优秀的 PRD ? 精于心简于形 !
如何从零开始解读什么叫产品经理
主题就是解析产品经理,一起了解下产品经理这个岗位,这个职业所要担负,所要解决的责任和问题, 岗位的了解,岗位的认知。也是对这个产品经理感兴趣或者想成为一个产品经理,或者已经是产品经理了想提高自己的能力。 (一)什么是产品经理 ① 身边常发生的事情 你是干啥工作来着?我是产品经理。 都当经理 其实产品经理不是经理,其实也没办法说,涉及的技术比较多。 这个岗位其实不被大众所理解,是做技术的吗? ② 什么是产品经理 画原型的,写文档的,项目经理的另一个称谓(催别人干活的), 背锅侠(项目出问题找他,技术出问题找
IT架构圈
2021/03/17
1.2K0
写给想要转行做产品经理的迷茫孩子们
近年来,随着中国互联网行业的深耕发展,产品经理在整体市场中的需求大坡度上升。然而,在大规模需求下,从业者的聚堆扎根也越发明显。在各行业信息化的大趋势下,不管专业方向思维快慢,都想舔舐时代甜头,最终,转行做产品的越来越多,切入点也越发难找。
齿轮易创说互联网
2018/09/30
4K0
写给想要转行做产品经理的迷茫孩子们
万恶的产品经理是推动程序员技术进步的不竭动力
万恶的 PM 是推动程序员技术不断进步的不竭动力。产品汪,你不仅仅是一只可爱的狗,你更是一只藏獒,因为我们程序员都是一群饥饿到边缘的草原狼。—题记 PM 与猿猿入行需知 在 IT 界,猿猿们一入行,就注定要与 PM 势不两立、死磕到底,犹如两大格格不入的家族,不斗牛就万幸了,千万别想和谐,谁低头谁就输了,但是这篇文章讲的可不是万恶的 PM 和温顺的猿猿们最后谁驯服谁的故事。俗话说:冤家宜解不宜结,小编以前每天在去公司的路上就会常常幻想和 PM 死磕的情景,将于 PM 吵架斗嘴作为枯燥无聊的工作中的一点乐
非著名程序员
2018/02/09
6650
产品经理面试题整理
凡事“预则立,不预则废”。即使你有丰富的产品管理和产品开发经验,在面试那种紧张的环境下,要面试好也不是一件容易的事情,因为在那种环境下,你要对面试官提出的问题快速反应,并临时组织语言,如果你没有经常训练这种能力,想回答好是非常不容易的。如果你经常背一些产品经理的面试题,那你回答的时候就流畅多了,下面将一些常见的产品经理面试题整理下来。
全栈程序员站长
2022/09/13
5.7K0
产品经理面试题整理
3年产品经理,从5k到30k,我是这样成长的(上)
近两年,我发现产品经理这个岗位的热度始终居高不下,特别在每年的校招中就能窥见一二,可谓是应届毕业生期望度最高的职位。
奔跑的小鹿
2022/05/26
3410
3年产品经理,从5k到30k,我是这样成长的(上)
4、产品经理专业技能之BRD/MRD/PRD文档撰写
1、Microsoft Visio 2007 -流程图利器 -信息结构图利器 2、Axure6.5 -简易流程图 -原型利器 注意:Axure虽好,但它只是工具,不要陷入超级写实中去 3、Balsamiq Mockups 原型图草图利器 -可以快速构建你想要的东西 -很好的支持移动性能 -元素丰富 -不容易干扰UI设计 4、MindManager2012 -思维导图 -收集,归纳,整理想法与思路 红、蓝、绿  分别代表 思考慎重、确定、头脑风暴 5、最牛利器 2B一下 + 本子 6、产品经理的三大文档?
陈树义
2018/04/13
1.4K0
2、产品经理深入浅出系列——产品经理的职责
==================  1、市场调研分析  ================= 1.1 了解市场需求 -与潜在用户交流尝试能否获取需求 -明确目标用户群体与特征(年历、收入、性别、心理特征) -直接与面对客户的一线同事交流,如销售、技术等 -获取,分析,评估用户的需求   -> 需求收集   -> 需求的分析评估: 是否迫切、是否强烈、是否高频 1.2 分析竞争状况 -市场报告 -行业文章 -使用竞争对手的产品 1.3 自身资源与满足用户需求的匹配程度 -技术资源   -> 是否存在
陈树义
2018/04/13
9410
从能力模型来讲数据产品经理的成长指南
数据产品经理是伴随着大数据、人工智能的火热,新兴起的热门岗位,也是集数据、产品、运营、技术等各方向知识能力的综合岗位。今天通过能力模型为大家分享数据产品经理的成长指南,与同行专家一起交流心得,与想转行和进阶成长的同学分享一些经验。
数据社
2020/07/02
1.3K0
从能力模型来讲数据产品经理的成长指南
产品落地:项目管理避坑指南
经历了一个失败的项目,一个不是非常复杂的后台管理模块从需求到上线历时近2个月,且上线后仅是能用,很多功能未实现,效果非常差。痛定思痛,复盘该项目过程存在的问题,提供前车之鉴。详细的项目管理方法论和流程很多文章都有介绍,就不作赘述。
数据干饭人
2022/07/01
5520
产品落地:项目管理避坑指南
回归本质,产品经理怎么干
先说一下我们产品经理这个职业,发现这个职业是一个比较特殊的职业。第一个有意思的,干产品经理的,没有所谓的专业对口,什么专业的人都有?理工科工程师半路转的,文科生转的都有,看起来好像没有专业的要求。就是我们常常说的同事,这个工作没有什么门槛,只要你有机会,只要别人给你机会,你就可以转身成为一名产品经理!入门就是这么容易,这么简单。不知道哪个大师说过一句话说过一句话,这个世界上越没有门槛的事情,其实门槛是最高的,产品经理就是这么一个岗位,看起来没有门槛,其实就是最高的门槛。产品经理的关键不在于你是产品经理,而在于你做的产品的结果怎么样?但是要做得好,其实比较难的,从事产品经理的人千千万万,在这个行业里有影响里的,就那么几个人,产品经理就是这么一个奇葩的岗位。
脑袋长草
2020/12/23
7980
招聘视角,看数据产品经理求职面试技巧
近几年负责数据产品团队,经历团队人员的变迁,进行过几百+简历的筛选,近百场社招、校招面试。金三银四的求职/招聘季接近尾声,想把自己招聘数据产品经理的过程进行总结,分享给想找数据产品经理工作的求职者。
数据干饭人
2022/07/01
5840
招聘视角,看数据产品经理求职面试技巧
数据产品经理技能树自查对照表
随着数字化转型进程的加快,数据方面的人才需求越来越旺盛。互联网早期,人人都是产品经理。数字化时代,不管会不会人人都是数据产品经理,但的确越来越多人会把数据产品经理作为求职方向,或者开始转型做数据产品经理。对于新从业者,最大的困惑就是目标很明确,但是不知道如何下手准备。这里列举了数据产品经理的技能项,希望可以为更多求职者或者从业者提供明确的方向,可以针对性地对自己的能力维度进行训练提升,成为“多边形”数据产品经理。
数据干饭人
2022/12/05
5040
数据产品经理技能树自查对照表
10分钟带你了解项目经理和产品经理
在公司的组织结构中会有这么两个职位:项目经理(Project Manager)和产品经理(Product Manager),简称PM。
互联网老辛
2020/05/19
2.9K0
10分钟带你了解项目经理和产品经理
浅谈产品经理和项目经理
产品经理——靠想。产品经理是做正确的事,其所领导的产品是否符合市场的需求,是否能给公司带来利润的。
奔跑的小鹿
2019/01/25
9850
浅谈产品经理和项目经理
原来好产品经理每天工作的时间是这样分配的
要想成为一名优秀的产品经理,需要了解一些具像的问题。不如,先从了解产品经理的一天开始。通过了解产品经理的日常工作,或许可以解决以下问题。
物流IT圈
2020/06/24
1.2K0
原来好产品经理每天工作的时间是这样分配的
写给数据产品经理新人的职场生存指南
最近收到很多新人数据产品经理朋友的咨询,他们大多数都已经对数据产品经理的岗位是什么,需要什么能力有了基础的认知,但是真正到了要去学习和提升的时候,面对铺天盖地的文章内容,以及质量参差不齐价格却都普遍不低的视频课程,却不知道从何着手。所以,今天想结合自己的工作历程,聊一聊数据产品经理如何在职场中生存下来。
数据干饭人
2023/03/03
5650
写给数据产品经理新人的职场生存指南
推荐阅读
相关推荐
哪类人适合当产品经理?
更多 >
交个朋友
加入HAI高性能应用服务器交流群
探索HAI应用新境界 共享实践心得
加入[数据] 腾讯云技术交流站
获取数据实战干货 共享技术经验心得
加入数据技术工作实战群
获取实战干货 交流技术经验
换一批
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档