Entries tagged [flume]

Friday Jan 11, 2013

Flume Performance Tuning - part 1

This is part 1 in a series of articles about tuning the performance of Apache Flume, a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of event data.

To kick off this series, I’d like to start off discussing some important Flume concepts that come into play when tuning your Flume flows for maximum performance: the channel and the transaction batch size.

Setting up a data flow

Imagine you want to take a heavy stream of user activity from your application server logs and store that onto your Hadoop cluster for analytics. If you have a large deployment of application servers, you would likely want to build a fan-in architecture, where you are sending data from many nodes to relatively fewer nodes.

A Tiered Flume Topology

If you are sending these user events one at a time, each time waiting for acknowledgment that it was delivered, your throughput may be limited by network latency. Naturally, you would want to batch up the events into larger transactions, so that you amortize the latency of the acknowledgment over a larger number of events and therefore get more throughput.

Channels

So, what happens if the storage tier goes down momentarily, as in the case of a network partition? What happens to the events if a Flume agent machine crashes? We still want to be able to serve our users on the application tier and retain our data somehow. In order to accomplish this, we need a buffering mechanism on each agent that allows it to store events in the case of downstream failures or slowdowns. In Flume, the channel is what persists events at each hop in the flow. Below is a diagram that illustrates where the channel sits in the architecture of a Flume agent.

Flume agent architecture

Memory Channel vs. File Channel

An important decision to make when designing your Flume flow is what type of channel you want to use. At the time of this writing, the two recommended channels are the file channel and the memory channel. The file channel is a durable channel, as it persists all events that are stored in it to disk. So, even if the Java virtual machine is killed, or the operating system crashes or reboots, events that were not successfully transferred to the next agent in the pipeline will still be there when the Flume agent is restarted. The memory channel is a volatile channel, as it buffers events in memory only: if the Java process dies, any events stored in the memory channel are lost. Naturally, the memory channel also exhibits very low put/take latencies compared to the file channel, even for a batch size of 1. Since the number of events that can be stored is limited by available RAM, its ability to buffer events in the case of temporary downstream failure is quite limited. The file channel, on the other hand, has far superior buffering capability due to utilizing cheap, abundant hard disk space.

Flume event batching

As mentioned earlier, Flume can batch events. The batch size is the maximum number of events that a sink or client will attempt to take from a channel in a single transaction. Tuning the batch size trades throughput vs. latency and duplication under failure. With a small batch size, throughput decreases, but the risk of event duplication is reduced if a failure were to occur. With a large batch size, you get much better throughput, but increased latency, and in the case of a transaction failure, the number of possible duplicates increases.

Transactions are a critical concept in Flume, because the delivery and durability guarantees made by channels only take effect at the end of each successful transaction. For example, when a source receives or generates an event, in order to store that event into a channel a transaction must first be opened on that channel. Within the transaction, the source puts up to the batch size number of events into the channel, and on success commits the transaction. A sink must go through the same process of operating within a transaction when taking events from a channel.

Batch size is configured at the sink level. The larger the batch, the faster the channels operate (but there is a caveat). In the case of the file channel, this speed difference is because all buffers are flushed and then synced to disk when each transaction is committed. A disk sync is a time-consuming operation (it may take several milliseconds), but it is required to ensure the durability of the data. Likewise, with the memory channel, there is a memory synchronization step when each transaction is committed. Of course, memory synchronization is much faster than a disk sync. For more information on the inner workings of the file channel, please see Brock Noland's article about the Apache Flume File Channel.

The downside of using a large batch size is that if there is some type of failure in the middle of a transaction, such as a downstream host or network failure, there is a possibility of duplicates being created. So, for example, if you set your batch size to 1000, and the machine you are writing to goes offline, duplicates may be generated in groups of up to 1000. This may occur in special cases, for example, if the events got written to the downstream machine but then the connection failed before it could acknowledge that it had received them. However, duplicate events will only appear in exceptional circumstances.

Choosing a batch size

