Apache NiFi

Wednesday Dec 16, 2015

Getting Syslog Events to HBase

Getting Syslog Events to HBase

Bryan Bende -  bbende@gmail.com @bbende


In the Apache NiFi 0.4.0 release there are several new integration points including processors for interacting with Syslog and HBase. In this post we'll demonstrate how to use NiFi to receive messages from Syslog over UDP, and store those messages in HBase.

The flow described in this post was created using Apache NiFi 0.4.0, rsyslog 5.8.10, and Apache HBase 1.1.2.

Setting up Syslog

In order for NiFi to receive syslog messages, rsyslog needs to forward messages to a port that NiFi will be listening on. Forwarding of messages can be configured in rsyslog.conf, generally located in /etc on most Linux operating systems.

Edit rsyslog.conf and add the following line:

*.* @localhost:7780

This tells rsyslog to forward all messages over UDP to localhost port 7780. A double '@@' can be used to forward over TCP.

Restart rsyslog for the changes to take effect:

/etc/init.d/rsyslog restart
Shutting down system logger:                               [  OK  ]
Starting system logger:                                    [  OK  ]

Setting up HBase

In order to store the syslog messages, we'll create an HBase table called 'syslog' with one column family called 'msg'. From the command line enter the following:

hbase shell
create 'syslog', {NAME => 'msg'}

Configure an HBase Client Service

The HBase processors added in Apache NiFi 0.4.0 use a controller service to interact with HBase. This allows the processors to remain unchanged when the HBase client changes, and allows a single NiFi instance to support multiple versions of the HBase client. NiFi's class-loader isolation provided in NARs, allows a single NiFi instance to interact with HBase instances of different versions at the same time.

The HBase Client Service can be configured by providing paths to external configuration files, such as hbase-site.xml, or by providing several properties directly in the processor. For this example we will take the latter approach. From the Controller Services configuration window in NiFi, add an HBase_1_1_2_ClientService with the following configuration (adjusting values appropriately for your system):

client-service-config.jpg

After configuring the service, enable it in order for it to be usable by processors:

client-service-enabled.jpg

Building the Dataflow

The dataflow we are going build will consist of the following components:

  • ListenSyslog for receiving syslog messages over UDP
  • UpdateAttribute for renaming attributes and creating a row id for HBase
  • AttributesToJSON for creating a JSON document from the syslog attributes
  • PutHBaseJSON for inserting each JSON document as a row in HBase

The overall flow looks like the following:

syslog-hbase-flow.jpg
Lets walk through the configuration of each processor...

ListenSyslog

config-listensyslog.jpg

Set the Port to the same port that rsyslog is forwarding messages to, in this case 7780. Leave everything else as the default values.

With a Max Batch Size of "1" and Parse Messages as "true", each syslog message will be emitted as a single FlowFile, with the content of the FlowFile being the original message, and the results of parsing the message being stored as FlowFile attributes.

The attributes we will be interested in are:

  • syslog.priority
  • syslog.severity
  • syslog.facility
  • syslog.version
  • syslog.timestamp
  • syslog.hostname
  • syslog.sender
  • syslog.body
  • syslog.protocol
  • syslog.port

UpdateAttribute

config-updateattr.jpg

The attributes produced by ListenSyslog all start with "syslog." which keeps them nicely namespaced in NiFi. However, we are going to use these attribute names as column qualifiers in HBase. We don't really need this prefix since we will already be with in a syslog table.

Add a property for each syslog attribute to remove the prefix, and use the Delete Attributes Expression to remove the original attributes. In addition, create an id attribute of the form "timestamp_uuid" where timestamp is the long representation of the timestamp on the syslog message, and uuid is the uuid of the FlowFile in NiFi. This id attribute will be used as the row id in HBase.

The expression language for the id attribute is:

${syslog.timestamp:toDate('MMM d HH:mm:ss'):toNumber()}_${uuid}
  

AttributesToJSON

config-attrstojson.jpg

Set the Destination to "flowfile-content" so that the JSON document replaces the FlowFile content, and set Include Core Attributes to "false" so that the standard NiFi attributes are not included.

PutHBaseJSON

config-puthbasejson.jpg

Select the HBase Client Service we configured earlier and set the Table Name and Column Family to "syslog" and "msg" based on the table we created earlier. In addition set the Row Identifier Field Name to "id" to instruct the processor to use the id field from the JSON for the row id.

Verifying the Flow

From a terminal we can send a test message to syslog using the logger utility:

logger "this is a test syslog message"

Using the HBase shell we can inspect the contents of the syslog table:

hbase shell
hbase(main):002:0> scan 'syslog'
ROW                                          COLUMN+CELL
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:body, timestamp=1449775215481,
  value=root: this is a test message
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:hostname, timestamp=1449775215481,
  value=localhost
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:port, timestamp=1449775215481,
  value=7780
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:protocol, timestamp=1449775215481,
  value=UDP
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:sender, timestamp=1449775215481,
  value=/127.0.0.1
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:timestamp, timestamp=1449775215481,
  value=Dec 10 19:20:15
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:version, timestamp=1449775215481,
  value=
1 row(s) in 0.1120 seconds

Performance Considerations

In some cases the volume of syslog messages being pushed to ListenSyslog may be very high. There are several options to help scale the processing depending on the given use-case.

Concurrent Tasks

ListenSyslog has a background thread reading messages as fast as possible and placing them on a blocking queue to be de-queued and processed by the onTrigger method of the processor. By increasing the number of concurrent tasks for the processor, we can scale up the rate at which messages are processed, ensuring new messages can continue to be queued.

Parsing

One of the more expensive operations during the processing of a message is parsing the message in order to provide the the attributes. Parsing messages is controlled on the processor through a property and can be turned off in cases where the attributes are not needed, and the original message just needs to be delivered somewhere.

Batching

In cases where parsing the messages is not necessary, an additional option is batching many messages together during one call to onTrigger. This is controlled through the Batch Size property which defaults to "1". This would be appropriate in cases where having individual messages is not necessary, such as storing the messages in HDFS where you need them batched into appropriately sized files.

ParseSyslog

In addition to parsing messages directly in ListenSyslog, there is also a ParseSyslog processor. An alternative to the flow described in the post would be to have ListenSyslog produce batches of 100 messages at a time, followed by SplitText, followed by ParseSyslog. The tradeoff here is that we can scale the different components independently, and take advantage of backpressure between processors.

Summary

At this point you should be able to get your syslog messages ingested into HBase and can experiment with different configurations. The template for this flow can be found here.

We would love to hear any questions, comments, or feedback that you may have!

Learn more about Apache NiFi and feel free to leave comments here or e-mail us at dev@nifi.apache.org.