Apache NiFi

Monday Jan 12, 2015

Apache NiFi: Thinking Differently About Dataflow

Apache NiFi: Thinking Differently About DataFlow

Mark Payne -  markap14@hotmail.com


Recently a question was posed to the Apache NiFi (Incubating) Developer Mailing List about how best to use Apache NiFi to perform Extract, Transform, Load (ETL) types of tasks. The question was "Is it possible to have NiFi service setup and running and allow for multiple dataflows to be designed and deployed (running) at the same time?"

The idea here was to create several disparate dataflows that run alongside one another in parallel. Data comes from Source X and it's processed this way. That's one dataflow. Other data comes from Source Y and it's processed this way. That's a second dataflow entirely. Typically, this is how we think about dataflow when we design it with an ETL tool. And this is a pretty common question for new NiFi users. With NiFi, though, we tend to think about designing dataflows a little bit differently. Rather than having several disparate, "stovepiped" flows, the preferred approach with NiFi is to have several inputs feed into the same dataflow. Data can then be easily routed (via RouteOnAttribute, for example) to "one-off subflows" if need be.

One of the benefits to having several disparate dataflows, though, is that it makes it much easier to answer when someone comes to you and says "I sent you a file last week. What did you do with it?" or "How do you process data that comes from this source?" You may not know exactly what happened to a specific file that they sent you, step-by-step, because of the different decision points in the flow, but at least you have a good idea by looking at the layout of the dataflow.

So we can avoid merging the data if we would like. For the sake of an example, let's assume that we have 4 different data sources. For each of them, they are going to send us some text data that needs to be pushed into HDFS. Maybe it's compressed, maybe it's not. So your flow will look like this:

Disparate Flows

Now, let's say that that you've got a new requirement. When you're sending text data to HDFS, each file that is pushed to HDFS needs to have 1,000 lines of text or less (yes, that's a contrived example and that's probably never a good idea, but the point is valid.) Now, consider how much work it is to make all of those modifications. And let's hope that you don't miss any!

If we continue down this path, this can get hairy quickly, as we have several different dataflows side-by-side on the same graph. In order to aid in the visual representation, we can use Process Groups to provide a nice logical separation. If we do that for each of those, we end up with something like:

Grouped Disparate Flows

We can then double-click each of those Process Groups and see/edit what's inside. But we still have the issue of having to change 4 different flows to make the change mentioned.

So let us consider the alternate approach of merging it all into a single dataflow, and we end up with a flow like this:

Merged Flow

Now, we have all of the data going to a single stream. If we want to update it, we just insert one new Processor:

Updated Merged Flow

And we're done. We don't have to make this change to insert a SplitText processor 3 more times.

The concern that we have here, though, as mentioned earlier, is that if all of the data is mixed together, as the dataflow grows larger, how do we know what happens to data that came from Source X, for example?

This is where the Data Provenance feature comes in. In the top right-hand corner there's a toolbar with 8 icons. The 4th one is the Provenance icon (Provenance Icon). If we click on that, we can then search for data that has been processed. For this example, let's simply searched for RECEIVE events.

This shows us all of the RECEIVE events that this instance of NiFi has seen within the time range searched:

Provenance Search Results

If we click the Lineage Graph icon () on the right, for the first file, we see exactly what happened to this piece of data:

Data Lineage

We see that a RECEIVE event occurred, and that generated a FlowFile. That FlowFile's attributes were then modified, its content was modified, and then the FlowFile was forked, and dropped. At this point, we can right-click on each of these event nodes and choose to view the details of the event. For the RECEIVE event, we see:

Receive Event

From here, we can see that the RECEIVE event took place at 16:55:51 EST on 01/11/2015. The component that reported the event was named "Data From Source X" and was a GetFile Processor. We can also see that the URI of the data was file:/C:/temp/in/debug/LICENSE.gz. If we follow the lineage of that FlowFile, we see that the next event is an ATTRIBUTES_MODIFIED Event:

Attrs Modified Event

Since the event is of type ATTRIBUTES_MODIFIED, it immediately begs the question "What attributes were modified?" So clicking the "Attributes" tab shows us this:

Attrs Modified Event