To squeeze all the performance possible out of a Flume system, batch sizes should be tuned with care through experimentation. While I will get into this in more detail in the follow-up to this post, here are some rules of thumb for selecting batch sizes.

  1. Start off with batch sizes equal to the sum of the batch sizes of the input streams coming into that tier. For example, if you have a downstream Flume agent running an Avro source with 10 upstream agents sending events via Avro sinks using a batch size of 100 each, consider starting that downstream agent with a batch size of 1,000. Tune / experiment from there.
  2. If you find yourself setting the batch size very high (say, higher than 10,000) then consider adding another Sink instead, in order to increase the parallelism (each sink typically runs on its own thread). Say you were going to use one HDFS sink with a batch size of 20,000. Experiment with using 2 HDFS sinks with batch sizes of 5,000 or 10,000 to see if that helps more.
  3. Prefer the lowest batch size that gives you acceptable performance.
  4. Monitor the steady-state channel sizes to get more tuning insight (more on this in the next article).

Different batch sizes in different situations

Due to the performance / duplication tradeoff of the batch size parameter, I often see varying batch size settings depending on the use case. In the case of using Flume to write to HBase using the HBase Sink, incrementing counters, a smaller batch size on the order of 100 is often used to reduce the impact in case of hiccups in the system. On the other hand, with an HDFS sink, in order to get maximum throughput, I see people running with batch sizes of 1,000 or even 10,000, since typically they can easily run map-reduce jobs to de-duplicate the data at processing time. Note that when writing large events with large batch sizes to HDFS, often other parameters need to be increased as well. One such parameter is hdfs.callTimeout, which may be increased to 60 seconds or more to account for the long tail of occasional higher-latency calls to HDFS.

At the other end of the spectrum, in cases where batching events at the application (Flume client) tier is not possible, the memory channel is often used at the collector-tier Flume agent (or a localhost agent on the app servers) to get acceptable performance with a batch size of 1, while using larger batch sizes and file channels in the downstream agents in order to get most of the benefits of durability there. For the best performance, however, all tiers including the client/application tier would perform some level of batching. (Please see above diagram for an illustration of the tiers referenced in this scenario.)

Configuration parameters and gotchas

The actual parameter used to set the batch size varies between sinks, but for most sinks it’s simply called batchSize or batch-size. For the HDFS sink, it’s actually called hdfs.batchSize for historical reasons; I recommend setting hdfs.txnEventMax to the same value as hdfs.batchSize for simplicity. Historically, in the HDFS sink, the number of events taken in a single transaction can be different from the number of events written to HDFS before a sync() operation; In practice, there is little reason these should not be set to the same value.

One potentially confusing gotcha when tuning the batch size on a sink is that it must be less than or equal to the transactionCapacity set on the corresponding channel. The transactionCapacity should be set to the value of the largest batch size that will be used to store or remove events from that channel.

tl;dr: Below is a “cheat sheet” batch size tuning summary for your convenience. Please note that these are just starting points for tuning.

Sink Type Config parameter Typical value
Avro batch-size 100
HDFS hdfs.batchSize, hdfs.txnEventMax 1000
HBaseSink batchSize 100
AsyncHBaseSink batchSize 100

That’s all I have space for in one blog post. Please leave feedback and questions in the comments. Look for another post in the future with tips on using Flume’s monitoring capabilities to take advantage of important information which can aid you in your quest for optimum performance.

Mike Percy is a committer and PMC member on the Apache Flume project.

Tuesday Nov 27, 2012

Streaming data into Apache HBase using Apache Flume

Apache Flume was conceived as a fault-tolerant ingest system for the Apache Hadoop ecosystem. Flume comes packaged with an HDFS Sink which can be used to write events into HDFS, and two different implementations of HBase sinks to write events into HBase. You can read about the basic architecture of Apache Flume 1.x in this blog post. You can also read about how Flume’s File Channel persists events and still provides extremely high performance in an earlier blog post. In this article, we will explore how to configure Flume to write events into HBase, and write custom serializers to write events into HBase in a format of the user’s choice.

Data is stored in HBase as tables. Each table has one or more column families, and each column family has one or more columns. HBase stores all columns in a column family in physical proximity. Each row is identified by a key known as the row key. To insert data into HBase, the table name, column family, column name and row key have to be specified. More details on the HBase data model can be found in the HBase documentation.

