Apache NiFi

Wednesday Dec 16, 2015

Getting Syslog Events to HBase

Getting Syslog Events to HBase

Bryan Bende -  bbende@gmail.com @bbende


In the Apache NiFi 0.4.0 release there are several new integration points including processors for interacting with Syslog and HBase. In this post we'll demonstrate how to use NiFi to receive messages from Syslog over UDP, and store those messages in HBase.

The flow described in this post was created using Apache NiFi 0.4.0, rsyslog 5.8.10, and Apache HBase 1.1.2.

Setting up Syslog

In order for NiFi to receive syslog messages, rsyslog needs to forward messages to a port that NiFi will be listening on. Forwarding of messages can be configured in rsyslog.conf, generally located in /etc on most Linux operating systems.

Edit rsyslog.conf and add the following line:

*.* @localhost:7780

This tells rsyslog to forward all messages over UDP to localhost port 7780. A double '@@' can be used to forward over TCP.

Restart rsyslog for the changes to take effect:

/etc/init.d/rsyslog restart
Shutting down system logger:                               [  OK  ]
Starting system logger:                                    [  OK  ]

Setting up HBase

In order to store the syslog messages, we'll create an HBase table called 'syslog' with one column family called 'msg'. From the command line enter the following:

hbase shell
create 'syslog', {NAME => 'msg'}

Configure an HBase Client Service

The HBase processors added in Apache NiFi 0.4.0 use a controller service to interact with HBase. This allows the processors to remain unchanged when the HBase client changes, and allows a single NiFi instance to support multiple versions of the HBase client. NiFi's class-loader isolation provided in NARs, allows a single NiFi instance to interact with HBase instances of different versions at the same time.

The HBase Client Service can be configured by providing paths to external configuration files, such as hbase-site.xml, or by providing several properties directly in the processor. For this example we will take the latter approach. From the Controller Services configuration window in NiFi, add an HBase_1_1_2_ClientService with the following configuration (adjusting values appropriately for your system):

client-service-config.jpg

After configuring the service, enable it in order for it to be usable by processors:

client-service-enabled.jpg

Building the Dataflow

The dataflow we are going build will consist of the following components:

  • ListenSyslog for receiving syslog messages over UDP
  • UpdateAttribute for renaming attributes and creating a row id for HBase
  • AttributesToJSON for creating a JSON document from the syslog attributes
  • PutHBaseJSON for inserting each JSON document as a row in HBase

The overall flow looks like the following:

syslog-hbase-flow.jpg
Lets walk through the configuration of each processor...

ListenSyslog

config-listensyslog.jpg

Set the Port to the same port that rsyslog is forwarding messages to, in this case 7780. Leave everything else as the default values.

With a Max Batch Size of "1" and Parse Messages as "true", each syslog message will be emitted as a single FlowFile, with the content of the FlowFile being the original message, and the results of parsing the message being stored as FlowFile attributes.

The attributes we will be interested in are:

  • syslog.priority
  • syslog.severity
  • syslog.facility
  • syslog.version
  • syslog.timestamp
  • syslog.hostname
  • syslog.sender
  • syslog.body
  • syslog.protocol
  • syslog.port

UpdateAttribute

config-updateattr.jpg

The attributes produced by ListenSyslog all start with "syslog." which keeps them nicely namespaced in NiFi. However, we are going to use these attribute names as column qualifiers in HBase. We don't really need this prefix since we will already be with in a syslog table.

Add a property for each syslog attribute to remove the prefix, and use the Delete Attributes Expression to remove the original attributes. In addition, create an id attribute of the form "timestamp_uuid" where timestamp is the long representation of the timestamp on the syslog message, and uuid is the uuid of the FlowFile in NiFi. This id attribute will be used as the row id in HBase.

The expression language for the id attribute is:

${syslog.timestamp:toDate('MMM d HH:mm:ss'):toNumber()}_${uuid}
  

AttributesToJSON

config-attrstojson.jpg

Set the Destination to "flowfile-content" so that the JSON document replaces the FlowFile content, and set Include Core Attributes to "false" so that the standard NiFi attributes are not included.

PutHBaseJSON

config-puthbasejson.jpg

Select the HBase Client Service we configured earlier and set the Table Name and Column Family to "syslog" and "msg" based on the table we created earlier. In addition set the Row Identifier Field Name to "id" to instruct the processor to use the id field from the JSON for the row id.

Verifying the Flow

From a terminal we can send a test message to syslog using the logger utility:

logger "this is a test syslog message"

Using the HBase shell we can inspect the contents of the syslog table:

hbase shell
hbase(main):002:0> scan 'syslog'
ROW                                          COLUMN+CELL
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:body, timestamp=1449775215481,
  value=root: this is a test message
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:hostname, timestamp=1449775215481,
  value=localhost
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:port, timestamp=1449775215481,
  value=7780
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:protocol, timestamp=1449775215481,
  value=UDP
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:sender, timestamp=1449775215481,
  value=/127.0.0.1
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:timestamp, timestamp=1449775215481,
  value=Dec 10 19:20:15
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:version, timestamp=1449775215481,
  value=
