Apache NiFi

Monday Sep 21, 2015

Indexing Tweets with NiFi and Solr

Indexing Tweets with NiFi and Solr

Bryan Bende -  bbende@gmail.com


This post will cover how to use Apache NiFi to pull in the public stream of tweets from the Twitter API, identify specific tweets of interest, and deliver those tweets to Solr for indexing. The example developed here was built against Apache NiFi 0.3.0 and Apache Solr 5.3.

In addition, you will also need to create a Twitter application for accessing their API. A good set of instructions for doing that can be found in this article - How to Create a Twitter App in 8 Easy Steps.

Setting up Solr

For this example we will start Solr in cloud mode, and create a tweets collection based off the data-driven configuration provided by Solr. The data-driven configuration allows Solr to create fields on the fly based off the incoming data, and is a good place to start for quickly prototyping. The following two commands can be used to get Solr running and create the collection:

./bin/solr start -c
./bin/solr create_collection -c tweets -d data_driven_schema_configs -shards 1 -replicationFactor 1

At this point we should have a running Solr instance on port 8983, with an embedded ZooKeeper on port 9983. Navigate to http://localhost:8983/solr/#/~cloud in your browser to verify Solr is running and the tweets collection was created sucessfully.

Building the Dataflow

The dataflow we are going build in NiFi will consist of the following processors:

  • GetTwitter for accessing the Twitter API and producing FlowFiles containing JSON Tweets
  • EvaluateJsonPath for extracting values from JSON documents into FlowFile attributes
  • RouteOnAttribute for making decisions about incoming FlowFiles based on their attributes
  • MergeContent for merging together many JSON documents to a single document
  • PutSolrContentStream for streaming JSON documents to Solr
The overall flow looks like the following:

nifi-twitter-solr-flow.png

Lets walk through the configuration of each processor...

GetTwitter

config-gettwitter.png

Set the end-point to the Sample Endpoint and fill in the credentials corresponding with the Twitter application you created earlier.

For this example we are going to specify a language of "en" to get only English tweets.

EvaluateJsonPath

config-extract-attributes.png

Now we want to extract the text and language of each tweet into FlowFile attributes in order to make decisions on these values later. Set the Destination property to "flowfile-attribute" and add two user-defined properties using the New Property icon on the top right. The name of the property will be the FlowFile attribute name, and the value is the JSON path we would like to extract from the Tweet. This will allow us to access these values later by using expression language, such as ${twitter.text} or ${twitter.lang}.

RouteOnAttribute

config-route-nonempty.png

At this point we would like to ensure we are only indexing tweets we are interested. So we add a user-defined property called "tweet" and specify the following expression language as the value:

${twitter.text:isEmpty():not():and(${twitter.lang:equals("en")})}

The first part of this expression filters out all the tweets which have an empty message. This occurs frequently as some of the JSON documents coming through the end-point are not actual tweets. The second part of the expression ensures that we are only selecting english tweets. We already set GetTwitter to filter on "en", but if we hadn't, the language could be used here to route different languages to different relationships. Any tweets matching the above conditions will be routed to the "tweet" relationship based on the property we defined, and anything else will be routed to the "unmatched" relationship. We can auto-terminate the "unmatched" relationship to have the non-matching FlowFiles discarded.

MergeContent

config-merge.png

In this example we could send the JSON documents from RouteOnAttribute directly to Solr, but in most real world scenarios accessing the Solr cluster will require network communication, and it will likely produce better performance if we batch together multiple JSON documents into a single request to reduce the amount requests sent over the network. The MergeContent processor is a powerful processor that was built just for this scenario in mind, and is capable of merging FlowFiles based on a number of criteria such as the number of FlowFiles, or their age. MergeContent also performs merges in a streaming manner and as a result is capable of merging a significant number of FlowFiles without worrying about exceeding memory constraints.

To configure MergeContent, set a Minimum and Maximum Number of entries, as well as a Max Bin Age to trigger a merge in cases where no new data has come in for a period of time. Also set the Delimiter Strategy to "Text", and specify the Header, Footer, and Demarcator as [ , ] respectively. This allows us to create a large JSON document composed of many incoming documents.

PutSolrContentStream

config-putsolr.png

Configure PutSolrContentStream to point to the Solr instance we started earlier by setting the Solr Type to "Cloud" and the Solr Location to the embedded ZooKeeper that was started earlier (localhost:9983). Also specify a Commit Within of "1000" to commit the incoming documents every second (this may not be needed if you set autoCommit settings in your solrconfig.xml).

Any user-defined properties will get sent to Solr on the request as key value pairs, so we need to tell Solr how to transform the incoming JSON document into a Solr document. A good explanation of how this JSON to Solr mapping works can be found here, and an explanation of the user-defined properties can be found here.

