本教程介绍了Apache Kafka的核心概念及其在可靠性、可伸缩性、持久性和性能至关重要的环境中所扮演的角色。
我们将创建Kafka主题(类别队列),来处理数据管道中的大量数据,充当物联网(IoT)数据和Storm拓扑之间的连接。
目标
要了解分布式系统中的消息系统背后的概念消,以及如何使用它们来转移生产者(发布者,发送者)和消费者(订阅者,接收者)之间的消息。在此示例中,您将了解Kafka。
消息系统在客户端应用程序之间传输数据。一个应用程序生成数据,例如从嵌入在车辆上的传感器读取数据,另一个应用程序接收数据,对其进行处理以使其可视化以显示有关驾驶这些车辆的驾驶员驾驶行为的特征。如您所见,每个应用程序开发人员都可以专注于编写代码来分析数据,而不必担心如何共享数据。在这种情况下使用两种消息传递系统,即点对点和发布订阅。最常用的系统是发布订阅,但我们将同时介绍两者。
点对点是将消息传输到队列中
以上通用图的主要特征:
发布-订阅是传送到主题中的消息
如何将发布-订阅消息系统的工作?
Apache Kafka是一个基于发布-订阅的开源消息传递系统,负责将数据从一个应用程序传输到另一个应用程序。
总体而言,我们的数据管道如下所示:
NiFi生产者
生产者实现为Kafka Producer的NiFi处理器,从卡车传感器和交通信息生成连续的实时数据提要,这些信息分别发布到两个Kafka主题中。
要了解有关Kafka Producer API示例代码的更多信息,请访问开发Kafka Producers
Kafka集群
具有1个或多个主题,用于支持由Kafka代理管理的1个或多个类别的消息,这些消息可创建每个主题的副本(类别队列)以实现持久性。
Storm消费者
从Kafka Cluster读取消息,并将其发送到Apache Storm拓扑中进行处理。
要了解有关Kafka消费者API示例代码的更多信息,请访问开发Kafka消费者
可靠性
可扩展性
持久性
性能
环境设定
如果您安装了最新的Cloudera DataFlow(CDF)沙盒,则该演示已预先安装。
打开本地计算机上的终端,然后通过开箱即用”的方法访问沙箱。
在对数据执行Kafka操作之前,我们必须首先在Kafka中包含数据,因此让我们运行NiFi DataFlow应用程序。请参阅本模块中的步骤:在Trucking IoT Demo中运行NiFi,然后您就可以开始探索Kafka。
如果尚未通过Ambari打开Kafka组件,则将其打开。
NiFi模拟器会生成两种类型的数据:TruckData和TrafficData作为CSV字符串。数据上会进行一些预处理,以准备将其拆分并由NiFi的Kafka生产者发送给两个单独的Kafka主题:trucking_data_truck和trucking_data_traffic。
在终端上,我们可以看到已创建的两个Kafka主题:
/usr/hdf/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper localhost:2181
结果:
Output:
trucking_data_driverstats
trucking_data_joined
trucking_data_traffic
trucking_data_truck_enriched
由于生产者将消息保留在Kafka主题中,因此您可以通过编写以下命令在每个主题中看到它们:
查看Kafka的数据主题:trucking_data_truck_enriched:
/usr/hdf/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server sandbox-hdf.hortonworks.com:6667 --topic trucking_data_truck_enriched --from-beginning
查看Kafka的数据主题:trucking_data_traffic:
/usr/hdf/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server sandbox-hdf.hortonworks.com:6667 --topic trucking_data_traffic --from-beginning
如您所见,Kafka充当了一个健壮的队列,可以接收数据并将其传输到其他系统。
Note: You may notice the is data encoded in a format we cannot read, this format is necessary for Schema Registry. The reason we are using Schema Registry is because we need it for Stream Analytics Manager to pull data from Kafka.
现在我们已经了解了Kafka的功能,下面让我们探讨其不同的组件,定义Kafka流程时的构建基块以及使用它们的原因。
最初在构建此演示时,我们验证了Zookeeper是否正在运行,因为Kafka使用Zookeeper。如果Zookeeper已关闭,则我们从Cloudera Manager运行或打开该命令:
/usr/hdf/current/kafka-broker/bin/zookeeper-server-start.sh config/zookeeper.properties
然后,我们通过Ambari或命令启动Kafka Broker:
/usr/hdf/current/kafka-broker/bin/kafka-server-start.sh config/server.properties
如果要查看正在运行的守护程序,请键入 jps
Example of Output:
2306 drpc
932 AmbariServer
2469 core
2726 logviewer
3848 NiFiRegistry
5201 StreamlineApplication
3602 NiFi
3026 TlsToolkitMain
18194 Jps
1684 Kafka
3829 RunNiFiRegistry
2649 Supervisor
1530 RegistryApplication
4762 LogSearch
4987 LogFeeder
3581 RunNiFi
4383 jar
1375 QuorumPeerMain
我们使用以下命令创建了两个Kafka主题:trucking_data_truck_enriched和trucking_data_traffic:
/usr/hdf/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper sandbox-hdf.hortonworks.com:2181 --replication-factor 1 --partitions 10 --topic trucking_data_truck_enriched
/usr/hdf/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper sandbox-hdf.hortonworks.com:2181 --replication-factor 1 --partitions 10 --topic trucking_data_traffic
创建了两个Kafka主题,每个主题有10个分区,每个分区有一个分区。创建主题后,Kafka代理终端会发送一条通知,该通知可以在创建主题的日志中找到:“ /tmp/kafka-logs/”
在我们的演示中,我们利用称为Apache NiFi的数据流框架生成传感器卡车数据和在线交通数据,对其进行处理并集成Kafka的Producer API,因此NiFi可以将其流文件的内容转换为可以发送给Kafka的消息。
启动NiFi流程中的所有处理器(包括Kafka处理器),数据将保留在两个Kafka主题中。
在我们的演示中,我们利用称为Apache Storm的流处理框架来消耗来自Kafka的消息。Storm集成了Kafka的Consumer API,以从Kafka代理获取消息,然后执行复杂的处理并将数据发送到目的地以进行存储或可视化。
提交Storm拓扑,来自Kafka主题的消息将被拉入Storm。
恭喜你!现在,您将了解Kafka在演示应用程序中扮演的角色,如何创建Kafka主题以及如何使用Kafka的Producer API和Kafka的Consumer API在主题之间传输数据。在我们的演示中,我们向您展示了NiFi将Kafka的Producer API包装到其框架中,Storm对Kafka的Consumer API进行了同样的处理。
修改Kafka主题
如果您需要修改Kafka主题,请运行以下命令:
/usr/hdf/current/kafka-broker/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic-name --partitions X
您的主题名称帐户将有所不同,并且您要添加的分区数量也将有所不同。
如果需要删除Kafka主题,请运行以下命令:
/usr/hdf/current/kafka-broker/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
原文链接:https://www.cloudera.com/tutorials/kafka-in-trucking-iot.html