Apache NiFi

Saturday Jan 10, 2015

Integrating Apache NiFi with Kafka

Integrating Apache NiFi with Apache Kafka

Mark Payne -  markap14@hotmail.com


A couple of weeks ago, Joey Echeverria wrote a fantastic blog post about how to get started with Apache NiFi (Incubating), available at http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/ . In it, Joey outlines how to quickly build a simple dataflow that automatically picks up any data from the /dropbox directory on your computer and pushes the data to HDFS. He then goes on to use the ExecuteStream Processor to make use of the Kite SDK’s ability to push CSV data into Hadoop.

In this blog post, we will expand upon Joey’s example to build a dataflow that’s a bit more complicated and illustrate a few additional features of Apache NiFi. If you haven’t already read Joey’s blog, we would recommend you read it before following along here, as this post assumes that you have a very basic understanding of NiFi, such as what NiFi is and how to add a Processor to the graph.

We will continue with the MovieLens dataset, this time using the "MovieLens 10M" dataset, which contains "10 million ratings and 100,000 tag applications applied to 10,000 movies by 72,000 users." However, rather than downloading this dataset and placing the data that we care about in the /dropbox directory, we will use NiFi to pull the data directly from the MovieLens site.

To do this, we first drag a Processor onto the graph. When we do this, we are given the option of choosing many different types of Processors. We know that we want to retrieve data from an HTTP address, so we will search for Processors of interest by choosing the "by tag" option for the filter and typing "http":

httpprocessors.png