For this example we will provide the following mappings:

  • split=/ to treat each child JSON document that we merged together as individual Solr documents
  • id:/id to map the id field of the JSON to the id field in the Solr schema (already defined)
  • twitter_text_t:/text to map the text field of the JSON to a dynamic field of type text_general in Solr
  • twitter_username_s:/user/name to map the user of the tweet to a dynamic string field in Solr
  • twitter_created_at_s:/created_at to map the creation date string to a dynamic string field in Solr
  • twitter_timestamp_ms_tl:/timestamp_ms to map the timestamp to a dynamic trie-long field in Solr

The naming conventions of the fields allow us to leverage dynamic fields in Solr. Dynamic fields take effect by using a suffix on the field name to indicate the field type, so the "_t" on "twitter_text_t" tells Solr that this field will map to a text_general field type. It is possible to not provide any field mappings and just send the JSON to Solr, and let Solr guess the field types on the fly. However, in that case all of the fields would get added as multiValued fields, and multiValued fields can't be used in a sorting clause which would prevent us from sorting on the timestamp. So we opt for the dynamic fields here.

Also worth noting, this processor has two failure relationships. The regular "failure" relationship is for failures that would generally require some intervention to resolve, such as an invalid document being submitted. The "connection_failure" relationship is for failures related to communication issues between NiFi and Solr. A typical approach in a production scenario would be to route the "connection_failure" back to itself to retry during connection failures, and to route the "failure" relationship to a PutFile processor that could write out failed documents for later inspection.

Summary

At this point you should be able to hit Start on your processors and start seeing tweets flowing to Solr, happy indexing!

A template of the flow created in this example can be found here. We would love to hear any questions, comments, or feedback that you may have!

Learn more about Apache NiFi and feel free to leave comments here or e-mail us at dev@nifi.apache.org.

 

Comments:

Thanks for sharing ..good post. I started taking a look at niFi after hortonworks took it seriously. What kind projects are you using Nifi in ? Are you putting it to production ? Have you integrated with enterprise schedulers ? How to enable multiple developers work in parallel ?

Posted by kashiks on November 10, 2015 at 04:21 AM GMT #

Thank you for the feedback. NiFi is definitely a production ready application. Although the version number is fairly low (0.3.0), it was worked on for almost 9 years prior to becoming an open-source project, and chose to start with 0.0.1 when open-sourcing. As far as enterprise schedulers, if you are referring to things like YARN and MESOS, there hasn't been any integration thus far. NiFi is not always intended to be deployed in a core data-center where one of these schedulers would live. Many times NiFi would be deployed closer to the sources of data and ship data back to the central data center. In these cases NiFi can't rely on these kinds of schedulers. Currently, multiple developers can work in isolation on their own instances and share templates of their flows. Templates can then be deployed to other shared instances. There are some future improvements around this area described here: https://cwiki.apache.org/confluence/display/NIFI/Multi-Tentant+Dataflow https://cwiki.apache.org/confluence/display/NIFI/Configuration+Management+of+Flows https://cwiki.apache.org/confluence/display/NIFI/Extension+and+Template+Registry

Posted by Bryan on November 11, 2015 at 10:21 PM GMT #

Thank you for the feedback. NiFi is definitely a production ready application. Although the version number is fairly low (0.3.0), it was worked on for almost 9 years prior to becoming an open-source project, and chose to start with 0.0.1 when open-sourcing. So Apache NiFi version 0.0.1 was years of development :)

Posted by Bryan on November 11, 2015 at 10:22 PM GMT #

As far as enterprise schedulers, if you are referring to things like YARN and MESOS, there hasn't been any integration thus far. NiFi is not always intended to be deployed in a core data-center where one of these schedulers would live. Many times NiFi would be deployed closer to the sources of data and ship data back to the central data center. In these cases NiFi needs to run on its own and can't rely on these kinds of schedulers.

Posted by Bryan on November 11, 2015 at 10:23 PM GMT #

Currently, multiple developers can work in isolation on their own NiFi instances and share templates of their flows. Templates can then be deployed to other shared instances, such as test or production instances. There are some future improvements around this are which can be read about here: https://cwiki.apache.org/confluence/display/NIFI/Multi-Tentant+Dataflow https://cwiki.apache.org/confluence/display/NIFI/Configuration+Management+of+Flows https://cwiki.apache.org/confluence/display/NIFI/Extension+and+Template+Registry

Posted by Bryan on November 11, 2015 at 10:23 PM GMT #

Hi, I am new to NiFi, i have been floowing this blog post and did exactly as suggested, but the tweets are not flowing into SolrCloud. Btw i could start only the GetTwitter process, the rest couldn't. Need help to fix this at the earliest

Posted by Lobo on February 01, 2016 at 05:20 AM GMT #

Lobo, If you can't start the other processors, it is most likely because they are not configured properly and in an invalid state. Each processor should have a yellow icon in the top left that you can hover over which tells you exactly why the processor is considered invalid.

Posted by Bryan on February 01, 2016 at 02:24 PM GMT #

How do i ingest fields in Solr based on properties-Indexed,Stored,Required, MultiValued,Unique Key. Where in NiFi will i make such configuration. Kindly provide and example for better undertanding. Thanks.