As with any Provenance Event, we can see all of the attributes that were present on the FlowFile when the event occurred. Of interest here, we can see that the value of the "mime.type" attribute was changed from "No value set" (the attribute didn't exist) to "application/gzip". The next event in our lineage is a CONTENT_MODIFIED Event. If we view the details here, we will see:

Content Modified Event

Here, we can see that the content was modified by a Processor named Decompress. This makes sense, since the previous Event showed us that the MIME Type was "application/gzip". After decompressing the data, we arrive at a FORK Event:

Fork Event

This event shows us that the FORK happened by the SplitText Processor. That is, the SplitText Processor broke a large FlowFile into many smaller FlowFiles. On the right-hand side of this dialog, we see that the file was broken into six different "child" FlowFiles. This is where things get fun! If we then close this dialog, we can right-click on the FORK Event and choose the "Expand" option. This will then pull back the lineage for each of those children, providing us with a more holistic view of what happened to this piece of data:

Expanded Lineage

Now, we can see that each of those six children was sent somewhere. Viewing the details of these events shows us where they were sent:

Send Event

The first file, for instance, was sent via the PutHDFS Processor with the filename "/nifi/blogs/thinking-differently/LICENSE.gz". This occurred at 16:55:53 EST on 01/11/2015. We can also see in this dialog the "Lineage Duration" was "00:00:02.712" or 2.712 seconds. The "Lineage Duration" field tells us how long elapsed between the time when the original source data was received and the time at which this event occurred.

Finally, we have the DROP event. The DROP event signifies the end of line for a FlowFile. If we look at the details of this event, we see:

Drop Event

Of note here, we see that the DROP event was emitted by PutHDFS. That is, PutHDFS was the last component in NiFi to process this piece of information. We can also see in the "Details" field why the FlowFile was dropped: it was Auto-terminated by the "success" relationship.

NiFi's Data Provenance capability allows us to understand exactly what happens to each piece of data that is received. We are given a directed graph that shows when a FlowFile was received, when it was modified, when it was routed in a particular way, and when and where it was sent - as well as which component performed the action. We are also able to see, for each event, the attributes (or metadata) associated with the data so that we can understand why the particular event occurred. Additionally, when many pieces of data are merged together or a single piece of data is split apart, we are able to understand fully the provenance of this data from the that it was received until the time at which it exited the flow. This makes it very easy to answer the question "I sent you a file last week. What did you do with it?" while providing a much more holistic view of the enterprise dataflow than would be available if we used many disparate flows.

Hopefully this post helps you to understand not only the way that we like to setup the flows with NiFi but also the benefits that we have as a result and the features that allow us to overcome any challenges that this approach may create.

 

Comments:

First impressions are half the battle.

Posted by itxxz on January 13, 2015 at 01:08 AM GMT #

Thank you again Mark - with details like this - I think I've got it now. :-)

Posted by Orrin on January 19, 2015 at 02:02 PM GMT #

Hi, I think Funnel can be use there right? I have a request, can you explain the difference of that method, the direct from Datasources to 1 processor vs Datasources to funnel and to 1 processor? Thanks!:)

Posted by mamm0nth on July 09, 2015 at 07:35 AM GMT #

mamm0nth, Great question! Absolutely, the funnel can be used there. It's not really necessary but can make the flow easier to visualize and manage. Funnels become particularly useful in the case that the Processor you want to connect to is far away on the graph - in that case, instead of drawing 4 connections all the way across the graph, you can funnel them into a single connection and draw 1 connection across the graph. The other case when funnels are important is if you want to prioritize the data in a connection. If you have 4 separate connections, you can't prioritize across all of them. To achieve this, you can simply funnel the data into a single connection and then prioritize that one connection. For more information on prioritizing the data in your queues, see http://nifi.incubator.apache.org/docs/nifi-docs/html/user-guide.html#settings Thanks -Mark

Posted by Mark on July 09, 2015 at 11:43 AM GMT #

Thanks Mark. I have another question. The processor with name CheckedIfCompress, if the flow file came from datasource, generate at the same time, how it will handle? It will run in parallel? Let say I set the Concurrent task to 4.

Posted by mamm0nt on July 10, 2015 at 12:53 AM GMT #

mamm0nth, yes, they would be processed in parallel (assuming that concurrent tasks is high enough to allow for that). If there are not enough tasks to handle it, the data will be queued. The processor will round-robin between all of the incoming connections to pull data, so which one gets processed first just depends on which queue it pulled from last time.

Posted by Mark on July 11, 2015 at 12:23 PM GMT #

Thanks, Mark (y)

Posted by mamm0nth on July 21, 2015 at 12:41 AM GMT #

Hi Mark, Your article is really great. I am currently evaluating Nifi for one of our usecase. I have one question, it would be grea, if you could answer it. We needed to find a way to execute whole dataflow from the command line. Is there a way to execute it in that manner in Nifi.?

Posted by Srujana on February 14, 2016 at 11:17 PM GMT #

@Srujana, NiFi was designed to be an always-on automated dataflow system. So the flows are not really intended to be run from the command-line. However, there has been some talk recently about a new sub-project of NiFi that is intended to be more of an Agent. I would encourage you to send an email to dev@nifi.apache.org and explain the use case in a little more detail, so that we can provide better guidance.

Posted by Mark on February 15, 2016 at 03:35 PM GMT #

All the responses are great but I'd like to go back to a variation of the original question. How do you develop and manager a system where there may be 50 or 100 dataflows into a system. Is there a way to isolate them so that you can just look at and control a single flow on one sheet/view? We use Nifi at my company and I get the impression that although Nifi can do alot of general purpose ETL work it still is just designed to do more basic data flow control. Get a file or files and make simple decisions to figure out where to send it or place it on a file system so a real ingest subsystem can do the real ETL. Is this a fair assumption?

Posted by Gerald on February 19, 2016 at 08:18 PM GMT #

Post a Comment:
  • HTML Syntax: NOT allowed

Calendar

Search