Apache NiFi

Thursday Mar 19, 2015

Stream Processing: NiFi and Spark

Stream Processing: NiFi and Spark

Mark Payne -  markap14@hotmail.com


Without doubt, Apache Spark has become wildly popular for processing large quantities of data. One of the key features that Spark provides is the ability to process data in either a batch processing mode or a streaming mode with very little change to your code. Batch processing is typically performed by reading data from HDFS. There have been a few different articles posted about using Apache NiFi (incubating) to publish data HDFS. This article does a great job of explaining how to accomplish this.

In many contexts, though, operating on the data as soon as it is available can provide great benefits. In order to provide the right data as quickly as possible, NiFi has created a Spark Receiver, available in the 0.0.2 release of Apache NiFi. This post will examine how we can write a simple Spark application to process data from NiFi and how we can configure NiFi to expose the data to Spark.

Incorporating the Apache NiFi Receiver into your Spark application is pretty easy. First, you'll need to add the Receiver to your application's POM:

  <dependency>
    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-spark-receiver</artifactId>
    <version>0.0.2-incubating</version>
  </dependency>

That's all that is needed in order to be able to use the NiFi Receiver. So now we'll look at how to use the Receiver in your code.

The NiFi Receiver is a Reliable Java Receiver. This means that if we lose a node after it pulls the data from NiFi, the data will not be lost. Instead, another node will simply pull and process the data. In order to create a NiFi Receiver, we need to first create a configuration that tells the Receiver where to pull the data from. The simplest form is to just tell the config where NiFi is running and which Port to pull the data from:

SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
  .url("http://localhost:8080/nifi")
  .portName("Data For Spark")
  .buildConfig();

To briefly explain the terminology here, NiFi refers to its mechanism for transferring data between clusters or instances as Site-to-Site. It exposes options for pushing data or pulling, so that the most appropriate approach can be used for each situation. Spark doesn't supply a mechanism to have data pushed to it - instead, it wants to pull data from other sources. In NiFi, this data can be exposed in such a way that a receiver can pull from it by adding an Output Port to the root process group. For Spark, we will use this same mechanism - we will use this Site-to-Site protocol to pull data from NiFi's Output Ports. In order for this to work, we need two pieces of information: the URL to connect to NiFi and the name of the Output Port to pull data from.

If the NiFi instance to connect to is clustered, the URL should be that of the NiFi Cluster Manager. In this case, the Receiver will automatically contact the Cluster Manager to determine which nodes are in the cluster and will automatically start pulling data from all nodes. The Receiver automatically determines by communicating with the Cluster Manager which nodes have the most data backed up and will pull from those nodes more heavily than the others. This information is automatically updated periodically so that as new nodes are added to the cluster or nodes leave the cluster, or if the nodes become more or less bogged down, the Receiver will automatically adjust to handle this.

Next, we need to instruct the Receiver which Port to pull data from. Since NiFi can have many different Output Ports, we need to provide either a Port Identifier or a Port Name. If desired, we can also configure communications timeouts; SSL information for secure data transfer, authentication, and authorization; compression; and preferred batch sizes. See the JavaDocs for the SiteToSiteClient.Builder for more information.

Once we have constructed this configuration object, we can now create the Receiver:

 SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example");
 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L));
 
 // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from 
 // specified Port
 JavaReceiverInputDStream packetStream = 
     ssc.receiverStream(new NiFiReceiver(config, StorageLevel.MEMORY_ONLY()));

This gives us a JavaReceiverInputDStream of type NiFiDataPacket. The NiFiDataPacket is a simple construct that packages an arbitrary byte array with a map of Key/Value pairs (referred to as attributes) that correspond to the data. As an example, we can process the data without paying attention to the attributes:

 // Map the data from NiFi to text, ignoring the attributes
 JavaDStream text = packetStream.map(new Function() {
   public String call(final NiFiDataPacket dataPacket) throws Exception {
     return new String(dataPacket.getContent(), StandardCharsets.UTF_8);
   }
 });

Or we can make use of the attributes:

 // Extract the 'uuid' attribute
 JavaDStream text = packetStream.map(new Function() {
   public String call(final NiFiDataPacket dataPacket) throws Exception {
     return dataPacket.getAttributes().get("uuid");
   }
 });

So now we have our Receiver ready to pull data from NiFi. Let's look at how we can configure NiFi to expose this data.

First, NiFi has to be configured to allow site-to-site communication. This is accomplished by setting the nifi.remote.input.socket.port property in the nifi.properties file to the desired port to use for site-to-site (if this value is changed, it will require a restart of NiFi for the changes to take effect).

Now that NiFi is setup to allow site-to-site, we will build a simple flow to feed data to Spark. We will start by adding two GetFile processors to the flow. One will pick up from /data/in/notifications and the other will pick up from /data/in/analysis:

GetFile

Next, let's assume that we want to assign different priorities to the data that is picked up from each directory. Let's assign a priority of "1" (the highest priority) to data coming from the analysis directory and assign a lower priority to the data from the notifications directory. We can use the UpdateAttribute processor to do this. Drag the Processor onto the graph and configure it. In the Settings tab, we set the name to "Set High Priority". In the Properties tab, we have no properties available. However, there's a button in the top-right of the dialog that says "New Property." Clicking that button lets us add a property with the name priority and a value of 1. When we click OK, we see it added to the table:

Set Priority

Let's click Apply and add another UpdateAttribute processor for the data coming from the notifications directory. Here, we will add a property with the name priority but give it a value of 2. After configuring these processor and connecting the GetFile processors to them, we end up with a graph that looks like this:

Connect GetFile and UpdateAttribute

Now, we want to combine all of the data into a single queue so that we can prioritize it before sending the data to Spark. We do this by adding a Funnel to the graph and connecting both of the UpdateAttribute processors to the Funnel:

With Funnel

Now we can add an Output Port that our Spark Streaming application can pull from. We drag an Output Port onto our graph. When prompted for a name, we will name it Data For Spark, as this is the name that we gave to our Spark Streaming application. Once we connect the Funnel to the Output Port, we have a graph like this:

Before Running

We haven't actually told NiFi to prioritize the data yet, though. We've simply added an attribute named priority. To prioritize the data based on that, we can right-click on the connection that feeds the Output Port and choose Configure. From the Settings tab, we can drag the PriorityAttributePrioritizer from the list of Available Prioritizers to the list of Selected Prioritizers:

Prioritize

Once we click Apply, we're done. We can start all of the components and we should see the data start flowing to our Spark Streaming application:

Running

Now any data that appears in our /data/in/notifications or /data/in/analysis directory will make its way to our streaming application!

By using NiFi's Output Port mechanism, we are able to create any number of different named Output Ports, as well. This allows you, as a NiFi user, to choose exactly which data gets exposed to Spark. Additionally, if NiFi is configured to be secure, each Output Port can be configured to provide the data to only the hosts and users that are authorized.

Let's consider, though, that this data has significant value for processing in a streaming fashion as soon as we have the data, but it may also be of value to a batch processing analytic. This is easy to handle, as well. We can add a MergeContent processor to our graph and configure it to merge data into 64-128 MB TAR files. Then, when we have a full TAR file, we can push the data to HDFS. We can configure MergeContent to make these bundles like so:

Merge configuration

We can then send the merged files to PutHDFS and auto-terminate the originals. We can feed all the data from our Funnel to this MergeContent processor, and this will allow us to dual-route all data to both HDFS (bundled into 64-128 MB TAR files) and to Spark Streaming (making the data available as soon as possible with very low latency):

hdfs and spark streaming

We would love to hear any questions, comments, or feedback that you may have!

Learn more about Apache NiFi and feel free to leave comments here or e-mail us at dev@nifi.incubator.apache.org.

 

Comments:

Hi Mark, Thanks for the post. I think there is a minor syntax error in the SiteToSiteClientBuilder piece. In the 0.0.2 code, the calls are url() and portName(). No sets. Thanks again. J.H.

Posted by JHowix on April 28, 2015 at 10:27 PM GMT #

J.H.: Thanks, you're right. I fixed the typo. I did some refactoring of the API a few times before it was released. Hopefully I caught any other change that I may have made :) Sorry it took so long to reply - email got lost in the shuffle. Good catch, though! I appreciate the heads-up.

Posted by Mark on May 15, 2015 at 01:30 AM GMT #

Hello. I've managed to use the receiver in Spark scala, but I wonder about reliability. If I enable the WAL in Spark, and with the nifi receiver data acknowledgment, does the process will be Exactly-one? Thanks.

Posted by Aurélien on November 18, 2015 at 04:09 PM GMT #

Aurélien, This is a fairly common question. However, it is important to note that truly having exactly-one delivery is not possible (there is a good discussion of why that is true at http://bravenewgeek.com/you-cannot-have-exactly-once-delivery/). The "best" one can really do is to provide at-least-once processing. This is what NiFi provides. You are guaranteed to get the data exactly once. It is possible, however, that you can receive the data into Spark, commit the transaction in Spark, and then lose the connection to NiFi or have NiFi shut down before the acknowledgement is sent back. So there is a small chance that you could get duplicates. If you really need to receive the data exactly once, you would have to perform de-duplication on the Spark/receiver side. This approach has been discussed in several articles online that typically term it as "exactly-once" processing. It is not true exactly-once delivery but provides the same semantics. Unfortunately, though, those semantics cannot be provided by NiFi alone - the receive must perform the deduplication because only the receiver knows for sure what has been received. Thanks -Mark

Posted by Mark on November 23, 2015 at 06:34 PM GMT #

Aurélien, This is a fairly common question. However, it is important to note that truly having exactly-one delivery is not possible (there is a good discussion of why that is true at http://bravenewgeek.com/you-cannot-have-exactly-once-delivery/). The "best" one can really do from the producer side is to provide at-least-once processing. This is what NiFi provides. You are guaranteed to get the data, but it's possible that you can receive the data into Spark, commit the transaction in Spark, and then lose the connection to NiFi before the acknowledgement is sent back. So there is a slim chance that you could get duplicates. If you really need to receive the data exactly once, you would have to perform de-duplication on the Spark/receiver side. This approach has been discussed in several articles online that typically term it as "exactly-once" processing. It is not true exactly-once delivery but provides the same semantics. Thanks -Mark

Posted by Mark on November 23, 2015 at 06:54 PM GMT #

Post a Comment:
  • HTML Syntax: NOT allowed

Calendar

Search