Posted by Lobo on February 12, 2016 at 08:41 AM GMT #

Lobo, I'm not sure I understand the question... the properties you mentioned (Indexed, Stored, Required, etc.) are properties that you would define in your solr schema.xml, they are not passed to Solr when indexing. In this example, the fields it is indexing are id, twitter_text_t, twitter_username_s, twitter_created_at_s, twitter_created_at_ms_tl. This example was relying on Solr to automatically create those fields upon indexing, but if you wanted to control the values you mentioned then you would need to define those fields in your schema.xml ahead of time.

Posted by Bryan on February 12, 2016 at 02:32 PM GMT #

Can anyone please help me how to move files from hdfs location into solr? Please let tell me which processors needs to be used here.. HDFS files---->Solr using Nifi......

Posted by Sharath on February 24, 2016 at 05:20 AM GMT #

Thanks much Bryan, it helped me achieve my task, I am interested to use Nifi to ingest data from database to Solr. Can you guide me as to which processors are involved and specific settings to each if required. Thanks!!

Posted by Lobo on February 25, 2016 at 11:47 AM GMT #

Sharath, For indexing data from HDFS, take a look at the ListHDFS and FetchHDFS processors, they are used together to pull data out of HDFS. Lobo, For indexing data from a database, take a look at slide 30 here: http://www.slideshare.net/BryanBende/building-data-pipelines-for-solr-with-apache-nifi

Posted by Bryan on February 25, 2016 at 05:33 PM GMT #

Thank you for this walk-thru. I have everything configured and runnable, but when I start the GetTwitter, nothing happens (I don't get any tweets) it just sits there running. Can you give me some ideas on how I can troubleshoot what the issue might be? Thank you in advance!

Posted by Francis on April 21, 2016 at 04:15 PM GMT #

Hello Team, Thanks for the great post. I have followed al the steps and able to pull the tweets and store in solr But stuck with one problem The filed "twitter_created_at_s" is of datatype string . I want to store it as type Date. How can I achieve this. I spent almost twodays to search but no luck :( :( Please help

Posted by SHIVADEEP GUNDOJU on July 20, 2016 at 12:18 PM GMT #

You need to use the dynamic field type for a date in Solr... "_s" is a string, "_dt" is a date https://github.com/apache/lucene-solr/blob/67b638880d81fbb11abfbfc1ec93a5f3d86c3d3b/solr/server/solr/configsets/basic_configs/conf/managed-schema#L153 Keep in mind the value might have to be formatted properly into a valid Solr date string first.

Posted by Bryan on July 20, 2016 at 01:04 PM GMT #

Thank you so much bryan. when I change the filed name to _dt i am getting error"Invalid date string" I understood the date format which we are getting from twitter is different from format solr can accept. Could you please help me to resolve pleaseeeeeeee Please help

Posted by SHIVADEEP GUNDOJU on July 20, 2016 at 02:09 PM GMT #

Right now its taking the value of created_at directly out of the JSON and inserting it to Solr. You would need to modify the JSON document so it had the correct value before it got to PutSolrContentStream. One way to do this would be to extract all of the fields being inserted to Solr into attributes using EvaluateJsonPath, then use UpdateAttribute with expression language to modify the date, then use AttributesToJson to replace the content with a new JSON document. Another approach might be to use the new JoltTransformJson processor in NiFi 0.7.0.

Posted by Bryan on July 20, 2016 at 02:31 PM GMT #

I have added <str>EEE MMM d HH:mm:ss Z yyyy</str> in solrconfig.xml string in the tweet looks like "Mon Jul 04 08:23:48 +0000 2016" Please help

Posted by SHIVADEEP GUNDOJU on July 20, 2016 at 02:34 PM GMT #

Okay will try and get back bryan.. Than you so much for your quick help

Posted by SHIVADEEP GUNDOJU on July 20, 2016 at 02:35 PM GMT #

Can I use NFI to convert TCP messages and stream it over to web clients?

Posted by Santosh on December 22, 2016 at 12:07 AM GMT #

Santosh, yes you can. Documentation for all of the available processors is here https://nifi.apache.org/docs.html

Posted by Bryan on December 22, 2016 at 01:51 PM GMT #

Thank you Bryan. I will look into the documentation. I need to publish TCP events to 500 browser clients without delay. any design inputs are highly appreciated.

Posted by Santosh on December 22, 2016 at 03:12 PM GMT #

@Bryan, i am not able to find the managed-schema for cloud collection "gettingstarted"... I am trying to get files text files from local repo and indexing into solr using PutSolrContentStream. input files are in below format : userid,pcname,employeename,email,role,date,activity how do i add index to solr?

Posted by sukesh on August 20, 2017 at 09:46 PM GMT #

@Bryan : please let me know ,where can i find managed-schema for collection "gettingstarted" for solr cloud....

Posted by sukesh on August 20, 2017 at 09:50 PM GMT #

Post a Comment:
  • HTML Syntax: NOT allowed

Calendar

Search