Apache Flume

Tuesday November 27, 2012

Streaming data into Apache HBase using Apache Flume

Apache Flume was conceived as a fault-tolerant ingest system for the Apache Hadoop ecosystem. Flume comes packaged with an HDFS Sink which can be used to write events into HDFS, and two different implementations of HBase sinks to write events into HBase. You can read about the basic architecture of Apache Flume 1.x in this blog post. You can also read about how Flume’s File Channel persists events and still provides extremely high performance in an earlier blog post. In this article, we will explore how to configure Flume to write events into HBase, and write custom serializers to write events into HBase in a format of the user’s choice.

Data is stored in HBase as tables. Each table has one or more column families, and each column family has one or more columns. HBase stores all columns in a column family in physical proximity. Each row is identified by a key known as the row key. To insert data into HBase, the table name, column family, column name and row key have to be specified. More details on the HBase data model can be found in the HBase documentation.

Flume has two HBase Sinks, the HBaseSink(org.apache.flume.sink.hbase.HBaseSink) and AsyncHBaseSink(org.apache.flume.sink.hbase.AsyncHBaseSink). These two sinks will eventually converge to similar functionality, but currently each has some advantages over the other:

  • The AsyncHBaseSink currently gives better performance than the HBaseSink, primarily because it makes non-blocking calls to HBase.
  • The HBaseSink will soon support secure HBase clusters (FLUME-1626) and the new HBase IPC which was introduced in HBase 0.96.


The configuration for both these sinks are very similar. A sample configuration is shown below:

#Use the AsyncHBaseSink
host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink
#Use the HBaseSink
#host1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink
host1.sinks.sink1.channel = ch1
host1.sinks.sink1.table = transactions
host1.sinks.sink1.columnFamily = clients
host1.sinks.sink1.column = charges
host1.sinks.sink1.batchSize = 5000
#Use the SimpleAsyncHbaseEventSerializer that comes with Flume
host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
#Use the SimpleHbaseEventSerializer that comes with Flume
#host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
host1.sinks.sink1.serializer.incrementColumn = icol
host1.channels.ch1.type=memory
In the above config, the “table” parameter specifies the table in HBase that the sink has to write to - in this case, “transactions”; the “columnFamily” parameter specifies the column family in that table to insert the data into, in this case, “clients”; and the “column” parameter specifies the column in the column family to write to, in this case “charges”. Apart from this the sink requires the channel to be configured, like all other Flume Sinks. The other interesting configuration parameters are the “serializer” and the “serializer.*” parameters. The two sinks use different interfaces for the serializer. In both cases, the serializer is a class that converts the Flume Event into an HBase-friendly format. This piece of code that “translates” the events is usually specific to the schema used by the user’s HBase cluster and is usually implemented by the user. All configuration parameters passed in as “serializer.*” are passed to the serializer. This configuration can be used to set up any internal state the serializer needs.

In case of the HBaseSink, the serializer converts a Flume Event into one or more HBase Puts and/or Increments. The serializer must implement the HbaseEventSerializer. The serializer is instantiated when the sink is started by the Flume configuration framework. For each event processed by the sink, the sink calls the initialize method in the serializer. The serializer must “translate” the Flume Event into HBase puts and increments which should be returned by getActions and getIncrements methods.  These puts and increments are then sent over the wire to the HBase cluster. When the sink stops, this instance of the serializer is closed by the HBaseSink.

The AsyncHBaseSink’s serializer must implement AsyncHbaseEventSerializer.

In this case, the initialize method is called once by the sink, when it starts up. For every event, the sink calls the setEvent method and then calls the getActions and getIncrements methods - similar to the HBaseSink. When the sink is stopped, the serializer’s cleanUp method is called. Notice that the methods do not return the standard HBase Puts and Increments, but PutRequest and AtomicIncrementRequest from the asynchbase API. These are roughly equivalent to the HBase Puts and Increments respectively, with some differences.

An example of such a serializer is below.


/**
 * A serializer for the AsyncHBaseSink, which splits the event body into
 * multiple columns and inserts them into a row whose key is available in
 * the headers
 */
public class SplittingSerializer implements AsyncHbaseEventSerializer {
  private byte[] table;
  private byte[] colFam;
  private Event currentEvent;
  private byte[][] columnNames;
  private final List<PutRequest> puts = new ArrayList<PutRequest>();
  private final List<AtomicIncrementRequest> incs = new ArrayList<AtomicIncrementRequest>();
  private byte[] currentRowKey;
  private final byte[] eventCountCol = "eventCount".getBytes();

  @Override
  public void initialize(byte[] table, byte[] cf) {
    this.table = table;
    this.colFam = cf;
  }

