1. 初识NiFi
1.1. 概述
NiFi 最早是美国国家安全局内部使用的工具,用来投递海量的传感器数据.后来由 apache 基金会开源。NiFi基本设计理念:Flow Based Programming ,
1.2. 核心概念
•
FlowFile
FlowFile表示在系统中移动的每个对象,FlowFile由两部分组成:
o content 内容,即数据本身
o attributes 属性,每条数据带上的属性信息.以键值对的形式. •
FlowFile Processor
o FlowFile处理器,由它完成对数据的实际处理工作.包括但不限于对数据内容和属性的加载,路
由,转换,输出等.
o 处理器最灵活之处在于处理器可以读写FlowFile的属性信息,并且用自带的领域特定语言(DS
L)对属性进行编程.
• Connection
o 由Connections把各个处理器链接起来,从而形成数据处理流程的有向无环图(DAG图).也称
数据流, NiFi 中的 Flow.
o Connection 同时充当处理器间的队列,并且队列的属性高度可配置. o 这些队列可以配置优先级,可以在设置阈值,可以实现反压。 •
Flow Controller
o 流控制器对用户不可见的.它充当维护处理器如何连接和管理所有处理器所使用的线程及其分
配的重要角色。
o Flow Controller充当促进处理器之间FlowFiles交换的代理。
• Process Group
o 为了方便管理,把一组特定的处理器及其连接组成的 Flow 放到一个处理组中去,可以通过输
入端口接收数据并通过输出端口发送数据。
o 以这种方式,处理组可以通过组合其他组来创建全新组,形成更加复杂的DAG图( Flow
流)。
1.3. 关键特性
•
Flow 流高度可管理
• 保证交付
NiFi的一个核心理念是即使在非常高的规模下,保证交付也是必须的。这是通过有效使用专门的持久化的预写日志(WAL)和内容存储库来实现的。它们的设计可以实现非常高的事务处理速率,有效的负载分散,写入时复制以及发挥传统磁盘读/写的优势。
• 背压和数据缓冲机制
NiFi支持缓冲所有队列数据,以及在这些队列达到指定限制时提供背压的能力,或者在数据达到指定时间时使数据过期失效。
• 可配置优先级的队列
NiFi允许设置一个或多个优先级策略,用于如何从队列中检索数据。默认是先进先出,但有时候应该先拉取最新的数据,最大的数据或其他一些自定义方案。
• Flow 流可配置特定的QoS(延迟v吞吐量,容量损失等)
在 Flow 流中有一些点是很关键的,且不能容忍丢失.或者有时候必须在几秒钟内处理和交付它。NiFi 可以对这些问题进行细粒度的特定配置。
•
易于使用
• 可视化的控制和命令
得益于强大的 web 操作界面.无论多么复杂的数据流都能在 web 界面上直观的呈现.整个数据处理流程,包括设计,控制,反馈和监控都可在web界面完成,一步到位.任何更改都能在界面上立马生效,完全不要部署的过程.对于整个数据流,更可以对中间某个处理器进行单独变更,实时生效.
• 数据流模板
对于设计好的数据流处理流,可以保存为模板来进行复用.模板可以导出成xml文件,导入到其他 NiFi 中进行多处使用.
• 数据溯源
flowfile 流过Flow 流时,NiFi会自动记录,索引并提供可用的起源数据,包括导入,导出,转换等。这些信息对于故障排除,优化等很有用处.
• 对历史数据进行细粒度的恢复
NiFi的内容存储库旨在充当历史记录的滚动缓冲区。数据仅在内容存储库过期时或存储空间不足时才会被删除。这与数据起源能力相结合,提供了非常精细的操作功能.包括对数据历史中的某一个点的点击查看内容,下载内容,处理回放等功能.所有数据都可以回溯到它生命周期中很早的某一点.
•
安全机制
• • • •
系统内部安全
Flow 流中的流动的数据都可以进行加密传输 用户使用安全
支持用户认证和不同级别的用户授权(可读,管理数据流,系统管理) 多租户授权
可扩展的架构设计
• 可扩展组件
NiFi 的核心设计就是扩展. 它的 processors, Controller Services, Reporting Tasks, Prioritizers, and Customer User Interfaces 都是 可扩展的.
• •
隔离的类加载器
自定义的类加载器保证了扩展的组件简单的依赖关系. 点到点的通信协议
NiFi实例之间的通信协议是NiFi 点到点(S2S)协议。S2S可以轻松,高效,安全地将数据从一个NiFi实例传输到另一个实例。NiFi 客户端 的 库也可以轻松在其他应用程序使用,以通过S2S来与NiFi 实例进行通信。S2S中支持基于套接字的协议和HTTP(S)协议作为底层传输协议,使得可以将代理服务器嵌入到S2S通信中。
• 灵活的扩容模型
• •
更多的NiFi 实例
可以搭建 NiFi 集群,也可以不组成集群,多台机器使用 点到点 协议来协作. 更大的并发数量 直接修改处理器的并发数
1.4. 架构
•
Web Server
web服务器的提供基于http的命令和控制API。
• Flow Controller
• 流量控制器是操作的大脑。它为扩展程序提供运行所需的线程,并管理扩展程序何时接收执行资源的时间表。
• Processor 处理组件
• Extensions 扩展组件
• FlowFile Repository
通过FlowFile Respository可跟踪Flow中处于活动状态的FlowFile的状态。存储库的实现是插
件式的,默认是位于指定磁盘分区上的持久性预写日志。
• Content Repository
Content Repository 作为FlowFile的存储库,实现是插件式的,默认是一种相当简单的机制,该
机制将数据块存储在文件系统中。可以指定多个文件系统存储位置,以便使用不同的物理分区以减少任何单个卷上的争用。
• Provenance Repository
Provenance Repository是存储所有来源事件数据的地方。存储库实现是插件式的,默认实现是
使用一个或多个物理磁盘卷。在每个位置内,事件数据都被索引并可以搜索。
集群
从NiFi 1.0版本开始,采用了零主群集的范例。NiFi群集中的每个节点都对数据执行相同的任务,但是每个节点都对不同的数据集进行操作。通过ZooKeeper选择一个节点作为集群协调器,并且故障转移由ZooKeeper自动处理。所有群集节点均向群集协调器报告心跳和状态信息。群集协调器负责断开和连接节点。此外,每个群集都有一个主节点,该节点也由ZooKeeper选择。作为DataFlow管理者,您
可以通过任何节点的用户界面(UI)与NiFi群集进行交互。您所做的任何更改都将复制到群集中的所有节点,从而允许多个入口点。
2. 源代码浅析
2.1.1. 总体结构
•
nifi-api
就是 nifi 的应用程序接口,里面就是定义了整个工程用到的接口,注解,抽象类和枚举等基本的接口和信息.
• nifi-assembl
负责 nifi 的成品装配, 工程打包最后形成的可供部署的压缩包就在这个工程里的 target 目录内.
• nifi-bootstarp
负责 nifi 这个 jvm 应用程序的启动相关事宜
• nifi-commons
nifi 诸多特性,比如data-provenance,expression-language,s2s 传输 的实现就在这里,同时也是 nifi 的工具类集合
• nifi-docker
nifi 的 docker 应用相关
• nifi-docs
nifi 的文档 实现相关
• nifi-external
nifi 内部元信息和外部交换,主要用于集群模式下
• nifi-framework-api
这就是nifi 核心框架层的api,也就是架构图中的 Flow Controller 那一层,注意这里只是各种接口信息定义,不是实现.
• nifi-maven-archetypes
这里只是为了生成两个 maven archetype,一个是 nifi 自定义处理器的脚手架,一个是 nifi 自定义服务的脚手架.这些脚手架在 maven 的中央仓库都有提供.
• nifi-mock
用于 nifi 的 mock 测试
• nifi-nar-bundles
nifi java 工具箱就是这里了.整个 nifi 里面大部分的 maven 工程都是这个工程的子工程.在这个工程里面,一个 bundle 就是一个工具,也对应着上面架构图里的 Extension
• nifi-toolkit
这里面是 nifi 的命令行工具的实现.nifi 也提供了比较丰富的命令行指令.
2.1.2. Nifi程序入口
在nifi-bpootstrap模块内有一个org.apache.nifi.bootstrap.RunNifi的类,该类的main()方法即为Nifi的启动入口方法。
接着看 start 方法,里面做了很多前期的准备性工作,主要是加载 bootstrap.conf 里配置的属性,以及在里面构建另外一个 java cmd 命令:
final ProcessBuilder builder = new ProcessBuilder(); ……
builder.command(cmd) ……
Process process = builder.start(); ……
所以这个 start 方法是启动了另外一个 java 进程,这个进程才是真正的 NiFi runtime。 通过代码跟踪或查看日志,可见,这个cmd命令类似如下:
/opt/jdk1.8.0_131/bin/java
-classpath /opt/nifi-1.7.1/./conf:/opt/nifi-1.7.1/./lib/logback-core-1.2.3.jar:/opt/nifi-1.7.1/./lib/jetty-schemas-3.1.jar:/opt/nifi-1.7.1/./lib/logback-classic-1.2.3.jar:/opt/nifi-1.7.1/./lib/jul-to-slf4j-1.7.25.jar:/opt/nifi-1.7.1/./lib/jcl-over-slf4j-1.7.25.jar:/opt/nifi-1.7.1/./lib/nifi-properties-1.7.1.jar:/opt/nifi-1.7.1/./lib/nifi-runtime-1.7.1.jar:/opt/nifi-1.7.1/./lib/nifi-framework-api-1.7.1.jar:/opt/nifi-1.7.1/./lib/nifi-nar-utils-1.7.1.jar:/opt/nifi-1.7.1/./lib/javax.servlet-api-3.1.0.jar:/opt/nifi-1.7.1/./lib/log4j-over-slf4j-1.7.25.jar:/opt/nifi-1.7.1/./lib/slf4j-api-1.7.25.jar:/opt/nifi-1.7.1/./lib/nifi-api-1.7.1.jar
-Dorg.apache.jasper.compiler.disablejsr199=true -Xmx3g -Xms3g
-Djavax.security.auth.useSubjectCredsOnly=true -Djava.security.egd=file:/dev/urandom -Dsun.net.http.allowRestrictedHeaders=true -Djava.net.preferIPv4Stack=true
-Djava.awt.headless=true -XX:+UseG1GC
-Djava.protocol.handler.pkgs=sun.net.www.protocol -Duser.timezone=Asia/Shanghai
-Dnifi.properties.file.path=/opt/nifi-1.7.1/./conf/nifi.properties -Dnifi.bootstrap.listen.port=56653 -Dapp=NiFi
-Dorg.apache.nifi.bootstrap.config.log.dir=/opt/nifi-1.7.1/logs org.apache.nifi.NiFi
可以清晰的看到,命令中实际执行的是java类org.apache.nifi.NiFi的main方法。
2.1.3. Nifi启动初始化
这个org.apache.nifi.NiFi类在以下模块中: nifi-nar-bundles
+-- nifi-framework-bundle
+--- nifi-framework
+--- nifi-runtime
Nifi-framework模块就是nifi框架的核心代码 Org.apache.nifi.NiFi.main()方法如下:
/** * Main entry point of the application. * * @param args things which are ignored */ public static void main(String[] args) { LOGGER.info(\"Launching NiFi...\"); try {
NiFiProperties properties = convertArgumentsToValidatedNiFiProperties(args); new NiFi(properties); } catch (final Throwable t) {
LOGGER.error(\"Failure to launch NiFi due to \" + t, t); } }
Main()方法调用了NiFi的构造方法:
public NiFi(final NiFiProperties properties)
throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
this(properties, ClassLoader.getSystemClassLoader()); }
public NiFi(final NiFiProperties properties, ClassLoader rootClassLoader)
throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
第二个构造方法是实际上的构造方法,里面进行了大量初始化操作,以下是非常关键的部分:
// expand the nars
final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle);
// load the extensions classloaders
NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance();
narClassLoaders.init(rootClassLoader,
properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
这部分初始化了NiFi的Java扩展工具箱,这些工具从Nifi的用户来说就是在NiFi安装目录的lib目录下的各个*.nar包,这个些nar包实际就是NiFi增加了特定额外信息的jar包集合的压缩,本质上还是jar包。以下是我们解压开的一个nar包,结构如下:
那么回到NiFi的构造方法内,首先是解压这些nar包,并在代码内用ExtensionMapping对象描述,代码如下:
// expand the nars
final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle);
然后加载并初始化这些类加载器:
// load the extensions classloaders
NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance();
narClassLoaders.init(rootClassLoader,
properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
// load the framework classloader
final ClassLoader frameworkClassLoader = narClassLoaders.getFrameworkBundle().getClassLoader(); if (frameworkClassLoader == null) {
throw new IllegalStateException(\"Unable to find the framework NAR ClassLoader.\"); }
final Set 在 NiFi 的官方介绍中,有两处它的特性介绍是扩展和类加载隔离,这里我们可以对它这两个特性的实现一探究竟了.它为每一个 nar 包构造了一个独立的自定义的类加载器: NarClassLoader public class NarClassLoader extends URLClassLoader { private static final Logger LOGGER = LoggerFactory.getLogger(NarClassLoader.class); private static final FileFilter JAR_FILTER = new FileFilter() { @Override public boolean accept(File pathname) { final String nameToTest = pathname.getName().toLowerCase(); return nameToTest.endsWith(\".jar\") && pathname.isFile(); } }; 目前基本清晰, NiFi 的 扩展性是由自定义的压缩文件 nar 包 和 自定义的类加载器来提供的. 接着往下看: // load the server from the framework classloader Thread.currentThread().setContextClassLoader(frameworkClassLoader); Class> jettyServer = Class.forName(\"org.apache.nifi.web.server.JettyServer\", true, frameworkClassLoader); Constructor> jettyConstructor = jettyServer.getConstructor(NiFiProperties.class, Set.class); final long startTime = System.nanoTime(); nifiServer = (NiFiServer) jettyConstructor.newInstance(properties, narBundles); nifiServer.setExtensionMapping(extensionMapping); nifiServer.setBundles(systemBundle, narBundles); 回想架构图VM 的最上层是 web server , 这个 web server 就是在这里被加载了,这是一个 jetty server ,继续往下看: if (shutdown) { LOGGER.info(\"NiFi has been shutdown via NiFi Bootstrap. Will not start Controller\"); } else { nifiServer.start(); if (bootstrapListener != null) { bootstrapListener.sendStartedStatus(true); } final long duration = System.nanoTime() - startTime; LOGGER.info(\"Controller initialization took \" + duration + \" nanoseconds \" + \"(\" + (int) TimeUnit.SECONDS.convert(duration, TimeUnit.NANOSECONDS) + \" seconds).\"); } start 这个 nifiServer ,这个 NiFi 对象的构造方法这里就全部走完了. 2.1.4. NiFi-Web NiFiServer.start()的方法内,代码跳转到nifi-framework下的一个子模块nifi-web内了。 2.1.5. nifi-jetty 与Web相关的代码都在这个模块了,包括Server和界面相关的代码,上面提到的NiFiServer的实现类JettyServer就在子模块nifi-jetty内了。 接着看 JettyServer 这个类,上面的 NiFi 构造方法里面最后是先实例化了这个 JettyServer ,然后调用了 start 方法.先看它的构造方法,只看注释,找到了核心方法: // load wars from the bundle final Handler warHandlers = loadInitialWars(bundles); 可以看到,其实就是把 war 包加载进来了,这些 war 包就是 nifi-web 下面的子工程,有几个子工程的 pom 文件中配置的就是 接着看这个 start 方法: 第一句就是 ExtensionManager.discoverExtensions(systemBundle, bundles); 就是这里把所有的扩展类加载进 JVM 了, 看到看到 ExtensionManager 的注释,这个注释就说明了一切 Scans through the classpath to load all FlowFileProcessors, FlowFileComparators, and ReportingTasks using the service provider API and running through all classloaders (root, NARs). 这个 ExtensionManager 在加载类的时候,用到了java 的一种比较高级的机制, java SPI(service provider interface),这种机制在很多框架中比如 spring 中大量使用 final ServiceLoader> serviceLoader = ServiceLoader.load(entry.getKey(), bundle.getClassLoader()); 这个机制解释了为什么写自定义的处理器的时候要在 /resources /META-INF/services 目录下面写上配置文件.在自定义处理开发的时候,一定要注意写这个配置文件,否则类是加载不进来的 接着 start 这个 jetty server,接着往下看,只看注释,可以看到,大致就是做了 server context 以及 filter 的注入工作了: // ensure the appropriate wars deployed successfully before injecting the NiFi context and security filters // this must be done after starting the server (and ensuring there were no start up failures) 2.1.6. nifi-web-api 基本到这里, NiFi 的实例化和初始化流程基本就有个大致了解了.我们可以接着再进一步,看到 nifi-web-api 这个工程,这个工程其实就是 nifi 的 restful 接口工程,nifi 的所有 restful 接口都是这里实现的,包括处理器的新增,处理器的连接以及处理器的 start 等 在里面随便打开一个以 resource 结尾的类: 这里我们可看到rest接口注解,接着打开 resources 文件夹,看到了 nifi-web-api-context.xml 文件 : 原来这是一个 spring 的 web 工程.然后找到一个关键的 configuration 类: NiFi 实例内所有的对象都是通过 spring ioc 注入的. 2.1.7. 小结 现在为止,从开发角度对 NiFi 就有了一个基本的认识了,它是一个 JVM 应用,它通过独立的类加载器加载类,使用 spring ioc 注入和管理对象.从以上的分析,我们了解到了 NiFi 的扩展性特性的大致实现,也了解了架构图最上面的一部分源码.至于它其他诸多特性的源码和实现,则需要花更多的时间研究 nifi-framework-core 工程了. 3. 开发指南(译) 开发指南主要是介绍NiFi的扩展开发方式。 3.1. NiFi组件 NiFi提供了几个扩展点,以便开发人员能够扩展NiFi功能来满足特定需求。以下列表对最常见的扩展点进行了概要描述: 3.1.1. Processor(处理器) Processor接口是NiFi对FlowFile,以及其属性和内容的访问机制 。Procesor是NiFi DataFlow的基本组成部分。此接口用于完成以下所有任务: • • • Create FlowFiles/创建FlowFiles Read FlowFile content/读取FlowFile内容 Write FlowFile content/写入FlowFile内容 • • • • • • • Read FlowFile attributes/读取FlowFile属性 Update FlowFile attributes/更新FlowFile属性 Ingest data/提取(采集)数据 Egress data/输出数据 Route data/路由数据 Extract data/抽取数据 Modify data/修改数据 3.1.2. ReportingTask ReportingTask接口允许将度量标准,监视信息和内部NiFi状态发布到外部端点(例如日志文件,电子邮件和远程Web服务)。 3.1.3. ControllerService ControllerService在单个JVM中跨Processor,提供其他ControllerService和 ReportingTasks等的共享状态和功能。例如,我们可通过在ControllerService中一次性加载非常大的数据集,并将其共享给其它需要的processor,而不需要各个Processor自己加载数据集。 3.1.4. FlowFilePrioritizer FlowFilePrioritizer可以对队列中的FlowFile进行优先级排序,以便可以按对特定期望顺序处理FlowFiles。 3.1.5. AuthorityProvider AuthorityProvider负责确定应授予给定用户哪些特权和角色(如果有)。 3.2. Processor API Processor在NiFi内是使用最广泛的组件,是唯一可以创建,删除,修改或检查FlowFiles(数据和属性)的组件。 所有Processor都使用Java的ServiceLoader机制(JSL)加载和实例化。所以所有处理器都必须遵守以下规则: 处理器必须具有默认构造函数。 处理器的JAR文件必须在META-INF / services目录中包含 org.apache.nifi.processor.Processor的文本文件,其中每一行都包含处理器的完全限定的类名。 尽管我们可以直接实现Processor接口来实现一个Processor,但通常我们不会这么做,因为NiFi为我们提供一个org.apache.nifi.processor.AbstractProcessor的抽象基础类,这使的开发一个Processor更容易。 并发 NiFi是一个高度并发的框架。这意味着所有扩展都必须是线程安全的。如果不熟悉用Java编写并发软件,强烈建议您熟悉Java并发原理。 3.2.1. Supporting API 为了理解Processor API,我们必须首先理解几个支持的类和接口,下面将对其进行讨论。 3.2.1.1. FlowFile FlowFile是一种逻辑概念,它包含了数据以及与数据关联的的一组属性,这些属性包括FlowFile的唯一标识符,名称,大小或者其他属性。虽然FlowFile的内容和属性可以更改,但FlowFile对象是不可变的。通过ProcessSession可以对FlowFile进行修改。 FlowFiles的核心属性在org.apache.nifi.flowfile.attributes.CoreAttributes枚举中定义。最常见的属性是文件名,路径和uuid。用引号引起来的字符串是CoreAttributes枚举中属性的值。 • • 文件名(“filename”):FlowFile的文件名。文件名不应包含任何目录结构。 UUID(“ uuid”):分配给此FlowFile的通用唯一标识符,用于将FlowFile与系统中的其他FlowFile区分开。 • • 路径(“path”):FlowFile的路径指示FlowFile所属的相对目录,并且不包含文件名。 绝对路径(“ absolute.path”):FlowFile的绝对路径指示FlowFile所属的绝对目录,并且不包含文件名。 • • • • 优先级(“priority”):指示FlowFile优先级的数字值。 MIME类型(“ mime.type”):此FlowFile的MIME类型。 丢弃原因(“ discard.reason”):指定丢弃FlowFile的原因。 备用标识符(“ alternate.identifier”):表示除FlowFile的UUID之外的已知标识符,该标识符引用该FlowFile。 3.2.1.2. ProcessSession(处理会话) ProcessSession,通常简称为“会话”,通过它可以创建,销毁,检查,克隆FlowFiles并将其转移到其他处理器。同一时间一个ProcessSession只能绑定到单一Processor上,并确同一个FlowFile不会被超过一个Processor访问。 ProcessSession提供了一种机制,用于通过添加或删除属性或修改FlowFile的内容来创建修改版本的FlowFiles。ProcessSession还公开了一种发出源事件的机制,该机制提供了跟踪FlowFile的血统和历史的功能。在一个或多个FlowFiles上执行操作后,可以提交或回滚ProcessSession。 3.2.1.3. ProcessContext(处理上下文) ProcessContext是Processor和NiFi FrameWork之间的桥梁。它提供Processor的配置信息,并允许Processor执行特定于框架的任务,例如生成其资源,以便框架将调度其他处理器运行,而不会消耗不必要的资源。 3.2.1.4. PropertyDescriptor(属性描述) PropertyDescriptor定义将由Processor,ReportingTask或ControllerService使用的属性。属性的定义包括其名称,属性说明,可选的默认值,验证逻辑以及关于是否需要该属性才能使Processor有效的指示符。通过实例化该类的实例PropertyDescriptor.Builder ,调用适当的方法以填充有关属性的详细信息并最终调用该build方法来创建PropertyDescriptor 。 3.2.1.5. Validator(验证器) PropertyDescriptor必须指定一个或多个验证器(Validator),这些验证器可用于确保用户输入的属性值有效。如果验证器指示属性值无效,则在属性变为有效之前将无法运行或使用组件。如果未指定验证器,则该组件将被视为无效,并且NiFi将报告该属性不受支持。 3.2.1.6. ValidationContext 验证属性值时,可以使用ValidationContext获取ControllerServices,创建PropertyValue对象以及使用表达式语言编译和评估属性值。 3.2.1.7. PropertyValue(属性值) 返回给Processor的所有属性值都以PropertyValue对象的形式返回。该对象具有方便的方法,可以将值从字符串转换为其他形式,例如数字和时间段,以及提供用于评估表达式语言的API。 3.2.1.8. Relationship(关系) Relationship定义了FlowFile可能从处理器传输到的路由。通过实例化该类的实例 Relationship.Builder ,调用适当的方法以填充Relationship的详细信息,最后调用该build方法,可以创建Relationship 。 3.2.1.9. StateManager(状态管理器) StateManager为Procesor,ReportingTask和ControllerService提供了一种易于存储和检索状态的机制。该API与ConcurrentHashMap相似,但是每个操作都需要一个Scope,指示状态是要在本地还是在整个集群范围内检索/存储。更多信息,请参见“ 状态管理器”部分。 3.2.1.10. ProcessorInitializationContext Processor被创建后,其initialize方法将被调用,并传入一个InitializationContext对象参数。该对象参数暴露给Processor的配置在Processor的整个生命周期内都不会更改,例如processor的唯一标识符。 3.2.1.11. ComponentLog(组件日志) NiFi建议Processor通过ComponentLog接口进行日志记录 ,而不是获取第三方记录器实例。因为通过ComponentLog进行的日志记录允许框架将超过配置的严重级别的日志消息呈现给用户界面,以便在发生重要事件时通知监视数据流的人员。此外,它可以在调试模式下记录堆栈跟踪,并在日志消息中提供处理器的唯一标识符,为所有处理器提供格式一致的日志记录。 3.2.2. AbstractProcessor API 绝大多数Processor将通过扩展AbstractProcessor来创建, AbstractProcessor提供了处理器开发人员感兴趣的几方法。 3.2.2.1.1. Init 初始化处理器 当一个Processor被创建,在调用任何其他方法之前,init方法将被调用,该方法接收一个类型为 ProcessorInitializationContext的参数。ProcessorInitializationContext为Processor提供了 ComponentLog,Procesor的唯一标识符和ControllerServiceLookup,可用于与已配置的ControllerServices进行交互。每个这样的对象是由AbstractProcessor存储,并且可以由子类通过getLogger,getIdentifier和 getControllerServiceLookup方法分别获得。 3.2.2.1.2. getRelationships 暴露Processor间关系: 为了使Processor能够将FlowFile传输到新的目的地进行后续处理,Processor必须首先能够向FrameWork暴露其当前支持的所有关系。这允许应用程序的用户通过在Processor之间创建连接并为这些连接分配适当的关系来将Processor彼此连接。 Processor通过覆盖getRelationships方法公开有效的关系集 。这个方法没有参数,并返回 Relationship 对象集合。对于大多数处理器,此集合将是静态的,但有些Processor将根据用户配置动态生成该Relationship 对象集合。对于返回静态集合的Processor,建议在Processor的构造函数或init方法中创建一个不可变的Set并返回该值,而不是动态生成Set。这种模式使其更干净的代码和更好的性能。 例如,ListFie的processor,就是在其初始化方法内,创建的不变集合 final Set this.relationships = Collections.unmodifiableSet(relationships); 3.2.2.1.3. getSupportedPropertyDescriptors 暴露Processor属性: 大多数Processor在使用前都需要进行一定的属性配置。处理器支持的属性通过 getSupportedPropertyDescriptors方法暴露给框架 。这是个无参方法,返回 PropertyDescriptor对象的List。List中对象的顺序决定了将在用户界面中属性的显示顺序。 一个PropertyDescriptor是通过创建一个新的实例构造PropertyDescriptor.Builder对象,调用构建器的适当的方法,并最终调用build方法。 尽管此方法涵盖了大多数场景,但有时还是希望允许用户配置一些名称未定的其它属性。这可以通过重写getSupportedDynamicPropertyDescriptor方法来实现 。此方法以一个String作为唯一参数,它指示属性的名称。该方法返回一个PropertyDescriptor对象,该 对象可用于验证属性名称和值。从此方法返回的任何PropertyDescriptor都应在其PropertyDescriptor.Builder类中的isDynamic值设置为true 。AbstractProcessor的默认是不允许任何动态创建的属性。 3.2.2.1.4. 验证处理器属性 如果Processor的配置无效,则无法启动。可以通过在PropertyDescriptor上设置Validator或通过PropertyDescriptor.Builder的allowableValues方法identifiesControllerService方法来限制属性的允许值来实现Processor属性的验证。 但是,有时候单独验证处理器的属性还不够。为此,AbstractProcessor暴露了一个customValidate方法。该方法接收单个ValidationContext参数。此方法的返回值是一个ValidationResult集合,用以描述验证过程中发现的所有问题。 应该只返回那些isValid方法返回false的ValidationResult对象。仅当所有属性均根据其关联的“验证器”和“允许值”有效时,才会调用customValidate方法。即,仅当所有属性本身都有效时,才调用此方法,并且此方法允许对处理器的配置进行整体验证。 3.2.2.1.5. 响应配置更改 有时需要让处理器在其属性更改时及时做出反应。onPropertyModified 方法可以达到这个目的。当用户更改处理器的属性值时,onPropertyModified将为每个修改后的属性调用该 方法。该方法具有三个参数:PropertyDescriptor,它指示要修改的属性,旧值和新值。如果该属性没有先前的值,则第二个参数为null。如果删除了该属性,则第三个参数为null。 注意,无论值是否有效,都将调用此方法。仅在实际修改值时才调用此方法,而不是在用户更新Processor而不更改其值时调用此方法。在调用此方法时,可以确保调用此方法的线程是处理器中当前正在执行代码的唯一线程,除非处理器本身创建了自己的线程。 3.2.2.1.6. 执行 当Processor需要工作时,Framwork会调用其onTrigger()方法,该方法有两个参数: 一个ProcessContext和一个 ProcessSession。onTrigger方法的通常首先通过ProcessSession的get方法得到FlowFile (对于从外部来源将数据导入NiFi的Processor,将跳过此步骤)。然后,Processor可对FlowFile的属性就行检查、添加、删除或修改属性,读取或修改FlowFile内容;并将FlowFiles传输到适当的Relationships。 3.2.2.1.7. 触发Processor 当一个Processor被调度执行且与要执行的工作,这个Processor的onTrigger方法将被框架调用。这里有要执行的工作是指满足以下任意条件: • 以当前Processor为目的的Connection的队列内有至少一个FlowFile; • 当前Processor没有接入Connection; • 当前Processor使用了@TriggerWhenEmpty注解; 有几个因素会影响处理器的onTrigger方法的调用。首先,只有被设置为run的processor才会被触发执行。如果设置Processor为run,则FrameWork会定期(由用户在用户界面中配置调度规则)检查 Processor是否有工作要做。同时FrameWork将检查procesor的下游目标,如果处理器的任何出站连接已满,则默认情况下不会安排Processor运行。 但是可以在Processor上添加@TriggerWhenAnyDestinationAvailable注解。添加该注解后,只要有一个下游目标可用”(如果连接队列未满,则将目标视为“可用”),而不是要求所有下游目标均可用。 @TriggerSerially 注解也与Processor调度有关。使用此注解的Processor永远不会有多个线程同时执行onTrigger方法。 3.2.3. 组件生命周期 NiFi API通过使用Java注解提供生命周期支持。org.apache.nifi.annotation.lifecycle软件包包含一些用于生命周期管理的注解。以下注解可以应用于NiFi组件中的Java方法,以指示框架何时应调用这些方法。我们NiFi组件包括Processor,ControllerServices和ReportingTask。 3.2.3.1.1. @OnAdded 添加@OnAdd注解的方法会导致组件被创建后,该方法会尽早的被调用。一个组件被构造出来后,其initialize方法(如果是AbstractProcessor的子类的话,可能是init方法)将会被调用,然后调用添加了@OnAdd注解的方法。任何添加了@OnAdd注解的方法抛出异常,都将返回错误,而且该组件不会添加到Flow中。同时不再调用其它用@OnAdd注解的方法。在组件的整个生命周期中,用@OnAdd注解的方法只会调用一次,且此方法不能有任何参数。 3.2.3.1.2. @OnEnabled @OnEnable注解适用于ControllerService的方法。 @OnEnabled注解用来指示在启用ControllerServices时应调用的方法。每当用户启用 ControllerServices时,都会调用具有此注解的方法。另外,当NiFi重启时,如果将NiFi配置为“auto-resume state”并启用ControllerServices,则将调用该方法。 如果有此注解的方法抛出异常,则将为该组件发出日志消息和公告,此时,服务将保持“ENABLING”状态且不可使用。在一定时间延迟后,将再次调用具有此注解的所有方法。在没有抛出任何异常的情况下,所有带有该注解的方法都返回之前,服务将不可用。 使用此注解的方法要么没有参数,要么只有一个org.apache.nifi.controller.ConfigurationContext类型的参数。 注: 如果将此注释应用于ReportingTask或Processor,该注解将被忽略。对于ControllerServices,enabling和disabling被认为是生命周期事件,因为该操作使其他组件可用或不可用。然而,对于Processor和ReportingTask,它们不是生命周期事件,而是一种机制,允许在启动或停止一组组件时排除某个组件。 3.2.3.1.3. @OnRemoved 与@OnAdd注解相反,@OnRemoved注解的作用是标注当一个组件被从Flow移除之前被@OnRemoved注解的方法需要被调用。通常我们会在此类方法中清理资源。被@OnRemoved的方法,必须没有任何参数。即使该类方法执行时抛出异常,对应的组件仍然会被移除。 3.2.3.1.4. @OnScheduled 该注解表示,当调度运行组件时,被@OnSchedule注解的方法应该被执行。由于ControllerServices没有调度概念,不应是使用这个注解。它应该只用于Processor和ReportingTask。如果带有该注解的任何方法抛出异常,则不会调用带有该注解的其他方法,并将通知用户。此时,将触发使用@OnUnscheduled注解的方法,然后触发使用@OnStopped注解的方法(期间,如果这些方法中的任何一个抛出异常,都会被忽略)。然后,组件将在一段时间(称为“Administrative Yield Duration”,这是在nifi.properties中配置的一个值)内产生其执行效果。然后,该过程将再次启动,直到使用@OnScheduled注释的所有方法都返回,并且没有抛出任何异常。具有此注解的方法可以采用零参数,也可以采用单个参数。如果使用单个参数,如果组件是Processor,则参数必须是ProcessContext类型;如果组件是ReportingTask,则参数必须是ConfigurationContext类型。 3.2.3.1.5. @OnUnscheduled 当不再计划运行Processor或ReportingTask时,将调用带有此批注的方法。此时,Processor的onTrigger方法中可能仍有许多线程处于活动状态。如果此类方法引发异常,则将生成日志消息,并忽略异常,并继续调用带有此注解的其他方法。具有此注解的方法可以采用零参数,也可以采用单个参数。如果使用单个参数,如果组件是Processor,则参数必须是ProcessContext类型;如果组件是ReportingTask,则参数必须是ConfigurationContext类型。 3.2.3.1.6. @OnStopped 当Processor或ReportingTask不再计划运行,并且所有线程都从onTrigger方法返回时,将调用具有此注解的方法。如果该方法抛出异常,将生成日志消息,并忽略该异常;同时仍将调用具有此注解的其他方法。带有此注解的方法允许接受0或1参数。如果使用了一个参数,它必须是ConfigurationContext类型(如果组件是ReportingTask),或者是ProcessContext类型(如果组件是Processor)。 3.2.3.1.7. @OnShutdown 当成功停止NiFi时,将调用所有使用@OnShutdown注解的方法。如果此类方法抛出异常,将生成一条日志消息,并忽略异常,同时仍将调用具有此注解的其他方法。具有此注释的方法必须接受零参数。注意:尽管NiFi将尝试在所有组件上调用带有该注解的方法,但在某些特殊情况下达不到预期效果。例如,进程可能被意外终止,在这种情况下,它没有机会调用这些方法。因此,尽管使用这个注释的方法可以用于清理资源,但是不应该依赖它们来处理关键的业务逻辑。 3.2.4. 组件通知 NiFi API通过使用Java注解提供通知支持。org.apache.nifi.annotation.notification软件包包含一些用于通知管理的注解。以下注解可应用于NiFi组件中的Java方法,以指示框架何时应调用这些方法。 3.2.4.1.1. @OnPrimaryNodeStateChange @OnPrimaryNodeStateChange注解标明当主节点的集群中的状态发生了变化需要调用的方法。具有此注解的方法要么不带参数,要么带一个PrimaryNodeState类型的参数。PrimaryNodeState提供关于更改内容的上下文,以便组件可以采取适当的操作。PrimaryNodeState有两个可能的值:ELECTED_PRIMARY_NODE(接收这个状态的节点已经被选为NiFi集群的主节点),或者PRIMARY_NODE_REVOKED (接收这个状态的节点是主节点,但是现在已经撤销了它的主节点角色)。。 3.2.5. 限制 受限组件是一种组件,可以用于执行操作员通过NiFi REST API / UI提供的任意未经消毒的代码,也可以用于使用NiFi OS凭据在NiFi主机系统上获取或更改数据。这些组件可能会被其他经过授权的NiFi用户使用,以超出应用程序的预期用途,提升特权,或者可能公开有关NiFi进程或主机系统内部的数据。所有这些功能都应被视为特权,并且管理员应意识到这些功能,并为一部分受信任的用户显式的启用它们。 可以使用@Restricted注解Processor,ControllerServices或ReportingTask。这将导致该组件被视为受限组件,并且需要将用户明确添加到可以访问受限组件的用户列表中。一旦允许用户访问受限制的组件,则将在允许所有其他权限的情况下允许他们创建和修改那些组件。如果无法访问受限制的组件,则用户即使u知道存在这些类型的组件,就算拥有足够的权限也无法创建或修改它们。 3.2.6. StateManager(状态管理器) 在ProcessContext、ReportingContext和ControllerServiceInitializationContext中,组件通过调用getStateManager()方法获得状态管理器。状态管理器提供用于存储和检索状态的简单API。这种机制旨在为开发人员提供一种非常容易地存储键/值对、检索这些值并自动更新它们的能力。状态可以存储在节点的本地,也可以在集群中的跨节点存储。但是,需要注意的是,此机制仅用于提供存储 “简单”状态的机制。因此,该API只允许存储和检索Map 3.2.6.1. 范围 与状态管理器进行通信时,所有方法调用都要求提供范围。此范围将为Scope.LOCAL或Scope.CLUSTER。如果NiFi在群集中运行,则此作用域向框架提供有关如何进行操作的重要信息。 如过存储状态使用Scope.CLUSTER,那么集群中的所有节点都将使用相同的状态存储机制进行通信。如果状态存储和检索使用Scope.LOCAL,则每个节点将看到状态的不同表示形式。 值得注意的是,如果将NiFi配置为作为独立实例运行,而不是在群集中运行,则始终使用范围Scope.LOCAL。这样做是为了允许NiFi组件的开发人员以一种一致的方式编写代码,而不必担心NiFi实例是否集群。开发人员应该假设实例是集群的,并相应地编写代码。 3.2.6.2. 存储和检索状态 状态使用StateManager的getState、setState、replace和clear方法进行存储。所有这些方法都需要提供一个作用域。应该注意,存储在本地范围中的状态与存储在集群范围中的状态完全不同。如果处理器使用Scope.CLUSTER作用域存储一个值。然后尝试使用Scope.CLUSTER作用域检索值。检索到的值将为null(除非使用该作用域的键也存储了一个值)。每个处理器的状态与其他处理器的状态隔离存储。 因此,两个处理器不能共享相同的状态。但是,在某些情况下,在不同类型的两个处理器或相同类型的两个处理器之间共享状态是非常必要的。这可以通过使用ControllerService来实现。通过从ControllerService存储和检索状态,多个处理器可以使用相同的ControllerService,状态可以通过控制器服务的API公开。 3.2.6.3. 单元测试 NiFi的Mock框架提供了大量工具来执行处理器的单元测试。处理器单元测试通常从TestRunner类开始。因此,TestRunner类本身包含一个getStateManager方法。然而,返回的StateManager是特定类型的:MockStateManager。除了StateManager接口定义的方法之外,这个实现还提供了几个方法,帮助开发人员更容易地开发单元测试。 首先,MockStateManager实现StateManager接口,因此可以在单元测试中检查所有状态。另外,MockStateManager公开了一些assert*方法来执行状态设置为预期状态的断言。MockStateManager还提供了一种能力,可以指示如果针对特定范围更新状态,则单元测试应该立即失败。 3.2.6.4. 报告处理器活动 处理器负责报告它们的活动,以便用户能够了解它们的数据发生了什么变化。处理器应该通过ComponentLog记录事件,可以通过InitializationContext或调用AbstractProcessor的getLogger方法访问ComponentLog。 此外,处理器应该使用ProvenanceReporter接口,该接口通过ProcessSession的 getProvenanceReporter方法获得。ProvenanceReporter应该用于指示内容从外部源接收或发送到外部位置的任何时间。ProvenanceReporter还提供了一些方法,用于报告何时复制、分叉或修改一个流文件,何时将多个流文件合并到一个流文件中,以及如何将流文件与其他标识符关联。但是,报告这些函数并不是很重要,因为框架能够检测这些内容并代表处理器发出适当的事件。然而,这是一个最佳实践的处理器开发人员发出这些事件,在这些事件的代码变得明确被释放,和开发人员能够提供额外的细节的事件,如或相关行动的时间,这是采取行动的信息。如果处理器发出事件,框架将不会发出重复的事件。相反,它总是假设处理器开发人员比框架更了解处理器上下文中发生的事情。然而,框架可能会发出不同的事件。例如,如果一个处理器修改了一个FlowFile的内容和它的属性,然后只发出一个ATTRIBUTES_MODIFIED事件,那么框架将发出一个CONTENT_MODIFIED事件。如果为该流文件(由处理器或框架发出)发出任何其他事件,则框架将不会发出ATTRIBUTES_MODIFIED事件。这是由于这样的事实,所有起源事件知道FlowFile在事件发生之前的属性以及属性发生由于FlowFile的处理,因此ATTRIBUTES_MODIFIED通常被认为是多余的,会导致FlowFile血统非常详细的呈现。但是,如果从处理器的角度考虑该事件是相关的,则处理器将此事件与其他事件一起发出是可以接受的。 3.3. 组件文档 NiFi试图通过NiFi应用程序本身通过用户界面向用户提供大量文档,从而使用户体验尽可能简单和方便。为了做到这一点,处理器开发人员必须将该文档提供给框架。NiFi公开了几种用于向框架提供文档的机制。 3.3.1. 属性文档化 可以通过调用PropertyDescriptor的生成器的描述方法来文档化单个属性,如下所示: public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor.Builder() .name(\"My Property\") .description(\"Description of the Property\") ... .build(); 如果该属性提供一组允许值,则这些值将在界面的下拉字段中显示给用户,每个值可以给出描述信息,如下: public static final AllowableValue EXTENSIVE = new AllowableValue(\"Extensive\\"Extensive\ \"Everything will be logged - use with caution!\"); public static final AllowableValue VERBOSE = new AllowableValue(\"Verbose\ \"Quite a bit of logging will occur\"); public static final AllowableValue REGULAR = new AllowableValue(\"Regular\ \"Typical logging will occur\"); public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder() .name(\"Amount to Log\") .description(\"How much the Processor should log\") .allowableValues(REGULAR, VERBOSE, EXTENSIVE) .defaultValue(REGULAR.getValue()) ... .build(); 3.3.2. RelationShips文档化 Processor的RelationShips文档化,与属性文档化类似,可以通过调用RelationShiops.Builderde description()方法完成,如下: public static final Relationship MY_RELATIONSHIP = new Relationship.Builder() .name(\"My Relationship\") .description(\"This relationship is used only if the Processor fails to process the data.\") .build(); 3.3.3. 能力和关键字文档化 NiFi的org.apache.nifi.annotation.documentation包提供了可用于文档化组件的注解。 CapabilityDescription注解可用于Processor,Reporting Task或者Controler Service,用于简单描述组件所提供的功能。Tags注解有一个类型为字符串数据组的value变量,其值可以是大括号括起来的的逗号分隔的多个字符串,然后用户就可以在界面上通过关键字(tags)进行检索组件。例如: @Tags({\"example\@CapabilityDescription(\"Example Processor that provides no real functionality but is provided\" + \" for an example in the Developer Guide\") public static final ExampleProcessor extends Processor { ... } 3.3.4. FlowFile属性交互文档化 很多时候,我们需要在在入站FlowFiles上设置适当属性以便Processor可以正常运行。另外Processor也可能在出站FlowFile上更新或创建一些属性。Processor的开发者可以使用ReadAttribure和 WritesAttribute注解来文档化这种行为能力。通过这些被文档话的属性,用户可以更好的了解一个Processor如何使用这些属性。 注意:由于Java7在一个类型上不支持在重复注解,所以您可能需要使用ReadsAttributes和 WritesAttributes指示Processor读取或写入多个FlowFile属性。该注释只能应用于处理器。下面列出了一个示例: @WritesAttributes({ @WritesAttribute(attribute = \"invokehttp.status.code\\"The status code that is returned\"), @WritesAttribute(attribute = \"invokehttp.status.message\message that is returned\"), @WritesAttribute(attribute = \"invokehttp.response.body\response body\"), @WritesAttribute(attribute = \"invokehttp.request.url\URL\"), @WritesAttribute(attribute = \"invokehttp.tx.idhat is returned after reading the response\"), @WritesAttribute(attribute = \"invokehttp.remote.dn\remote server\") }) public final class InvokeHTTP extends AbstractProcessor { 3.3.5. 关联组件文档化 通常,Processor和Controller Service相互关联。有时它是一个PUT / GET关系中PutFile和GetFile。有时,Processor使用Controller服务,例如InvokeHTTP和StandardSSLContextService。有时,一个ControllerService使用另一个ControllerServicew,比如DistributedMapCacheClientService和 DistributedMapCacheServer。开发人员可以使用SeeAlso注解将这些不同的组件相关联。 SeeAlso注解可以应用于Processors,ControllerServices和ReportingTasks。下面列出了一个示例: @SeeAlso(GetFile.class) public class PutFile extends AbstractProcessor { 3.3.6. 高级文档化 当上述文档化方法不能满足要求时,NiFi提供了一种使用“Usage”的更高级的文档方法。当用户鼠标右键点击Processor时,可以弹出一个“Usage”,另外在界面的右上角有一个“Help”的链接,可以找到同样的使用帮助信息。 Processor的高级文档以一个名为additionalDetails.html的html文件提供。这个文件应该对应Processor的jar内的docs目录下的一个名为对应组件的的全限定类名的目录下。html文件会被一个生成的HTML文件链接,这个生成的HTML文件将包含所有的功能描述、关键字、属性描述以及关系信息。所以在additionalDetails.html中没有必要再重复这些信息。 在此,您可以解释这个Procdssor正在执行的操作,其预期产生的数据类型以及其预期产生的FlowFile属性。由于是HTML格式的,所以您可以使用图像和表格来描述这个组件。可以使用相同的方法为Processors,ControllerServices和ReportingTasks提供高级文档。 3.4. 来源事件(Provenance Events) 来源事件类型列表 来源事件 描述 ADDINFO ATTRIBUTES_MODIFIED 用于添加附加信息(如新链接到新URI或UUID)的起源事件 指示以某种方式修改了FlowFile的属性。当同时报告另一个事件时,不需要此事件,因为另一个事件已经包含所有FlowFile属性 CLONE CONTENT_MODIFIED 表示一个FlowFile是其父FlowFile的精确副本 表示FlowFile的内容被某种方式修改。使用这个事件类型时,建议提供内容如何被修改的详细信息。 CREATE 表示一个由数据生成的FlowFile,数据不是从远程或外部进程获得。 DOWNLOAD DROP 表示一个FlowFile的内容被用户或外币实体下载。 表示出于某种原因(不是对象过期)而结束对象生命的起源事件 来源事件 描述 EXPIRE FETCH 指示由于未及时处理对象而结束对象生命的出处事件 指示使用某些外部资源的内容覆盖了FlowFile的内容。这类似于RECEIVE事件,但不同之处在于,RECEIVE事件旨在用作将FlowFile引入系统的事件,而FETCH用于指示现有FlowFile的内容已被覆盖。 FORK JOIN RECEIVE 指示一个或多个FlowFiles是从父FlowFile派生 指示单个FlowFile是从多个父FlowFiles派生的 指示从外部流程接收数据的来源事件。此事件类型是FlowFile的第一个事件。因此,从外部源接收数据并使用该数据替换现有FlowFile内容的处理器应使用FETCH事件类型,而不是RECEIVE事件类型。 REPLAY 重放FlowFile的来源事件。事件的UUID表示正在重播的原始FlowFile的UUID。该事件包含一个父UUID(也是正在重播的FlowFile的UUID)和一个子UUID(一个新创建的FlowFile的UUID),子UUID将重新排队以进行处理 ROUTE 指示将FlowFile路由到指定的RelationShip,并提供有关为何将FlowFile路由到此RelationShip的信息 SEND UNKNOWN 发送数据到外部流程的事件 表示来源事件的类型未知,因为尝试访问该事件的用户无权知道该类型 3.5. 通用Proccessor模式(Common Processor Patterns) 尽管有许多不同的处理器可供NiFi用户使用,但绝大多数都属于几种常见的设计模式之一。下面,我们将讨论这些模式,何时模式是适当的,我们遵循这些模式的原因,以及在应用这些模式时要注意的事项。注意,下面讨论的模式和建议是通用的指导原则,而不是严格的规则。 3.5.1. 数据入口 Processor提取数据到NiFi有一个名为“success”的Relationship。该Processor通过ProcessSession的create方法生成新的FlowFile,不会从传入的连接中提取FlowFile。Processor名称以“Get”或\\“Listen”开头(具体取决于它轮询外部源还是公开一些外部源可以连接到的接口),以所使用的通讯协议名称结尾。遵循这种模式的处理器包括GetFile,GetSFTP, ListenHTTP,和GetHTTP。 该处理器可能使用@OnScheduled注释的方法创建或初始化连接池 。但由于通信问题可能会阻止建立连接或导致连接终止,所以此时不会创建连接本身,而是在该onTrigger方法中从池中创建或租用连接。 Processor的onTrigger方法首先从连接池中租赁一个连接(如果可能的话),或者创建一个到外部服务的连接。当外部源中没有可用的数据时,处理器将调用ProcessContext的yield方法,然后返回,以避免Processor持续运行和毫无益处地资源耗费。否则,Processor将通过ProcessSession的create方法创建一个FlowFile,并向FlowFile分配适当的文件名和路径(通过添加filename和path属性),以及其他相关属性。Flowfile的OutputStream通过ProcessSession的write方法获得的,它传递一个新的 OutputStreamCallback(通常是一个匿名的内部类)。在这个回调中,Processor将来自外部资源的内容写到FlowFile的OutputStream中。如果希望将InputStream的全部内容写入到FlowFile中,那么使用ProcessSession的importFrom方法可能比使用write方法更方便。 如果Processor希望接收许多小文件,建议再提交会话之前创建多个FlowFile。这样框架可以更有效地处理新创建的FlowFiles的内容。 Processor产生一个Provenance事件,说明已接收到数据,以及数据来自何处。Processor应该记录FlowFile的创建,以便必要时可通过分析日志来确定FlowFile的来源。 Processor应对从外部源接收或移除数据进行确认,以避免接收重复文件。确认操作只能在创建FlowFile的 ProcessSession提交后。不遵守这一原则可能会导致数据丢失,因为在提交会话之前重新启动NiFi将导致临时文件被删除。但是,即使遵守这个原则也可能接收到重复的数据,因为应用程序可能在提交会话之后并在确认或从外部源删除数据之前重新启动。但是,通常数据重复比数据丢失更容易接受。连接最终被归还或添加到连接池中,这取决于是从连接池中租赁的连接还是在onTrigger方法中创建的连接。 如果出现通讯问题,则连接通常会终止,并且不会归还(或添加)到连接池中。断开与远程系统的连接,并使用带有@OnStopped注释的方法关闭连接池,以便可以回收资源。 3.5.2. 数据出口 将数据发布到外部源的Processor具有两个关系:success和failure。Procesor名称以“ Put”开头,以使用的数据传输的协议名结尾。遵循此模式的处理器包括PutEmail,PutSFTP和 PostHTTP(请注意,名称不以“ Put”开头,因为这会引起混淆,因为在处理HTTP时PUT和POST具有特殊含义)。 Processor可以使用@OnScheduled注解的方法创建或初始化连接池 。但是,由于通信问题可能会阻止建立连接或导致连接终止,因此此时不会创建连接本身。而是在onTrigger方法中从池中创建或租用连接。 OnTrigger方法首先通过该get方法从ProcessSession获得FlowFile 。如果没有FlowFile可用,该方法将返回而不获取远程资源的连接。 如果至少有一个FlowFile可用,则Processor从连接池获取连接或创建新连接。如果处理器既不能从连接池中租借连接也不能创建新连接,则将FlowFile路由到failure,记录事件,然后方法返回。 如果获得了连接,Processor通过调用ProcessSession上的read方法并传递InputStreamCallback(通常是一个匿名内部类),从这个回调中将FlowFile的内容传输到目标,从而获得一个FlowFile内容相关的InputStream。该事件与传输文件所花费的时间和传输文件的数据速率一起被记录下来。将发送事件报告给ProvenanceReporter,方法是通过getProvenanceReporter方法从ProcessSession获取报告器,然后调用报告 器上的SEND方法。连接被归还或添加到连接池中,这取决于是从池中租赁的连接还是onTrigger方法新创建的连接。 如果出现通信问题,则连接通常会终止,并且不会返回(或添加)到连接池中。如果将数据发送到远程资源时出现问题,则处理错误的方法取决于几个注意事项。 如果问题与网络条件有关,则通常会将FlowFile路由到failure。流文件没有受到惩罚,因为数据没有必要的问题( The FlowFile is not penalized because there is not necessary a problem with the data)。 与数据接收类型Processor的情况不同,我们通常不调用ProcessContext上的yield。这是因为在ingest的情况下,在Processor能够执行其功能之前FlowFile并不存在。但是,对于Put类型Processor,数据流管理器可以选择将故障路由到另一个Processor。这可以允许在一个系统出现问题时使用“备份”系统,也可以用于跨多个系统的负载分配。 如果发生与数据相关的问题,则有两种处理方法。如果问题很可能会自行解决,则对FlowFile进行惩罚,然后将其路由到 failure。例如,在PutFTP中就是这种情况,因为文件命名冲突而无法传输FlowFile。前提是该文件最终将从目录中删除,以便可以传输新文件。结果,我们对FlowFile进行惩罚并路由到, failure以便稍后再试。如果数据确实存在问题(例如数据不符合某些要求的规范),则可以采用另外一种方法。即将failure关系分解为 failure和 communications failure关系(RelationShip)。这使DataFlow Manager可以确定如何分别处理每种情况。在这些情况下,在创建关系时对RelatiojnShip的描述就很重要,描述信息应该清晰区分出两种关系的不同。 断开与远程系统的连接,并使用@OnStopped注解的方法关闭连接池,以便回收资源。 3.5.3. 基于内容的路由(一对一) 根据其内容路由数据的处理器将采用以下两种形式之一:将传入的FlowFile恰好路由到一个目的地,或将传入的数据路由到0个或多个目的地。在这里,我们将讨论第一种情况。 该处理器具有两个关系:matched和unmatched。如果期望使用特定的数据格式,则处理器也将具有一种failure关系,该关系将在输入不是期望的格式时使用。处理器公开指示路由规则的属性。 如果指定路由规则的属性需要处理(例如,编译正则表达式),则在使用被@OnScheduled注解的方法中完成处理,并将结果存储在标记为volatile的成员变量中。 onTrigger方法获得单个FlowFile,并通过ProcessSession的read方法读取FlowFile的内容,评估匹配条件。然后,processor根据条件是否匹配来确定是否应将FlowFile路由到matched或路由到unmatched,从而将FlowFile路由到适当的关系。 然后,处理器发出一个ROUTE事件,该事件指示处理器将FlowFile路由到哪个关系。 Processor使用org.apache.nifi.annotation.behavior包中的@SideEffectFree和 @SupportsBatching注解。 3.5.4. 基于内容的路由(一对多) 与一对一基于内容路由不同,一对多基于内容路由通常由用户动态定义路由匹配关系; 为使用户可以定义额外属性,必须复写getSupportedDynamicPropertyDescriptor方法。此方法返回一个PropertyDescriptor和一个确保用户指定匹配条件有效的验证器。 在这种Processor中,getRelationships方法返回的关系集合是一个标记为volatile的成员变量。此关系集合初始由一个unmatched的relationship构造。复写onPropertyModified方法,以便在添加或删除属性时,将使用相同的名称创建一个新的Relationship。如果处理器具有非用户定义的属性,则检查指定的属性是否为用户定义很重要。这可以通过调用传递给这个方法的PropertyDescriptor的isDynamic方法来实现。如果此属性是动态的,则会创建一组新的关系,并将之前的一组关系复制到其中。这个新集合要么将新创建的 关系添加到其中,要么从其中删除,这取决于是将一个新属性添加到处理器中,还是删除了一个属性(通过检查此函数的第三个参数是否为null来检测属性删除)。持有关系集的成员变量随后被更新,以指向这个新集合。 如果指定路由条件的属性需要处理(例如编译正则表达式),则此处理将在使用@OnScheduled注释的方法中完成(如果可能的话)。然后将结果存储在标记为volatile的成员变量中。这个成员变量通常是Map类型,map的key是Relationship类型,值的类型由处理属性值的结果定义。 onTrigger方法通过ProcessSession的get方法获得一个FlowFile。如果没有可用的FlowFile,立即返回,否则创建一组Relationship。onTrigger方法通过ProcessSession的read方法读取FlowFile的内容,在数据流化时评估每个匹配条件。对于任何匹配的条件,都将与该匹配条件关联的关系添加到关系集。。 读取FlowFile的内容后,检查“关系集”是否为空。如果为空,则原始FlowFile会添加一个属性,以指示将其路由到的关系以及路由到的unmatched关系,并记录日志,发出ROUTE事件,然后该方法返回。如果关系集的大小等于1,则原始FlowFile会添加一个属性,以指示该路由文件已路由到的Relation,并被路由到集合中条目指定的Relationship,记录日志,发出ROUTE事件,然后该方法返回。 如果集合包含多个Relationship,Processor将为每个Relationship创建FlowFile的克隆(第一个除外)。这通过ProcessSession 的clone方法完成的。不用报告“CLONE”事件,因为该框架将为您处理此事件。原始FlowFile和每个克隆都被路由到其适当的Relationship。并为每个FlowFile发出一个ROUTE事件。记录下来,方法返回。 该Processor用 org.apache.nifi.annotation.behavior包中的@SideEffectFree和 @SupportsBatching注解。 3.5.5. 基于内容路由流(一对多) 上述基于内容的路由(一对多)提供了创建强大Processor的抽象。不过,其假设每个FlowFile都将路由到零或多个RelationShip。假如传入的数据格式是许多不同信息片段的“流”——而我们希望将此流的不同片段发送到不同的RelationShip,该如何?例如,假设我们希望有一个RouteCSV处理器,它可以配置多个正则表达式。如果CSV文件中的某行与正则表达式匹配,则应将该行包含在RelationShip的出站FlowFile中。如果某个正则表达式与关系“has-apple”相关联,并且该正则表达式匹配流文件中的1000行,那么对于“has-apple”关系应该有一个包含1000行数据的出站FlowFile,其中包含1000行。如果不同的正则表达式与关系“has-oranges”相关联,并且该正则表达式匹配流文件中的50行,那么对于“has-oranges”关系应该有一个包含50行数据的出站FlowFile行。即一个FlowFile进来,两个FlowFile出来。这两个FlowFile可能包含来自原始流文件的一些相同的文本行,或者它们可能完全不同。这就是我们将在本节中讨论的Processor类型。 此类Processor的名称以“Route”开头,以路由的数据类型的名称结尾。在我们的示例中,我们正在路由CSV数据,因此处理器名为RouteCSV。这个Processor支持动态属性。每个用户定义的属性都有一个映射到RelationShip名称的名称。属性的值采用“匹配条件”所需的格式。在我们的示例中,属性的值必须是有效的正则表达式。 此Processor维护一个内部ConcurrentMap,其Key是Relationship,值的类型取决于匹配条件的格式。在我们的示例中,我们将维持 ConcurrentMap onPropertyModified方法。如果提供给此方法的新值(第三个参数)为null,则将从ConcurrentMap中删除其名称由属性名称(第一个参数)定义的Relationship。否则,将处理新值(在我们的示例中,通过调用Pattern.compile(newValue)),并将此值添加到ConcurrentMap,key是Relationship,RelationShip的名称由属性名称指定。 此Processor将复写customValidate方法。它将从ValidationContext检索所有属性,并计算动态的PropertyDescriptors的数量(通过调用PropertyDescriptor上的isDynamic())。如果dynamic PropertyDescriptors的数量为0,则表明用户没有添加任何RelationShip,因此Processor将返回一个ValidationResult,表明处理器因为没有添加任何RelationShip而无效。 当调用getRelationships方法时,Processor将返回用户设置的所有关系,并且还将返回一个不匹配的关系。因为这个处理器必须读写内容仓库(可相对昂贵),所以如果希望该处理器用于非常大的数据量,用户可以设置一个属性,来确定其是否关心数据是否匹配到任何匹配规则。 当onTrigger方法被调用时,Processor通过ProcessSession.get获得一个FlowFile。如果没有可用的数据,处理器返回。否则,处理器将创建一个Map。我们将把这个Map命名为flowFileMap。处理器通过调用ProcessSession.read来读取传入的FlowFile,并提供一个InputStreamCallback。在InputStreamCallback中,Processor从FlowFile中读取第一段数据,然后根据这段数据评估每个匹配条件。如果一个特定的条件(在我们的例子中是一个正则表达式)匹配,那么Processor将从对应的RelationShip的flowFileMap中获取FlowFile。如果这个flowFileMap中还没有FlowFile,那么Processor通过调用 session.create(incomingFlowFile)创建一个新的FlowFile,然后将新的FlowFile添加到flowFileMap中。处理器然后通过调用session.appen方法将这段数据写入到FlowFile中。并提供一个OutputStreamCallback。在这个OutputStreamCallback中,我们可以访问新的FlowFile的OutputStream,因此我们能够将数据写入新的FlowFile。然后从OutputStreamCallback返回。在遍历每个匹配条件之后,如果没有匹配条件,我们将对未匹配的关系执行与上面相同的例程(除非用户配置我们不写出未匹配的数据)。现在我们已经调用了session.append,我们有一个新的版本的流文件。因此,我们需要更新我们的flowFileMap,将Relationship与新的FlowFile相关联. 无论何时抛出异常,都需要将传入的FlowFile路由到failure。同时删除每个新建的FlowFile,因为不会再将他们转移到任何地方,我们可以调session.remove(flowFileMap.values())来完成此操作。然后,记录错误并返回。 如果一切OK,就遍历 flowFileMap并将每个FlowFile传输到相应的Relationship。然后,原始FlowFile要么删除要么路由到某个original关系。对于每个新创建的FlowFiles,我们还发出一个ROUTE事件,指示FlowFile转到哪个关系。在ROUTE事件的详细信息中包括此FlowFile中包含多少数据也是有益的。这使DataFlow Manager在查看“源谱系”视图时可以轻松查看给定输入FlowFile的每个关系有多少条信息。 另外,某些处理器可能需要对发送到每个关系的数据进行“分组”,以便发送到关系的每个FlowFile具有相同的值。在我们的示例中,我们可能希望正则表达式具有一个分组能力,如果CSV中的两个不同行与正则表达式匹配,但根据分组设置具有不同的值,我们希望将它们添加到两个不同的FlowFiles中。然后可以将匹配值作为属性添加到每个FlowFile。这可以通过修改flowFileMap,使其被定义为Map 3.5.6. 基于属性的路由 与基于内容的路由类似,基于属性的路由也分为一对一和一对多。不同的是此类Processor不会调用PrecessSession的read方法,因为其不会读取FlowFile的Content。此类Processor通常效率非常高,因此@SupportBatching注解的使用会非常重要。 3.5.7. 内容拆分(一对多) 此类Processor是将一个大的FlowFile根据设定的拆分大小,拆分为若干小的FlowFile,此类Processor通常除了设置拆分大小外,不需要额外配置信息。 新拆分出的FlowFile会多出如下属性: 属性名称 split.parent.uuid split.index 描述 原始FlowFile的UUID 一个单向数字,指示这是列表中的哪个FlowFile(创建的第一个FlowFile是0,第二个FlowFile是1) split.count 已创建的拆分FlowFiles的总数 3.5.8. 根据内容更新属性 该处理器与上述的基于内容的路由Processor很相似,只是这里不进行路由,而是根据配置的匹配规则设置FlowFile的属性,配置方式类似于“基于内容的路由(一对多)”,用户可以定义自己的属性。属性的名称表示要添加的属性的名称。该属性的值表示要应用于数据的某些匹配条件。如果匹配条件与数据匹配,则会添加一个属性名称与该属性相同的属性。该属性的值是来自匹配内容的条件。 例如,评估XPath表达式的处理器可以允许输入用户定义的XPath。如果XPath与FlowFile的内容匹配,则该FlowFile将添加一个属性,该属性的名称等于该属性名称的名称,并且其值等于与该XPath匹配的XML元素或属性的文本内容。如果在此示例中传入的FlowFile不是有效的XML,则将使用该failure关系。无论是否找到任何匹配项,都将使用success关系。该处理器发出类型为ATTRIBUTES_MODIFIED的事件。 3.5.9. 丰富/修改内容 “丰富/修改内容”模式非常普遍且非常通用。此模式负责任何常规的内容修改。在大多数情况下,此处理器用@SideEffectFree和@SupportsBatching注释标记 。处理器具有任意数量的必需和可选属性,具体取决于处理器的功能。处理器通常具有success和failure关系。当输入文件的格式不是预期的格式时,通常使用failure关系。 该处理器获取FlowFile并使用ProcessSession的write(StreamCallback)方法对其进行更新,以便它既可以读取FlowFile的内容,又可以写入FlowFile内容的下一个版本。如果在回调过程中遇到错误,则回调将抛出ProcessException。对ProcessSession.write方法的调用被包装在一个try/catch块中,捕获ProcessException 并路由FlowFile到failure。 如果回调成功,则将发出CONTENT_MODIFIED事件。 3.6. 错误处理 在编写Procesor时,可能会出现若干意外情况。如果Processor本身不处理错误,那么Processor开发人员理解NiFi框架的错误处理机制是很重要的,而且理解处理器应该如何处理错误也是很重要的。在这里,我们将讨论处理器在工作过程中应该如何处理意外错误。 3.6.1. Processor内的异常 在执行onTrigger方法期间,常见的故障包括: • • • • 传入数据不是预期的格式。 与外部服务的网络连接失败。 向磁盘读取或写入数据失败。 Processor本身有bug或依赖库有bug。 从框架的角度来看,只有两种异常,一种是ProcessException一种就是所有其它的异常。 如果从Processor抛出ProcessException,则框架将假定这是已知的失败。而且,这是以后尝试再次成功处理数据的前提。框架将回滚正在处理的会话,并惩罚正在处理的FlowFiles。 一旦有任何其他异常从Processor抛出,则框架将假定这是开发人员没有考虑到的故障。在这种情况下,框架还将回滚会话并惩罚流文件。然而,在这种情况下,我们会遇到一些非常糟糕的情况。例如,处理器可能处于糟糕的状态,并且可能持续运行,耗尽系统资源,而不提供任何有用的工作。例如,当连续抛出NullPointerException时。为了避免这种情况,如果ProcessException之外的异常从onTrigger方法抛出,那么框架将“Administratively Yield”处理器。即在一段时间内(可在nifi.properties中配置,默认10秒)不会触发处理器再次运行。 3.6.2. Exceptions within a callback: IOException, RuntimeException 通常情况下,当一个异常在处理器中发生时,它是在回调中发生的(如:InputStreamCallback, OutputStreamCallback,或StreamCallback)。即在处理FlowFile的content时。允许回调抛出RuntimeException或IOException。对于RuntimeException,此异常将传播回onTrigger方法。在IOException的情况下,异常将被包装在ProcessException中,然后从框架中抛出ProcessException。 出于这个原因,建议使用回调的处理器在try/catch块和catch ProcessException以及任何其他它们期望回调抛出的RuntimeException中执行回调。但是,不建议处理器捕获一般异常或可抛出的情况。不鼓励这样做有两个原因。 出于这个原因,建议使用回调的处理器在一个try/catch块内进行捕获ProcessException和预期的回调引发RuntimeException。不建议处理器捕获一般Exception或Throwable,有两个原因: 首先,如果引发了意外的RuntimeException,则可能是一个错误,允许框架回滚会话将确保没有数据丢失,并确保DataFlow Managers能够通过将相应的数据排入队列来处理。 其次,当回调抛出IOException时,实际上有两种类型的IOException:Processor代码引发的IOException(例如,数据不是预期的格式或网络连接失败),和内容仓库(Content Repository)引发的IOException。如果是后者,则框架将捕获此IOException并将其包装到FlowFileAccessException(扩展自RuntimeException)。This is done explicitly so that the Exception will escape the onTrigger method and the framework can handle this condition appropriately. Catching the general Exception prevents this from happening. 3.6.3. Penalization vs. Yielding(惩罚与屈 服) 在处理过程中发生问题时,该框架公开两种方法,允许处理器开发人员避免执行不必要的工作:“Penalization(惩罚)”和“Yielding(屈服)”。对于刚接触NiFi API的开发人员来说,这两个概念可能会造成混淆。开发人员可以通过调用ProcessSession.FlowFilepenalize(FlowFile)方法来对FlowFile进行惩罚。这将导致当前FlowFile在一段时间内无法被下游Processor访问。FlowFile不可访问的时间由DataFlow Manager通过在“处理器配置”对话框中设置“惩罚持续时间”设置来确定。默认值为30秒。通常,当Processor确定由于预期会自行解决的环境原因而导致无法处理数据时,将执行此操作。一个很好的例子是PutSFTP处理器,如果SFTP服务器上已经存在一个具有相同文件名的文件,它将惩罚FlowFile。在这种情况下,处理器会惩罚FlowFile并将其路由到失败。然后,DataFlow Manager可以将故障路由回相同的PutSFTP处理器。这样,如果文件存在相同的文件名,处理器将在30秒内(或DFM配置处理器配置的任何时间)不尝试再次发送文件。同时,它能够继续处理其他FlowFiles。 另一方面,“Yielding(屈服)”允许Processor开发人员向框架指示在一段时间内它将无法执行任何有用的功能。这通常发生在与远程资源进行通信的Processor上。如果Processor无法连接到远程资源,或者期望远程资源提供数据但没有数据,则处理器应调用ProcessContext.yield()方法,然后返回。这样,处理器就告诉框架它不应浪费资源来触发该处理器运行。 3.6.4. Session Rollback(会话回滚) 到目前为止,当我们讨论ProcessSession时,我们通常将它简单地认为是访问FlowFile的机制。然而,它提供了另一个非常重要的功能,即事务性。在ProcessSession上调用的所有方法都作为事务发生。当我们决定结束事务时,我们可以通过调用commit()或调用rollback()来完成。通常,这是由AbstractProcessor类处理的:如果onTrigger方法抛出异常,AbstractProcessor将捕获异常,调用session.rollback(),然后重新抛出异常。否则,AbstractProcessor将调用ProcessSession上的commit()。 然而,有时开发人员需要显式回滚会话。这可以通过调用rollback()或rollback(boolean)方法。如果使用后者,布尔值指示是否应该在将从队列中提取的FlowFile(通过ProcessSession.get方法)添加回其队列之前对其进行惩罚。 当调用rollback时,会话中对flowfile发生的任何修改都会被丢弃,这包括内容和属性修改。此外,所有的起源事件都回滚(除了通过为force参数传递true值而发出的任何发送事件)。从输入队列中提取的FlowFile随后被传输回输入队列(可选地惩罚),以便可以再次处理它们。 当调用commit方法时,FlowFile的新状态将持久化到FlowFile存储库中,而发生的任何起源事件将持久化到源存储库中。前一个content被销毁(除非另一个FlowFile引用相同的content片段),FlowFile被传输到出站队列,以便下一个处理器可以对数据进行操作。 注意org.apache.nifi.annotation.behavio的SupportsBatching注解对此行为的影响也很重要。如果Processor用了这个注解,对ProcessSession.commit的调用可能不会立即生效。而是,将这些提交打包在一起以提供更高的吞吐量。但是,如果处理器回滚ProcessSession,自最后一次调用commit以来的所有更改都将被丢弃,所有“批处理”提交都将生效。这些“批处理”提交不会回滚。 3.7. 通用设计考虑 在设计处理器时,需要考虑一些重要的设计。这一部分介绍开发人员在创建处理器时应该考虑的一些方面。 3.7.1. 考虑用户 开发处理器(或任何其他组件)时要记住的最重要的概念之一就是用户体验。重要的是要记住,作为组件的开发者,您对上下文有重要的了解。应始终提供文档,以便那些不熟悉该过程的人能够轻松使用它。 考虑用户体验时,还必须注意一致性。最好遵循标准的命名约定。对于处理器名称,属性名称和值,关系名称以及用户将遇到的任何其他方面,都是如此。 简单至上!避免添加您不希望用户理解或更改的属性。作为开发人员,我们被告知硬编码不好。但这有时会使开发人员暴露一些属性,这些属性在要求说明时一般会告诉用户只保留默认值即可。但这会对用户造成困扰。 3.7.2. 内聚和可重用性 为了制作一个单一的,有凝聚力的单元,有时会吸引开发人员将多个功能组合到一个处理器中。当处理器希望输入数据的格式为X以便处理器可以将数据转换为格式Y并将新格式化的数据发送到某些外部服务时,情况就是如此。 采用这种格式化特定端点的数据然后将数据发送到同一处理器中的该端点的方法有几个缺点: • • 处理器变得非常复杂,因为它必须执行数据转换任务以及将数据发送到远程服务的任务。 如果处理器无法与远程服务进行通信,它将把数据路由到failure关系。在这种情况下,处理器将负责再次执行数据转换。如果再次失败,则还会再次转换。 • 如果我们有五个不同的处理器在发送数据之前将传入的数据转换为这种新格式,那么我们将有很多重复的代码。例如,如果架构更改,则必须更新许多处理器。 • 当处理器完成向远程服务的发送后,这些中间数据将被丢弃。中间数据格式可能对其他处理器很有用。 为了避免这些问题,并使处理器具有更高的可重用性,处理器应始终坚持“做一件事情,做好事”的原 则。这样的处理器应分为两个单独的处理器:一个将数据从格式X转换为格式Y,另一个处理器将数据发送到远程资源。 3.7.3. 命名约定 为了向用户提供一致的外观,建议处理器遵循标准的命名约定。以下是使用的标准约定的列表: • 从远程系统提取数据的处理器被命名为Get • • • 将数据推送到远程系统的处理器称为Put 3.7.4. Processor行为注解 在创建处理器时,开发人员能够向框架提供关于如何最有效地利用处理器的提示。这是通过将注解应用到处理器的类来实现的。可以应用于处理器的注释存在于org.apache.nifi.annotation的三个子包中。document子包用于向用户提供文档。lifecycle子包指示框架应该在处理器上调用哪些方法,以便响应适当的生命周期事件。behavior子包帮助框架理解如何在调度和一般行为方面与处理器交互。 org.apache.nifi.annotatio.behaviorn包的以下注解用来修改框架将如何处理你的处理器: • EventDriven:指示框架可以使用事件驱动的调度策略来调度处理器。此策略目前仍处于试验阶段,但可能会导致在不处理极高数据速率的数据流上降低资源利用率。 • SideEffectFree:表示处理器在NiFi外部没有任何副作用。框架可以使用相同的输入多次调用处理器,而不会导致任何意外结果。意味着幂等行为。框架可以通过执行一些操作来提高效率,比如将一个ProcessSession从一个处理器转移到另一个处理器,这样,如果出现问题,许多处理器的操作可以回滚并再次执行。 • SupportsBatching:此注释表明框架可以将多个ProcessSession提交批处理为一个提交。如果存在此注释,则用户将能够在“处理器的调度”选项卡中选择是偏爱高吞吐量还是较低的延迟。该注释应适用于大多数处理器,但警告:如果处理器调用ProcessSession.commit,则不能保证数据已安全地存储在NiFi的Content,FlowFile和Provenance存储库中。结果,对于那些从外部源接收数据,提交会话然后删除远程数据或确认与远程资源进行事务的处理器来说,这是不合适的。 • TriggerSerially:当这个注释出现时,框架将不允许用户调度多个并发线程来一次执行onTrigger方法。相反,线程数(“并发任务”)总是设置为1。但是,这并不意味着处理器不必是线程安全的,因为执行onTrigger的线程可能在调用之间发生变化。 • PrimaryNodeOnly:群集时,Apache NiFi为处理器提供了两种执行模式:“主节点”和“所有节点”。尽管在所有节点上运行可提供更好的并行性,但已知某些处理器在多个节点上运行时会导致意外行为。例如,某些处理器列出或从远程文件系统读取文件。如果此类处理器计划在“所有节点”上运行,则将导致不必要的重复甚至错误。此类处理器应使用此注释。应用此注释将限制处理器仅在“主节点”上运行。 • TriggerWhenAnyDestinationAvailable:默认情况下,如果任何出站队列已满,NiFi不会调度Processor运行。这使得背压作用于整个处理器链。但有时候,即使出站队列之一已满,某些处理器也需要运行。此注解指示只要有任意关系“可用”( 如果所有使用该关系的连接都未满,则该关系被称为“可用”。),则Processor应该继续运行。例如,DistributeLoad Processor使用此注解。如果使用“round robin(循环)”调度策略,则如果任何出站队列已满,则处理器将不会运行。但是,如果使用“next available(下一个可用)”调度策略,处理器将在任何关系可用的情况下运行,并将FlowFile路由到那些可用的关系。 • TriggerWhenEmpty:默认仅当处理器的输入队列中至少有一个FlowFile或处理器没有输入队列(通常是“源”处理器)时,才触发处理器运行。应用此批注将导致框架忽略输入队列的大小,并触发处理器,而不管输入队列上是否有数据。例如,这在需要触发处理器定期运行,以使网络连接不会超时时很有用。 • InputRequirement:默认情况下,所有处理器都将允许用户为处理器创建传入连接,但是如果用户未创建传入连接,则该处理器仍然有效并且可以安排运行。但是,对于源处理器,这可能会使用户感到困惑,并且用户可能会尝试将FlowFiles发送到该处理器,仅使FlowFiles排队而不进行处理。相反,如果处理器期望传入的FlowFiles,但没有输入队列,则处理器将被调度运行,但不执行任何工作,因为它接收不到FlowFile,这也有点乱。因此,我们可以使用@InputRequirement注解,并为其提供 一个INPUT_REQUIRED,INPUT_ALLOWED或INPUT_FORBIDDEN值。告诉框架何时应该使处理器无效或用户是否为Processor建立连接的信息。例如,如果一个处理器带有 InputRequirement(Requirement.INPUT_FORBIDDEN)注解,则用户甚至无法创建以该处理器为目标的连接。 3.7.5. Data Buffering(数据缓冲) NiFi提供了通用的数据处理功能。数据可以是任意格式。处理器通常由几个线程调度。NiFi新手常犯的一个错误是将FlowFile中的所有内容都缓冲到内存中。虽然在某些情况下需要这样做,但是应该尽可能避免这样做,除非知道数据的格式。例如,负责对XML文档执行XPath的处理器需要将数据的全部内容加载到内存中。这通常是可以接受的,因为XML不会特别大。但是,搜索特定字节序列的Processor可以用来搜索几百gb或更多的文件。试图将其加载到内存中会导致很多问题,特别是当多个线程同时处理不同的流文件时。 与其将数据缓冲到内存中,不如在从内容存储库流式读数据时进行处理。此时,我们不希望从Content Repository中读取每个字节,因此我们将使用BufferedInputStream或其它方式缓冲少量数据。 3.8. Controller Services ControllerService接口允许开发人员以一种干净和一致的方式跨JVM共享功能和状态。该接口类似于Processor接口,但没有onTrigger方法,因为控制器服务不用定期调度运行,而且控制器服务没有RelationShip,因为它们没有直接集成到Flow中。但是,Processor、Reporting Task和其他ControllerService务使用它们。 3.8.1. 开发ControllerService 与Processor接口一样,ControllerService接口公开了用于配置、验证和初始化的方法。除了初始化方法的传入参数是ControllerServiceInitializationContext而不是ProcessorInitializationContext之外,这些方法都与Processor接口的方法相同。 ControllerService有Procesor所没有的附加约束。控制器服务必须扩展ControllerService接口。Processor只能引用接口来与ControllerService交互,而不会获得其具体实现。 这个约束主要是因为Processor可以存在于一个NAR中,而Processor所使用的ControllerService的实现可以存在于另一个NAR中。这是由FrameWork通过以如下方式动态实现公开的接口来实现的:框架可以切换到适当的ClassLoader并在具体实现上调用所需的方法。Processor和Controller Service实现必须共享Controller Service接口的相同定义。因此,这两个NAR必须都依赖于容纳Controller Service接口的NAR。有关更多信息,请参见NiFi档案(NAR)。 3.8.2. 与ControllerService交互 ControllerServices可以被Processor、其它ControllerService或ReportingTask通过 ControllerServiceLookup或使用PropertyDescriptor的Builder类的identifiesControllerService方法来获得。Processor可以从传递给initialize方法的ProcessorInitializationContext中获取ControllerServiceLookup。类 似地,它是由ControllerService从ControllerServiceInitializationContext获得的,ReportingTask通过传递给initialize方法的ReportingConfiguration对象得到。 大多数情况下,首选方法是使用PropertyDescriptor的Builder的identifiesControllerService方法,这也是最简单的方法。为了使用这个方法,我们创建了一个引用ControllerService的PropertyDescriptor,如下: public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name(\"SSL Context Service\") .description(\"Specified the SSL Context Service that can be used to create secure connections\") .required(true) .identifiesControllerService(SSLContextService.class) .build(); 使用此方法,将提示用户提供应该使用的SSL上下文服务。这是通过向用户提供一个下拉菜单来实现的,用户可以从中选择已配置的任何SSLContextService配置,而不关心具体实现。 为了使用这项服务,处理器可以使用以下代码: final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE) .asControllerService(SSLContextService.class); 请注意,这SSLContextService是扩展ControllerService的接口。目前唯一的实现是StandardSSLContextService。当然,处理器开发人员不必关心此细节。 3.9. Reporting Tasks 到目前为止,我们很少提到如何向外界传达NiFi及其组件的性能。系统是否能够跟上传入的数据速率?系统还能处理多少?在一天的高峰时间和一天中最不繁忙的时间处理了多少数据? 为了回答这些问题以及更多的问题,NiFi通过ReportingTask接口提供了向外部服务报告状态、统计、度量和监视信息的功能。ReportingTasks给出了大量信息,来确定系统的执行情况。 3.9.1. 开发Reporting Task 与Processor和ControllerService接口一样,ReportingTask接口公开了用于配置、验证和初始化的方法。这些方法都与Processor和ControllerService接口的方法相同(除了initialize方法传入参数为 ReportingConfiguration对象)。ReportingTask还有一个onTrigger方法,框架会调用该方法来触发任务来执行其任务。 在onTrigger方法中,ReportingTask被授予对ReportingContext的访问权,可以从中获得关于NiFi实例的配置和信息。允许查询BulletinRepository中的Bulletin,提交提交自己的Bulletin,从而把信息展现给用户。可通过上下文访问的ControllerServiceLookup提供对已配置的ControllerServices的访问。但是,这种获取Controller Services的方法不是首选方法。首选方法在PropertyDescriptor中引用控制器服务,如在与ControllerService交互中所讨论的。 通过ReportingContext公开的EventAccess对象提供对ProcessGroupStatus的访问, ProcessGroupStatus公开过去五分钟内由Processor组、Procesor、Connection和其他组件处理的数据量的统计信息。另外,EventAccess对象提供对ProvenanceEvent记录的访问,这些记录已经存储在 ProvenanceEventRepository中。当数据从外部源接收、发送到外部服务、从系统中删除、修改或路由时,Processor将发出这些事件。 每个ProvenanceEvent都有FlowFile的ID,事件的类型,事件的创建时间以及在组件访问FlowFile时与FlowFile关联的所有FlowFile属性以及与之关联的FlowFile属性。为ReportingTasks提供了大量信息,从而允许以多种不同的方式生成报告,以暴露所需的指标和监视功能。 3.10. UI 扩展 NiFi提供了两个UI扩展点: • • 自定义Processor用户界面 Content查看器 可以创建自定义UI,以提供大多数处理器设置中可用的标准属性/值表以外的配置选项。具有自定义UI 的处理器示例包括UpdateAttribute和JoltTransformJSON。 可以创建Content查看器来扩展可在NiFi中查看的数据类型。NiFi在lib目录中随附NAR,其中包含内容查看器,用于数据类型(例如csv,xml,avro,json(标准nar))和图像类型(例如png,jpeg和gif(media-nar))。 3.10.1. 自定义Processor UIs 为Processor添加自定义UI: 1. 创建UI. 2. 构建打包WAR包(在Processor到的NAR包内). 3. WAR包内的META-INF目录下需要包含一个nifi-processor-configuration 文件,此文件将 Processor合定制UI关联起来。 4. 将Processor的NAR包放到lib目录,NiFi启动时会扫描并加载。 5. 在Processor的配置界面,属性TAB页上,可以看到一个“高级”按钮,通过这个按钮可访问定制界 面. 以下是UpdateAttribute Processor的NAR包结构: nifi-update-attribute-bundle │ ├── nifi-update-attribute-model │ ├── nifi-update-attribute-nar │ ├── nifi-update-attribute-processor │ ├── nifi-update-attribute-ui │ ├── pom.xml │ └── src │ └── main │ ├── java │ ├── resources │ └── webapp │ └── css │ └── images │ └── js │ └── META-INF │ │ └── nifi-processor-configuration │ └── WEB-INF │ └── pom.xml nifi-processor-configuration 文件内容如下: org.apache.nifi.processors.attributes.UpdateAttribute:${project.groupId}:nifi-update-attribute-nar:${project.version} 当然,ControllerService和ReportingTask也可以定制界面。 3.10.2. Content Viewers(Content查看器) 添加一个Content查看器: 1. 构建打包WAR包(在Processor的NAR包内)。 2. WAR包的META-INF目录下需要一个名为nifi-content-viewer的文件,该文件列出了支持的content type。 3. 将Processor的NAR包放到lib目录,NiFi启动时会扫描并加载 4. 当遇到匹配的content type时,content查看器将生成适当的视图。 以下是标准content查看器的NAR包结构: nifi-standard-bundle │ ├── nifi-jolt-transform-json-ui │ ├── nifi-standard-content-viewer │ ├── pom.xml │ └── src │ └── main │ ├── java │ ├── resources │ └── webapp │ └── css │ └── META-INF │ │ └── nifi-content-viewer │ └── WEB-INF │ ├── nifi-standard-nar │ ├── nifi-standard-prioritizers │ ├── nifi-standard-processors │ ├── nifi-standard-reporting-tasks │ ├── nifi-standard-utils │ └── pom.xml nifi-content-viewer 文件内容如下: application/xml application/json text/plain text/csv avro/binary application/avro-binary application/avro+binary 3.11. 命令行工具 3.11.1. tls-toolkit The Client/Server mode of operation came about from the desire to automatically generate required TLS configuration artifacts without needing to perform that generation in a centralized place. This simplifies configuration in a clustered environment. Since we don’t necessarily have a central place to run the generation logic or a trusted Certificate Authority, a shared secret is used to authenticate the clients and server to each other. The tls-toolkit prevents man in the middle attacks using HMAC verification of the public keys of the CA server and the CSR the client sends. A shared secret (the token) is used as the HMAC key. The basic process goes as follows: 1. The client generates a KeyPair. 2. The client generates a request json payload containing a CSR and an HMAC with the token as the key and the CSR’s public key fingerprint as the data. 3. The client connects to the CA Hostname at the https port specified and validates that the CN of the CA’s certificate matches the hostname (NOTE: because we don’t trust the CA at this point, this adds NO security, it is just a way to error out early if possible). 4. The server validates the HMAC from the client payload using the token as the key and the CSR’s public key fingerprint as the data. This proves that the client knows the shared secret and that it wanted a CSR with that public key to be signed. (NOTE: a man in the middle could forward this on but wouldn’t be able to change the CSR without invalidating the HMAC, defeating the purpose). 5. The server signs the CSR and sends back a response json payload containing the certificate and an HMAC with the token as the key and a fingerprint of its public key as the data. 6. The client validates the response HMAC using the token as the key and a fingerprint of the certificate public key supplied by the TLS session. This validates that a CA that knows the shared secret is the one we are talking to over TLS. 7. The client verifies that the CA certificate from the TLS session signed the certificate in the payload. 8. The client adds the generated KeyPair to its keystore with the certificate chain and adds the CA certificate from the TLS connection to its truststore. 9. The client writes out the configuration json containing keystore, truststore passwords and other details about the exchange. 3.12. Testing Testing the components that will be used within a larger framework can often be very cumbersome and tricky. With NiFi, we strive to make testing components as easy as possible. In order to do this, we have created a nifi-mock module that can be used in conjunction with JUnit to provide extensive testing of components. The Mock Framework is mostly aimed at testing Processors, as these are by far the most commonly developed extension point. However, the framework does provide the ability to test Controller Services as well. Components have typically been tested by creating functional tests to verify component behavior. This is done because often a Processor will consist of a handful of helper methods but the logic will largely be encompassed within the onTrigger method. The TestRunner interface allows us to test Processors and Controller Services by converting more \"primitive\" objects such as files and byte arrays into FlowFiles and handles creating the ProcessSessions and ProcessContexts needed for a Processor to do its job, as well as invoking the necessary lifecycle methods in order to ensure that the Processor behaves the same way in the unit tests as it does in production. 3.12.1. Instantiate TestRunner Most unit tests for a Processor or a Controller Service start by creating an instance of the TestRunner class. In order to add the necessary classes to your Processor, you can use the Maven dependency: We create a new TestRunner by calling one of the static newTestRunner methods of the TestRunners class (located in the org.apache.nifi.util package). These methods take an argument for the Processor under test (can either be the class of the Processor to test or can be an instance of a Processor), and allow the setting of the processor name as well. 3.12.2. Add ControllerServices After creating a new Test Runner, we can add any Controller Services to the Test Runner that our Processor will need in order to perform its job. We do this by calling the addControllerService method and supply both an identifier for the Controller Service and an instance of the Controller Service. If the Controller Service needs to be configured, its properties can be set by calling the setProperty(ControllerService, PropertyDescriptor, String), setProperty(ControllerService, String, String), or setProperty(ControllerService, PropertyDescriptor, AllowableValue) method. Each of these methods returns a ValidationResult. This object can then be inspected to ensure that the property is valid by calling isValid. Annotation data can be set by calling the setAnnotationData(ControllerService, String) method. We can now ensure that the Controller Service is valid by calling assertValid(ControllerService) - or ensure that the configured values are not valid, if testing the Controller Service itself, by calling assertNotValid(ControllerService). Once a Controller Service has been added to the Test Runner and configured, it can now be enabled by calling the enableControllerService(ControllerService) method. If the Controller Service is not valid, this method will throw an IllegalStateException. Otherwise, the service is now ready to use. 3.12.3. Set Property Values After configuring any necessary Controller Services, we need to configure our Processor. We can do this by calling the same methods as we do for Controller Services, without specifying any Controller Service. I.e., we can call setProperty(PropertyDescriptor, String), and so on. Each of the setProperty methods again returns a ValidationResult property that can be used to ensure that the property value is valid. Similarly, we can also call assertValid() and assertNotValid() to ensure that the configuration of the Processor is valid or not, according to our expectations. 3.12.4. Enqueue FlowFiles Before triggering a Processor to run, it is usually necessary to enqueue FlowFiles for the Processor to process. This can be achieved by using the enqueue methods of the TestRunner class. The enqueue method has several different overrides, and allows data to be added in the form of a byte[], InputStream, or Path. Each of these methods also supports a variation that allows a Map Additionally, there is an enqueue method that takes a var-args of FlowFile objects. This can be useful, for example, to obtain the output of a Processor and then feed this to the input of the Processor. 3.12.5. Run the Processor After configuring the Controller Services and enqueuing the necessary FlowFiles, the Processor can be triggered to run by calling the run method of TestRunner. If this method is called without any arguments, it will invoke any method in the Processor with an @OnScheduled annotation, call the Processor’s onTrigger method once, and then run the @OnUnscheduled and finally @OnStopped methods. If it is desirable to run several iterations of the onTrigger method before the other @OnUnscheduled and @OnStopped life-cycle events are triggered, the run(int) method can be used to specify how many iterations of onTrigger should be called. There are times when we want to trigger the Processor to run but not trigger the @OnUnscheduled and @OnStopped life-cycle events. This is useful, for instance, to inspect the Processor’s state before these events occur. This can be achieved using the run(int, boolean) and passing false as the second argument. After doing this, though, calling the @OnScheduled life-cycle methods could cause an issue. As a result, we can now run onTrigger again without causing these events to occur by using the run(int,boolean,boolean) version of the run method and passing false as the third argument. If it is useful to test behavior that occurs with multiple threads, this can also be achieved by calling the setThreadCount method of TestRunner. The default is 1 thread. If using multiple threads, it is important to remember that the run call of TestRunner specifies how many times the Processor should be triggered, not the number of times that the Processor should be triggered per thread. So, if the thread count is set to 2 but run(1) is called, only a single thread will be used. 3.12.6. Validate Output After a Processor has finished running, a unit test will generally want to validate that the FlowFiles went where they were expected to go. This can be achieved using the TestRunners assertAllFlowFilesTransferred and assertTransferCount methods. The former method takes as arguments a Relationship and an integer to dictate how many FlowFiles should have been transferred to that Relationship. The method will fail the unit test unless this number of FlowFiles were transferred to the given Relationship or if any FlowFile was transferred to any other Relationship. The assertTransferCount method validates only that the FlowFile count was the expected number for the given Relationship. After validating the counts, we can then obtain the actual output FlowFiles via the getFlowFilesForRelationship method. This method returns a List because MockFlowFile comes with many methods for validating the contents. For example, MockFlowFile has methods for asserting that FlowFile Attributes exist (assertAttributeExists), asserting that other attributes are not present (assertAttributeNotExists), or that Attributes have the correct value (assertAttributeEquals, assertAttributeNotEquals). Similar methods exist for verifying the contents of the FlowFile. The contents of a FlowFile can be compared to a byte[], and InputStream, a file, or a String. If the data is expected to be textual, the String version is preferred, as it provides a more intuitive error message if the output is not as expected. 3.12.7. Mocking External Resources One of the biggest problems when testing a NiFi processor that connects to a remote resource is that we don’t want to actually connect to some remote resource from a unit test. We can stand up a simple server ourselves in the unit test and configure the Processor to communicate with it, but then we have to understand and implement the server-specific specification and may not be able to properly send back error messages, etc. that we would like for testing. Generally, the approach taken here is to have a method in the Processor that is responsible for obtaining a connection or a client to a remote resource. We generally mark this method as protected. In the unit test, instead of creating the TestRunner by calling TestRunners.newTestRunner(Class) and providing the Processor class, we instead create a subclass of the Processor in our unit test and use this: @Test public void testConnectionFailure() { final TestRunner runner = TestRunners.newTestRunner(new MyProcessor() { protected Client getClient() { // Return a mocked out client here. return new Client() { public void connect() throws IOException { throw new IOException(); } // ... // other client methods // ... }; } }); // rest of unit test. } This allows us to implement a Client that mocks out all of the network communications and returns the different error results that we want to test, as well as ensure that our logic is correct for handling successful calls to the client. 3.12.8. Additional Testing Capabilities In addition to the above-mentioned capabilities provided by the testing framework, the TestRunner provides several convenience methods for verifying the behavior of a Processor. Methods are provided for ensuring that the Processor’s Input Queue has been emptied. Unit Tests are able to obtain the ProcessContext, ProcessSessionFactory, ProvenanceReporter, and other framework-specific entities that will be used by the TestRunner. The shutdown method provides the ability to test Processor methods that are annotated to be run only on shutdown of NiFi. Annotation Data can be set for Processors that make use of Custom User Interfaces. Finally, the number of threads that should be used to run the Processor can be set via the setThreadCount(int) method. 3.13. NiFi Archives (NARs) When software from many different organizations is all hosted within the same environment, Java ClassLoaders quickly become a concern. If multiple components have a dependency on the same library but each depends on a different version, many problems arise, typically resulting in unexpected behavior or NoClassDefFoundError errors occurring. In order to prevent these issues from becoming problematic, NiFi introduces the notion of a NiFi Archive, or NAR. A NAR allows several components and their dependencies to be packaged together into a single package. The NAR package is then provided ClassLoader isolation from other NAR packages. Developers should always deploy their NiFi components as NAR packages. To achieve this, a developer creates a new Maven Artifact, which we refer to as the NAR artifact. The packaging is set to nar. The dependencies section of the POM is then created so that the NAR has a dependency on all NiFi Components that are to be included within the NAR. In order to use a packaging of nar, we must use the nifi-nar-maven-plugin module. This is included by adding the following snippet to the NAR’s pom.xml: In the Apache NiFi codebase, this exists in the NiFi root POM from which all other NiFi artifacts (with the exception of the nifi-nar-maven-plugin itself) inherit, so that we do not need to include this in any of our other POM files. The NAR is able to have one dependency that is of type nar. If more than one dependency is specified that is of type nar, then the nifi-nar-maven-plugin will error. If NAR A adds a dependency on NAR B, this will not result in NAR B packaging all of the components of NAR A. Rather, this will add a Nar-Dependency-Id element to the MANIFEST.MF file of NAR A. This will result in setting the ClassLoader of NAR B as the Parent ClassLoader of NAR A. In this case, we refer to NAR B as the Parent of NAR A. This linkage of Parent ClassLoaders is the mechanism that NiFi uses in order to enable Controller Services to be shared across all NARs. As mentioned in the Developing a ControllerService section, A Controller Service must be separated into an interface that extends ControllerService and an implementation that implements that interface. Controller Services can be referenced from any Processor, regardless of which NAR it is in, as long as both the Controller Service Implementation and the Processor share the same definition of the Controller Service interface. In order to share this same definition, both the Processor’s NAR and the Controller Service Implementation’s NAR must have as a Parent the Controller Service definition’s NAR. An example hierarchy may look like this: Controller Service NAR Layout root ├── my-controller-service-api │ ├── pom.xml │ └── src │ └── main │ └── java │ └── org │ └── my │ └── services │ └── MyService.java │ ├── my-controller-service-api-nar │ └── pom.xml │ │ │ ├── my-controller-service-impl │ ├── pom.xml │ └── src │ ├── main │ │ ├── java │ │ │ └── org │ │ │ └── my │ │ │ └── services │ │ │ └── MyServiceImpl.java │ │ └── resources │ │ └── META-INF │ │ └── services │ │ └── org.apache.nifi.controller.ControllerService │ └── test │ └── java │ └── org │ └── my │ └── services │ └── TestMyServiceImpl.java │ │ ├── my-controller-service-nar │ └── pom.xml │ │ └── other-processor-nar └── pom.xml This POM file has a type of nar. It has a dependency on nifi-standard-services-api-nar. This POM file is of type jar. It has a dependency on my- controller-service-api. It does not have a dependency on any nar artifacts. This POM file has a type of nar. It has a dependency on my-controller-service-api-nar. While these may seem very complex at first, after creating such a hierarchy once or twice, it becomes far less complicated. Note here that the my-controller-service-api-nar has a dependency on nifi-standard-services-api-nar. This is done so that any NAR that has a dependency on my-controller-service-api-nar will also be able to access all of the Controller Services that are provided by the nifi-standard-services-api-nar, such as the SSLContextService. In this same vein, it is not necessary to create a different \"service-api\" NAR for each service. Instead, it often makes sense to have a single \"service-api\" NAR that encapsulates the APIs for many different Controller Services, as is done by the nifi-standard-services-api-nar. Generally, the API will not include extensive dependencies, and as a result, ClassLoader isolation may be less important, so lumping together many API artifacts into the same NAR is often acceptable. 3.14. Per-Instance ClassLoading A component developer may wish to add additional resources to the component’s classpath at runtime. For example, you may want to provide the location of a JDBC driver to a processor that interacts with a relational database, thus allowing the processor to work with any driver rather than trying to bundle a driver into the NAR. This may be accomplished by declaring one or more PropertyDescriptor instances with dynamicallyModifiesClasspath set to true. For example: PropertyDescriptor EXTRA_RESOURCE = new PropertyDescriptor.Builder() .name(\"Extra Resources\") .description(\"The path to one or more resources to add to the classpath.\") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) .dynamicallyModifiesClasspath(true) .build(); When these properties are set on a component, the framework identifies all properties where dynamicallyModifiesClasspath is set to true. For each of these properties, the framework attempts to resolve filesystem resources from the value of the property. The value may be a comma-separated list of one or more directories or files, where any paths that do not exist are skipped. If the resource represents a directory, the directory is listed, and all of the files in that directory are added to the classpath individually. Each property may impose further restrictions on the format of the value through the validators. For example, using StandardValidators.FILE_EXISTS_VALIDATOR restricts the property to accepting a single file. Using StandardValidators.NON_EMPTY_VALIDATOR allows any combination of comma-separated files or directories. Resources are added to the instance ClassLoader by adding them to an inner ClassLoader that is always checked first. Anytime the value of these properties change, the inner ClassLoader is closed and re-created with the new resources. NiFi provides the @RequiresInstanceClassLoading annotation to further expand and isolate the libraries available on a component’s classpath. You can annotate a component with @RequiresInstanceClassLoading to indicate that the instance ClassLoader for the component requires a copy of all the resources in the component’s NAR ClassLoader. When @RequiresInstanceClassLoading is not present, the instance ClassLoader simply has its parent ClassLoader set to the NAR ClassLoader, rather than copying resources. The @RequiresInstanceClassLoading annotation also provides an optional flag `cloneAncestorResources'. If set to true, the instance ClassLoader will include ancestor resources up to the first ClassLoader containing a controller service API referenced by the component, or up to the Jetty NAR. If set to false, or not specified, only the resources from the component’s NAR will be included. Because @RequiresInstanceClassLoading copies resources from the NAR ClassLoader for each instance of the component, use this capability judiciously. If ten instances of one component are created, all classes from the component’s NAR ClassLoader are loaded into memory ten times. This could eventually increase the memory footprint significantly when enough instances of the component are created. Additionally, there are some restrictions when using @RequiresInstanceClassLoading when using Controller Services. Processors, Reporting Tasks, and Controller Services can reference a Controller Service API in one of its Property Descriptors. An issue may arise when the Controller Service API is bundled in the same NAR with a component that references it or with the Controller Service implementation. If either of these cases are encountered and the extension requires instance classloading, the extension will be skipped and an appropriate ERROR will be logged. To address this issue, the Controller Service API should be bundled in a parent NAR. The service implementation and extensions that reference that service should depend on the Controller Service API NAR. Please refer to the Controller Service NAR Layout in the NiFi Archives (NARs) section. Anytime a Controller Service API is bundled with an extension that requires it, even if @RequiresInstanceClassLoading isn’t used, a WARNING will be logged to help avoid this bad practice. 3.15. Deprecating a Component Sometimes it may be desirable to deprecate a component. Whenever this occurs the developer may use the @DeprecationNotice annotation to indicate that a component has been deprecated, allowing the developer to describe a reason for the deprecation and suggest alternative components. An example of how to do this can be found below: @DeprecationNotice(alternatives = {ListenSyslog.class}, classNames = {\"org.apache.nifi.processors.standard.ListenRELP\reason = \"Technology has been superseded\ public class ListenOldProtocol extends AbstractProcessor { As you can see, the alternatives can be used to define and array of alternative Components, while classNames can be used to represent the similar content through an array of strings. 4. 三大仓库 4.1. 介绍 FlowFiles是NiFi设计的核心。FlowFile是一个数据记录,由指向其Content的指针和一组属性组成,并关联了Provenance事件。无论是FlowFile、Content还是Provenance事件,NiFi都为其设计了专门的Repository来进行数据存储。 4.2. Repository NiFi提供了三个存储库,并提供特定的功能。这三个存储库都是本地存储中NiFi用于保存数据的目录。 • • • FlowFile Repository: 存储Flow中当前FlowFiles的所有元数据。 Content Repository:存储当前和过去FlowFiles的Content。 Procenance Repository:保存FlowFiles的历史事件。 4.2.1. FlowFile Repository 系统当前郑处理的FlowFiles会在JVM的内存中以HASH Map存储,这种内存处理效率很高。但如遇系统功能宕机、断电等情况,则内存数据将丢失,NiFi重启后,将无法延续重启前的处理工作。为此,NiFi提供了持久化的跨进程重启机制,即FlowFile Repository,FlowFile Repository存储当前每个FlowFile的元数据的ahead log(或叫数据记录),FlowFile的元数据包括所有与其相关的属性,以及一个指向FlowFile的Content(保存在Content Repository中)和状态的指针,例如FlowFile所属的Connection/Queue。这为NiFi提供给处理插重启、系统故障等提供了可能。 FlowFile在NiFi中流转时,每个变化都记录到FlowFil Repository中,这样系统在处理数据时准确地知道节点处于哪个的步骤,如果节点在处理数据时宕机,那么重新启动时可以很容易地从停止的地方继续处理。Ahead Log中是以FlowFile的一系列变化增量。NiFi通过恢复流文件的“快照”来恢复流文件,然后重新播放这些增量。 系统会定期自动为每一个FlowFile创建新快照,系统会将内存中的每个FlowFile写到名为”.partial”的文件中来计算新的检查点(checkpoint)。随着检查点处理的进行,新的FlowFile基线写入到“.partial”文件。一旦这个工作完成,旧的快照文件就会被删除,同时将“.partial”文件被重命名为”snaphot”。 两次快照时间间隔可以在nifi.properties中进行配置,默认为两分钟。 当Connection Queue中的FlowFiles的数量达到一定限制时(通过nifi.queue.swap.threshold设置), 将Queue中优先级较低的FlowFiles序列化到磁盘,并从内存的HashMap中移除,Connection Queue负责觉得何时再将其读取到内存。 4.2.2. Content Repository(内容仓库) Content Repository存储FlowFile的Content,通常是三个仓库中最大的。其设计理念时,将FlowFile的Content保存在磁盘,需要的时候再读取到JVM内存中。 与JVM的垃圾回收机制类似,NiFi也有专门线程分析ContentRepository中是否有不再使用的Content。当Content被标记为不再使用时,就会被删除或归档(是否启用归档,在nifi.properties中配置)。如果启用了归档,则Content不会被删除,直到过期或由于ContentRepository占用空间过大被删除。存档和/或删除的条件在“ nifi.properties”文件中配置(“ nifi.content.repository.archive.max.retention.period”,“ nifi.content.repository.archive.max.usage.percentage”) 4.2.3. ProvenanceRepository来源库 ProcenanceRepository用来存储FlowFile的历史。该历史记录了每段数据的沿袭。当FlowFile被created、forked、cloned、modified等时,都会创建一个Provenance Event。此Provenance Event是FlowFile的快照。当一个ProvenanceEvent被创建,将复制所有的FlowFile属性和指向FlowFile Content的指针,并与FlowFile的状态(例如它与其他Provenanve Event的关系)一起聚合到Provenance报告中。除非数据已过期,否则此快照将不会更改。ProvenanceRepository在一定周期内(可在nifi.properties中设定)保存Provenance Event。 由于Procenance Repository中保存了所有FlowFile的属性和指向FlowFile Content的指针,DataFlow Manager不仅可以看到数据沿袭历史,还可以查看数据本身,甚至从Flow中任意点对数据进行回放。典型示例,当一个下游系统说没有收到数据,那么通过数据沿袭可以准确显示数据何时交付到下游系统、是什么数据、文件名和被发送到的URL是什么的等,或者确认数据确实没有送达下游系统。然后可以通过回放功能将数据重新发送到特定下游系统。 4.2.4. 通用说明 4.2.4.1. 多个物理存储点 ProvenanceRepository和ContentRepository可以选择在多个物理分区上进行存储,逻辑上是一个Repository,系统将自动在多个卷/分区之间划分写入。存储目录在“ nifi.properties”文件中指定。 4.2.4.2. 最佳实践 尽可能少地分析FlowFile的Copntent,而是从内容中提取关键信息到FlowFile的属性中。然后从FlowFile属性读取/写入信息。一个示例是ExtractText处理器,该处理器从FlowFile内容中提取文本并将其 作为属性放置,以便其他处理器可以使用它。与连续处理FlowFile的整个Content相比,这提供了更好的性能,因为到每个属性中存储的数据量,更新FlowFile存储库比更新Content存储库要快得多。 因篇幅问题不能全部显示,请点此查看更多更全内容