1 row(s) in 0.1120 seconds

Performance Considerations

In some cases the volume of syslog messages being pushed to ListenSyslog may be very high. There are several options to help scale the processing depending on the given use-case.

Concurrent Tasks

ListenSyslog has a background thread reading messages as fast as possible and placing them on a blocking queue to be de-queued and processed by the onTrigger method of the processor. By increasing the number of concurrent tasks for the processor, we can scale up the rate at which messages are processed, ensuring new messages can continue to be queued.

Parsing

One of the more expensive operations during the processing of a message is parsing the message in order to provide the the attributes. Parsing messages is controlled on the processor through a property and can be turned off in cases where the attributes are not needed, and the original message just needs to be delivered somewhere.

Batching

In cases where parsing the messages is not necessary, an additional option is batching many messages together during one call to onTrigger. This is controlled through the Batch Size property which defaults to "1". This would be appropriate in cases where having individual messages is not necessary, such as storing the messages in HDFS where you need them batched into appropriately sized files.

ParseSyslog

In addition to parsing messages directly in ListenSyslog, there is also a ParseSyslog processor. An alternative to the flow described in the post would be to have ListenSyslog produce batches of 100 messages at a time, followed by SplitText, followed by ParseSyslog. The tradeoff here is that we can scale the different components independently, and take advantage of backpressure between processors.

Summary

At this point you should be able to get your syslog messages ingested into HBase and can experiment with different configurations. The template for this flow can be found here.

We would love to hear any questions, comments, or feedback that you may have!

Learn more about Apache NiFi and feel free to leave comments here or e-mail us at dev@nifi.apache.org.

Monday Sep 21, 2015

Indexing Tweets with NiFi and Solr

Indexing Tweets with NiFi and Solr

Bryan Bende -  bbende@gmail.com


This post will cover how to use Apache NiFi to pull in the public stream of tweets from the Twitter API, identify specific tweets of interest, and deliver those tweets to Solr for indexing. The example developed here was built against Apache NiFi 0.3.0 and Apache Solr 5.3.

In addition, you will also need to create a Twitter application for accessing their API. A good set of instructions for doing that can be found in this article - How to Create a Twitter App in 8 Easy Steps.

Setting up Solr

For this example we will start Solr in cloud mode, and create a tweets collection based off the data-driven configuration provided by Solr. The data-driven configuration allows Solr to create fields on the fly based off the incoming data, and is a good place to start for quickly prototyping. The following two commands can be used to get Solr running and create the collection:

./bin/solr start -c
./bin/solr create_collection -c tweets -d data_driven_schema_configs -shards 1 -replicationFactor 1

At this point we should have a running Solr instance on port 8983, with an embedded ZooKeeper on port 9983. Navigate to http://localhost:8983/solr/#/~cloud in your browser to verify Solr is running and the tweets collection was created sucessfully.

Building the Dataflow

The dataflow we are going build in NiFi will consist of the following processors:

  • GetTwitter for accessing the Twitter API and producing FlowFiles containing JSON Tweets
  • EvaluateJsonPath for extracting values from JSON documents into FlowFile attributes
  • RouteOnAttribute for making decisions about incoming FlowFiles based on their attributes
  • MergeContent for merging together many JSON documents to a single document
  • PutSolrContentStream for streaming JSON documents to Solr
The overall flow looks like the following:

nifi-twitter-solr-flow.png

Lets walk through the configuration of each processor...

GetTwitter

config-gettwitter.png

Set the end-point to the Sample Endpoint and fill in the credentials corresponding with the Twitter application you created earlier.

For this example we are going to specify a language of "en" to get only English tweets.

EvaluateJsonPath

config-extract-attributes.png

Now we want to extract the text and language of each tweet into FlowFile attributes in order to make decisions on these values later. Set the Destination property to "flowfile-attribute" and add two user-defined properties using the New Property icon on the top right. The name of the property will be the FlowFile attribute name, and the value is the JSON path we would like to extract from the Tweet. This will allow us to access these values later by using expression language, such as ${twitter.text} or ${twitter.lang}.

RouteOnAttribute

config-route-nonempty.png

At this point we would like to ensure we are only indexing tweets we are interested. So we add a user-defined property called "tweet" and specify the following expression language as the value:

${twitter.text:isEmpty():not():and(${twitter.lang:equals("en")})}

The first part of this expression filters out all the tweets which have an empty message. This occurs frequently as some of the JSON documents coming through the end-point are not actual tweets. The second part of the expression ensures that we are only selecting english tweets. We already set GetTwitter to filter on "en", but if we hadn't, the language could be used here to route different languages to different relationships. Any tweets matching the above conditions will be routed to the "tweet" relationship based on the property we defined, and anything else will be routed to the "unmatched" relationship. We can auto-terminate the "unmatched" relationship to have the non-matching FlowFiles discarded.

