Apache Kafka是一种流行的分布式消息代理,旨在有效处理大量实时数据。Kafka集群不仅具有高度可扩展性和容错性,而且与其他消息代理(如ActiveMQ和RabbitMQ)相比,它还具有更高的吞吐量。虽然它通常用作发布/订阅消息传递系统,但许多组织也将其用于日志聚合,因为它为已发布的消息提供持久存储。
发布/订阅消息传递系统允许一个或多个生成器发布消息,而不考虑消费者的数量或他们将如何处理消息。将自动通知已订阅的客户端有关更新和新消息的创建。与客户端定期轮询以确定新消息是否可用的系统相比,此系统更高效且可扩展。
在本教程中,您将在Ubuntu 18.04上安装和使用Apache Kafka 1.1.0。
要继续,您将需要:
由于Kafka可以通过网络处理请求,因此您应该为其创建专用用户。如果Kafka服务器受到损害,这可以最大限度地减少对Ubuntu计算机的损害。我们将在此步骤中创建一个专用的kafka用户,但是您应该创建一个不同的非root用户,以便在完成Kafka设置后在此服务器上执行其他任务。
以非root sudo用户身份登录,使用以下useradd
命令创建名为kafka的用户:
sudo useradd kafka -m
该-m
标志确保将为用户创建主目录。此/home/kafka
主目录将充当我们的工作区目录,用于执行以下部分中的命令。
使用passwd
来为以下设置密码:
sudo passwd kafka
使用该命令将kafka用户添加到sudo
组中adduser
,以便它具有安装Kafka依赖项所需的权限:
sudo adduser kafka sudo
您的kafka用户现已准备就绪。使用以下su
方式登录此帐户:
su -l kafka
现在我们已经创建了特定于Kafka的用户,我们可以继续下载和解压缩Kafka二进制文件。
让我们将kafka二进制文件下载并解压缩到我们kafka用户主目录中的专用文件夹中。
首先,在/home/kafka
中创建一个名为Downloads
的目录存储您的下载:
mkdir ~/Downloads
使用curl
下载卡夫卡的二进制文件:
curl "http://www-eu.apache.org/dist/kafka/1.1.0/kafka_2.12-1.1.0.tgz" -o ~/Downloads/kafka.tgz
创建一个名为kafka
的目录并更改为此目录。这将是Kafka安装的基本目录:
mkdir ~/kafka && cd ~/kafka
使用以下tar
命令提取下载的存档:
tar -xvzf ~/Downloads/kafka.tgz --strip 1
我们指定--strip 1
标志以确保存档的内容~/kafka/
本身被提取而不是在其内部的另一个目录(例如~/kafka/kafka_2.12-1.1.0/
)中提取。
现在我们已经成功下载并解压缩了二进制文件,我们可以继续配置Kafka以允许删除主题。
Kafka的默认行为将不允许我们删除可以发布消息的主题,类别,组或订阅源名称。要修改它,让我们编辑配置文件。
Kafka的配置选项在server.properties
中指定。用nano
或其他你喜欢的编辑器打开这个文件:
nano ~/kafka/config/server.properties
让我们添加一个允许我们删除Kafka主题的设置。将以下内容添加到文件的底部:
delete.topic.enable = true
保存文件,然后退出nano
。现在我们已经配置了Kafka,我们可以继续创建systemd单元文件,以便在启动时运行并启用它。
在本节中,我们将为Kafka服务创建systemd单元文件。这将帮助我们执行常见的服务操作,例如以与其他Linux服务一致的方式启动,停止和重新启动Kafka。
Zookeeper是Kafka用于管理其集群状态和配置的服务。它通常在许多分布式系统中用作不可或缺的组件。如果您想了解更多信息,请访问官方Zookeeper文档。
为zookeeper
创建单位文件:
sudo nano /etc/systemd/system/zookeeper.service
在文件中输入以下单位定义:
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
该[Unit]
部分指定Zookeeper在启动之前需要网络并且文件系统准备就绪。
该[Service]
部分指定systemd应使用zookeeper-server-start.sh
和zookeeper-server-stop.sh
shell文件来启动和停止服务。它还指定Zookeeper如果异常退出则应自动重启。
接下来,为以下kafka
内容创建systemd服务文件:
sudo nano /etc/systemd/system/kafka.service
在文件中输入以下单位定义:
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
该[Unit]
部分指定此单元文件依赖于zookeeper.service
。这将确保zookeeper
在kafa
服务启动时自动启动。
该[Service]
部分指定systemd应使用kafka-server-start.sh
和kafka-server-stop.sh
shell文件来启动和停止服务。它还指定如果Kafka异常退出则应自动重启。
现在已经定义了单元,使用以下命令启动Kafka:
sudo systemctl start kafka
要确保服务器已成功启动,请检查设备的日志日志kafka
:
journalctl -u kafka
您应该看到类似于以下内容的输出:
Jul 17 18:38:59 kafka-ubuntu systemd[1]: Started kafka.service.
您现在有一个Kafka服务器侦听端口9092
。
虽然我们已启动该kafka
服务,但如果我们要重新启动服务器,它将不会自动启动。要kafka
在服务器启动时启用,请运行:
sudo systemctl enable kafka
现在我们已经启动并启用了服务,让我们检查安装。
让我们发布并使用“Hello World”消息以确保Kafka服务器正常运行。在Kafka中发布消息需要:
首先,通过键入以下命名创建一个TutorialTopic
主题:
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
您可以使用该kafka-console-producer.sh
脚本从命令行创建生成器。它期望Kafka服务器的主机名,端口和主题名称作为参数。
键入以下内容将字符串发布"Hello, World"
到TutorialTopic
主题:
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
接下来,您可以使用该kafka-console-consumer.sh
脚本创建Kafka使用者。它期望ZooKeeper服务器的主机名和端口,以及主题名称作为参数。
以下命令使用来自TutorialTopic
的消息。请注意--from-beginning
标志的使用,该标志允许消费在消费者启动之前发布的消息:
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
如果没有配置问题,您应该在终端中看到Hello, World
:
Hello, World
该脚本将继续运行,等待更多消息发布到该主题。随意打开一个新的终端并启动一个生产者发布更多的消息。您应该能够在消费者的输出中看到它们。
完成测试后,按CTRL+C
以停止使用者脚本。现在我们已经测试了安装,让我们继续安装KafkaT。
KafkaT是Airbnb的一款工具,可让您更轻松地查看有关Kafka群集的详细信息,并从命令行执行某些管理任务。因为它是一个Ruby gem,所以你需要Ruby才能使用它。您还需要该build-essential
软件包才能构建其依赖的其他gem。使用apt
安装:
sudo apt install ruby ruby-dev build-essential
您现在可以使用gem命令安装KafkaT:
sudo gem install kafkat
KafkaT使用.kafkatcfg
配置文件来确定Kafka服务器的安装和日志目录。它还应该有一个条目将KafkaT指向您的ZooKeeper实例。
创建一个名为.kafkatcfg
的新文件:
nano ~/.kafkatcfg
添加以下行以指定有关Kafka服务器和Zookeeper实例的必需信息:
{
"kafka_path": "~/kafka",
"log_path": "/tmp/kafka-logs",
"zk_path": "localhost:2181"
}
您现在可以使用KafkaT了。首先,您可以使用它来查看有关所有Kafka分区的详细信息:
kafkat partitions
您将看到以下输出:
Topic Partition Leader Replicas ISRs
TutorialTopic 0 0 [0] [0]
__consumer_offsets 0 0 [0] [0]
...
...
你会看到TutorialTopic
,以及__consumer_offsets
这是用于存储客户端相关信息所使用卡夫卡的内部话题。您可以放心地忽略以__consumer_offsets
开头的行。
要了解有关KafkaT的更多信息,请参阅其GitHub存储库。
如果要使用更多Ubuntu 18.04计算机创建多代理群集,则应在每台新计算机上重复步骤1,步骤4和步骤5。此外,您应该在server.properties
文件中为每个更改:
broker.id
的属性的值,使其在整个群集中是唯一的。此属性唯一标识集群中的每个服务器,并且可以将任何字符串作为其值。例如,"server1"
,"server2"
等。zookeeper.connect
的属性的值,以便所有节点都指向同一个ZooKeeper实例。此属性指定Zookeeper实例的地址并遵循<HOSTNAME/IP_ADDRESS>:<PORT>
格式。例如,"203.0.113.0:2181"``"203.0.113.1:2181"
等等。如果要为群集设置多个ZooKeeper实例,则每个节点上zookeeper.connect
属性的值应该是一个相同的逗号分隔字符串,其中列出了所有ZooKeeper实例的IP地址和端口号。
现在所有安装都已完成,您可以删除kafka用户的管理员权限。在执行此操作之前,请注销并以任何其他非root sudo用户身份重新登录。如果您仍在运行相同的shell会话,那么只需键入exit
即可启动本教程。
从sudo组中删除kafka用户:
sudo deluser kafka sudo
要进一步提高Kafka服务器的安全性,请使用该命令锁定kafka用户的密码passwd
。这可以确保没有人可以使用此帐户直接登录服务器:
sudo passwd kafka -l
此时,只有root或sudo用户可以以kafka
的身份通过键入以下命令登录:
sudo su - kafka
将来,如果要解锁,请使用passwd
以下的-u
选项:
sudo passwd kafka -u
您现在已成功限制kafka用户的管理员权限。
您现在可以在Ubuntu服务器上安全地运行Apache Kafka。您可以使用Kafka客户端(可用于大多数编程语言)创建Kafka生产者和使用者,从而在项目中使用它。要了解有关Kafka的更多信息,您还可以查阅其文档。
更多Ubuntu教程请前往腾讯云+社区学习更多知识。
参考文献:《 How To Install Apache Kafka on Ubuntu 18.04》
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。