Flume has two HBase Sinks, the HBaseSink(org.apache.flume.sink.hbase.HBaseSink) and AsyncHBaseSink(org.apache.flume.sink.hbase.AsyncHBaseSink). These two sinks will eventually converge to similar functionality, but currently each has some advantages over the other:

  • The AsyncHBaseSink currently gives better performance than the HBaseSink, primarily because it makes non-blocking calls to HBase.
  • The HBaseSink will soon support secure HBase clusters (FLUME-1626) and the new HBase IPC which was introduced in HBase 0.96.


The configuration for both these sinks are very similar. A sample configuration is shown below:

#Use the AsyncHBaseSink
host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink
#Use the HBaseSink
#host1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink
host1.sinks.sink1.channel = ch1
host1.sinks.sink1.table = transactions
host1.sinks.sink1.columnFamily = clients
host1.sinks.sink1.column = charges
host1.sinks.sink1.batchSize = 5000
#Use the SimpleAsyncHbaseEventSerializer that comes with Flume
host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
#Use the SimpleHbaseEventSerializer that comes with Flume
#host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
host1.sinks.sink1.serializer.incrementColumn = icol
host1.channels.ch1.type=memory
In the above config, the “table” parameter specifies the table in HBase that the sink has to write to - in this case, “transactions”; the “columnFamily” parameter specifies the column family in that table to insert the data into, in this case, “clients”; and the “column” parameter specifies the column in the column family to write to, in this case “charges”. Apart from this the sink requires the channel to be configured, like all other Flume Sinks. The other interesting configuration parameters are the “serializer” and the “serializer.*” parameters. The two sinks use different interfaces for the serializer. In both cases, the serializer is a class that converts the Flume Event into an HBase-friendly format. This piece of code that “translates” the events is usually specific to the schema used by the user’s HBase cluster and is usually implemented by the user. All configuration parameters passed in as “serializer.*” are passed to the serializer. This configuration can be used to set up any internal state the serializer needs.

In case of the HBaseSink, the serializer converts a Flume Event into one or more HBase Puts and/or Increments. The serializer must implement the HbaseEventSerializer. The serializer is instantiated when the sink is started by the Flume configuration framework. For each event processed by the sink, the sink calls the initialize method in the serializer. The serializer must “translate” the Flume Event into HBase puts and increments which should be returned by getActions and getIncrements methods.  These puts and increments are then sent over the wire to the HBase cluster. When the sink stops, this instance of the serializer is closed by the HBaseSink.

The AsyncHBaseSink’s serializer must implement AsyncHbaseEventSerializer.

In this case, the initialize method is called once by the sink, when it starts up. For every event, the sink calls the setEvent method and then calls the getActions and getIncrements methods - similar to the HBaseSink. When the sink is stopped, the serializer’s cleanUp method is called. Notice that the methods do not return the standard HBase Puts and Increments, but PutRequest and AtomicIncrementRequest from the asynchbase API. These are roughly equivalent to the HBase Puts and Increments respectively, with some differences.

An example of such a serializer is below.


/**
 * A serializer for the AsyncHBaseSink, which splits the event body into
 * multiple columns and inserts them into a row whose key is available in
 * the headers
 */
public class SplittingSerializer implements AsyncHbaseEventSerializer {
  private byte[] table;
  private byte[] colFam;
  private Event currentEvent;
  private byte[][] columnNames;
  private final List<PutRequest> puts = new ArrayList<PutRequest>();
  private final List<AtomicIncrementRequest> incs = new ArrayList<AtomicIncrementRequest>();
  private byte[] currentRowKey;
  private final byte[] eventCountCol = "eventCount".getBytes();

  @Override
  public void initialize(byte[] table, byte[] cf) {
    this.table = table;
    this.colFam = cf;
  }

  @Override
  public void setEvent(Event event) {
    // Set the event and verify that the rowKey is not present
    this.currentEvent = event;
    String rowKeyStr = currentEvent.getHeaders().get("rowKey");
    if (rowKeyStr == null) {
      throw new FlumeException("No row key found in headers!");
    }
    currentRowKey = rowKeyStr.getBytes();
  }

  @Override
  public List<PutRequest> getActions() {
    // Split the event body and get the values for the columns
    String eventStr = new String(currentEvent.getBody());
    String[] cols = eventStr.split(",");
    puts.clear();
    for (int i = 0; i < cols.length; i++) {
      //Generate a PutRequest for each column.
      PutRequest req = new PutRequest(table, currentRowKey, colFam,
              columnNames[i], cols[i].getBytes());
      puts.add(req);
    }
    return puts;
  }

