Apache NiFi

Monday Oct 29, 2018

Load Balancing Across the Cluster

Load Balancing Data Across a NiFi Cluster

Mark Payne -  @dataflowmark

With each release of Apache NiFi, we tend to see at least one pretty powerful new application-level feature, in addition to all of the new and improved Processors that are added. And the latest release of NiFi, version 1.8.0, is no exception! Version 1.8.0 brings us a very powerful new feature, known as Load-Balanced Connections, which makes it much easier to move data around a cluster. Prior to this feature, when a user needs to spread data from one node in a cluster to all the nodes of the cluster, the best option was to use Remote Process Groups and Site-to-Site to move the data. The approach looks like this:

First, an Input Port has to be added to the Root Group. Then, a Remote Process Group has to be added to the flow in order to transfer data to that Root Group Input Port. The connection has to be drawn from the Processor to the Remote Process Group, and the Input Port has to be chosen. Next, the user will configure the specific Port within the Remote Process Group and set the Batch Size to 1 FlowFile, so that data is Round-Robin'ed between the nodes. A connection must then be made from the Root Group Input Port all the way back down, through the Process Groups, to the desired destination, so that the FlowFiles that are sent to the Remote Process Group are transferred to where they need to go in the flow. This may involve adding several more Local Input Ports to the Process Groups, to ensure that the data can flow to the correct destination. Finally, all of those newly created components have to be started.

Unfortunately, this approach has a few problems:

  • It's a lot to setup! The user doesn't just have to create the Remote Process Group, but the Input Port, any number of intermediate input ports and connections to move data from the Root Group to the desired group, and then configure and start all of this.
  • It requires an Input Port at the Root Group level and every level in between. This means that the user either needs permissions to update all of these Process Groups, or needs the assistance of someone who does. In a large, multi-tenant environment, this can become problematic.
  • This results in a disconnected graph. Rather than connecting Processor A to Processor B, we end up connecting Processor A to Remote Process Group. Then, we have a second graph that connects Input Port 1 to Processor B. Even if there are no intermediate Process Groups, it makes the graph look something like this: The intent of this, and the understanding of how data flows through the system, is far less clear than if we simply connect Processor A to Processor B.
  • This approach does not lend itself well to reuse. When a flow is built using this paradigm, and the flow is then versioned using the NiFi Flow Registry, the URL of the Remote Process Group becomes part of that Versioned Flow. When it is then imported into a second environment, the user has to be sure to update the URL of the Remote Process Group, or else risk sending data to the wrong cluster.
  • Remote Process Groups do not support node affinity. When using an RPG to transfer data between nodes, the protocol is designed to balance the load across the cluster pretty evenly. However, it does not allow sending "like data" to the same node.

So if using Remote Process Groups to distribute data across the cluster has all of these drawbacks, it begs the question: why did we ever design it this way to begin with? The simple answer here is that it was never designed as a mechanism to re-distribute data between nodes in the same cluster. It was designed as a mechanism to send data from one NiFi instance/cluster to another. Hence the naming Remote Process Group and Site-to-Site protocol. It still does a pretty good job of this (though, I would still love to see "Remote Input Ports" at all levels, not just the Root Group level - but hopefully that will come soon!) When we started seeing this constant need to distribute data across the cluster, the Remote Process Group became the answer because the capability already existed and it worked for most cases - even if not optimally.

But version 1.8.0 aims to fix all of these problems by allowing the data to be load-balanced across the cluster at the Connection level. Now, any time that a user decides to create a Connection between two components on the graph - or configure an existing connection - the user is given some new options in the Settings dialog:

We can now configure a Load Balance Strategy, to start. The default value is "Do Not Load Balance," which behaves the same as connections always have. Data stays on the node that it's on. However, we can change the value to one of several values:

  • Round Robin. Data will be spread across the cluster evenly. This will be the most commonly chosen option.
  • Partition by Attribute. This allows "like data" to be sent to the same node in the cluster. When this value is chosen, the user is able to configure the "Attribute name." Any two FlowFiles that have the same value for this attribute will be sent to the same node. This is super helpful, for example, when using a MergeContent or MergeRecord processor, and we want to ensure that data spread across the cluster is merged together based on a "customerId" field.
  • Single Node. While this will likely be the least used strategy, there are times when we want to ensure that all data in a cluster is sent to a single node. For instance, before using an EnforceOrder processor to push to a database. Or when we want to merge data together into larger batches but the data rate is fairly low, so we would rather have all the data on a single node in order to more quickly meet our minimum thresholds.

So, to make it clear how we can use these features in a full flow, let's take the following use case as an example:

We need to get data from a remote SFTP server. The data is Point-of-Sale data in CSV format and is compressed using GZIP. One of the fields in the CSV data is the Store Identifier field, "storeId." We need to use this Store Identifier to perform some enrichment, adding in the City, State, and Region where the store is located. Once we have enriched the data, we want to send the data to three destinations: HDFS for archiving, Kafka for our real-time analytics to pull from, and Elasticsearch for querying and dashboarding. We want to push to Kafka in Avro format as quickly as possible. For HDFS, we want to merge together larger batches and store as JSON. For Elasticsearch, we can also benefit from having data bundled together because their Bulk API is more efficient than sending individual messages. We can build this flow fairly simply. Our entire flow now looks like this:

The first connection, from ListSFTP to FetchSFTP uses a Load Balance Strategy of "Round Robin." We simply want to spread the data across the cluster so that we can have all nodes in the cluster working together to pull the data, decompress it, enrich it, and split it apart based on Region. The connection before MergeRecord is configured to use a Load Balance Strategy of "Partition by Attribute" and uses the "region" attribute that was added by the PartitionRecord Processor.

Now, notice the new icon on a couple of the connections:

This lets us know that the Connection is currently configured to Load Balance data across the cluster. If we hover over it, we get a little bit more information:

Now, when we start our flow, we can see the data quickly flowing. We can also see that this icon has now changed from a grey color to a blue color and the orientation is different:

This tells us that not only is Load Balancing configured for this connection but that data is currently being transferred between nodes in the cluster. This is very important information if, for example, we notice that the destination of our connection is not processing the data. If the indicator is blue, then we know that the reason the data is not being processed is probably because the data does not yet live on the correct node and is being transferred there now.

It's also important to note that if the Load Balance Strategy is changed while data is queued up in the connection, then the connection will immediately start re-balancing the data to ensure that the data goes to the correct node. For example, if "Do Not Load Balance" is configured, and then it is changed to "Round Robin," it will immediately start binning the data to go to the different nodes in the cluster, and you'll see the indicator in blue. Similarly, if the Load Balance Strategy is changed from "Round Robin" to "Partition by Attribute," the data will be partitioned and binned differently, to ensure that data ends up on the correct node. If the number of nodes in our cluster changes (i.e., a node is removed from the cluster or added to the cluster), then the data will also be re-partitioned.

Now, let's consider what happens in a failure scenario. Specifically, what happens if a node disconnects from the cluster, or if a Node A simply cannot communicate with Node B. In this case, what happens to the data that is queued up waiting to go to Node B? Will the data just queue up, or will it fail over to another node?

Well, that depends on the Load Balance Strategy configured. If "Round Robin" is used, and a failure occurs, the data will be rebalanced to another node. In this case, the data is rebalanced fairly slowly, though. Up to 1,000 FlowFiles, or 10 MB of data, will be rebalanced per second. This is done so that if the node reconnects to the cluster or is able to communicate again momentarily, that the data is not immediately redistributed to other nodes. But if that does not happen, the data is still redistributed throughout the cluster in a timely manner.

If the "Partition by Attribute" or "Single Node" strategy is used, then the data will just queue up, waiting to send, until the node is reconnected and able to communicate. This is because these strategies expect that a given piece of data go to a specific node, whereas "Round Robin" just expects that data gets spread across the cluster.

While this is a very powerful feature that will significantly improve the user experience offered, this feature also lays the groundwork for some other very exciting features. The newly added "Offload" feature makes use of this mechanism to allow a node in the cluster to be disconnected and then Offloaded. Doing this results in stopping any and all Processors on that node and distributing all data across the cluster to the still-connected nodes. This, in turn, enables easy decommissioning of a NiFi node and truly elastic clustering capabilities.

While this is just a quick walkthrough of the new feature, I hope it has piqued your interest enough to get you excited about it and get you started. As always, please don't hesitate to reach out if you have any questions or comments.

Tuesday Jul 11, 2017

Encrypted Provenance Repositories in Apache NiFi

Encrypted Provenance Repositories in Apache NiFi

Andy LoPresto -  @yolopey

There has been a surprising level of Twitter demand for more security-focused NiFi blogs. I’ll try to address this underserved market with a post about a new feature in Apache NiFi 1.2.0 – encrypted provenance repositories (NIFI-3388). As a mentor of mine often said, “You don’t understand something until you can teach it.” In this article, we’ll find out if I actually understand the code I wrote. First I’ll give a brief explanation of the provenance feature for anyone unfamiliar with it, describe how the existing implementations work, and introduce a new option that is sure to have people very excited (no guarantee of excited people actually offered).

What is Provenance?

Provenance is a term we use in the NiFi community with a very specific intent. The great bastion of knowledge, Wikipedia, defines it as “the chronology of the ownership, custody or location of a historical object.” Substitute “data” for “historical object”, and you’re on the right track for NiFi. A serious concern in any dataflow routing & management platform is “what happens to the data?” and “when?”. Only slightly less important is “how do we prove it?”

NiFi allows users to answer these questions by automatically recording everything that happens to your data on a very granular level. Think of “data lineage” as a family tree where each person has a Ken Burns biography – a very complete history of everything they did and how it relates to everyone else. Any event (data being created, ingested, routed, modified, tagged, viewed, exported, or deleted) is recorded, along with the time, the identity of the component that acted on it, where it was sent, what was changed, etc.

Provenance records listing

Not only does this extensive dataset allow users to query the history of their data, it enables some other powerful features as well.

Provenance lineage chain


Users can explore the provenance chain (the directed graph representation of the provenance events) of any flowfile to review the path it followed through the data flow. This allows both broad and focused analysis of things like timing bottlenecks, system performance, and identifying critical paths. Sometimes open-ended exploration of this data will reveal or inspire potential improvements in the flow itself.