MergeContent

config-merge.png

In this example we could send the JSON documents from RouteOnAttribute directly to Solr, but in most real world scenarios accessing the Solr cluster will require network communication, and it will likely produce better performance if we batch together multiple JSON documents into a single request to reduce the amount requests sent over the network. The MergeContent processor is a powerful processor that was built just for this scenario in mind, and is capable of merging FlowFiles based on a number of criteria such as the number of FlowFiles, or their age. MergeContent also performs merges in a streaming manner and as a result is capable of merging a significant number of FlowFiles without worrying about exceeding memory constraints.

To configure MergeContent, set a Minimum and Maximum Number of entries, as well as a Max Bin Age to trigger a merge in cases where no new data has come in for a period of time. Also set the Delimiter Strategy to "Text", and specify the Header, Footer, and Demarcator as [ , ] respectively. This allows us to create a large JSON document composed of many incoming documents.

PutSolrContentStream

config-putsolr.png

Configure PutSolrContentStream to point to the Solr instance we started earlier by setting the Solr Type to "Cloud" and the Solr Location to the embedded ZooKeeper that was started earlier (localhost:9983). Also specify a Commit Within of "1000" to commit the incoming documents every second (this may not be needed if you set autoCommit settings in your solrconfig.xml).

Any user-defined properties will get sent to Solr on the request as key value pairs, so we need to tell Solr how to transform the incoming JSON document into a Solr document. A good explanation of how this JSON to Solr mapping works can be found here, and an explanation of the user-defined properties can be found here.

For this example we will provide the following mappings:

  • split=/ to treat each child JSON document that we merged together as individual Solr documents
  • id:/id to map the id field of the JSON to the id field in the Solr schema (already defined)
  • twitter_text_t:/text to map the text field of the JSON to a dynamic field of type text_general in Solr
  • twitter_username_s:/user/name to map the user of the tweet to a dynamic string field in Solr
  • twitter_created_at_s:/created_at to map the creation date string to a dynamic string field in Solr
  • twitter_timestamp_ms_tl:/timestamp_ms to map the timestamp to a dynamic trie-long field in Solr

The naming conventions of the fields allow us to leverage dynamic fields in Solr. Dynamic fields take effect by using a suffix on the field name to indicate the field type, so the "_t" on "twitter_text_t" tells Solr that this field will map to a text_general field type. It is possible to not provide any field mappings and just send the JSON to Solr, and let Solr guess the field types on the fly. However, in that case all of the fields would get added as multiValued fields, and multiValued fields can't be used in a sorting clause which would prevent us from sorting on the timestamp. So we opt for the dynamic fields here.

Also worth noting, this processor has two failure relationships. The regular "failure" relationship is for failures that would generally require some intervention to resolve, such as an invalid document being submitted. The "connection_failure" relationship is for failures related to communication issues between NiFi and Solr. A typical approach in a production scenario would be to route the "connection_failure" back to itself to retry during connection failures, and to route the "failure" relationship to a PutFile processor that could write out failed documents for later inspection.

Summary

At this point you should be able to hit Start on your processors and start seeing tweets flowing to Solr, happy indexing!

A template of the flow created in this example can be found here. We would love to hear any questions, comments, or feedback that you may have!

Learn more about Apache NiFi and feel free to leave comments here or e-mail us at dev@nifi.apache.org.

Thursday Mar 19, 2015

Stream Processing: NiFi and Spark

Stream Processing: NiFi and Spark

Mark Payne -  markap14@hotmail.com


Without doubt, Apache Spark has become wildly popular for processing large quantities of data. One of the key features that Spark provides is the ability to process data in either a batch processing mode or a streaming mode with very little change to your code. Batch processing is typically performed by reading data from HDFS. There have been a few different articles posted about using Apache NiFi (incubating) to publish data HDFS. This article does a great job of explaining how to accomplish this.

In many contexts, though, operating on the data as soon as it is available can provide great benefits. In order to provide the right data as quickly as possible, NiFi has created a Spark Receiver, available in the 0.0.2 release of Apache NiFi. This post will examine how we can write a simple Spark application to process data from NiFi and how we can configure NiFi to expose the data to Spark.

Incorporating the Apache NiFi Receiver into your Spark application is pretty easy. First, you'll need to add the Receiver to your application's POM:

  <dependency>
    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-spark-receiver</artifactId>
    <version>0.0.2-incubating</version>
  </dependency>

That's all that is needed in order to be able to use the NiFi Receiver. So now we'll look at how to use the Receiver in your code.

The NiFi Receiver is a Reliable Java Receiver. This means that if we lose a node after it pulls the data from NiFi, the data will not be lost. Instead, another node will simply pull and process the data. In order to create a NiFi Receiver, we need to first create a configuration that tells the Receiver where to pull the data from. The simplest form is to just tell the config where NiFi is running and which Port to pull the data from:

SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
  .url("http://localhost:8080/nifi")
  .portName("Data For Spark")
  .buildConfig();