  @Override
  public List<AtomicIncrementRequest> getIncrements() {
    incs.clear();
    //Increment the number of events received
    incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol));
    return incs;
  }

  @Override
  public void cleanUp() {
    table = null;
    colFam = null;
    currentEvent = null;
    columnNames = null;
    currentRowKey = null;
  }

  @Override
  public void configure(Context context) {
    //Get the column names from the configuration
    String cols = new String(context.getString("columns"));
    String[] names = cols.split(",");
    byte[][] columnNames = new byte[names.length][];
    int i = 0;
    for(String name : names) {
      columnNames[i++] = name.getBytes();
    }
  }

  @Override
  public void configure(ComponentConfiguration conf) {
  }
}

This serializer splits the event body based on a delimiter and inserts each split into a different column. The row is defined in the event header. When each event is received, a counter is incremented to keep track of the number of events received as well.

This serializer can be configured by the following configuration:

host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink
host1.sinks.sink1.channel = ch1
host1.sinks.sink1.table = transactions
host1.sinks.sink1.columnFamily = clients
host1.sinks.sink1.batchSize = 5000
#The serializer to use
host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SplittingSerializer
#List of columns each event writes to.
host1.sinks.sink1.serializer.columns = charges,date,priority

Internals of the HBaseSink and AsyncHBaseSink
The HBaseSink uses the HBase HTable API to write events out to HBase. HTable supports batching of Puts, but only HBase 0.92+ supports batching of Increments. Currently, the HBase Sink is single-threaded and will call the serializer to get the Puts and Increments once per event it processes. HBase Put and Increments are sent to HBase via blocking calls, which means the next event is read and passed to the serializer only once the current event is successfully written to HBase. Each transaction consists of at most the number of events specified by the batchSize property in the configuration. Like all other Flume sinks, if one of these events fails to get written successfully, the sink will retry the entire transaction again.

On the other hand, the AsyncHBaseSink uses the asynchbase API, and sends out events asynchronously to HBase. The AsyncHBaseSink, in the same way as the HBase sink, generates Puts and Increments for each event. Once the Puts and Increments are generated, the sink sends them out immediately to HBase and moves on to process the next event. Success or failure is handled through callbacks. Again, each transaction consists of at most the number of events specified by the batchSize configuration parameter. The sink waits until either success callbacks are received for all the events sent, or at least one error callback is received. If an error callback is received, the entire transaction is retried, in true Flume style.

A word of caution
As you can see, if HBase reports failure to write even one Put or Increment, the entire transaction is retried - this is how Flume’s at-least-once semantics work, and most Flume sinks operate in the same way. In case of HBase Increments, this means it is possible that the same event would cause a counter to be incremented more than once. This is something to keep in mind while using Flume to perform Increments. Also, if the serializer is not idempotent, then this means that it is possible that the same event can cause multiple different Puts to be written to HBase. Imagine a case where we are talking about credit card transactions represented by the event. If the same event can generate different Puts each time, it is possible that HBase would have multiple records of the same transactions, which is probably not desired.

The AsyncHBaseSink is known to give better performance than the HBaseSink primarily because of the non-blocking nature of the underlying API it uses. The HBase community is working on improving the HBase client API to improve its performance, which would vastly improve the HBaseSink performance.

Conclusion
Flume is an excellent tool to write events out to the different storage systems in the Hadoop ecosystem including HBase. The HBase sinks provide the functionality to write data to HBase in your own schema and allows the user to “map” the Flume event to HBase data.

Tuesday Dec 20, 2011

Apache Flume Hackathon

On December 16, 2011, more than 40 people from various companies gathered at Cloudera's Headquarters to participate in Apache Flume Hackathon. This post summarizes the hackathon along with a picture and video from the session.[Read More]

Friday Dec 09, 2011

Apache Flume - Architecture of Flume NG

Apache Flume is currently undergoing incubation at The Apache Software Foundation. Work related to it's next major revision is informally referred to as Flume NG. This blog post covers the high level architecture of Flume NG. This is first in a series of blog posts that will drill into the detailed design and implementation of Flume NG.
[Read More]

Calendar

Search

Hot Blogs (today's hits)

Tag Cloud

Categories

Feeds

Links

Navigation