The provenance data is also crucial to a key feature of NiFi – allowing the user to replay flowfiles. As long as the provenance data has not been aged off and the referenced content is still available in the content repository, any flowfile can be replayed from any point in the flow. This greatly tightens the flow development lifecycle – rather than a “build and deploy” cycle, this encourages rapid refinement of a flow watching live (or slightly stale, but consistent) data proceed through different branches. A metaphor we use frequently in the NiFi community is that the data, much like water, is always flowing. NiFi is less about building permanent water pipes, and more like digging irrigation ditches from a constantly-flowing river.

While replay supports rapid flow refinement, the open secret is that it was developed for a different reason. NiFi connects many disparate systems, and in an enterprise environment, these are often owned and managed by different teams. Sometimes, data coming from System A, managed by Alice’s team, flows through a NiFi instance run by Norman, and then is routed to Bob’s System B. Saturday at 0300, an urgent alert comes into Norman saying that Bob hasn’t received data for the last 3 hours. A quick check of NiFi’s stats shows that Alice’s app is still producing messages on the correct Kafka topic and NiFi is consuming and delivering the data. After some troubleshooting, the RCA (root cause analysis) is that while the data was being written to a triage directory managed by Bob’s team, their application server was down, and the triage directory has aged out half of the delivered data.

In other loosely-coupled systems, the data may be lost forever. Not only does NiFi’s provenance allow Norman to prove that the data was delivered (important for the inevitable “blame game” that will be played by managers and budget people on Monday), but then to reconcile the “we’re all on the same team” mentality and help Bob out by replaying all the data that was lost back through the same flow.

“My CPU is a neural-net processor, a learning computer”

One of the buzzwords that you can’t go 20 minutes without hearing today is “machine learning” (that’s two words). While this means different things to different people (seriously, ask a data scientist sometime if you’re having trouble falling asleep), it’s generally accepted as “the computer learning something without being explicitly programmed”. Will Song has done some fascinating research on using NiFi provenance data for anomaly detection. Rather than examining the actual content of a flowfile (which can inhabit a broad, aschematic domain or be in a format that is not easily consumed or parsed), Will took the approach of examining the provenance metadata, which is tightly defined. By identifying anomalous data in timing, routing, etc., he could build a clustering model and predictor system.

The possibilities for NiFi are impressive – from early fault detection (think identifying timing increases to predict failing external hardware or network issues), to flow recommender systems (a Markov chain lookup of the most frequent follow-on processors when Processor X is added to the flow or configured with a certain collection of attributes), to flow recovery (self-healing flows that can intelligently re-route data when a specific destination is not available or bypass an expensive enrichment step when the flow volume increases above a threshold rather than delay downstream delivery).

Uphill Both Ways

The Provenance Repository was originally developed to provide a basic storage facility and sequential iteration. As noted by Mark Payne in NIFI-3356: Provide a newly refactored provenance repository

The Persistent Provenance Repository has been redesigned a few different times over several years. The original design for the repository was to provide storage of events and sequential iteration over those events via a Reporting Task. After that, we added the ability to compress the data so that it could be held longer. We then introduced the notion of indexing and searching via Lucene. We’ve since made several more modifications to try to boost performance.

At this point, however, the repository is still the bottleneck for many flows that handle large volumes of small FlowFiles. We need a new implementation that is based around the current goals for the repository and that can provide better throughput.

The PersistentProvenanceRepository served well for a long time, and while it is still the default implementation for a vanilla installation of NiFi, Mark did amazing work building a completely redesigned backing store called the WriteAheadProvenanceRepository.

Faster, Stronger, Better

The WriteAheadProvenanceRepository (or WAPR from here on out) uses a write-ahead log for the backing EventStore, rather than writing directly to journal files as the PersistentProvenanceRepository did. By combining the EventStore, which simply reports back an EventIdentifier to locate the written data, with an EventIndex (powered by Apache Lucene), the two components can work in conjunction to provide high throughput and efficient querying and retrieval.

Out of the box, WAPR is about 3x as fast as PPR, and this scales better as more disks and CPU resources are made available. In addition, with the Lucene EventIndex, event records are immediately available for query and retrieval, as opposed to the batch processing and ingesting done by PPR.

The WAPR implementation follows the classic Java design pattern philosophy “composition over inheritance”, so the underlying EventStore field belonging to the repository contains RecordWriter and RecordReader members. By providing a factory for each of these fields during EventStore construction, the store is responsible for instantiating these objects when necessary, but the repository itself can inject the relevant behavior through DI (Dependency Injection)/IoC (Inversion of Control). This is a crucial decision that made implementing the encrypted version much easier and cleaner.

Encrypt All The Things!

That’s 8500 characters to get to the point of this article – why and how do we encrypt the provenance repository?

The why is pretty straightforward – why not? In all seriousness, provenance data contains, in addition to generic timing and routing metadata, the value of the attributes at each point in the lineage. Many of these values can be quite sensitive. While NiFi has access controls for the REST API and UI, these details are written in plaintext (or compressed via GZIP) to the backing file system. While NiFi was originally designed with the expectation that it would run on managed hardware, many users are now requesting cloud deployments, or as I call it, “storing the crown jewels in someone else’s safe”. As part of an ongoing effort to harden NiFi for deployment on remote/”untrusted” hardware, we are continually looking for exposed surfaces. I have plans to provide an encrypted version of all three repositories (content and flowfile are the other two), and provenance seemed like the place to start, as it is the most ephemeral and the WAPR was most recently written and likely to be the cleanest, learning the most from the continual community feedback.

The how isn’t much more complicated. Because of Mark’s clean architecture, I was able to extend the WAPR and intercept the RecordReaderFactory and RecordWriterFactory that were injected into the EventStore constructor and simply replace those with my own implementations. This means the EncryptedWriteAheadProvenanceRepository file is a grand total of 159 lines (the unit test is 346).

The classes we will actually examine here are the KeyProvider interface and its two (current) implementations StaticKeyProvider and FileBasedKeyProvider, the ProvenanceEventEncryptor interface and its sole (current) implementation AESProvenanceEventEncryptor, and the new implementations of EncryptedSchemaRecordReader and EncryptedSchemaRecordWriter.

Vinz Clortho, Keymaster of Gozer

Providing access to encryption keys is one of the great challenges of data protection. Handling key material protection and availability, as well as key migration, rotation, and expiry are broad and complicated topics. For the initial implementation, I focused on two providers – StaticKeyProvider and FileBasedKeyProvider. The common interface is quite simple, providing four methods (Javadoc elided):

