前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[707]Apache NiFi安装及简单使用

[707]Apache NiFi安装及简单使用

作者头像
周小董
发布2020-01-13 17:36:19
6.3K0
发布2020-01-13 17:36:19
举报
文章被收录于专栏:python前行者

NiFI介绍

NiFi是美国国家安全局开发并使用了8年的可视化数据集成产品,2014年NAS将其贡献给了Apache社区,2015年成为Apache顶级项目

NiFi(NiagaraFiles)是为了实现系统间数据流的自动化而构建的。虽然术语“数据流”用于各种上下文,但我们在此处使用它来表示系统之间的自动和管理信息流

一个易用、强大、可靠的数据处理与分发系统。基于Web图形界面,通过拖拽、连接、配置完成基于流程的编程,实现数据采集等功能

官网地址:http://nifi.apache.org/

文档:http://nifi.apache.org/docs.html

NiFi架构

image
image

linux NiFI安装

环境要求:a、需要Java 8或更高版本

b、支持的操作系统:Linux、Unix、Windows、Mac OS X

1、下载安装包

命令:wget -b http://mirror.bit.edu.cn/apache/nifi/1.8.0/nifi-1.8.0-bin.tar.gz

2、解压安装包、即可使用

命令:tar -zxvf nifi-1.8.0-bin.tar.gz

目录如下:

image
image

3、配置文件( nifi-1.8.0/conf/nifi.properties )、可以使用默认配置,根据自己情况进行修改

image
image

4、操作NIFI,启动的时候,比较慢,注意机器内存是否足够

后台启动命令:./bin/nifi.sh start

前端启动命令:./bin/nifi.sh run

关闭命令:./bin/nifi.sh stop

首次启动NiFi时,会创建以下文件和目录:

  • content_repository
  • database_repository
  • flowfile_repository
  • provenance_repository
  • work 目录
  • logs 目录
  • conf目录中,将创建flow.xml.gz文件

5、启动后,使用浏览器进行访问,地址:http://ip:8080/nifi

image
image

win NiFI安装

1、下载安装包

地址:http://mirror.bit.edu.cn/apache/nifi/

我下载的是nifi-1.10.0-bin.zip,文件好大,有1.2G。

下载完后解压进入bin目录运行run-nifi.bat:

image
image

启动成功,浏览器输入地址:http://localhost:8080/nifi/

image
image

NIFI简单使用

不理解NIFI是做什么的,看一个简单的例子(同步文件夹)吧,帮助理解

1、从工具栏中拖入一个Processor,在弹出面板中搜索GetFIle,然后确认

image
image
image
image

2、配置GetFile,设置结束关系、输入目录、保留源文件,其他设置可以不动,输入目录中有文件:file.txt(内容为abc)。

image
image
image
image
image
image

3、从工具栏中拖入一个Processor,在弹出面板中搜索PutFIle,然后确认,如第一步

4、配置PutFile,设置结束关系、输出目录,其他设置可以不动,输出目录为空文件夹

image
image
image
image

5、将GetFIle与PutFIle关联起来,从GetFIle中心点击,拖拉到PutFIle上

image
image
image
image

6、右键启动GetFIle与PutFIle,可以看到结果,输入目录中的文件同步到,输出目录中了

image
image

注意:操作过程中,注意错误排查

1、Processor上的警告

image
image

2、Processor上的错误

image
image

简单使用2

先来添加处理器

1.jpg
1.jpg

这里选择getfile处理器,它会获取本地磁盘数据,然后删除源文件

2.png
2.png

右键处理器->点configure,可以看到该处理器要填的属性,加粗的是必填项,只有必填项满足才能运行处理器

3.png
3.png

在input Directory处填目录名./data-in。他回去nifi安装目录找,我们同时也在nifi安装目录下建立data-in目录

再添加一个LogAttribute处理器做getfile处理器suucess后的下步操作。

4.png
4.png

然后点getfile的箭头移到logattritute上去。

5.png
5.png
6.png
6.png

可以看到连接的一些设置,FlowFile Expiration属性表示数据在通道里的过期时间,默认是0不过期,如果改成30sec,就代表数据如果在这个通道里停留30sec还没被下个处理器处理,就失效了。这样可以保存处理器是可用的,不会因为数据积压导致整个处理器不可用,适用于时效性有要求的处理。

Available Prioritizers是数据排序规则,比如我选先进先出规则,就把FirstInFirstOutPrioritizer拖动到下面的Selected Prioritizers输入框中。

还可以设置队列长度,大小,使系统具有恢复能力。