To briefly explain the terminology here, NiFi refers to its mechanism for transferring data between clusters or instances as Site-to-Site. It exposes options for pushing data or pulling, so that the most appropriate approach can be used for each situation. Spark doesn't supply a mechanism to have data pushed to it - instead, it wants to pull data from other sources. In NiFi, this data can be exposed in such a way that a receiver can pull from it by adding an Output Port to the root process group. For Spark, we will use this same mechanism - we will use this Site-to-Site protocol to pull data from NiFi's Output Ports. In order for this to work, we need two pieces of information: the URL to connect to NiFi and the name of the Output Port to pull data from.

If the NiFi instance to connect to is clustered, the URL should be that of the NiFi Cluster Manager. In this case, the Receiver will automatically contact the Cluster Manager to determine which nodes are in the cluster and will automatically start pulling data from all nodes. The Receiver automatically determines by communicating with the Cluster Manager which nodes have the most data backed up and will pull from those nodes more heavily than the others. This information is automatically updated periodically so that as new nodes are added to the cluster or nodes leave the cluster, or if the nodes become more or less bogged down, the Receiver will automatically adjust to handle this.

Next, we need to instruct the Receiver which Port to pull data from. Since NiFi can have many different Output Ports, we need to provide either a Port Identifier or a Port Name. If desired, we can also configure communications timeouts; SSL information for secure data transfer, authentication, and authorization; compression; and preferred batch sizes. See the JavaDocs for the SiteToSiteClient.Builder for more information.

Once we have constructed this configuration object, we can now create the Receiver:

 SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example");
 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L));
 
 // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from 
 // specified Port
 JavaReceiverInputDStream packetStream = 
     ssc.receiverStream(new NiFiReceiver(config, StorageLevel.MEMORY_ONLY()));

This gives us a JavaReceiverInputDStream of type NiFiDataPacket. The NiFiDataPacket is a simple construct that packages an arbitrary byte array with a map of Key/Value pairs (referred to as attributes) that correspond to the data. As an example, we can process the data without paying attention to the attributes:

 // Map the data from NiFi to text, ignoring the attributes
 JavaDStream text = packetStream.map(new Function() {
   public String call(final NiFiDataPacket dataPacket) throws Exception {
     return new String(dataPacket.getContent(), StandardCharsets.UTF_8);
   }
 });

Or we can make use of the attributes:

 // Extract the 'uuid' attribute
 JavaDStream text = packetStream.map(new Function() {
   public String call(final NiFiDataPacket dataPacket) throws Exception {
     return dataPacket.getAttributes().get("uuid");
   }
 });

So now we have our Receiver ready to pull data from NiFi. Let's look at how we can configure NiFi to expose this data.

First, NiFi has to be configured to allow site-to-site communication. This is accomplished by setting the nifi.remote.input.socket.port property in the nifi.properties file to the desired port to use for site-to-site (if this value is changed, it will require a restart of NiFi for the changes to take effect).

Now that NiFi is setup to allow site-to-site, we will build a simple flow to feed data to Spark. We will start by adding two GetFile processors to the flow. One will pick up from /data/in/notifications and the other will pick up from /data/in/analysis:

GetFile

Next, let's assume that we want to assign different priorities to the data that is picked up from each directory. Let's assign a priority of "1" (the highest priority) to data coming from the analysis directory and assign a lower priority to the data from the notifications directory. We can use the UpdateAttribute processor to do this. Drag the Processor onto the graph and configure it. In the Settings tab, we set the name to "Set High Priority". In the Properties tab, we have no properties available. However, there's a button in the top-right of the dialog that says "New Property." Clicking that button lets us add a property with the name priority and a value of 1. When we click OK, we see it added to the table:

Set Priority

Let's click Apply and add another UpdateAttribute processor for the data coming from the notifications directory. Here, we will add a property with the name priority but give it a value of 2. After configuring these processor and connecting the GetFile processors to them, we end up with a graph that looks like this:

Connect GetFile and UpdateAttribute

Now, we want to combine all of the data into a single queue so that we can prioritize it before sending the data to Spark. We do this by adding a Funnel to the graph and connecting both of the UpdateAttribute processors to the Funnel:

With Funnel

Now we can add an Output Port that our Spark Streaming application can pull from. We drag an Output Port onto our graph. When prompted for a name, we will name it Data For Spark, as this is the name that we gave to our Spark Streaming application. Once we connect the Funnel to the Output Port, we have a graph like this:

Before Running

We haven't actually told NiFi to prioritize the data yet, though. We've simply added an attribute named priority. To prioritize the data based on that, we can right-click on the connection that feeds the Output Port and choose Configure. From the Settings tab, we can drag the PriorityAttributePrioritizer from the list of Available Prioritizers to the list of Selected Prioritizers:

Prioritize

Once we click Apply, we're done. We can start all of the components and we should see the data start flowing to our Spark Streaming application:

Running

Now any data that appears in our /data/in/notifications or /data/in/analysis directory will make its way to our streaming application!