  @Override
  public void setEvent(Event event) {
    // Set the event and verify that the rowKey is not present
    this.currentEvent = event;
    String rowKeyStr = currentEvent.getHeaders().get("rowKey");
    if (rowKeyStr == null) {
      throw new FlumeException("No row key found in headers!");
    }
    currentRowKey = rowKeyStr.getBytes();
  }

  @Override
  public List<PutRequest> getActions() {
    // Split the event body and get the values for the columns
    String eventStr = new String(currentEvent.getBody());
    String[] cols = eventStr.split(",");
    puts.clear();
    for (int i = 0; i < cols.length; i++) {
      //Generate a PutRequest for each column.
      PutRequest req = new PutRequest(table, currentRowKey, colFam,
              columnNames[i], cols[i].getBytes());
      puts.add(req);
    }
    return puts;
  }

  @Override
  public List<AtomicIncrementRequest> getIncrements() {
    incs.clear();
    //Increment the number of events received
    incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol));
    return incs;
  }

  @Override
  public void cleanUp() {
    table = null;
    colFam = null;
    currentEvent = null;
    columnNames = null;
    currentRowKey = null;
  }

  @Override
  public void configure(Context context) {
    //Get the column names from the configuration
    String cols = new String(context.getString("columns"));
    String[] names = cols.split(",");
    byte[][] columnNames = new byte[names.length][];
    int i = 0;
    for(String name : names) {
      columnNames[i++] = name.getBytes();
    }
  }

  @Override
  public void configure(ComponentConfiguration conf) {
  }
}

This serializer splits the event body based on a delimiter and inserts each split into a different column. The row is defined in the event header. When each event is received, a counter is incremented to keep track of the number of events received as well.

This serializer can be configured by the following configuration:

host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink
host1.sinks.sink1.channel = ch1
host1.sinks.sink1.table = transactions
host1.sinks.sink1.columnFamily = clients
host1.sinks.sink1.batchSize = 5000
#The serializer to use
host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SplittingSerializer
#List of columns each event writes to.
host1.sinks.sink1.serializer.columns = charges,date,priority

Internals of the HBaseSink and AsyncHBaseSink
The HBaseSink uses the HBase HTable API to write events out to HBase. HTable supports batching of Puts, but only HBase 0.92+ supports batching of Increments. Currently, the HBase Sink is single-threaded and will call the serializer to get the Puts and Increments once per event it processes. HBase Put and Increments are sent to HBase via blocking calls, which means the next event is read and passed to the serializer only once the current event is successfully written to HBase. Each transaction consists of at most the number of events specified by the batchSize property in the configuration. Like all other Flume sinks, if one of these events fails to get written successfully, the sink will retry the entire transaction again.

On the other hand, the AsyncHBaseSink uses the asynchbase API, and sends out events asynchronously to HBase. The AsyncHBaseSink, in the same way as the HBase sink, generates Puts and Increments for each event. Once the Puts and Increments are generated, the sink sends them out immediately to HBase and moves on to process the next event. Success or failure is handled through callbacks. Again, each transaction consists of at most the number of events specified by the batchSize configuration parameter. The sink waits until either success callbacks are received for all the events sent, or at least one error callback is received. If an error callback is received, the entire transaction is retried, in true Flume style.

A word of caution
As you can see, if HBase reports failure to write even one Put or Increment, the entire transaction is retried - this is how Flume’s at-least-once semantics work, and most Flume sinks operate in the same way. In case of HBase Increments, this means it is possible that the same event would cause a counter to be incremented more than once. This is something to keep in mind while using Flume to perform Increments. Also, if the serializer is not idempotent, then this means that it is possible that the same event can cause multiple different Puts to be written to HBase. Imagine a case where we are talking about credit card transactions represented by the event. If the same event can generate different Puts each time, it is possible that HBase would have multiple records of the same transactions, which is probably not desired.

The AsyncHBaseSink is known to give better performance than the HBaseSink primarily because of the non-blocking nature of the underlying API it uses. The HBase community is working on improving the HBase client API to improve its performance, which would vastly improve the HBaseSink performance.

Conclusion
Flume is an excellent tool to write events out to the different storage systems in the Hadoop ecosystem including HBase. The HBase sinks provide the functionality to write data to HBase in your own schema and allows the user to “map” the Flume event to HBase data.

Comments:

Great post, very informative. One quick observation. I could be wrong, but I believe the following line of code in the SplittingSerializer class: byte[][] columnNames = new byte[names.length][]; should be columnNames = new byte[names.length][]; I mention this because columnNames is declared earlier as private byte[][] columnNames; Otherwise the columnNames reference below will evaluate as null: PutRequest req = new PutRequest(table, currentRowKey, colFam, columnNames[i], cols[i].getBytes());

Posted by Dan Sandler on March 23, 2013 at 08:31 AM PDT #