右击LogAttribute->configura->settings

勾上Automatically Terminate Relationships多选框。表示数据流到此为止。

准备run:

我先在data-in放了一个log文件,然后右键getFile->start

7.png
7.png

看上图,可以看到getFile读取到我的日志文件152K并写到队列里面,因为我LogAttribute还没启动,所以数据还没出队。

现在启动LogAttribute,数据流到LogAttribute,终止。

NiFi 组件

1.FlowFile

FlowFile代表每个被系统处理的数据对象。每个FlowFile由两部分组成:属性和内容。内容是数据本身,属性是与数据相关的key-value键值对,用于描述数据

2.Processor

Processor可以用来创建、发送、接受、转换、路由、分割、合并、处理 FlowFiles。Processor可以访问给定的FlowFile

3.Connection

提供Processors之间的连接,作为Processors之间的缓冲队列。用来定义Processors之间的执行关系,并允许不同Processors之间以不同的速度进行交互

4.Process Group

一个特定集合的Processors与他们之间的连接关系形成一个ProcessGroup

5.Controller Service

6.Reporting Task

Reporting Task是一种后台运行的组件,可将Metrics指标、监控信息、内部NiFi状态发送到外部

7.Funnel

漏斗是一个NiFi组件,用于将来自多个连接的数据组合成单个连接。

有哪些处理器可用

1.数据转换

  • CompressContent:压缩或解压缩内容
  • ConvertCharacterSet:用于将一种字符编码集转换成另一种
  • EncryptContent:加密或解密内容
  • ReplaceText:使用正则表达式修改文本内容
  • TransformXml:将XSLT转换应用于XML内容
  • JoltTransformJSON:应用JOLT规范来转换JSON内容

2.路由和调解

  • ControlRate:限制数据流量
  • DetectDuplicate:根据一些用户定义的条件监视重复的FlowFiles。经常与HashContent一起使用
  • DistributeLoad:通过用户定义的规则,把某些数据发到特定的Relationship,实现负载均衡
  • MonitorActivity:在指定的时间内,没有任何数据通过流即发送通知,也可选择在数据流恢复时发送通知
  • RouteOnAttribute:根据FlowFile包含的属性,路由FlowFile
  • ScanAttribute:扫描FlowFile的属性,看是否有匹配的属性
  • RouteOnContent:通过FlowFile内容 路由FlowFile
  • ScanContent:扫描FlowFile的内容,看是否有匹配的内容
  • ValidateXml:针对XML模式验证XML内容; 基于用户定义的XML模式,检查FlowFile是否有效。

3.数据库访问

  • ConvertJSONToSQL:将JSON文档转换为SQL INSERT或UPDATE命令,然后将其传递给PutSQL处理器
  • ExecuteSQL:执行用户定义的SQL SELECT命令,将结果写入Avro格式的FlowFile
  • PutSQL:通过执行FlowFile内容定义的SQL DDM语句来更新数据库
  • SelectHiveQL:针对Apache Hive数据库执行用户定义的HiveQL SELECT命令,将结果以Avro或CSV格式写入FlowFile
  • PutHiveQL:通过执行由FlowFile的内容定义的HiveQL DDM语句来更新Hive数据库

4.属性提取

  • EvaluateJsonPath:用户提供JSONPath表达式(与用于XML解析/提取的XPath类似),然后根据JSON内容评估这些表达式,以替换FlowFile内容或将该值提取到用户命名的属性中。
  • EvaluateXPath:用户提供XPath表达式,然后根据XML内容评估这些表达式,以替换FlowFile内容,或将该值提取到用户命名的属性中。
  • EvaluateXQuery:用户提供XQuery查询,然后根据XML内容评估此查询,以替换FlowFile内容或将该值提取到用户命名的属性中。
  • ExtractText:用户提供一个或多个正则表达式,然后根据FlowFile的文本内容进行评估,然后将提取的值作为用户命名的属性添加。
  • HashAttribute:对用户定义的现有属性列表的并置执行散列函数。
  • HashContent:对FlowFile的内容执行散列函数,并将哈希值作为属性添加。
  • IdentifyMimeType:评估FlowFile的内容,以便确定FlowFile封装的文件类型。该处理器能够检测许多不同的MIME类型,例如图像,文字处理器文档,文本和压缩格式等
  • UpdateAttribute:更新Attribute