By using NiFi's Output Port mechanism, we are able to create any number of different named Output Ports, as well. This allows you, as a NiFi user, to choose exactly which data gets exposed to Spark. Additionally, if NiFi is configured to be secure, each Output Port can be configured to provide the data to only the hosts and users that are authorized.

Let's consider, though, that this data has significant value for processing in a streaming fashion as soon as we have the data, but it may also be of value to a batch processing analytic. This is easy to handle, as well. We can add a MergeContent processor to our graph and configure it to merge data into 64-128 MB TAR files. Then, when we have a full TAR file, we can push the data to HDFS. We can configure MergeContent to make these bundles like so:

Merge configuration

We can then send the merged files to PutHDFS and auto-terminate the originals. We can feed all the data from our Funnel to this MergeContent processor, and this will allow us to dual-route all data to both HDFS (bundled into 64-128 MB TAR files) and to Spark Streaming (making the data available as soon as possible with very low latency):

hdfs and spark streaming

We would love to hear any questions, comments, or feedback that you may have!

Learn more about Apache NiFi and feel free to leave comments here or e-mail us at dev@nifi.incubator.apache.org.

Monday Feb 02, 2015

Say Good-Bye to Canned Data

Say Good-Bye to Canned Data

Mark Payne -  markap14@hotmail.com


We've all been there. After months of development and exhaustive testing, our killer new web service (or app or analytic or what have you) is ready for production! We've tested it with all of the random data that we've mocked up, including all of the samples that we've concocted to ensure that it handles every bad input that we can imagine. We've handled it all well, so it's time to deploy to production. So we do. And all is great!

Until an hour later, when our logs start filling with errors because, despite all of our due diligence in testing, we never could have envisioned getting that as input. And now we're in a mad frenzy to fix our problem, because we're now responsible for all of the errors that are happening in production.

If there's one thing that I've learned in my years as a software developer, it's that no matter how diligent we are in testing our code, we get data in production that we just haven't accounted for.

So what can we do about it? Test with live production data!

Now I'm not suggesting that we skip the testing phase all together and go straight into production - quite the opposite really. I'm just suggesting that we test "smarter, not harder." One of the benefits of Apache NiFi (incubating) is that it allows us to have real-time command and control of our data. It allows us to change our dataflow in just a few minutes to send multiple copies of our data to anywhere we want, while providing different reliability guarantees and qualities of service to different parts of our flow. So let's look at how we might accomplish this.

Let's assume that we typically get our feed of data to our web service from NiFi in a flow that looks something like this:

Original Flow

Now let's say that we want to send a duplicate copy of this data feed to our "staging" environment that is running the new version of our web service - or a new web service all together. We can simply copy and paste the InvokeHTTP processor that we're using to send the data to our production instance (select the processor and press Ctrl-C, Ctrl-V to copy and paste) and then right-click on the new one and choose "Configure..." In the Properties tab, we will change the URL to point to our staging instance. Now, all we have to do is draw a second connection from the preceding processor to our newly created InvokeHTTP. We will give it the same relationship that feeds the production instance - "splits." And now any data that goes to our production instance will also be sent to our staging environment:

Flow with second InvokeHTTP

Of course, since we know this is a staging environment, it may not be as powerful as the production instance and it may not handle the entire stream of live data. What we really would like to do is send only about 20% of our data to our staging environment. So how can we accomplish that? We can easily insert a DistributeLoad Processor just ahead of our new InvokeHTTP processor, like so:

Flow with DistributeLoad added in

We can now configure the DistributeLoad Processor to have two relationships. We want 80% of the load to go to relationship "1" and 20% to go to relationship "2." We can accomplish this by adding two user-defined properties. We right-click on DistributeLoad and choose "Configure..." In the Properties tab, click the icon to add the first property. We give it the name "1" and a value of 80. Then we click OK and add another property. This time, we give the property the name "2" and a value of "20":

Configure DistributeLoad

Now, all we have to do is go to the Settings Tab and choose to Auto-Terminate Relationship 1. This will throw away 80% percent of our data. (Not to worry, we don't actually make copies of this data just to throw away 80% of it. The actual work required to "clone" a FlowFile is very small, as it doesn't actually copy any of the content but rather just creates a new pointer to the content.) Now we add a Connection from DistributeLoad to InvokeHTTP and use Relationship "2." Start the Processors, and we've now got 20% of the data being pushed to our staging environment:

20% of data going to staging area

Now, we have just one more concern that we need to think about. Since we're sending data to our staging area, which may go down pretty often, as we are debugging and testing things, won't the data backup in NiFi on our production dataflow? At what point is this going to cause a problem?

This is where my former comment about NiFi offering "different Quality of Service guarantees to different parts of the flow" comes in. For this endpoint, we just want to try to send the data and if it can't handle the data, we don't want to risk causing issues in our production environment. To ensure that this happens, we right-click on the connection that feeds the new InvokeHTTP processor and click "Configure..." (You'll have to first stop the connection's source and destination processors in order to modify it). In the settings tab here, we have an option for "File expiration." The default is "0 sec," which means that the data will not age off. Let's change this value to 3 minutes.