When I use this method.I found that.All the data is loaded into the hbase (including raw data).Rather than the incremental data.So there will be a lot of useless data.And I've found that source of SimpleAsyncHbaseEventSerializer class is the same.I don't know Whether or not my Flume configuration is correct.please help me.thank you

Posted by Shimon on May 20, 2015 at 12:49 AM PDT #

hello, I would like to use the flume hbasesink on a not hbase node computer to write data to the remote flume.How to configure?

Posted by jay on July 31, 2015 at 12:30 AM PDT #

Microsoft windows 10 is fully featured and complex operating system made by Microsoft in windows history so for any issue with related windows 10 operating system then come this online tutorial for fast and best response.

Posted by windows help on July 08, 2018 at 10:24 PM PDT #

Wow actually i loved your blog man..Thanks for sharing awesome content.. https://downloadvideoderapp.com https://onlinenotepadplusplus.com

Posted by ios mark on July 17, 2018 at 01:51 AM PDT #

please use this link.

Posted by what is thinspo on October 25, 2018 at 04:24 AM PDT #

Thanks for your article. I am a big fan of you.

Posted by https://savegyd.com on November 03, 2018 at 12:57 PM PDT #

Dollar General Corporation is an American chain of variety stores headquartered in Goodlettsville, Tennessee. As of July 2018, Dollar General operates 15,000 stores in 45 of the 48 contiguous United States (the exceptions being three states in the northwest: Idaho, Montana, and Washington). https://myhoneybakedfeedback.xyz/www-dgcustomerfirst-com/

Posted by dgcustomerfirst on February 06, 2019 at 05:30 AM PST #

Thank you very much for all the android apps and games developers because I can now enjoy all latest movies and tv shows on pc using the nox android emulator. Download it from https://noxforpc.com/

Posted by nox on March 18, 2019 at 02:41 AM PDT #

When I use this method.I found that.All the data is loaded into the hbase (including raw data).Rather than the incremental data.So there will be a lot of useless data.And I've found that source of SimpleAsyncHbaseEventSerializer class is the same.I don't know Whether or not my Flume configuration is correct.please help me.thank you http://thestily.com/

Posted by pagotayat on March 18, 2019 at 07:51 AM PDT #

Interesting information for all the Apache.org Users. Most of us always search for stores which offer AtoZ items at one place here is the Kroger store, Ans for your requirements. Also, Kroger conducts an online survey https://www.thekrogerfeedback.com/ to take opinion from its customers and announces $5k for survey winners

Posted by kroger survey link on April 26, 2019 at 03:28 AM PDT #

Our Geek Squad Tech Support Agents provide repair, installation and setup services on all kinds of tech at more than 1,100 Best Buy Geek Squad stores – including computer & tablet repair, setup and support, TV & home theater repair, car stereo & GPS installation, cell phone repair and home appliance repair. We fix most makes and models, no matter where you bought them and can show you how to get the most out of your technology. https://igeeksquad.org

Posted by Geek Squad Tech Support on May 19, 2019 at 02:41 AM PDT #

The day passed quickly and I realized I couldn't forget

Posted by geometry dash on June 11, 2019 at 06:13 PM PDT #

I used Apache web server on my academic project work. Really easy to use it. In this article clearly explained about data streaming using Apache Flume. Before reading this post I have no idea about data streaming But now I am happy to read this article to improve my knowledge level through this article. https://hostsailorservices.com/hostsailor-com-services/

Posted by jennifer Winget on June 22, 2019 at 05:04 AM PDT #

Fascinating data for all the Apache.org Users. The majority of us generally scan for stores which offer AtoZ things at one spot here is the Kroger store, Ans for your prerequisites. Additionally, Kroger conducts an online overview to take supposition from its clients and declares $5k for review victors https://kroger-feedback-survey.com/

Posted by Christopher Marcus on June 24, 2019 at 09:28 PM PDT #

Microsoft Windows 10 is a full-featured and complex operating system developed by Microsoft in the history of Windows. For problems with the related Windows 10 operating system, this online tutorial provides fast and optimal answers.

Posted by windows 10 help on June 27, 2019 at 04:11 AM PDT #

Microsoft Windows 10 is a full-featured and complex operating system developed by Microsoft in the history of Windows. For problems with the related Windows 10 operating system, this online tutorial provides fast and optimal answers.

Posted by film izle on June 29, 2019 at 01:29 PM PDT #

This is really cool information about technology.! well apache hbase is really generally in use and in website cpanel

Posted by natalia21 on July 05, 2019 at 09:02 AM PDT #

such a great post. http://www.gtagame100.com http://www.subway-game.blogspot.com http://www.zumagame100.blogspot.com

Posted by zain david on July 15, 2019 at 09:48 AM PDT #

Post a Comment:
  • HTML Syntax: NOT allowed

Calendar

Search

Hot Blogs (today's hits)

Tag Cloud

Categories

Feeds

Links

Navigation