5.系统交互

  • ExecuteProcess:运行用户定义的Operating System命令。进程的StdOut被重定向,使得写入StdOut的内容成为出站FlowFile的内容。该处理器是源处理器 - 其输出预计将生成一个新的FlowFile,并且系统调用预期不会接收输入。为了向进程提供输入,请使用ExecuteStreamCommand处理器。
  • ExecuteStreamCommand:运行用户定义的Operating System命令。FlowFile的内容可选地流式传输到进程的StdIn。写入StdOut的内容成为hte出站FlowFile的内容。该处理器不能使用源处理器 - 它必须被馈送进入FlowFiles才能执行其工作。要使用源处理器执行相同类型的功能,请参阅ExecuteProcess Processor。

6.数据接入

  • GetFile:将文件的内容从本地磁盘(或网络连接的磁盘)流入NiFi。
  • GetFTP:通过FTP将远程文件的内容下载到NiFi中。
  • GetSFTP:通过SFTP将远程文件的内容下载到NiFi中。
  • GetJMSQueue:从JMS队列中下载消息,并根据JMS消息的内容创建一个FlowFile。也可以将JMS属性复制为属性。
  • GetJMSTopic:从JMS主题下载消息,并根据JMS消息的内容创建一个FlowFile。也可以将JMS属性复制为属性。此处理器支持持久和非持久订阅。
  • GetHTTP:将基于HTTP或HTTPS的远程URL的内容下载到NiFi中。处理器将记住ETag和Last-Modified Date,以确保数据不会持续摄取。
  • ListenHTTP:启动HTTP(或HTTPS)服务器并监听传入连接。对于任何传入的POST请求,请求的内容将作为FlowFile写出,并返回200响应码。
  • ListenUDP:侦听传入的UDP数据包,并为每个数据包或每包数据包创建一个FlowFile(取决于配置),并将FlowFile发送到成功关系。
  • GetHDFS:在HDFS中监视用户指定的目录。每当一个新的文件进入HDFS,它被复制到NiFi中。该处理器仅在主节点上运行,如果在群集中运行。为了从HDFS中复制数据并保持原样,或者从集群中的多个节点流出数据,请参阅ListHDFS处理器。
  • ListHDFS / FetchHDFS:ListHDFS监视HDFS中用户指定的目录,并发出一个FlowFile,其中包含遇到的每个文件的文件名。然后它通过分布式缓存通过整个NiFi集群来保持此状态。然后,这些FlowFiles可以跨群集扇出,并发送到FetchHDFS处理器,该处理器负责获取这些文件的实际内容,并发出包含从HDFS获取的内容的FlowFiles。
  • FetchS3Object:从Amazon Web Services(AWS)简单存储服务(S3)中获取对象的内容。出站FlowFile包含从S3接收的内容。
  • GetKafka:从Apache Kafka获取消息,专门用于0.8.x版本。消息可以作为每个消息的FlowFile发出,或者可以使用用户指定的分隔符进行批处理。
  • GetMongo:对MongoDB执行用户指定的查询,并将内容写入新的FlowFile。
  • GetTwitter:允许用户注册一个过滤器来收听Twitter”garden hose” 或Enterprise endpoint,为收到的每个推文创建一个FlowFile。

7.数据出口/发送数据

  • PutEmail:向配置的收件人发送电子邮件。FlowFile的内容可选择作为附件发送。
  • PutFile:将 FlowFile的内容写入本地(或网络连接)文件系统上的目录。
  • PutFTP:将 FlowFile的内容复制到远程FTP服务器。
  • PutSFTP:将 FlowFile的内容复制到远程SFTP服务器。
  • PutJMS:将 FlowFile的内容作为JMS消息发送到JMS代理,可选择基于属性添加JMS属性。
  • PutSQL:作为SQL DDL语句(INSERT,UPDATE或DELETE)执行 FlowFile的内容。FlowFile的内容必须是有效的SQL语句。可以使用属性作为参数,以便FlowFile的内容可以参数化SQL语句,以避免SQL注入攻击。
  • PutKafka:将一个FlowFile的内容作为消息传递给Apache Kafka,专门用于0.8.x版本。FlowFile可以作为单个消息发送,或者可以指定分隔符,例如新行,以便为单个FlowFile发送许多消息。
  • PutMongo:将 FlowFile的内容作为INSERT或UPDATE发送到Mongo。