This narrows the list of Processors to just a few. We can quickly see the GetHTTP Processor. Clicking on the Processor shows a description of the Processor just below: "Fetches a file via HTTP." This sounds like a good fit, so we click the "Add" button to add it to the graph. Next, we need to configure the Processor to pull the data that we are interested in. Right-clicking the Processor and clicking the "Configure" menu option provides us a dialog with a "Properties" tab. Here, we can enter the URL to fetch the data from (http://files.grouplens.org/datasets/movielens/ml-10m.zip) and enter the filename to use when downloading it:

configuregethttp.png

We can leave the other properties as they are for the sake of this blog post. In a real environment, you may have an interest in changing timeout values, providing authentication, etc. We do, however, want to look at the "Scheduling" tab. By default, Processors are scheduled to run as fast as they can. While this is great for most high-volume dataflows, we wouldn't be very good clients if we continually hit the website nonstop as fast as we can - and I doubt that those responsible for the site would appreciate it very much. In order to avoid this, we go to the "Scheduling" tab and change the "Run Schedule" setting from "0 sec" to something more reasonable - say "10 mins". This way, we will check for new data from the website every 10 minutes. If the E-Tag changes or the Last Modified date changes, then we will pull the data again. Otherwise, we won't continue to pull the data.

Next, we know that the data is in ZIP format, based on the URL. We will want to unzip the data to interact with each file individually. Again, we can drag a Processor to the graph and filter by the tag "zip." We see that the UnpackContent processor can handle ZIP files. We add it to the graph and configure its Properties as we did for GetHTTP. This time, there is only one property: Packing Format. We change it to "zip" and click "Apply."

Next, we need to send the data from GetHTTP to UnpackContent. We drag a Connection between the Processors and include the "success" relationship, as that is the only relationship defined by GetHTTP. Now that we've unzipped the data, the README tells us that the actual data included in the zip is in 3 files: movies.dat, ratings.dat, and tags.dat. We don't really care about the rest of the data in the zip file, so we will use a RouteOnAttribute to throw it out. We'll configure our RouteOnAttribute with the following properties:

RouteOnAttributeProperties

We will connect the "success" relationsihp of UnpackContent to RouteOnAttribute. We don't care about the original FlowFile - we only want the unpacked data. And if the Unpack fails, then there's not much we can do, so we will configure UnpackContent and in the Settings tab choose to Auto-terminate the "original" and "failure" relationships. Likewise, we don't care about anything routed to the "unmatched" relationship on the RouteOnAttribute, so we will Auto-terminate that relationship as well.

Now, we can just decide what to do with the "movies", "ratings", and "tags" relationships. For the sake of this post, let's go ahead and push all of this data to HDFS with a PutHDFS Processor. We add a PutHDFS Processor and configure it as Joey's blog instructs. We then drag a Connection from RouteOnAttribute to PutHDFS and choose all three of these relationships.

For PutHDFS, once we have successfully sent the data, there is nothing else to do, so we Auto-terminate the "success" relationship. However, if there's a problem sending the data, we don't want to lose this data. We can then connect the PutHDFS' "failure" relationship back to the PutHDFS Processor. Now, if we fail to send the data we will keep retrying until we are successful.

Now, we're ready to send some data to Kafka. Let's assume that we want only the data in the `movies.dat` file to go to Kafka. We can drag a Processor onto the graph and filter by the tag "kafka." We see two Processors: GetKafka and PutKafka. Since we want to send the data, we will use PutKafka. To send the movies data, we simply draw a Connection from the RouteOnAttribute Processor to PutKafka and choose only the "movies" relationship. NiFi will take care of cloning the FlowFile in a way that's very efficient so that no data is actually copied.

We configure PutKafka by Auto-terminating "success" and looping "failure" back to itself as with PutHDFS. Now let's look at the Properties tab. We have to choose a Kafka Topic to send the data to and a list of 1 or more Kafka servers to send to. We'll set the Known Brokers to "localhost:9092" (assuming this is running on the same box as Kafka) and set the Kafka Topic to "movies". Since the data is a CSV file, we know that it is new-line delimited. By default, NiFi will send the entire contents of a FlowFile to Kafka as a single message. However, we want each line in our CSV file to be a new message on the Kafka Topic. We accomplish this by setting the "Message Delimiter" property to "\n". At this point, our flow should look something like this:

NiFi Kafka integration

And now we're ready to start our flow!

We can press Ctrl+A to select all and then click the Start button in the middle toolbar to start everything running. While the flow is running, we can right-click on the canvas and select "Refresh Status" to update all of the stats shown on the Processors. Then we will see in the top-right-hand corner of GetHTTP a "1" indicating that there is currently a single task running for this Processor. Once this task is finished, the data will be routed to the UnpackContent Processor, then the RouteOnAttribute Processor. At this point, some of the data will be destroyed because RouteOnAttribute will route it to "unmatched." The rest will go to the PutHDFS Processor, while only the "movies.dat" file will be sent to the PutKafka Processor. If we keep clicking "Refresh Status" as the data is processing, we will see each of these steps happening.

We've now successfully setup a dataflow with Apache NiFi that pulls the largest of the available MovieLens datasets, unpacks the zipped contents, grooms the unwanted data, routes all of the pertinent data to HDFS, and finally sends a subset of this data to Apache Kafka.

In Part Two of this series, we will look at how we can consume data from Kafka using NiFi, as well as how we can see what data we've pulled and what we've done with that data. Until then, please feel free to leave any questions, comments, or feedback in the Comments section.

 

Comments:

Isn't it easier to wget to a NFS mounted partition? What are the advantages of Nifi to other ingestion technologies?

Posted by 140.211.11.75 on January 20, 2015 at 12:32 PM GMT #

Running wget and pushing to an NFS mounted file share would accomplish part of what the GetHTTP Processor does. However, this is a manual process and will not monitor for any changes. However, doing this will simply download the file, whereas this flow will then unpack the zipped data, and then route specific files within the .zip file to Kafka and HDFS. The flow could then be expanded to do many more tasks as needs arise. For more information about what NiFi is and does and how it's different from other ingestion technologies, I would recommend you visit http://nifi.incubator.apache.org. Thanks -Mark

Posted by Mark Payne on January 20, 2015 at 12:42 PM GMT #

"If the E-Tag changes or the Last Modified date changes, then we will pull the data again. Otherwise, we won't continue to pull the data." -- I'm wondering how this is done exactly. Is this a built-in functionality of the GetHTTP processor to pull only if the file.lastModifiedTime was changed? I'm following your instructions but I'm using GetFTP instead of GetHTTP. The file is fetched every time the schedule comes and creates duplicate messages in the Kafka topic.

Posted by Oscar on March 05, 2015 at 07:37 AM GMT #

Oscar, Yes, this is a function of the GetHTTP processor. GetFTP does not do this because with GetFTP, the file is copied over and then the original is deleted from the FTP server. While there is a "Delete Original" property, it is really intended for testing purposes so that you can pull the data without destroying it. In an operational environment, though, NiFi is expected to handle all distribution of the data from the point that it pulls from the FTP server, so there's no reason to leave the file there.

Posted by Mark Payne on March 05, 2015 at 01:02 PM GMT #

爱对方受到

Posted by 140.211.11.75 on April 14, 2015 at 05:53 AM GMT #

Looking forward to Part two of this article!

Posted by Travis Collins on May 15, 2015 at 12:22 PM GMT #

Hi Thank you for this article. I am trying to connect HDFS from NiFi which running in worker node. I got below error. Failed on local exception java.io.IOException: Broken Pipe: Host Details : local host is :"worker1/" :destination host is :"mater:8020" Name node is running on master. Nifi is running on worker1. I am using correct confi files and directory in PutHDFS processor. Any suggestion appreciated. Satya

Posted by Satya Kondapalli on August 31, 2015 at 01:55 AM GMT #

Great stuff! Easy to understand and see the power of NiFi. Would love to see part 2.

Posted by A curious reader on October 08, 2015 at 11:15 AM GMT #

Great tool. I have started using it to ingest data to Spark via Kafka, and to HDFS.

Posted by Satya Kondapalli on October 08, 2015 at 01:11 PM GMT #

hello i installed Apache Nifi i created process and the process configuration setting are not changing /not reflecting so kindly help me

Posted by mahesh on April 08, 2016 at 08:03 AM GMT #

Hello, Nice article. I have tried to build a similar DataFlow but having following issues. 1. It gets the data over getHTTP but immediately try to unpack the same using UnpackContent processor and fails with "Unable to unpack because it does not appear to have any entries" 2. I tried to store the original/ fail file and tried to unzip it from my Ubuntu local location and that too fails with " End-of-central-directory signature not found. Either this file is not a zipfile, or it constitutes one disk of a multi-part archive. In the latter case the central directory and zipfile comment will be found on the last disk(s) of this archive." Looks like the entire .zip file is not getting downloaded and before that only it is trying to unzip and failing. Any help would be appreciated. Thank you! Anshu

Posted by Anshu on February 21, 2017 at 11:48 AM GMT #

I am setting up Nifi cluster on kubernetes, its working as expected.I am wondering what is the best practice. 1. Separate nifi for each app. Bundle it with microservice. 2. Create a shared cluster like elasticsearch and add resources as and when needed.

Posted by cloudcar on September 14, 2017 at 07:29 AM GMT #

@cloudcar I would recommend using a single cluster. NiFi is pretty heavy weight to be bundled up with a microservice. You also don't want to have to maintain a separate NiFi for every service that you've got. Much easier to have a single instance/cluster.

Posted by Mark on September 14, 2017 at 01:17 PM GMT #

Post a Comment:
  • HTML Syntax: NOT allowed

Calendar

Search