Now, when we click Apply, we can see that the Connection's label has a small "clock" icon on it, indicating that the connection has an expiration set.

Age off after 3 minutes

Any FlowFile in the connection that becomes more than 3 minutes old will automatically be deleted. This means that we will buffer up to three minutes worth of data in our production instance to send to staging environment but no more. We still will not expire any data that is waiting to go to the production instance. Because of this capability, we can extend our example a bit to perform load testing as well. While in this example we decided that we only wanted to send 20% of our data to the staging environment, we could easily remove the DistributeLoad processor all together. In this way, we will send 100% of our production data to the staging environment, as long as it is able to handle the data rate. However, if it falls behind, it won't hurt our production servers because they'll destroy any data more than 3 minutes old. If concerns were to arise, we can disable this in a matter of seconds: simply stop the DistributeLoad processor and the SplitText processor feeding it, remove the connection between them, and restart the SplitText processor.

As always, I'd love to hear feedback in the Comments section about how we could improve, or how you've solved similar problems.

Monday Jan 12, 2015

Apache NiFi: Thinking Differently About Dataflow

Apache NiFi: Thinking Differently About DataFlow

Mark Payne -  markap14@hotmail.com


Recently a question was posed to the Apache NiFi (Incubating) Developer Mailing List about how best to use Apache NiFi to perform Extract, Transform, Load (ETL) types of tasks. The question was "Is it possible to have NiFi service setup and running and allow for multiple dataflows to be designed and deployed (running) at the same time?"

The idea here was to create several disparate dataflows that run alongside one another in parallel. Data comes from Source X and it's processed this way. That's one dataflow. Other data comes from Source Y and it's processed this way. That's a second dataflow entirely. Typically, this is how we think about dataflow when we design it with an ETL tool. And this is a pretty common question for new NiFi users. With NiFi, though, we tend to think about designing dataflows a little bit differently. Rather than having several disparate, "stovepiped" flows, the preferred approach with NiFi is to have several inputs feed into the same dataflow. Data can then be easily routed (via RouteOnAttribute, for example) to "one-off subflows" if need be.

One of the benefits to having several disparate dataflows, though, is that it makes it much easier to answer when someone comes to you and says "I sent you a file last week. What did you do with it?" or "How do you process data that comes from this source?" You may not know exactly what happened to a specific file that they sent you, step-by-step, because of the different decision points in the flow, but at least you have a good idea by looking at the layout of the dataflow.

So we can avoid merging the data if we would like. For the sake of an example, let's assume that we have 4 different data sources. For each of them, they are going to send us some text data that needs to be pushed into HDFS. Maybe it's compressed, maybe it's not. So your flow will look like this:

Disparate Flows

Now, let's say that that you've got a new requirement. When you're sending text data to HDFS, each file that is pushed to HDFS needs to have 1,000 lines of text or less (yes, that's a contrived example and that's probably never a good idea, but the point is valid.) Now, consider how much work it is to make all of those modifications. And let's hope that you don't miss any!

If we continue down this path, this can get hairy quickly, as we have several different dataflows side-by-side on the same graph. In order to aid in the visual representation, we can use Process Groups to provide a nice logical separation. If we do that for each of those, we end up with something like:

Grouped Disparate Flows

We can then double-click each of those Process Groups and see/edit what's inside. But we still have the issue of having to change 4 different flows to make the change mentioned.

So let us consider the alternate approach of merging it all into a single dataflow, and we end up with a flow like this:

Merged Flow

Now, we have all of the data going to a single stream. If we want to update it, we just insert one new Processor:

Updated Merged Flow

And we're done. We don't have to make this change to insert a SplitText processor 3 more times.

The concern that we have here, though, as mentioned earlier, is that if all of the data is mixed together, as the dataflow grows larger, how do we know what happens to data that came from Source X, for example?

This is where the Data Provenance feature comes in. In the top right-hand corner there's a toolbar with 8 icons. The 4th one is the Provenance icon (Provenance Icon). If we click on that, we can then search for data that has been processed. For this example, let's simply searched for RECEIVE events.

This shows us all of the RECEIVE events that this instance of NiFi has seen within the time range searched:

Provenance Search Results

If we click the Lineage Graph icon () on the right, for the first file, we see exactly what happened to this piece of data:

Data Lineage

We see that a RECEIVE event occurred, and that generated a FlowFile. That FlowFile's attributes were then modified, its content was modified, and then the FlowFile was forked, and dropped. At this point, we can right-click on each of these event nodes and choose to view the details of the event. For the RECEIVE event, we see:

Receive Event

From here, we can see that the RECEIVE event took place at 16:55:51 EST on 01/11/2015. The component that reported the event was named "Data From Source X" and was a GetFile Processor. We can also see that the URI of the data was file:/C:/temp/in/debug/LICENSE.gz. If we follow the lineage of that FlowFile, we see that the next event is an ATTRIBUTES_MODIFIED Event:

Attrs Modified Event

Since the event is of type ATTRIBUTES_MODIFIED, it immediately begs the question "What attributes were modified?" So clicking the "Attributes" tab shows us this:

Attrs Modified Event

As with any Provenance Event, we can see all of the attributes that were present on the FlowFile when the event occurred. Of interest here, we can see that the value of the "mime.type" attribute was changed from "No value set" (the attribute didn't exist) to "application/gzip". The next event in our lineage is a CONTENT_MODIFIED Event. If we view the details here, we will see:

Content Modified Event

Here, we can see that the content was modified by a Processor named Decompress. This makes sense, since the previous Event showed us that the MIME Type was "application/gzip". After decompressing the data, we arrive at a FORK Event:

Fork Event

This event shows us that the FORK happened by the SplitText Processor. That is, the SplitText Processor broke a large FlowFile into many smaller FlowFiles. On the right-hand side of this dialog, we see that the file was broken into six different "child" FlowFiles. This is where things get fun! If we then close this dialog, we can right-click on the FORK Event and choose the "Expand" option. This will then pull back the lineage for each of those children, providing us with a more holistic view of what happened to this piece of data:

Expanded Lineage

Now, we can see that each of those six children was sent somewhere. Viewing the details of these events shows us where they were sent:

Send Event

The first file, for instance, was sent via the PutHDFS Processor with the filename "/nifi/blogs/thinking-differently/LICENSE.gz". This occurred at 16:55:53 EST on 01/11/2015. We can also see in this dialog the "Lineage Duration" was "00:00:02.712" or 2.712 seconds. The "Lineage Duration" field tells us how long elapsed between the time when the original source data was received and the time at which this event occurred.

Finally, we have the DROP event. The DROP event signifies the end of line for a FlowFile. If we look at the details of this event, we see:

Drop Event

Of note here, we see that the DROP event was emitted by PutHDFS. That is, PutHDFS was the last component in NiFi to process this piece of information. We can also see in the "Details" field why the FlowFile was dropped: it was Auto-terminated by the "success" relationship.

NiFi's Data Provenance capability allows us to understand exactly what happens to each piece of data that is received. We are given a directed graph that shows when a FlowFile was received, when it was modified, when it was routed in a particular way, and when and where it was sent - as well as which component performed the action. We are also able to see, for each event, the attributes (or metadata) associated with the data so that we can understand why the particular event occurred. Additionally, when many pieces of data are merged together or a single piece of data is split apart, we are able to understand fully the provenance of this data from the that it was received until the time at which it exited the flow. This makes it very easy to answer the question "I sent you a file last week. What did you do with it?" while providing a much more holistic view of the enterprise dataflow than would be available if we used many disparate flows.

Hopefully this post helps you to understand not only the way that we like to setup the flows with NiFi but also the benefits that we have as a result and the features that allow us to overcome any challenges that this approach may create.

Saturday Jan 10, 2015

Integrating Apache NiFi with Kafka

Integrating Apache NiFi with Apache Kafka

Mark Payne -  markap14@hotmail.com


A couple of weeks ago, Joey Echeverria wrote a fantastic blog post about how to get started with Apache NiFi (Incubating), available at http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/ . In it, Joey outlines how to quickly build a simple dataflow that automatically picks up any data from the /dropbox directory on your computer and pushes the data to HDFS. He then goes on to use the ExecuteStream Processor to make use of the Kite SDK’s ability to push CSV data into Hadoop.

In this blog post, we will expand upon Joey’s example to build a dataflow that’s a bit more complicated and illustrate a few additional features of Apache NiFi. If you haven’t already read Joey’s blog, we would recommend you read it before following along here, as this post assumes that you have a very basic understanding of NiFi, such as what NiFi is and how to add a Processor to the graph.

We will continue with the MovieLens dataset, this time using the "MovieLens 10M" dataset, which contains "10 million ratings and 100,000 tag applications applied to 10,000 movies by 72,000 users." However, rather than downloading this dataset and placing the data that we care about in the /dropbox directory, we will use NiFi to pull the data directly from the MovieLens site.

To do this, we first drag a Processor onto the graph. When we do this, we are given the option of choosing many different types of Processors. We know that we want to retrieve data from an HTTP address, so we will search for Processors of interest by choosing the "by tag" option for the filter and typing "http":

httpprocessors.png

This narrows the list of Processors to just a few. We can quickly see the GetHTTP Processor. Clicking on the Processor shows a description of the Processor just below: "Fetches a file via HTTP." This sounds like a good fit, so we click the "Add" button to add it to the graph. Next, we need to configure the Processor to pull the data that we are interested in. Right-clicking the Processor and clicking the "Configure" menu option provides us a dialog with a "Properties" tab. Here, we can enter the URL to fetch the data from (http://files.grouplens.org/datasets/movielens/ml-10m.zip) and enter the filename to use when downloading it:

configuregethttp.png

We can leave the other properties as they are for the sake of this blog post. In a real environment, you may have an interest in changing timeout values, providing authentication, etc. We do, however, want to look at the "Scheduling" tab. By default, Processors are scheduled to run as fast as they can. While this is great for most high-volume dataflows, we wouldn't be very good clients if we continually hit the website nonstop as fast as we can - and I doubt that those responsible for the site would appreciate it very much. In order to avoid this, we go to the "Scheduling" tab and change the "Run Schedule" setting from "0 sec" to something more reasonable - say "10 mins". This way, we will check for new data from the website every 10 minutes. If the E-Tag changes or the Last Modified date changes, then we will pull the data again. Otherwise, we won't continue to pull the data.

Next, we know that the data is in ZIP format, based on the URL. We will want to unzip the data to interact with each file individually. Again, we can drag a Processor to the graph and filter by the tag "zip." We see that the UnpackContent processor can handle ZIP files. We add it to the graph and configure its Properties as we did for GetHTTP. This time, there is only one property: Packing Format. We change it to "zip" and click "Apply."

Next, we need to send the data from GetHTTP to UnpackContent. We drag a Connection between the Processors and include the "success" relationship, as that is the only relationship defined by GetHTTP. Now that we've unzipped the data, the README tells us that the actual data included in the zip is in 3 files: movies.dat, ratings.dat, and tags.dat. We don't really care about the rest of the data in the zip file, so we will use a RouteOnAttribute to throw it out. We'll configure our RouteOnAttribute with the following properties:

RouteOnAttributeProperties

We will connect the "success" relationsihp of UnpackContent to RouteOnAttribute. We don't care about the original FlowFile - we only want the unpacked data. And if the Unpack fails, then there's not much we can do, so we will configure UnpackContent and in the Settings tab choose to Auto-terminate the "original" and "failure" relationships. Likewise, we don't care about anything routed to the "unmatched" relationship on the RouteOnAttribute, so we will Auto-terminate that relationship as well.

Now, we can just decide what to do with the "movies", "ratings", and "tags" relationships. For the sake of this post, let's go ahead and push all of this data to HDFS with a PutHDFS Processor. We add a PutHDFS Processor and configure it as Joey's blog instructs. We then drag a Connection from RouteOnAttribute to PutHDFS and choose all three of these relationships.

For PutHDFS, once we have successfully sent the data, there is nothing else to do, so we Auto-terminate the "success" relationship. However, if there's a problem sending the data, we don't want to lose this data. We can then connect the PutHDFS' "failure" relationship back to the PutHDFS Processor. Now, if we fail to send the data we will keep retrying until we are successful.

Now, we're ready to send some data to Kafka. Let's assume that we want only the data in the `movies.dat` file to go to Kafka. We can drag a Processor onto the graph and filter by the tag "kafka." We see two Processors: GetKafka and PutKafka. Since we want to send the data, we will use PutKafka. To send the movies data, we simply draw a Connection from the RouteOnAttribute Processor to PutKafka and choose only the "movies" relationship. NiFi will take care of cloning the FlowFile in a way that's very efficient so that no data is actually copied.

We configure PutKafka by Auto-terminating "success" and looping "failure" back to itself as with PutHDFS. Now let's look at the Properties tab. We have to choose a Kafka Topic to send the data to and a list of 1 or more Kafka servers to send to. We'll set the Known Brokers to "localhost:9092" (assuming this is running on the same box as Kafka) and set the Kafka Topic to "movies". Since the data is a CSV file, we know that it is new-line delimited. By default, NiFi will send the entire contents of a FlowFile to Kafka as a single message. However, we want each line in our CSV file to be a new message on the Kafka Topic. We accomplish this by setting the "Message Delimiter" property to "\n". At this point, our flow should look something like this:

NiFi Kafka integration

And now we're ready to start our flow!

We can press Ctrl+A to select all and then click the Start button in the middle toolbar to start everything running. While the flow is running, we can right-click on the canvas and select "Refresh Status" to update all of the stats shown on the Processors. Then we will see in the top-right-hand corner of GetHTTP a "1" indicating that there is currently a single task running for this Processor. Once this task is finished, the data will be routed to the UnpackContent Processor, then the RouteOnAttribute Processor. At this point, some of the data will be destroyed because RouteOnAttribute will route it to "unmatched." The rest will go to the PutHDFS Processor, while only the "movies.dat" file will be sent to the PutKafka Processor. If we keep clicking "Refresh Status" as the data is processing, we will see each of these steps happening.

We've now successfully setup a dataflow with Apache NiFi that pulls the largest of the available MovieLens datasets, unpacks the zipped contents, grooms the unwanted data, routes all of the pertinent data to HDFS, and finally sends a subset of this data to Apache Kafka.

In Part Two of this series, we will look at how we can consume data from Kafka using NiFi, as well as how we can see what data we've pulled and what we've done with that data. Until then, please feel free to leave any questions, comments, or feedback in the Comments section.