8.分割和聚合

  • SplitText:SplitText采用单个FlowFile,其内容为文本,并根据配置的行数将其拆分为1个或更多个FlowFiles。例如,处理器可以配置为将FlowFile拆分成许多FlowFiles,每个FlowFiles只有1行。
  • SplitJson:允许用户将由数组或许多子对象组成的JSON对象拆分为每个JSON元素的FlowFile。
  • SplitXml:允许用户将XML消息拆分成许多FlowFiles,每个FlowFiles都包含原始的段。当通过“包装”元素连接几个XML元素时,通常使用这种方法。然后,该处理器允许将这些元素分割成单独的XML元素。
  • UnpackContent:解压缩不同类型的归档格式,如ZIP和TAR。存档中的每个文件随后作为单个FlowFile传输。
  • MergeContent:该处理器负责将许多FlowFiles合并到一个FlowFile中。FlowFiles可以通过将其内容与可选的页眉,页脚和分隔符连接起来,或者通过指定ZIP或TAR等存档格式来合并。FlowFiles可以根据一个共同的属性进行合并,如果被其他Splitting进程拆分,则可以进行“碎片整理”。每个bin的最小和最大大小都是基于元素数量或FlowFiles内容的总大小进行用户指定的,并且可以分配可选的超时,以便FlowFiles只会等待其指定的时间。
  • SegmentContent:根据一些配置的数据大小,将FlowFile分段到潜在的许多较小的FlowFiles中。拆分不是针对任何分隔符而是基于字节偏移来执行的。这是在传送FlowFiles之前使用的,以便通过并行发送许多不同的片段来提供更低的延迟。另一方面,这些FlowFiles可以由MergeContent处理器使用碎片整理模式进行重新组合。
  • SplitContent:将单个FlowFile拆分为潜在的许多FlowFiles,类似于SegmentContent。但是,对于SplitContent,分割不是在任意字节边界上执行,而是指定要分割内容的字符串。

9.HTTP

  • GetHTTP:将基于HTTP或HTTPS的远程URL的内容下载到NiFi中。处理器将记住ETag和Last-Modified Date,以确保数据不会持续摄取。
  • ListenHTTP:启动HTTP(或HTTPS)服务器并监听传入连接。对于任何传入的POST请求,请求的内容将作为FlowFile写出,并返回200个响应。
  • InvokeHTTP:执行由用户配置的HTTP请求。该处理器比GetHTTP和PostHTTP更加通用,但需要更多的配置。该处理器不能用作源处理器,并且必须具有传入的FlowFiles才能被触发以执行其任务。
  • PostHTTP:执行HTTP POST请求,发送FlowFile的内容作为消息的正文。这通常与ListenHTTP一起使用,以便在不能使用Site to Site的情况下(例如,当节点不能直接访问,但能够通过HTTP进行通信时)在两个不同的NiFi实例之间传输数据)。注意:HTTP可用作站点到站点运输协议除了现有的RAW Socket传输。它还支持HTTP代理。推荐使用HTTP Site to Site,因为它具有更高的可扩展性,并且可以使用输入/输出端口提供双向数据传输,具有更好的用户认证和授权。
  • HandleHttpRequest / HandleHttpResponse:HandleHttpRequest处理器是一个源处理器,它与ListenHTTP类似地启动嵌入式HTTP(S)服务器。但是,它不会向客户端发送响应。相反,FlowFile与HTTP请求的主体一起发送,作为其作为属性的所有典型Servlet参数,标头等的内容和属性。HandleHttpResponse可以在FlowFile处理完成后将响应发送回客户端。这些处理器总是被期望彼此结合使用,并允许用户在NiFi内直观地创建Web服务。

10.亚马逊网络服务

  • FetchS3Object:获取存储在Amazon Simple Storage Service(S3)中的对象的内容。然后将从S3检索的内容写入FlowFile的内容。
  • PutS3Object:使用配置的凭据,密钥和存储桶名称将 FlowFile的内容写入到Amazon S3对象。
  • PutSNS:将 FlowFile的内容作为通知发送到Amazon Simple Notification Service(SNS)。
  • GetSQS:从Amazon Simple Queuing Service(SQS)中提取消息,并将消息的内容写入FlowFile的内容。
  • PutSQS:将 FlowFile的内容作为消息发送到Amazon Simple Queuing Service(SQS)。
  • DeleteSQS:从亚马逊简单排队服务(SQS)中删除一条消息。这可以与GetSQS一起使用,以便从SQS接收消息,对其执行一些处理,然后只有在成功完成处理后才从队列中删除该对象。

参考:https://www.cnblogs.com/h–d/p/10079418.html https://blog.csdn.net/u011596455/article/details/86557781 https://blog.csdn.net/u011870280/article/details/80351123

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019/11/27 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • NiFI介绍
  • NiFi架构
  • linux NiFI安装
  • win NiFI安装
  • NIFI简单使用
  • 简单使用2
  • NiFi 组件
  • 有哪些处理器可用
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档