public interface KeyProvider {
     SecretKey getKey(String keyId) throws KeyManagementException; 
     boolean keyExists(String keyId);  
     List<String> getAvailableKeyIds();   
     boolean addKey(String keyId, SecretKey key) throws OperationNotSupportedException, KeyManagementException;

The methods are fairly self-documenting – by storing the keyId alongside the encrypted data, we are able to encrypt records with varying keys over time, reducing the risk of key compromise to be more granular.

The static implementation simply contains a HashMap of key IDs to SecretKey objects. It is initialized by reading the $NIFI_HOME/conf/nifi.properties file and reading the provided keys. These keys can be in plaintext or encrypted (along with any other sensitive configuration value) using the $ ./bin/encrypt-config.sh tool provided by the NiFi Toolkit.

if (StaticKeyProvider.class.getName().equals(implementationClassName)) {
    // Get all the keys (map) from config
    if (CryptoUtils.isValidKeyProvider(implementationClassName, config.getKeyProviderLocation(), config.getKeyId(), config.getEncryptionKeys())) {
        Map<String, SecretKey> formedKeys = config.getEncryptionKeys().entrySet().stream()
                e -> {
                    try {
                        return CryptoUtils.formKeyFromHex(e.getValue());
                    } catch (KeyManagementException e1) {
                        // This should never happen because the hex has already been validated
                        logger.error("Encountered an error: ", e1);
                        return null;
        keyProvider = new StaticKeyProvider(formedKeys);
    } else {
        final String msg = "The StaticKeyProvider definition is not valid";
        throw new KeyManagementException(msg);

The file-based implementation expands on this by reading from a separate file (which can be located on a remote volume, etc.) and reading a key-value listing of key IDs and keys. The key material is Base64-encoded, encrypted and signed (using AES G/CM) hexadecimal representations.

Currently neither implementation supports the addKey method, but in the future, I expect a PKCS11-compatible HSM (Hardware Security Module) provider as well as bridges to sensitive value containers like Square’s KeyWhiz and Hashicorp’s Vault.

The KeyProvider interface and the implementations are also contained within the provenance package and module, but I expect to refactor them out to a framework-level service as part of changes in NIFI-3890: Create Key Management Controller Service.

Superfluous Interface Development

I’m kind of joking in the section title – the ProvenanceEventEncryptor interface has only four methods and only one implementation, but I wanted to ensure it could be cleanly extended in the future.

public interface ProvenanceEventEncryptor {
    void initialize(KeyProvider keyProvider) throws KeyManagementException;
    byte[] encrypt(byte[] plainRecord, String recordId, String keyId) throws EncryptionException;
    byte[] decrypt(byte[] encryptedRecord, String recordId) throws EncryptionException;
    String getNextKeyId() throws KeyManagementException;

Warning: Crypto nerd stuff ahead

The provided implementation uses AES (Advanced Encryption Standard) in G/CM (Galois/Counter Mode). AES is a symmetric encryption cipher, a variant of the Rijndael cipher, a substitution-permutation network, with a fixed block size of 128 bits and a key length of 128, 192, or 256 bits. This is in contrast to one of its most-common predecessors, DES (Data Encryption Standard), which used a Feistel network. If you want more detail than this, buy me a beer sometime and sit back. If you absolutely do not want more detail, consider yourself sane, and accept that it is sufficient for what we are covering here. In addition to the cipher selection, G/CM is an AEAD (Authenticated Encryption with Associated Data) mode of operation, which means that not only does it provide confidentiality (only people with the secret key can decrypt the cipher text), it also provides integrity (the cipher text cannot be undetectably modified by a party without the secret key). This is crucial for authenticity, especially in the application of provenance data, and common modes like ECB, CBC, and CTR do not provide this feature. An alternative construction would be to use a hash-based message authentication code (HMAC) like HMAC/SHA-256 or BLAKE2 with a separate key over the cipher text, but G/CM satisfies the requirements without a separate key and operation, so it’s a personal favorite for scenarios like this.

In the future, there may be ChaCha20-Poly1305 implementations for better performance, RSA or GPG implementations for asymmetric encryption using public-private key pairs, or even HSM implementations for encryption performed on remote/network-attached encryption appliances with completely contained keys.

Regardless, the actual interface contract is quite straightforward. The encrypt() and decrypt() methods accept arbitrary byte[] messages and some metadata, check that the specified key exists and is valid, and then perform the desired cryptographic operation (including serializing/deserializing the encryption metadata) and return the resulting byte[].

How to Lose a Super Bowl and Other Great Interceptions

We now turn to the pieces that actually make the repository work (everything up to this point has been pretty standard point-n’-click cryptography). The EncryptedSchemaRecordWriter and EncryptedSchemaRecordReader classes that we define will be wrappers extending and adding bonus functionality to the EventIdFirstSchemaRecord* classes. By providing different factories to the EventStore constructor as mentioned earlier, we’ll provide compatible instances which intercept the byte[] serialization/deserialization and also encrypt/decrypt the data. This means we don’t need any additional work to handle event indexing/querying, compression, etc. This saves us about 800 lines of repetition.

The encryption is easy – as stated above, the record is already serialized to a byte[] by the existing record writer, and that byte[] is handed to the ProvenanceEventEncryptor already described. The EncryptionMetadata (key ID, algorithm, IV, version, and cipher text length) is also serialized and prepended to allow on-demand retrieval and decryption (as a good crypto student, you did know we were going to use unique and non-predictable IVs for every event record, right?). This also allows records encrypted by two different keys to reside side-by-side in the repository with no ill effects.

The decryption operation is simply the inverse – retrieve the blob of data identified by the event ID (and read using random access via stored offset, not sequential reads) from the repository, retrieve the key and decrypt it, and then pass the plaintext serialized form to the delegated readRecord() method to be rehydrated into an object.

Configuring The App

As described in the Apache NiFi User Guide and Apache NiFi Admin Guide (light reading for insomniacs), the encrypted provenance repository does need a little bit of configuration in nifi.properties. Every property is verbosely described on that page, but here is the simplest valid configuration:


Does It Actually Work?

Encryption always adds costs to any software. I ran some basic performance tests to provide some metrics in PR 1686, and I found that with low flow volume, using an encrypted provenance repository was more than twice as fast as the old PPR and almost identical to the new WAPR. This led me to double-take and question “is it actually encrypting anything?”

Small Event Low Volume benchmarks

I used my handy Hex Fiend to examine the actual files on disk to ensure the data was being encrypted. Here you can see the EncryptionMetadata being serialized via Java serialization and the cipher text of the event record following.

Encrypted repository file

Ok, so once the data is encrypted, is it still useful in the app? Sure enough, a provenance query returns perfectly human-readable records through the REST API and the UI.

Encrypted provenance repository query

With high flow volume, I did “luckily” see more performance cost. Still, running with an encrypted provenance repository, the flow could handle about 13k events per second. While the flow performance was slower than the original PPR, the provenance queries were almost identical (and sometimes even faster).

Small Event High Volume benchmarks

Long story short, if you like writing encrypted provenance data to your disk, NiFi’s got you covered.

Back to the Future

There is still plenty of work to do here. Repository implementation migration is not as smooth as it could be, tools for migration and key rotation would be nice, the KeyProvider and other services can be extracted to the framework level, content and flowfile repository implementations are still necessary, and provenance records themselves do not have cryptographic signatures over their content (for chain of custody, governance, and integrity guarantees). The User Guide has a section devoted just to “Potential Issues” involved. As provenance data isn’t intended to be stored long term in NiFi, but offloaded to a glacial store like Apache Atlas, these aren’t priority issues. I would recommend you try the encrypted provenance repository on non-business-critical data at first, but in our tests, it has been pretty stable.

Tuesday Jun 06, 2017

Real-Time SQL on Event Streams

Real-Time SQL On Event Streams

Mark Payne -  @dataflowmark

Apache NiFi has grown tremendously over the past 2 and a half years since it was open sourced. The community is continuously thinking of, implementing, and contributing amazing new features. The newly released version 1.2.0 of NiFi is no exception. One of the most exciting features of this new release is the QueryRecord Processor and the Record Reader and Record Writer components that go along with it. If you aren't familiar with those components, there's a blog post that explains how those work.

This new Processor, powered by Apache Calcite, allows users to write SQL SELECT statements to run over their data as it streams through the system. Each FlowFile in NiFi can be treated as if it were a database table named FLOWFILE. These SQL queries can be used to filter specific columns or fields from your data, rename those columns/fields, filter rows, perform calculations and aggregations on the data, route the data, or whatever else you may want to use SQL for. All from the comfy confines of the most widely known and used Domain Specific Language.

This is a big deal! Of course, there are already other platforms that allow you to run SQL over arbitrary data, though. So let's touch on how NiFi differs from other platforms that run SQL over arbitrary data outside of an RDBMS:

  • Queries are run locally. There is no need to push the data to some external service, such as S3, in order to run queries over your data. There's also no need to pay for that cloud storage or the bandwidth, or to create temporary "staging tables" in a database.
  • Queries are run inline. Since your data is already streaming through NiFi, it is very convenient to add a new QueryRecord Processor to your canvas. You are already streaming your data through NiFi, aren't you? If not, you may find Our Docs Page helpful to get started learning more about NiFi.
  • Query data in any format. Write results in any data format. One of the goals of this effort was to allow data to be queried regardless of the format. In order to accomplish this, the Processor was designed to be configured with a "Record Reader" Controller Service and a "Record Writer" Controller Service. Out of the box, there are readers for CSV, JSON, Avro, and even log data. The results of the query can be written out in CSV, JSON, Avro, or free-form text (for example, a log format) using the NiFi Expression Language. If your data is in another format, you are free to write your own implementation of the Record Reader and/or Record Writer Controller Service. A simple implementation can wrap an existing library to return Record objects from an InputStream. That's all that is needed to run SQL over your own custom data format! Writing the results in your own custom data format is similarly easy.
  • It's fast - very fast! Just how fast largely depends on the performance of your disks and the number of disks that you have available. The data must be read from disk, queried, and then the results written to disk. In most scenarios, reading of the data from disk is actually avoided due to operating system disk caching, though. We've tested the performance on a server with 32 cores and 64 GB RAM. We ran a continuous stream of JSON-formatted Provenance data through the Processor. To do this, we used the NiFi Provenance Reporting Task to send the Provenance data back to the NiFi instance. Because we wanted to stress the Processor, we then used a DuplicateFlowFile processor to create 200 copies of the data (this really just creates 200 pointers to the same piece of data; it doesn't copy the data itself). We used a SQL query to pull out any Provenance "SEND" event (a small percentage of the data, in this case). Using 12 concurrent tasks, we were able to see the query running at a consistent rate of about 1.2 GB per second on a single node - using less than half of the available CPU!
  • Data Provenance keeps a detailed trail of what happened. One of the biggest differentiators between NiFi and other dataflow platforms is the detailed Data Provenance that NiFi records. If the data that you end up with is not what you expect, the Data Provenance feature makes it easy to see exactly what the data looked like at each point in the flow and pinpoint exactly what when wrong - as well as understand where the data come from and where the data went. When the flow has been updated, the data can be replayed with the click of a button and the new results can be verified. If the results are still not right, update your flow and replay again.

How It Works

In order to get started, we need to add a QueryRecord Processor to the graph. Once we've added the Processor to the graph, we need to configure three things: the Controller Service to use for reading data, the service to use for writing the results, and one or more SQL queries. The first time that you set this all up, it may seem like there's a lot going on. But once you've done it a couple of times, it becomes pretty trivial. The rest of this post will be dedicated to setting everything up. First, we will configure the Record Reader Controller Service. We'll choose to create a new service:

We are given a handful of different types of services to choose from. For this example, we will use CSV data, so we will use a CSVReader:

We will come back and configure our new CSVReader service in a moment. For now, we will click the "Create" button and then choose to create a new Controller Service to write the results. We will write the results in JSON format:

We can again click the "Create" button to create the service. Now that we have created our Controller Services, we will click the "Go To" button on our reader service to jump to the Controller Service configuration page:

Configuring the Reader

We can click the pencil in the right-hand column to configure our CSV Reader:

The CSV Reader gives us plenty of options to customize the reader to our format, as can be seen in the above image. For this example, we will leave most of the defaults, but we will change the "Skip Header Line" Property from the default value of "false" to "true" because our data will contain a header line that we don't want to process as an actual record.

In order to run SQL over our data and make sense of the columns in our data, we need to also configure the reader with a schema for the data. We have several options for doing this. We can use an attribute on the FlowFile that includes an Avro-formatted schema. Or we can use a Schema Registry to store our schema and access it by name or by identifier and version. Let's go ahead and use a local Schema Registry and add our schema to that Registry. To do so, we will change the "Schema Access Strategy" to "Use 'Schema Name' Property." This means that we want to look up the Schema to use by name. The name of the schema is specified by the "Schema Name" Property. The default value for that property is "${schema.name}", which means that we will just use the "schema.name" attribute to identify which schema we want to use. Instead of using an attribute, we could just type the name of the schema here. Doing so would mean that we would need a different CSV Reader for each schema that we want to read, though. By using an attribute, we can have a single CSV Reader that works for all schemas.

Next, we need to specify the Schema Registry to use. We click the value of the "Schema Registry" property and choose to "Create new service..." We will use the AvroSchemaRegistry. (Note that our incoming data is in CSV format and the output will be in JSON. That's okay, Avro in this sense only refers to the format of the schema provided. So we will provide a schema in the same way that we would if we were using Avro.) We will click the "Create" button and then click the "Go To" arrow that appears in the right-hand column in order to jump to the Schema Registry service (and click 'Yes' to save changes to our CSV Reader service).

This will take us back to our Controller Services configuration screen. It is important to note that from this screen, each Controller Service has a "Usage" icon on the left-hand side (it looks like a little book). That icon will take you to the documentation on how to usage that specific Controller Service. The documentation is fairly extensive. Under the "Description" heading, each of the Record Readers and Writers has an "Additional Details..." link that provides much more detailed information about how to use service and provides examples.

We will click the Edit ("pencil") icon next to the newly created AvroSchemaRegistry and go to the Properties tab. Notice that the service has no properties, so we click the New Property ("+") icon in the top-right corner. The name of the property is the name that we will use to refer to the schema. Let's call it "hello-world". For the value, we can just type or paste in the schema that we want to use using the Avro Schema syntax. For this example, we will use the following schema:

  "name": "helloWorld",
  "namespace": "org.apache.nifi.blogs",
  "type": "record",
  "fields": [
      { "name": "purchase_no", "type": "long" },
      { "name": "customer_id", "type": "long" },
      { "name": "item_id", "type": ["null", "long"] },
      { "name": "item_name", "type": ["null", "string"] },
      { "name": "price", "type": ["null", "double"] },
      { "name": "quantity", "type": ["null", "int"] },
      { "name": "total_price", "type": ["null", "double"] }

Now we can click "OK" and apply our changes. Clicking Enable (the lightning bolt icon) enables the service. We can now also enable our CSV Reader.

Configuring the Writer

Similarly, we need to configure our writer with a schema so that NiFi knows how we expect our data to look. If we click the Pencil icon next to our JSONRecordSetWriter, in the Properties tab, we can configure whether we want our JSON data to be pretty-printed or not and how we want to write out date and time fields. We also need to specify how to access the schema and how to convey the schema to down-stream processing. For the "Schema Write Strategy," since we are using JSON, we will just set the "schema.name" attribute, so we will leave the default value. If we were writing in Avro, for example, we would probably want to include the schema in the data itself. For the "Schema Access Strategy," we will use the "Schema Name" property, and set the "Schema Name" property to "${schema.name}" just as we did with the CSV Reader. We then select the same AvroSchemaRegistry service for the "Schema Registry" property.

Again, we click "Apply" and then click the Lightning icon to enable our Controller Service and click the Enable button. We can then click the "X" to close out this dialog.

Write the SQL

Now comes the fun part! We can go back to configure our QueryRecord Processor. In the Properties tab, we can start writing our queries. For this example, let's take the following CSV data:

purchase_no, customer_id, item_id, item_name, price, quantity
10280, 40070, 1028, Box of pencils, 6.99, 2
10280, 40070, 4402, Stapler, 12.99, 1
12440, 28302, 1029, Box of ink pens, 8.99, 1
28340, 41028, 1028, Box of pencils, 6.99, 18
28340, 41028, 1029, Box of ink pens, 8.99, 18
28340, 41028, 2038, Printer paper, 14.99, 10
28340, 41028, 4018, Clear tape, 2.99, 10
28340, 41028, 3329, Tape dispenser, 14.99, 10
28340, 41028, 5192, Envelopes, 4.99, 45
28340, 41028, 3203, Laptop Computer, 978.88, 2
28340, 41028, 2937, 24\" Monitor, 329.98, 2
49102, 47208, 3204, Powerful Laptop Computer, 1680.99, 1

In our Properties tab, we can click the "Add Property" button to add a new property. Because we can add multiple SQL queries in a single Processor, we need a way to distinguish the results of each query and route the data appropriately. As such, the name of the property is the name of the Relationship that data matching the query should be routed to. We will create two queries. The first will be named "over.1000" and will include the purchase_no and customer_id fields of any purchase that cost more than $1,000.00 and will also include a new field named total_price that is the dollar amount for the entire purchase. Note that when entering a value for a property in NiFi, you can use Shift + Enter to insert a newline in your value:

SELECT purchase_no, customer_id, SUM(price * quantity) AS total_price
GROUP BY purchase_no, customer_id
HAVING SUM(price * quantity) > 1000

The second property will be named "largest.order" and will contain the purchase_no, customer_id, and total price of the most expensive single purchase (as defined by price times quantity) in the data:

SELECT purchase_no, customer_id, SUM(price * quantity) AS total_price
GROUP BY purchase_no, customer_id
ORDER BY total_price DESC

Now we will wire our QueryRecord processor up in our graph so that we can use it. For this demo, we will simply use a GenerateFlowFile to feed it data. We will set the "Custom Text" property to the CSV that we have shown above. In the "Scheduling" tab, I'll configure the processor to run once every 10 seconds so that I don't flood the system with data. We need to add a "schema.name" attribute, so we will route the "success" relationship of GenerateFlowFile to an UpdateAttribute processor. To this processor, we will add a new Property named "schema.name" with a value of "hello-world" (to match the name of the schema that we added to the AvroSchemaRegistry service). We will route the "success" relationship to QueryRecord.

Next, we will create two UpdateAttribute Processors and connect the "over.1000" relationship to the first and the "largest.order" relationship to the other. This just gives us a simple place to hold the data so that we can view it. I will loop the "failure" relationship back to the QueryRecord processor so that if there is a problem evaluating the SQL, the data will remain in my flow. I'll also auto-terminate the "original" relationship because once I have evaluated the SQL, I don't care about the original data anymore. I'll also auto-terminate the "success" relationship of each terminal UpdateAttribute processor. When we start the QueryRecord, GenerateFlowFile, and the first UpdateAttribute processors, we end up with a FlowFile queued up before each UpdateAttribute processor:

If we right-click on the "over.1000" connection and click "List queue," we are able to see the FlowFiles in the queue:

Clicking the "information" icon on the left-hand-side of the FlowFile gives us the ability to view the FlowFile's attributes and download or view its contents. We can click the "View" button to view the content. Changing the "View as" drop-down at the top from "original" to "formatted" gives us a pretty-printed version of the JSON, and we quickly see the results that we want:

Note that we have a null value for the columns that are defined in our schema that were not part of our results. If we wanted to, We could certainly update our schema in order to avoid having these fields show up at all.

Viewing the FlowFiles of the "largest.order" connection shows us a single FlowFile also, with the content that we expect there as well:

Of course, if we have already run the data through the flow and want to go back and inspect that data and what happened to it, we can easily do that through NiFi's powerful Data Provenance feature.


Here, I have given but a small glimpse into the capabilities of the new QueryRecord Processor for NiFi. The possibilities that it opens up are huge. I'd love to hear thoughts and ideas on how to make it even better or questions that you may have! Feel free to reach out to the NiFi mailing list at users@nifi.apache.org or dev@nifi.apache.org.

Sunday May 21, 2017

Record-Oriented Data with NiFi

Record-Oriented Data with NiFi

Mark Payne -  @dataflowmark

Intro - The What

Apache NiFi is being used by many companies and organizations to power their data distribution needs. One of NiFi's strengths is that the framework is data agnostic. It doesn't care what type of data you are processing. There are processors for handling JSON, XML, CSV, Avro, images and video, and several other formats. There are also several general-purposes processors, such as RouteText and CompressContent. Data can be hundreds of bytes or many gigabytes. This makes NiFi a powerful tool for pulling data from external sources; routing, transforming, and aggregating it; and finally delivering it to its final destinations.

While this ability to handle any arbitrary data is incredibly powerful, we often see users working with record-oriented data. That is, large volumes of small "records," or "messages," or "events." Everyone has their own way to talk about this data, but we all mean the same thing. Lots of really small, often structured, pieces of information. This data comes in many formats. Most commonly, we see CSV, JSON, Avro, and log data.

While there are many tasks that NiFi makes easy, there are some common tasks that we can do better with. So in version 1.2.0 of NiFi, we released a new set of Processors and Controller Services, for working with record-oriented data. The new Processors are configured with a Record Reader and a Record Writer Controller Service. There are readers for JSON, CSV, Avro, and log data. There are writers for JSON, CSV, and Avro, as well as a writer that allows users to enter free-form text. This can be used to write data out in a log format, like it was read in, or any other custom textual format.

The Why

These new processors make building flows to handle this data simpler. It also means that we can build processors that accept any data format without having to worry about the parsing and serialization logic. Another big advantage of this approach is that we are able to keep FlowFiles larger, each consisting of multiple records, which results in far better performance.

The How - Explanation

In order to make sense of the data, Record Readers and Writers need to know the schema that is associated with the data. Some Readers (for example, the Avro Reader) allow the schema to be read from the data itself. The schema can also be included as a FlowFile attribute. Most of the time, though, it will be looked up by name from a Schema Registry. In this version of NiFi, two Schema Registry implementations exist: an Avro-based Schema Registry service and a client for an external Hortonworks Schema Registry.

Configuring all of this can feel a bit daunting at first. But once you've done it once or twice, it becomes rather quick and easy to configure. Here, we will walk through how to set up a local Schema Registry service, configure a Record Reader and a Record Writer, and then start putting to use some very powerful processors. For this post, we will keep the flow simple. We'll ingest some CSV data from a file and then use the Record Readers and Writers to transform the data into JSON, which we will then write to a new directory.

The How - Tutorial

In order to start reading and writing the data, as mentioned above, we are going to need a Record Reader service. We want to parse CSV data and turn that into JSON data. So to do this, we will need a CSV Reader and a JSON Writer. So we will start by clicking the "Settings" icon on our Operate palette. This will allow us to start configuring our Controller Services. When we click the 'Add' button in the top-right corner, we see a lot of options for Controller Services:

Since we know that we want to read CSV data, we can type "CSV" into our Filter box in the top-right corner of the dialog, and this will narrow down our choices pretty well:

We will choose to add the CSVReader service and then configure the Controller Service. In the Properties tab, we have a lot of different properties that we can set:

Fortunately, most of these properties have default values that are good for most cases, but you can choose which delimiter character you want to use, if it's not a comma. You can choose whether or not to skip the first line, treating it as a header, etc. For my case, I will set the "Skip Header Line" property to "true" because my data contains a header line that I don't want to process as a record. The first properties are very important though. The "Schema Access Strategy" is used to instruct the reader on how to obtain the schema. By default, it is set to "Use String Fields From Header." Since we are also going to be writing the data, though, we will have to configure a schema anyway. So for this demo, we will change this strategy to "Use 'Schema Name' Property." This means that we are going to lookup the schema from a Schema Registry. As a result, we are now going to have to create our Schema Registry. If we click on the "Schema Registry" property, we can choose to "Create new service...":

We will choose to create an AvroSchemaRegistry. It is important to note here that we are reading CSV data and writing JSON data - so why are we using an Avro Schema Registry? Because this Schema Registry allows us to convey the schema using the Apache Avro Schema format, but it does not imply anything about the format of the data being read. The Avro format is used because it is already a well-known way of storing data schemas.

Once we've added our Avro Schema Registry, we can configure it and see in the Properties tab that it has no properties at all. We can add a schema by adding a new user-defined property (by clicking the 'Add' / 'Plus' button in the top-right corner). We will give our schema the name "demo-schema" by using this as the name of the property. We can then type or paste in our schema. For those unfamiliar with Avro schemas, it is a JSON formatted representation that has a syntax like the following:

  "name": "recordFormatName",
  "namespace": "nifi.examples",
  "type": "record",
  "fields": [
    { "name": "id", "type": "int" },
    { "name": "firstName", "type": "string" },
    { "name": "lastName", "type": "string" },
    { "name": "email", "type": "string" },
    { "name": "gender", "type": "string" }

Here, we have a simple schema that is of type "record." This is typically the case, as we want multiple fields. We then specify all of the fields that we have. There is a field named "id" of type "int" and all other fields are of type "string." See the Avro Schema documentation for more information. We've now configured our schema! We can enable our Controller Services now.

We can now add our JsonRecordSetWriter controller service as well. When we configure this service, we see some familiar options for indicating how to determine the schema. For the "Schema Access Strategy," we will again use the "Use 'Schema Name' Property," which is the default. Also note that the default value for the "Schema Name" property uses the Expression Language to reference an attribute named "schema.name". This provides a very nice flexibility, because now we can re-use our Record Readers and Writers and simply convey the schema by using an UpdateAttribute processor to specify the schema name. No need to keep creating Record Readers and Writers. We will set the "Schema Registry" property to the AvroSchemaRegistry that we just created and configured.

Because this is a Record Writer instead of a Record Reader, we also have another interesting property: "Schema Write Strategy." Now that we have configured how to determine the data's schema, we need to tell the writer how to convey that schema to the next consumer of the data. The default option is to add the name of the schema as an attribute. We will accept the default. But we could also write the entire schema as a FlowFile attribute or use some strategies that are useful for interacting with the Hortonworks Schema Registry.

Now that we've configured everything, we can apply the settings and start our JsonRecordSetWriter as well. We've now got all of our Controller Services setup and enabled:

Now for the fun part of building our flow! The above will probably take about 5 minutes, but it makes laying out the flow super easy! For our demo, we will have a GetFile processor bring data into our flow. We will use UpdateAttribute to add a "schema.name" attribute of "demo-schema" because that is the name of the schema that we configured in our Schema Registry:

We will then use the ConvertRecord processor to convert the data into JSON. Finally, we want to write the data out using PutFile:

We still need to configure our ConvertRecord processor, though. To do so, all that we need to configure in the Properties tab is the Record Reader and Writer that we have already configured:

Now, starting the processors, we can see the data flowing through our system!

Also, now that we have defined these readers and writers and the schema, we can easily create a JSON Reader, and an Avro Writer, for example. And adding additional processors to split the data up, query and route the data becomes very simple because we've already done the "hard" part.


In version 1.2.0 of Apache NiFi, we introduced a handful of new Controller Services and Processors that will make managing dataflows that process record-oriented data much easier. I fully expect that the next release of Apache NiFi will have several additional processors that build on this. Here, we have only scratched the surface of the power that this provides to us. We can delve more into how we are able to transform and route the data, split the data up, and migrate schemas. In the meantime, each of the Controller Services for reading and writing records and most of the new processors that take advantage of these services have fairly extensive documentation and examples. To see that information, you can right-click on a processor and click "Usage" or click the "Usage" icon on the left-hand side of your controller service in the configuration dialog. From there, clicking the "Additional Details..." link in the component usage will provide pretty significant documentation. For example, the JsonTreeReader provides a wealth of knowledge in its Additional Details.

Wednesday Dec 16, 2015

Getting Syslog Events to HBase

Getting Syslog Events to HBase

Bryan Bende -  bbende@gmail.com @bbende

In the Apache NiFi 0.4.0 release there are several new integration points including processors for interacting with Syslog and HBase. In this post we'll demonstrate how to use NiFi to receive messages from Syslog over UDP, and store those messages in HBase.

The flow described in this post was created using Apache NiFi 0.4.0, rsyslog 5.8.10, and Apache HBase 1.1.2.

Setting up Syslog

In order for NiFi to receive syslog messages, rsyslog needs to forward messages to a port that NiFi will be listening on. Forwarding of messages can be configured in rsyslog.conf, generally located in /etc on most Linux operating systems.

Edit rsyslog.conf and add the following line:

*.* @localhost:7780

This tells rsyslog to forward all messages over UDP to localhost port 7780. A double '@@' can be used to forward over TCP.

Restart rsyslog for the changes to take effect:

/etc/init.d/rsyslog restart
Shutting down system logger:                               [  OK  ]
Starting system logger:                                    [  OK  ]

Setting up HBase

In order to store the syslog messages, we'll create an HBase table called 'syslog' with one column family called 'msg'. From the command line enter the following:

hbase shell
create 'syslog', {NAME => 'msg'}

Configure an HBase Client Service

The HBase processors added in Apache NiFi 0.4.0 use a controller service to interact with HBase. This allows the processors to remain unchanged when the HBase client changes, and allows a single NiFi instance to support multiple versions of the HBase client. NiFi's class-loader isolation provided in NARs, allows a single NiFi instance to interact with HBase instances of different versions at the same time.

The HBase Client Service can be configured by providing paths to external configuration files, such as hbase-site.xml, or by providing several properties directly in the processor. For this example we will take the latter approach. From the Controller Services configuration window in NiFi, add an HBase_1_1_2_ClientService with the following configuration (adjusting values appropriately for your system):


After configuring the service, enable it in order for it to be usable by processors:


Building the Dataflow

The dataflow we are going build will consist of the following components:

  • ListenSyslog for receiving syslog messages over UDP
  • UpdateAttribute for renaming attributes and creating a row id for HBase
  • AttributesToJSON for creating a JSON document from the syslog attributes
  • PutHBaseJSON for inserting each JSON document as a row in HBase

The overall flow looks like the following:

Lets walk through the configuration of each processor...



Set the Port to the same port that rsyslog is forwarding messages to, in this case 7780. Leave everything else as the default values.

With a Max Batch Size of "1" and Parse Messages as "true", each syslog message will be emitted as a single FlowFile, with the content of the FlowFile being the original message, and the results of parsing the message being stored as FlowFile attributes.

The attributes we will be interested in are:

  • syslog.priority
  • syslog.severity
  • syslog.facility
  • syslog.version
  • syslog.timestamp
  • syslog.hostname
  • syslog.sender
  • syslog.body
  • syslog.protocol
  • syslog.port



The attributes produced by ListenSyslog all start with "syslog." which keeps them nicely namespaced in NiFi. However, we are going to use these attribute names as column qualifiers in HBase. We don't really need this prefix since we will already be with in a syslog table.

Add a property for each syslog attribute to remove the prefix, and use the Delete Attributes Expression to remove the original attributes. In addition, create an id attribute of the form "timestamp_uuid" where timestamp is the long representation of the timestamp on the syslog message, and uuid is the uuid of the FlowFile in NiFi. This id attribute will be used as the row id in HBase.

The expression language for the id attribute is:

${syslog.timestamp:toDate('MMM d HH:mm:ss'):toNumber()}_${uuid}



Set the Destination to "flowfile-content" so that the JSON document replaces the FlowFile content, and set Include Core Attributes to "false" so that the standard NiFi attributes are not included.



Select the HBase Client Service we configured earlier and set the Table Name and Column Family to "syslog" and "msg" based on the table we created earlier. In addition set the Row Identifier Field Name to "id" to instruct the processor to use the id field from the JSON for the row id.

Verifying the Flow

From a terminal we can send a test message to syslog using the logger utility:

logger "this is a test syslog message"

Using the HBase shell we can inspect the contents of the syslog table:

hbase shell
hbase(main):002:0> scan 'syslog'
ROW                                          COLUMN+CELL
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:body, timestamp=1449775215481,
  value=root: this is a test message
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:hostname, timestamp=1449775215481,
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:port, timestamp=1449775215481,
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:protocol, timestamp=1449775215481,
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:sender, timestamp=1449775215481,
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:timestamp, timestamp=1449775215481,
  value=Dec 10 19:20:15
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:version, timestamp=1449775215481,
1 row(s) in 0.1120 seconds

Performance Considerations

In some cases the volume of syslog messages being pushed to ListenSyslog may be very high. There are several options to help scale the processing depending on the given use-case.

Concurrent Tasks

ListenSyslog has a background thread reading messages as fast as possible and placing them on a blocking queue to be de-queued and processed by the onTrigger method of the processor. By increasing the number of concurrent tasks for the processor, we can scale up the rate at which messages are processed, ensuring new messages can continue to be queued.


One of the more expensive operations during the processing of a message is parsing the message in order to provide the the attributes. Parsing messages is controlled on the processor through a property and can be turned off in cases where the attributes are not needed, and the original message just needs to be delivered somewhere.


In cases where parsing the messages is not necessary, an additional option is batching many messages together during one call to onTrigger. This is controlled through the Batch Size property which defaults to "1". This would be appropriate in cases where having individual messages is not necessary, such as storing the messages in HDFS where you need them batched into appropriately sized files.


In addition to parsing messages directly in ListenSyslog, there is also a ParseSyslog processor. An alternative to the flow described in the post would be to have ListenSyslog produce batches of 100 messages at a time, followed by SplitText, followed by ParseSyslog. The tradeoff here is that we can scale the different components independently, and take advantage of backpressure between processors.


At this point you should be able to get your syslog messages ingested into HBase and can experiment with different configurations. The template for this flow can be found here.

We would love to hear any questions, comments, or feedback that you may have!

Learn more about Apache NiFi and feel free to leave comments here or e-mail us at dev@nifi.apache.org.

Monday Sep 21, 2015

Indexing Tweets with NiFi and Solr

Indexing Tweets with NiFi and Solr

Bryan Bende -  bbende@gmail.com

This post will cover how to use Apache NiFi to pull in the public stream of tweets from the Twitter API, identify specific tweets of interest, and deliver those tweets to Solr for indexing. The example developed here was built against Apache NiFi 0.3.0 and Apache Solr 5.3.

In addition, you will also need to create a Twitter application for accessing their API. A good set of instructions for doing that can be found in this article - How to Create a Twitter App in 8 Easy Steps.

Setting up Solr

For this example we will start Solr in cloud mode, and create a tweets collection based off the data-driven configuration provided by Solr. The data-driven configuration allows Solr to create fields on the fly based off the incoming data, and is a good place to start for quickly prototyping. The following two commands can be used to get Solr running and create the collection:

./bin/solr start -c
./bin/solr create_collection -c tweets -d data_driven_schema_configs -shards 1 -replicationFactor 1

At this point we should have a running Solr instance on port 8983, with an embedded ZooKeeper on port 9983. Navigate to http://localhost:8983/solr/#/~cloud in your browser to verify Solr is running and the tweets collection was created sucessfully.

Building the Dataflow

The dataflow we are going build in NiFi will consist of the following processors:

  • GetTwitter for accessing the Twitter API and producing FlowFiles containing JSON Tweets
  • EvaluateJsonPath for extracting values from JSON documents into FlowFile attributes
  • RouteOnAttribute for making decisions about incoming FlowFiles based on their attributes
  • MergeContent for merging together many JSON documents to a single document
  • PutSolrContentStream for streaming JSON documents to Solr
The overall flow looks like the following:


Lets walk through the configuration of each processor...



Set the end-point to the Sample Endpoint and fill in the credentials corresponding with the Twitter application you created earlier.

For this example we are going to specify a language of "en" to get only English tweets.



Now we want to extract the text and language of each tweet into FlowFile attributes in order to make decisions on these values later. Set the Destination property to "flowfile-attribute" and add two user-defined properties using the New Property icon on the top right. The name of the property will be the FlowFile attribute name, and the value is the JSON path we would like to extract from the Tweet. This will allow us to access these values later by using expression language, such as ${twitter.text} or ${twitter.lang}.



At this point we would like to ensure we are only indexing tweets we are interested. So we add a user-defined property called "tweet" and specify the following expression language as the value:


The first part of this expression filters out all the tweets which have an empty message. This occurs frequently as some of the JSON documents coming through the end-point are not actual tweets. The second part of the expression ensures that we are only selecting english tweets. We already set GetTwitter to filter on "en", but if we hadn't, the language could be used here to route different languages to different relationships. Any tweets matching the above conditions will be routed to the "tweet" relationship based on the property we defined, and anything else will be routed to the "unmatched" relationship. We can auto-terminate the "unmatched" relationship to have the non-matching FlowFiles discarded.



In this example we could send the JSON documents from RouteOnAttribute directly to Solr, but in most real world scenarios accessing the Solr cluster will require network communication, and it will likely produce better performance if we batch together multiple JSON documents into a single request to reduce the amount requests sent over the network. The MergeContent processor is a powerful processor that was built just for this scenario in mind, and is capable of merging FlowFiles based on a number of criteria such as the number of FlowFiles, or their age. MergeContent also performs merges in a streaming manner and as a result is capable of merging a significant number of FlowFiles without worrying about exceeding memory constraints.

To configure MergeContent, set a Minimum and Maximum Number of entries, as well as a Max Bin Age to trigger a merge in cases where no new data has come in for a period of time. Also set the Delimiter Strategy to "Text", and specify the Header, Footer, and Demarcator as [ , ] respectively. This allows us to create a large JSON document composed of many incoming documents.



Configure PutSolrContentStream to point to the Solr instance we started earlier by setting the Solr Type to "Cloud" and the Solr Location to the embedded ZooKeeper that was started earlier (localhost:9983). Also specify a Commit Within of "1000" to commit the incoming documents every second (this may not be needed if you set autoCommit settings in your solrconfig.xml).

Any user-defined properties will get sent to Solr on the request as key value pairs, so we need to tell Solr how to transform the incoming JSON document into a Solr document. A good explanation of how this JSON to Solr mapping works can be found here, and an explanation of the user-defined properties can be found here.

For this example we will provide the following mappings:

  • split=/ to treat each child JSON document that we merged together as individual Solr documents
  • id:/id to map the id field of the JSON to the id field in the Solr schema (already defined)
  • twitter_text_t:/text to map the text field of the JSON to a dynamic field of type text_general in Solr
  • twitter_username_s:/user/name to map the user of the tweet to a dynamic string field in Solr
  • twitter_created_at_s:/created_at to map the creation date string to a dynamic string field in Solr
  • twitter_timestamp_ms_tl:/timestamp_ms to map the timestamp to a dynamic trie-long field in Solr

The naming conventions of the fields allow us to leverage dynamic fields in Solr. Dynamic fields take effect by using a suffix on the field name to indicate the field type, so the "_t" on "twitter_text_t" tells Solr that this field will map to a text_general field type. It is possible to not provide any field mappings and just send the JSON to Solr, and let Solr guess the field types on the fly. However, in that case all of the fields would get added as multiValued fields, and multiValued fields can't be used in a sorting clause which would prevent us from sorting on the timestamp. So we opt for the dynamic fields here.

Also worth noting, this processor has two failure relationships. The regular "failure" relationship is for failures that would generally require some intervention to resolve, such as an invalid document being submitted. The "connection_failure" relationship is for failures related to communication issues between NiFi and Solr. A typical approach in a production scenario would be to route the "connection_failure" back to itself to retry during connection failures, and to route the "failure" relationship to a PutFile processor that could write out failed documents for later inspection.


At this point you should be able to hit Start on your processors and start seeing tweets flowing to Solr, happy indexing!

A template of the flow created in this example can be found here. We would love to hear any questions, comments, or feedback that you may have!

Learn more about Apache NiFi and feel free to leave comments here or e-mail us at dev@nifi.apache.org.

Thursday Mar 19, 2015

Stream Processing: NiFi and Spark

Stream Processing: NiFi and Spark

Mark Payne -  markap14@hotmail.com

Without doubt, Apache Spark has become wildly popular for processing large quantities of data. One of the key features that Spark provides is the ability to process data in either a batch processing mode or a streaming mode with very little change to your code. Batch processing is typically performed by reading data from HDFS. There have been a few different articles posted about using Apache NiFi (incubating) to publish data HDFS. This article does a great job of explaining how to accomplish this.

In many contexts, though, operating on the data as soon as it is available can provide great benefits. In order to provide the right data as quickly as possible, NiFi has created a Spark Receiver, available in the 0.0.2 release of Apache NiFi. This post will examine how we can write a simple Spark application to process data from NiFi and how we can configure NiFi to expose the data to Spark.

Incorporating the Apache NiFi Receiver into your Spark application is pretty easy. First, you'll need to add the Receiver to your application's POM:


That's all that is needed in order to be able to use the NiFi Receiver. So now we'll look at how to use the Receiver in your code.

The NiFi Receiver is a Reliable Java Receiver. This means that if we lose a node after it pulls the data from NiFi, the data will not be lost. Instead, another node will simply pull and process the data. In order to create a NiFi Receiver, we need to first create a configuration that tells the Receiver where to pull the data from. The simplest form is to just tell the config where NiFi is running and which Port to pull the data from:

SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
  .portName("Data For Spark")

To briefly explain the terminology here, NiFi refers to its mechanism for transferring data between clusters or instances as Site-to-Site. It exposes options for pushing data or pulling, so that the most appropriate approach can be used for each situation. Spark doesn't supply a mechanism to have data pushed to it - instead, it wants to pull data from other sources. In NiFi, this data can be exposed in such a way that a receiver can pull from it by adding an Output Port to the root process group. For Spark, we will use this same mechanism - we will use this Site-to-Site protocol to pull data from NiFi's Output Ports. In order for this to work, we need two pieces of information: the URL to connect to NiFi and the name of the Output Port to pull data from.

If the NiFi instance to connect to is clustered, the URL should be that of the NiFi Cluster Manager. In this case, the Receiver will automatically contact the Cluster Manager to determine which nodes are in the cluster and will automatically start pulling data from all nodes. The Receiver automatically determines by communicating with the Cluster Manager which nodes have the most data backed up and will pull from those nodes more heavily than the others. This information is automatically updated periodically so that as new nodes are added to the cluster or nodes leave the cluster, or if the nodes become more or less bogged down, the Receiver will automatically adjust to handle this.

Next, we need to instruct the Receiver which Port to pull data from. Since NiFi can have many different Output Ports, we need to provide either a Port Identifier or a Port Name. If desired, we can also configure communications timeouts; SSL information for secure data transfer, authentication, and authorization; compression; and preferred batch sizes. See the JavaDocs for the SiteToSiteClient.Builder for more information.

Once we have constructed this configuration object, we can now create the Receiver:

 SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example");
 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L));
 // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from 
 // specified Port
 JavaReceiverInputDStream packetStream = 
     ssc.receiverStream(new NiFiReceiver(config, StorageLevel.MEMORY_ONLY()));

This gives us a JavaReceiverInputDStream of type NiFiDataPacket. The NiFiDataPacket is a simple construct that packages an arbitrary byte array with a map of Key/Value pairs (referred to as attributes) that correspond to the data. As an example, we can process the data without paying attention to the attributes:

 // Map the data from NiFi to text, ignoring the attributes
 JavaDStream text = packetStream.map(new Function() {
   public String call(final NiFiDataPacket dataPacket) throws Exception {
     return new String(dataPacket.getContent(), StandardCharsets.UTF_8);

Or we can make use of the attributes:

 // Extract the 'uuid' attribute
 JavaDStream text = packetStream.map(new Function() {
   public String call(final NiFiDataPacket dataPacket) throws Exception {
     return dataPacket.getAttributes().get("uuid");

So now we have our Receiver ready to pull data from NiFi. Let's look at how we can configure NiFi to expose this data.

First, NiFi has to be configured to allow site-to-site communication. This is accomplished by setting the nifi.remote.input.socket.port property in the nifi.properties file to the desired port to use for site-to-site (if this value is changed, it will require a restart of NiFi for the changes to take effect).

Now that NiFi is setup to allow site-to-site, we will build a simple flow to feed data to Spark. We will start by adding two GetFile processors to the flow. One will pick up from /data/in/notifications and the other will pick up from /data/in/analysis:


Next, let's assume that we want to assign different priorities to the data that is picked up from each directory. Let's assign a priority of "1" (the highest priority) to data coming from the analysis directory and assign a lower priority to the data from the notifications directory. We can use the UpdateAttribute processor to do this. Drag the Processor onto the graph and configure it. In the Settings tab, we set the name to "Set High Priority". In the Properties tab, we have no properties available. However, there's a button in the top-right of the dialog that says "New Property." Clicking that button lets us add a property with the name priority and a value of 1. When we click OK, we see it added to the table:

Set Priority

Let's click Apply and add another UpdateAttribute processor for the data coming from the notifications directory. Here, we will add a property with the name priority but give it a value of 2. After configuring these processor and connecting the GetFile processors to them, we end up with a graph that looks like this:

Connect GetFile and UpdateAttribute

Now, we want to combine all of the data into a single queue so that we can prioritize it before sending the data to Spark. We do this by adding a Funnel to the graph and connecting both of the UpdateAttribute processors to the Funnel:

With Funnel

Now we can add an Output Port that our Spark Streaming application can pull from. We drag an Output Port onto our graph. When prompted for a name, we will name it Data For Spark, as this is the name that we gave to our Spark Streaming application. Once we connect the Funnel to the Output Port, we have a graph like this:

Before Running

We haven't actually told NiFi to prioritize the data yet, though. We've simply added an attribute named priority. To prioritize the data based on that, we can right-click on the connection that feeds the Output Port and choose Configure. From the Settings tab, we can drag the PriorityAttributePrioritizer from the list of Available Prioritizers to the list of Selected Prioritizers:


Once we click Apply, we're done. We can start all of the components and we should see the data start flowing to our Spark Streaming application:


Now any data that appears in our /data/in/notifications or /data/in/analysis directory will make its way to our streaming application!

By using NiFi's Output Port mechanism, we are able to create any number of different named Output Ports, as well. This allows you, as a NiFi user, to choose exactly which data gets exposed to Spark. Additionally, if NiFi is configured to be secure, each Output Port can be configured to provide the data to only the hosts and users that are authorized.

Let's consider, though, that this data has significant value for processing in a streaming fashion as soon as we have the data, but it may also be of value to a batch processing analytic. This is easy to handle, as well. We can add a MergeContent processor to our graph and configure it to merge data into 64-128 MB TAR files. Then, when we have a full TAR file, we can push the data to HDFS. We can configure MergeContent to make these bundles like so:

Merge configuration

We can then send the merged files to PutHDFS and auto-terminate the originals. We can feed all the data from our Funnel to this MergeContent processor, and this will allow us to dual-route all data to both HDFS (bundled into 64-128 MB TAR files) and to Spark Streaming (making the data available as soon as possible with very low latency):

hdfs and spark streaming

We would love to hear any questions, comments, or feedback that you may have!

Learn more about Apache NiFi and feel free to leave comments here or e-mail us at dev@nifi.incubator.apache.org.

Monday Feb 02, 2015

Say Good-Bye to Canned Data

Say Good-Bye to Canned Data

Mark Payne -  markap14@hotmail.com

We've all been there. After months of development and exhaustive testing, our killer new web service (or app or analytic or what have you) is ready for production! We've tested it with all of the random data that we've mocked up, including all of the samples that we've concocted to ensure that it handles every bad input that we can imagine. We've handled it all well, so it's time to deploy to production. So we do. And all is great!

Until an hour later, when our logs start filling with errors because, despite all of our due diligence in testing, we never could have envisioned getting that as input. And now we're in a mad frenzy to fix our problem, because we're now responsible for all of the errors that are happening in production.

If there's one thing that I've learned in my years as a software developer, it's that no matter how diligent we are in testing our code, we get data in production that we just haven't accounted for.

So what can we do about it? Test with live production data!

Now I'm not suggesting that we skip the testing phase all together and go straight into production - quite the opposite really. I'm just suggesting that we test "smarter, not harder." One of the benefits of Apache NiFi (incubating) is that it allows us to have real-time command and control of our data. It allows us to change our dataflow in just a few minutes to send multiple copies of our data to anywhere we want, while providing different reliability guarantees and qualities of service to different parts of our flow. So let's look at how we might accomplish this.

Let's assume that we typically get our feed of data to our web service from NiFi in a flow that looks something like this:

Original Flow

Now let's say that we want to send a duplicate copy of this data feed to our "staging" environment that is running the new version of our web service - or a new web service all together. We can simply copy and paste the InvokeHTTP processor that we're using to send the data to our production instance (select the processor and press Ctrl-C, Ctrl-V to copy and paste) and then right-click on the new one and choose "Configure..." In the Properties tab, we will change the URL to point to our staging instance. Now, all we have to do is draw a second connection from the preceding processor to our newly created InvokeHTTP. We will give it the same relationship that feeds the production instance - "splits." And now any data that goes to our production instance will also be sent to our staging environment:

Flow with second InvokeHTTP

Of course, since we know this is a staging environment, it may not be as powerful as the production instance and it may not handle the entire stream of live data. What we really would like to do is send only about 20% of our data to our staging environment. So how can we accomplish that? We can easily insert a DistributeLoad Processor just ahead of our new InvokeHTTP processor, like so:

Flow with DistributeLoad added in

We can now configure the DistributeLoad Processor to have two relationships. We want 80% of the load to go to relationship "1" and 20% to go to relationship "2." We can accomplish this by adding two user-defined properties. We right-click on DistributeLoad and choose "Configure..." In the Properties tab, click the icon to add the first property. We give it the name "1" and a value of 80. Then we click OK and add another property. This time, we give the property the name "2" and a value of "20":

Configure DistributeLoad

Now, all we have to do is go to the Settings Tab and choose to Auto-Terminate Relationship 1. This will throw away 80% percent of our data. (Not to worry, we don't actually make copies of this data just to throw away 80% of it. The actual work required to "clone" a FlowFile is very small, as it doesn't actually copy any of the content but rather just creates a new pointer to the content.) Now we add a Connection from DistributeLoad to InvokeHTTP and use Relationship "2." Start the Processors, and we've now got 20% of the data being pushed to our staging environment:

20% of data going to staging area

Now, we have just one more concern that we need to think about. Since we're sending data to our staging area, which may go down pretty often, as we are debugging and testing things, won't the data backup in NiFi on our production dataflow? At what point is this going to cause a problem?

This is where my former comment about NiFi offering "different Quality of Service guarantees to different parts of the flow" comes in. For this endpoint, we just want to try to send the data and if it can't handle the data, we don't want to risk causing issues in our production environment. To ensure that this happens, we right-click on the connection that feeds the new InvokeHTTP processor and click "Configure..." (You'll have to first stop the connection's source and destination processors in order to modify it). In the settings tab here, we have an option for "File expiration." The default is "0 sec," which means that the data will not age off. Let's change this value to 3 minutes.

Now, when we click Apply, we can see that the Connection's label has a small "clock" icon on it, indicating that the connection has an expiration set.

Age off after 3 minutes

Any FlowFile in the connection that becomes more than 3 minutes old will automatically be deleted. This means that we will buffer up to three minutes worth of data in our production instance to send to staging environment but no more. We still will not expire any data that is waiting to go to the production instance. Because of this capability, we can extend our example a bit to perform load testing as well. While in this example we decided that we only wanted to send 20% of our data to the staging environment, we could easily remove the DistributeLoad processor all together. In this way, we will send 100% of our production data to the staging environment, as long as it is able to handle the data rate. However, if it falls behind, it won't hurt our production servers because they'll destroy any data more than 3 minutes old. If concerns were to arise, we can disable this in a matter of seconds: simply stop the DistributeLoad processor and the SplitText processor feeding it, remove the connection between them, and restart the SplitText processor.

As always, I'd love to hear feedback in the Comments section about how we could improve, or how you've solved similar problems.

Monday Jan 12, 2015

Apache NiFi: Thinking Differently About Dataflow

Apache NiFi: Thinking Differently About DataFlow

Mark Payne -  markap14@hotmail.com

Recently a question was posed to the Apache NiFi (Incubating) Developer Mailing List about how best to use Apache NiFi to perform Extract, Transform, Load (ETL) types of tasks. The question was "Is it possible to have NiFi service setup and running and allow for multiple dataflows to be designed and deployed (running) at the same time?"

The idea here was to create several disparate dataflows that run alongside one another in parallel. Data comes from Source X and it's processed this way. That's one dataflow. Other data comes from Source Y and it's processed this way. That's a second dataflow entirely. Typically, this is how we think about dataflow when we design it with an ETL tool. And this is a pretty common question for new NiFi users. With NiFi, though, we tend to think about designing dataflows a little bit differently. Rather than having several disparate, "stovepiped" flows, the preferred approach with NiFi is to have several inputs feed into the same dataflow. Data can then be easily routed (via RouteOnAttribute, for example) to "one-off subflows" if need be.

One of the benefits to having several disparate dataflows, though, is that it makes it much easier to answer when someone comes to you and says "I sent you a file last week. What did you do with it?" or "How do you process data that comes from this source?" You may not know exactly what happened to a specific file that they sent you, step-by-step, because of the different decision points in the flow, but at least you have a good idea by looking at the layout of the dataflow.

So we can avoid merging the data if we would like. For the sake of an example, let's assume that we have 4 different data sources. For each of them, they are going to send us some text data that needs to be pushed into HDFS. Maybe it's compressed, maybe it's not. So your flow will look like this:

Disparate Flows

Now, let's say that that you've got a new requirement. When you're sending text data to HDFS, each file that is pushed to HDFS needs to have 1,000 lines of text or less (yes, that's a contrived example and that's probably never a good idea, but the point is valid.) Now, consider how much work it is to make all of those modifications. And let's hope that you don't miss any!

If we continue down this path, this can get hairy quickly, as we have several different dataflows side-by-side on the same graph. In order to aid in the visual representation, we can use Process Groups to provide a nice logical separation. If we do that for each of those, we end up with something like:

Grouped Disparate Flows

We can then double-click each of those Process Groups and see/edit what's inside. But we still have the issue of having to change 4 different flows to make the change mentioned.

So let us consider the alternate approach of merging it all into a single dataflow, and we end up with a flow like this:

Merged Flow

Now, we have all of the data going to a single stream. If we want to update it, we just insert one new Processor:

Updated Merged Flow

And we're done. We don't have to make this change to insert a SplitText processor 3 more times.

The concern that we have here, though, as mentioned earlier, is that if all of the data is mixed together, as the dataflow grows larger, how do we know what happens to data that came from Source X, for example?

This is where the Data Provenance feature comes in. In the top right-hand corner there's a toolbar with 8 icons. The 4th one is the Provenance icon (Provenance Icon). If we click on that, we can then search for data that has been processed. For this example, let's simply searched for RECEIVE events.

This shows us all of the RECEIVE events that this instance of NiFi has seen within the time range searched:

Provenance Search Results

If we click the Lineage Graph icon () on the right, for the first file, we see exactly what happened to this piece of data:

Data Lineage

We see that a RECEIVE event occurred, and that generated a FlowFile. That FlowFile's attributes were then modified, its content was modified, and then the FlowFile was forked, and dropped. At this point, we can right-click on each of these event nodes and choose to view the details of the event. For the RECEIVE event, we see:

Receive Event

From here, we can see that the RECEIVE event took place at 16:55:51 EST on 01/11/2015. The component that reported the event was named "Data From Source X" and was a GetFile Processor. We can also see that the URI of the data was file:/C:/temp/in/debug/LICENSE.gz. If we follow the lineage of that FlowFile, we see that the next event is an ATTRIBUTES_MODIFIED Event:

Attrs Modified Event

Since the event is of type ATTRIBUTES_MODIFIED, it immediately begs the question "What attributes were modified?" So clicking the "Attributes" tab shows us this:

Attrs Modified Event

As with any Provenance Event, we can see all of the attributes that were present on the FlowFile when the event occurred. Of interest here, we can see that the value of the "mime.type" attribute was changed from "No value set" (the attribute didn't exist) to "application/gzip". The next event in our lineage is a CONTENT_MODIFIED Event. If we view the details here, we will see:

Content Modified Event

Here, we can see that the content was modified by a Processor named Decompress. This makes sense, since the previous Event showed us that the MIME Type was "application/gzip". After decompressing the data, we arrive at a FORK Event:

Fork Event

This event shows us that the FORK happened by the SplitText Processor. That is, the SplitText Processor broke a large FlowFile into many smaller FlowFiles. On the right-hand side of this dialog, we see that the file was broken into six different "child" FlowFiles. This is where things get fun! If we then close this dialog, we can right-click on the FORK Event and choose the "Expand" option. This will then pull back the lineage for each of those children, providing us with a more holistic view of what happened to this piece of data:

Expanded Lineage

Now, we can see that each of those six children was sent somewhere. Viewing the details of these events shows us where they were sent:

Send Event

The first file, for instance, was sent via the PutHDFS Processor with the filename "/nifi/blogs/thinking-differently/LICENSE.gz". This occurred at 16:55:53 EST on 01/11/2015. We can also see in this dialog the "Lineage Duration" was "00:00:02.712" or 2.712 seconds. The "Lineage Duration" field tells us how long elapsed between the time when the original source data was received and the time at which this event occurred.

Finally, we have the DROP event. The DROP event signifies the end of line for a FlowFile. If we look at the details of this event, we see:

Drop Event

Of note here, we see that the DROP event was emitted by PutHDFS. That is, PutHDFS was the last component in NiFi to process this piece of information. We can also see in the "Details" field why the FlowFile was dropped: it was Auto-terminated by the "success" relationship.

NiFi's Data Provenance capability allows us to understand exactly what happens to each piece of data that is received. We are given a directed graph that shows when a FlowFile was received, when it was modified, when it was routed in a particular way, and when and where it was sent - as well as which component performed the action. We are also able to see, for each event, the attributes (or metadata) associated with the data so that we can understand why the particular event occurred. Additionally, when many pieces of data are merged together or a single piece of data is split apart, we are able to understand fully the provenance of this data from the that it was received until the time at which it exited the flow. This makes it very easy to answer the question "I sent you a file last week. What did you do with it?" while providing a much more holistic view of the enterprise dataflow than would be available if we used many disparate flows.

Hopefully this post helps you to understand not only the way that we like to setup the flows with NiFi but also the benefits that we have as a result and the features that allow us to overcome any challenges that this approach may create.

Saturday Jan 10, 2015

Integrating Apache NiFi with Kafka

Integrating Apache NiFi with Apache Kafka

Mark Payne -  markap14@hotmail.com

A couple of weeks ago, Joey Echeverria wrote a fantastic blog post about how to get started with Apache NiFi (Incubating), available at http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/ . In it, Joey outlines how to quickly build a simple dataflow that automatically picks up any data from the /dropbox directory on your computer and pushes the data to HDFS. He then goes on to use the ExecuteStream Processor to make use of the Kite SDK’s ability to push CSV data into Hadoop.

In this blog post, we will expand upon Joey’s example to build a dataflow that’s a bit more complicated and illustrate a few additional features of Apache NiFi. If you haven’t already read Joey’s blog, we would recommend you read it before following along here, as this post assumes that you have a very basic understanding of NiFi, such as what NiFi is and how to add a Processor to the graph.

We will continue with the MovieLens dataset, this time using the "MovieLens 10M" dataset, which contains "10 million ratings and 100,000 tag applications applied to 10,000 movies by 72,000 users." However, rather than downloading this dataset and placing the data that we care about in the /dropbox directory, we will use NiFi to pull the data directly from the MovieLens site.

To do this, we first drag a Processor onto the graph. When we do this, we are given the option of choosing many different types of Processors. We know that we want to retrieve data from an HTTP address, so we will search for Processors of interest by choosing the "by tag" option for the filter and typing "http":


This narrows the list of Processors to just a few. We can quickly see the GetHTTP Processor. Clicking on the Processor shows a description of the Processor just below: "Fetches a file via HTTP." This sounds like a good fit, so we click the "Add" button to add it to the graph. Next, we need to configure the Processor to pull the data that we are interested in. Right-clicking the Processor and clicking the "Configure" menu option provides us a dialog with a "Properties" tab. Here, we can enter the URL to fetch the data from (http://files.grouplens.org/datasets/movielens/ml-10m.zip) and enter the filename to use when downloading it:


We can leave the other properties as they are for the sake of this blog post. In a real environment, you may have an interest in changing timeout values, providing authentication, etc. We do, however, want to look at the "Scheduling" tab. By default, Processors are scheduled to run as fast as they can. While this is great for most high-volume dataflows, we wouldn't be very good clients if we continually hit the website nonstop as fast as we can - and I doubt that those responsible for the site would appreciate it very much. In order to avoid this, we go to the "Scheduling" tab and change the "Run Schedule" setting from "0 sec" to something more reasonable - say "10 mins". This way, we will check for new data from the website every 10 minutes. If the E-Tag changes or the Last Modified date changes, then we will pull the data again. Otherwise, we won't continue to pull the data.

Next, we know that the data is in ZIP format, based on the URL. We will want to unzip the data to interact with each file individually. Again, we can drag a Processor to the graph and filter by the tag "zip." We see that the UnpackContent processor can handle ZIP files. We add it to the graph and configure its Properties as we did for GetHTTP. This time, there is only one property: Packing Format. We change it to "zip" and click "Apply."

Next, we need to send the data from GetHTTP to UnpackContent. We drag a Connection between the Processors and include the "success" relationship, as that is the only relationship defined by GetHTTP. Now that we've unzipped the data, the README tells us that the actual data included in the zip is in 3 files: movies.dat, ratings.dat, and tags.dat. We don't really care about the rest of the data in the zip file, so we will use a RouteOnAttribute to throw it out. We'll configure our RouteOnAttribute with the following properties:


We will connect the "success" relationsihp of UnpackContent to RouteOnAttribute. We don't care about the original FlowFile - we only want the unpacked data. And if the Unpack fails, then there's not much we can do, so we will configure UnpackContent and in the Settings tab choose to Auto-terminate the "original" and "failure" relationships. Likewise, we don't care about anything routed to the "unmatched" relationship on the RouteOnAttribute, so we will Auto-terminate that relationship as well.

Now, we can just decide what to do with the "movies", "ratings", and "tags" relationships. For the sake of this post, let's go ahead and push all of this data to HDFS with a PutHDFS Processor. We add a PutHDFS Processor and configure it as Joey's blog instructs. We then drag a Connection from RouteOnAttribute to PutHDFS and choose all three of these relationships.

For PutHDFS, once we have successfully sent the data, there is nothing else to do, so we Auto-terminate the "success" relationship. However, if there's a problem sending the data, we don't want to lose this data. We can then connect the PutHDFS' "failure" relationship back to the PutHDFS Processor. Now, if we fail to send the data we will keep retrying until we are successful.

Now, we're ready to send some data to Kafka. Let's assume that we want only the data in the `movies.dat` file to go to Kafka. We can drag a Processor onto the graph and filter by the tag "kafka." We see two Processors: GetKafka and PutKafka. Since we want to send the data, we will use PutKafka. To send the movies data, we simply draw a Connection from the RouteOnAttribute Processor to PutKafka and choose only the "movies" relationship. NiFi will take care of cloning the FlowFile in a way that's very efficient so that no data is actually copied.

We configure PutKafka by Auto-terminating "success" and looping "failure" back to itself as with PutHDFS. Now let's look at the Properties tab. We have to choose a Kafka Topic to send the data to and a list of 1 or more Kafka servers to send to. We'll set the Known Brokers to "localhost:9092" (assuming this is running on the same box as Kafka) and set the Kafka Topic to "movies". Since the data is a CSV file, we know that it is new-line delimited. By default, NiFi will send the entire contents of a FlowFile to Kafka as a single message. However, we want each line in our CSV file to be a new message on the Kafka Topic. We accomplish this by setting the "Message Delimiter" property to "\n". At this point, our flow should look something like this:

NiFi Kafka integration

And now we're ready to start our flow!

We can press Ctrl+A to select all and then click the Start button in the middle toolbar to start everything running. While the flow is running, we can right-click on the canvas and select "Refresh Status" to update all of the stats shown on the Processors. Then we will see in the top-right-hand corner of GetHTTP a "1" indicating that there is currently a single task running for this Processor. Once this task is finished, the data will be routed to the UnpackContent Processor, then the RouteOnAttribute Processor. At this point, some of the data will be destroyed because RouteOnAttribute will route it to "unmatched." The rest will go to the PutHDFS Processor, while only the "movies.dat" file will be sent to the PutKafka Processor. If we keep clicking "Refresh Status" as the data is processing, we will see each of these steps happening.

We've now successfully setup a dataflow with Apache NiFi that pulls the largest of the available MovieLens datasets, unpacks the zipped contents, grooms the unwanted data, routes all of the pertinent data to HDFS, and finally sends a subset of this data to Apache Kafka.

In Part Two of this series, we will look at how we can consume data from Kafka using NiFi, as well as how we can see what data we've pulled and what we've done with that data. Until then, please feel free to leave any questions, comments, or feedback in the Comments section.