Apache NiFi: Thinking Differently About Dataflow
Apache NiFi: Thinking Differently About DataFlow
Recently a question was posed to the Apache NiFi (Incubating) Developer Mailing List about how best to use Apache NiFi to perform Extract, Transform, Load (ETL) types of tasks. The question was "Is it possible to have NiFi service setup and running and allow for multiple dataflows to be designed and deployed (running) at the same time?"
The idea here was to create several disparate dataflows that run alongside one another in parallel. Data comes from Source X and it's processed this way. That's one dataflow. Other data comes from Source Y and it's processed this way. That's a second dataflow entirely. Typically, this is how we think about dataflow when we design it with an ETL tool. And this is a pretty common question for new NiFi users. With NiFi, though, we tend to think about designing dataflows a little bit differently. Rather than having several disparate, "stovepiped" flows, the preferred approach with NiFi is to have several inputs feed into the same dataflow. Data can then be easily routed (via RouteOnAttribute, for example) to "one-off subflows" if need be.
One of the benefits to having several disparate dataflows, though, is that it makes it much easier to answer when someone comes to you and says "I sent you a file last week. What did you do with it?" or "How do you process data that comes from this source?" You may not know exactly what happened to a specific file that they sent you, step-by-step, because of the different decision points in the flow, but at least you have a good idea by looking at the layout of the dataflow.
So we can avoid merging the data if we would like. For the sake of an example, let's assume that we have 4 different data sources. For each of them, they are going to send us some text data that needs to be pushed into HDFS. Maybe it's compressed, maybe it's not. So your flow will look like this:
Now, let's say that that you've got a new requirement. When you're sending text data to HDFS, each file that is pushed to HDFS needs to have 1,000 lines of text or less (yes, that's a contrived example and that's probably never a good idea, but the point is valid.) Now, consider how much work it is to make all of those modifications. And let's hope that you don't miss any!
If we continue down this path, this can get hairy quickly, as we have several different dataflows side-by-side on the same graph. In order to aid in the visual representation, we can use Process Groups to provide a nice logical separation. If we do that for each of those, we end up with something like:
We can then double-click each of those Process Groups and see/edit what's inside. But we still have the issue of having to change 4 different flows to make the change mentioned.
So let us consider the alternate approach of merging it all into a single dataflow, and we end up with a flow like this:
Now, we have all of the data going to a single stream. If we want to update it, we just insert one new Processor:
And we're done. We don't have to make this change to insert a SplitText processor 3 more times.
The concern that we have here, though, as mentioned earlier, is that if all of the data is mixed together, as the dataflow grows larger, how do we know what happens to data that came from Source X, for example?
This is where the Data Provenance feature comes in. In the top right-hand corner there's a toolbar with 8 icons. The 4th one is the Provenance icon (). If we click on that, we can then search for data that has been processed. For this example, let's simply searched for RECEIVE events.
This shows us all of the RECEIVE events that this instance of NiFi has seen within the time range searched:
If we click the Lineage Graph icon () on the right, for the first file, we see exactly what happened to this piece of data:
We see that a RECEIVE event occurred, and that generated a FlowFile. That FlowFile's attributes were then modified, its content was modified, and then the FlowFile was forked, and dropped. At this point, we can right-click on each of these event nodes and choose to view the details of the event. For the RECEIVE event, we see:
From here, we can see that the RECEIVE event took place at 16:55:51 EST on 01/11/2015. The component that reported the event was named "Data From Source X" and was a GetFile Processor. We can also see that the URI of the data was file:/C:/temp/in/debug/LICENSE.gz. If we follow the lineage of that FlowFile, we see that the next event is an ATTRIBUTES_MODIFIED Event:
Since the event is of type ATTRIBUTES_MODIFIED, it immediately begs the question "What attributes were modified?" So clicking the "Attributes" tab shows us this:
As with any Provenance Event, we can see all of the attributes that were present on the FlowFile when the event occurred. Of interest here, we can see that the value of the "mime.type" attribute was changed from "No value set" (the attribute didn't exist) to "application/gzip". The next event in our lineage is a CONTENT_MODIFIED Event. If we view the details here, we will see:
Here, we can see that the content was modified by a Processor named Decompress. This makes sense, since the previous Event showed us that the MIME Type was "application/gzip". After decompressing the data, we arrive at a FORK Event:
This event shows us that the FORK happened by the SplitText Processor. That is, the SplitText Processor broke a large FlowFile into many smaller FlowFiles. On the right-hand side of this dialog, we see that the file was broken into six different "child" FlowFiles. This is where things get fun! If we then close this dialog, we can right-click on the FORK Event and choose the "Expand" option. This will then pull back the lineage for each of those children, providing us with a more holistic view of what happened to this piece of data:
Now, we can see that each of those six children was sent somewhere. Viewing the details of these events shows us where they were sent:
The first file, for instance, was sent via the PutHDFS Processor with the filename "/nifi/blogs/thinking-differently/LICENSE.gz". This occurred at 16:55:53 EST on 01/11/2015. We can also see in this dialog the "Lineage Duration" was "00:00:02.712" or 2.712 seconds. The "Lineage Duration" field tells us how long elapsed between the time when the original source data was received and the time at which this event occurred.
Finally, we have the DROP event. The DROP event signifies the end of line for a FlowFile. If we look at the details of this event, we see:
Of note here, we see that the DROP event was emitted by PutHDFS. That is, PutHDFS was the last component in NiFi to process this piece of information. We can also see in the "Details" field why the FlowFile was dropped: it was Auto-terminated by the "success" relationship.
NiFi's Data Provenance capability allows us to understand exactly what happens to each piece of data that is received. We are given a directed graph that shows when a FlowFile was received, when it was modified, when it was routed in a particular way, and when and where it was sent - as well as which component performed the action. We are also able to see, for each event, the attributes (or metadata) associated with the data so that we can understand why the particular event occurred. Additionally, when many pieces of data are merged together or a single piece of data is split apart, we are able to understand fully the provenance of this data from the that it was received until the time at which it exited the flow. This makes it very easy to answer the question "I sent you a file last week. What did you do with it?" while providing a much more holistic view of the enterprise dataflow than would be available if we used many disparate flows.
Hopefully this post helps you to understand not only the way that we like to setup the flows with NiFi but also the benefits that we have as a result and the features that allow us to overcome any challenges that this approach may create.