Apache Flume

Friday Jan 11, 2013

Flume Performance Tuning - part 1

This is part 1 in a series of articles about tuning the performance of Apache Flume, a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of event data.

To kick off this series, I’d like to start off discussing some important Flume concepts that come into play when tuning your Flume flows for maximum performance: the channel and the transaction batch size.

Setting up a data flow

Imagine you want to take a heavy stream of user activity from your application server logs and store that onto your Hadoop cluster for analytics. If you have a large deployment of application servers, you would likely want to build a fan-in architecture, where you are sending data from many nodes to relatively fewer nodes.

A Tiered Flume Topology

If you are sending these user events one at a time, each time waiting for acknowledgment that it was delivered, your throughput may be limited by network latency. Naturally, you would want to batch up the events into larger transactions, so that you amortize the latency of the acknowledgment over a larger number of events and therefore get more throughput.

Channels

So, what happens if the storage tier goes down momentarily, as in the case of a network partition? What happens to the events if a Flume agent machine crashes? We still want to be able to serve our users on the application tier and retain our data somehow. In order to accomplish this, we need a buffering mechanism on each agent that allows it to store events in the case of downstream failures or slowdowns. In Flume, the channel is what persists events at each hop in the flow. Below is a diagram that illustrates where the channel sits in the architecture of a Flume agent.

Flume agent architecture

Memory Channel vs. File Channel

An important decision to make when designing your Flume flow is what type of channel you want to use. At the time of this writing, the two recommended channels are the file channel and the memory channel. The file channel is a durable channel, as it persists all events that are stored in it to disk. So, even if the Java virtual machine is killed, or the operating system crashes or reboots, events that were not successfully transferred to the next agent in the pipeline will still be there when the Flume agent is restarted. The memory channel is a volatile channel, as it buffers events in memory only: if the Java process dies, any events stored in the memory channel are lost. Naturally, the memory channel also exhibits very low put/take latencies compared to the file channel, even for a batch size of 1. Since the number of events that can be stored is limited by available RAM, its ability to buffer events in the case of temporary downstream failure is quite limited. The file channel, on the other hand, has far superior buffering capability due to utilizing cheap, abundant hard disk space.

Flume event batching

As mentioned earlier, Flume can batch events. The batch size is the maximum number of events that a sink or client will attempt to take from a channel in a single transaction. Tuning the batch size trades throughput vs. latency and duplication under failure. With a small batch size, throughput decreases, but the risk of event duplication is reduced if a failure were to occur. With a large batch size, you get much better throughput, but increased latency, and in the case of a transaction failure, the number of possible duplicates increases.

Transactions are a critical concept in Flume, because the delivery and durability guarantees made by channels only take effect at the end of each successful transaction. For example, when a source receives or generates an event, in order to store that event into a channel a transaction must first be opened on that channel. Within the transaction, the source puts up to the batch size number of events into the channel, and on success commits the transaction. A sink must go through the same process of operating within a transaction when taking events from a channel.

Batch size is configured at the sink level. The larger the batch, the faster the channels operate (but there is a caveat). In the case of the file channel, this speed difference is because all buffers are flushed and then synced to disk when each transaction is committed. A disk sync is a time-consuming operation (it may take several milliseconds), but it is required to ensure the durability of the data. Likewise, with the memory channel, there is a memory synchronization step when each transaction is committed. Of course, memory synchronization is much faster than a disk sync. For more information on the inner workings of the file channel, please see Brock Noland's article about the Apache Flume File Channel.

The downside of using a large batch size is that if there is some type of failure in the middle of a transaction, such as a downstream host or network failure, there is a possibility of duplicates being created. So, for example, if you set your batch size to 1000, and the machine you are writing to goes offline, duplicates may be generated in groups of up to 1000. This may occur in special cases, for example, if the events got written to the downstream machine but then the connection failed before it could acknowledge that it had received them. However, duplicate events will only appear in exceptional circumstances.

Choosing a batch size

To squeeze all the performance possible out of a Flume system, batch sizes should be tuned with care through experimentation. While I will get into this in more detail in the follow-up to this post, here are some rules of thumb for selecting batch sizes.

  1. Start off with batch sizes equal to the sum of the batch sizes of the input streams coming into that tier. For example, if you have a downstream Flume agent running an Avro source with 10 upstream agents sending events via Avro sinks using a batch size of 100 each, consider starting that downstream agent with a batch size of 1,000. Tune / experiment from there.
  2. If you find yourself setting the batch size very high (say, higher than 10,000) then consider adding another Sink instead, in order to increase the parallelism (each sink typically runs on its own thread). Say you were going to use one HDFS sink with a batch size of 20,000. Experiment with using 2 HDFS sinks with batch sizes of 5,000 or 10,000 to see if that helps more.
  3. Prefer the lowest batch size that gives you acceptable performance.
  4. Monitor the steady-state channel sizes to get more tuning insight (more on this in the next article).

Different batch sizes in different situations

Due to the performance / duplication tradeoff of the batch size parameter, I often see varying batch size settings depending on the use case. In the case of using Flume to write to HBase using the HBase Sink, incrementing counters, a smaller batch size on the order of 100 is often used to reduce the impact in case of hiccups in the system. On the other hand, with an HDFS sink, in order to get maximum throughput, I see people running with batch sizes of 1,000 or even 10,000, since typically they can easily run map-reduce jobs to de-duplicate the data at processing time. Note that when writing large events with large batch sizes to HDFS, often other parameters need to be increased as well. One such parameter is hdfs.callTimeout, which may be increased to 60 seconds or more to account for the long tail of occasional higher-latency calls to HDFS.

At the other end of the spectrum, in cases where batching events at the application (Flume client) tier is not possible, the memory channel is often used at the collector-tier Flume agent (or a localhost agent on the app servers) to get acceptable performance with a batch size of 1, while using larger batch sizes and file channels in the downstream agents in order to get most of the benefits of durability there. For the best performance, however, all tiers including the client/application tier would perform some level of batching. (Please see above diagram for an illustration of the tiers referenced in this scenario.)

Configuration parameters and gotchas

The actual parameter used to set the batch size varies between sinks, but for most sinks it’s simply called batchSize or batch-size. For the HDFS sink, it’s actually called hdfs.batchSize for historical reasons; I recommend setting hdfs.txnEventMax to the same value as hdfs.batchSize for simplicity. Historically, in the HDFS sink, the number of events taken in a single transaction can be different from the number of events written to HDFS before a sync() operation; In practice, there is little reason these should not be set to the same value.

One potentially confusing gotcha when tuning the batch size on a sink is that it must be less than or equal to the transactionCapacity set on the corresponding channel. The transactionCapacity should be set to the value of the largest batch size that will be used to store or remove events from that channel.

tl;dr: Below is a “cheat sheet” batch size tuning summary for your convenience. Please note that these are just starting points for tuning.

Sink Type Config parameter Typical value
Avro batch-size 100
HDFS hdfs.batchSize, hdfs.txnEventMax 1000
HBaseSink batchSize 100
AsyncHBaseSink batchSize 100

That’s all I have space for in one blog post. Please leave feedback and questions in the comments. Look for another post in the future with tips on using Flume’s monitoring capabilities to take advantage of important information which can aid you in your quest for optimum performance.

Mike Percy is a committer and PMC member on the Apache Flume project.

Tuesday Dec 04, 2012

Apache Flume 1.3.0 released!

The Apache Flume team is pleased to announce the release of Flume version 1.3.0.

Apache Flume 1.3.0 is the fourth release under the auspices of Apache of the so-called “NG” codeline, and our second release as a top-level Apache project! Flume 1.3.0 has been put through many stress and regression tests, is stable, production-ready software, and is backwards-compatible with Flume 1.2.0.

Four months of very active development went into this release: a whopping 221 patches were committed since 1.2.0, representing many features, enhancements, and bug fixes. While the full change log can be found in the link below, here are a few new feature highlights:

  • New HTTP Post Source
  • New Spool Directory Source
  • New Multi-port Syslog Source
  • New Elastic Search Sink
  • New Regex Extractor Interceptor
  • File Channel Encryption
This release can be downloaded from the Flume download page.

The change log and documentation are available on the 1.3.0 release page.

Your help and feedback is more than welcome!

Tuesday Nov 27, 2012

Streaming data into Apache HBase using Apache Flume

Apache Flume was conceived as a fault-tolerant ingest system for the Apache Hadoop ecosystem. Flume comes packaged with an HDFS Sink which can be used to write events into HDFS, and two different implementations of HBase sinks to write events into HBase. You can read about the basic architecture of Apache Flume 1.x in this blog post. You can also read about how Flume’s File Channel persists events and still provides extremely high performance in an earlier blog post. In this article, we will explore how to configure Flume to write events into HBase, and write custom serializers to write events into HBase in a format of the user’s choice.

Data is stored in HBase as tables. Each table has one or more column families, and each column family has one or more columns. HBase stores all columns in a column family in physical proximity. Each row is identified by a key known as the row key. To insert data into HBase, the table name, column family, column name and row key have to be specified. More details on the HBase data model can be found in the HBase documentation.

Flume has two HBase Sinks, the HBaseSink(org.apache.flume.sink.hbase.HBaseSink) and AsyncHBaseSink(org.apache.flume.sink.hbase.AsyncHBaseSink). These two sinks will eventually converge to similar functionality, but currently each has some advantages over the other:

  • The AsyncHBaseSink currently gives better performance than the HBaseSink, primarily because it makes non-blocking calls to HBase.
  • The HBaseSink will soon support secure HBase clusters (FLUME-1626) and the new HBase IPC which was introduced in HBase 0.96.


The configuration for both these sinks are very similar. A sample configuration is shown below:

#Use the AsyncHBaseSink
host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink
#Use the HBaseSink
#host1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink
host1.sinks.sink1.channel = ch1
host1.sinks.sink1.table = transactions
host1.sinks.sink1.columnFamily = clients
host1.sinks.sink1.column = charges
host1.sinks.sink1.batchSize = 5000
#Use the SimpleAsyncHbaseEventSerializer that comes with Flume
host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
#Use the SimpleHbaseEventSerializer that comes with Flume
#host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
host1.sinks.sink1.serializer.incrementColumn = icol
host1.channels.ch1.type=memory
In the above config, the “table” parameter specifies the table in HBase that the sink has to write to - in this case, “transactions”; the “columnFamily” parameter specifies the column family in that table to insert the data into, in this case, “clients”; and the “column” parameter specifies the column in the column family to write to, in this case “charges”. Apart from this the sink requires the channel to be configured, like all other Flume Sinks. The other interesting configuration parameters are the “serializer” and the “serializer.*” parameters. The two sinks use different interfaces for the serializer. In both cases, the serializer is a class that converts the Flume Event into an HBase-friendly format. This piece of code that “translates” the events is usually specific to the schema used by the user’s HBase cluster and is usually implemented by the user. All configuration parameters passed in as “serializer.*” are passed to the serializer. This configuration can be used to set up any internal state the serializer needs.

In case of the HBaseSink, the serializer converts a Flume Event into one or more HBase Puts and/or Increments. The serializer must implement the HbaseEventSerializer. The serializer is instantiated when the sink is started by the Flume configuration framework. For each event processed by the sink, the sink calls the initialize method in the serializer. The serializer must “translate” the Flume Event into HBase puts and increments which should be returned by getActions and getIncrements methods.  These puts and increments are then sent over the wire to the HBase cluster. When the sink stops, this instance of the serializer is closed by the HBaseSink.

The AsyncHBaseSink’s serializer must implement AsyncHbaseEventSerializer.

In this case, the initialize method is called once by the sink, when it starts up. For every event, the sink calls the setEvent method and then calls the getActions and getIncrements methods - similar to the HBaseSink. When the sink is stopped, the serializer’s cleanUp method is called. Notice that the methods do not return the standard HBase Puts and Increments, but PutRequest and AtomicIncrementRequest from the asynchbase API. These are roughly equivalent to the HBase Puts and Increments respectively, with some differences.

An example of such a serializer is below.


/**
 * A serializer for the AsyncHBaseSink, which splits the event body into
 * multiple columns and inserts them into a row whose key is available in
 * the headers
 */
public class SplittingSerializer implements AsyncHbaseEventSerializer {
  private byte[] table;
  private byte[] colFam;
  private Event currentEvent;
  private byte[][] columnNames;
  private final List<PutRequest> puts = new ArrayList<PutRequest>();
  private final List<AtomicIncrementRequest> incs = new ArrayList<AtomicIncrementRequest>();
  private byte[] currentRowKey;
  private final byte[] eventCountCol = "eventCount".getBytes();

  @Override
  public void initialize(byte[] table, byte[] cf) {
    this.table = table;
    this.colFam = cf;
  }

  @Override
  public void setEvent(Event event) {
    // Set the event and verify that the rowKey is not present
    this.currentEvent = event;
    String rowKeyStr = currentEvent.getHeaders().get("rowKey");
    if (rowKeyStr == null) {
      throw new FlumeException("No row key found in headers!");
    }
    currentRowKey = rowKeyStr.getBytes();
  }

  @Override
  public List<PutRequest> getActions() {
    // Split the event body and get the values for the columns
    String eventStr = new String(currentEvent.getBody());
    String[] cols = eventStr.split(",");
    puts.clear();
    for (int i = 0; i < cols.length; i++) {
      //Generate a PutRequest for each column.
      PutRequest req = new PutRequest(table, currentRowKey, colFam,
              columnNames[i], cols[i].getBytes());
      puts.add(req);
    }
    return puts;
  }

  @Override
  public List<AtomicIncrementRequest> getIncrements() {
    incs.clear();
    //Increment the number of events received
    incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol));
    return incs;
  }

  @Override
  public void cleanUp() {
    table = null;
    colFam = null;
    currentEvent = null;
    columnNames = null;
    currentRowKey = null;
  }

  @Override
  public void configure(Context context) {
    //Get the column names from the configuration
    String cols = new String(context.getString("columns"));
    String[] names = cols.split(",");
    byte[][] columnNames = new byte[names.length][];
    int i = 0;
    for(String name : names) {
      columnNames[i++] = name.getBytes();
    }
  }

  @Override
  public void configure(ComponentConfiguration conf) {
  }
}

This serializer splits the event body based on a delimiter and inserts each split into a different column. The row is defined in the event header. When each event is received, a counter is incremented to keep track of the number of events received as well.

This serializer can be configured by the following configuration:

host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink
host1.sinks.sink1.channel = ch1
host1.sinks.sink1.table = transactions
host1.sinks.sink1.columnFamily = clients
host1.sinks.sink1.batchSize = 5000
#The serializer to use
host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SplittingSerializer
#List of columns each event writes to.
host1.sinks.sink1.serializer.columns = charges,date,priority

Internals of the HBaseSink and AsyncHBaseSink
The HBaseSink uses the HBase HTable API to write events out to HBase. HTable supports batching of Puts, but only HBase 0.92+ supports batching of Increments. Currently, the HBase Sink is single-threaded and will call the serializer to get the Puts and Increments once per event it processes. HBase Put and Increments are sent to HBase via blocking calls, which means the next event is read and passed to the serializer only once the current event is successfully written to HBase. Each transaction consists of at most the number of events specified by the batchSize property in the configuration. Like all other Flume sinks, if one of these events fails to get written successfully, the sink will retry the entire transaction again.

On the other hand, the AsyncHBaseSink uses the asynchbase API, and sends out events asynchronously to HBase. The AsyncHBaseSink, in the same way as the HBase sink, generates Puts and Increments for each event. Once the Puts and Increments are generated, the sink sends them out immediately to HBase and moves on to process the next event. Success or failure is handled through callbacks. Again, each transaction consists of at most the number of events specified by the batchSize configuration parameter. The sink waits until either success callbacks are received for all the events sent, or at least one error callback is received. If an error callback is received, the entire transaction is retried, in true Flume style.

A word of caution
As you can see, if HBase reports failure to write even one Put or Increment, the entire transaction is retried - this is how Flume’s at-least-once semantics work, and most Flume sinks operate in the same way. In case of HBase Increments, this means it is possible that the same event would cause a counter to be incremented more than once. This is something to keep in mind while using Flume to perform Increments. Also, if the serializer is not idempotent, then this means that it is possible that the same event can cause multiple different Puts to be written to HBase. Imagine a case where we are talking about credit card transactions represented by the event. If the same event can generate different Puts each time, it is possible that HBase would have multiple records of the same transactions, which is probably not desired.

The AsyncHBaseSink is known to give better performance than the HBaseSink primarily because of the non-blocking nature of the underlying API it uses. The HBase community is working on improving the HBase client API to improve its performance, which would vastly improve the HBaseSink performance.

Conclusion
Flume is an excellent tool to write events out to the different storage systems in the Hadoop ecosystem including HBase. The HBase sinks provide the functionality to write data to HBase in your own schema and allows the user to “map” the Flume event to HBase data.

Wednesday Sep 26, 2012

Apache Flume - FileChannel

This blog post is about Apache Flume’s File Channel. Apache Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

FileChannel is a persistent Flume channel that supports writing to multiple disks in parallel and encryption.

Overview

When using Flume, each flow has a Source, Channel, and Sink. A typical example would be a webserver writing events to a Source via RPC (e.g. Avro Source), the sources writing to MemoryChannel, and HDFS Sink consuming the events, writing them to HDFS.

MemoryChannel provides high throughput but loses data in the event of a crash or loss of power. As such the development of a persistent Channel was desired. FileChannel was implemented in FLUME-1085. The goal of FileChannel is to provide a reliable high throughput channel. FileChannel guarantees that when a transaction is committed, no data will be lost due to a subsequent crash or loss of power.

It's important to note that FileChannel does not do any replication of data itself. As such, it is only as reliable as the underlying disks. Users who use FileChannel because of its durability should take this into account when purchasing and configuring hardware. The underlying disks should be RAID, SAN, or similar.

Many systems trade a small amount of data loss (fsync from memory to disk every few seconds for example) for higher throughput. The Flume team decided on a different approach with FileChannel. Flume is a transactional system and multiple events can be either Put or Taken in a single transaction. The batch size can be used to control throughput. Using large batch sizes, Flume can move data through a flow with no data loss and high throughput. The batch size is completely controlled by the client. This is an approach users of RDBMS's will be familiar with.

A Flume transaction consists of either Puts or Takes, but not both, and either a commit or a rollback. Each transaction implements both a Put and Take method. Sources do Puts onto the channel and Sinks do Takes from the channel.

Design

FileChannel is based on a write ahead log or WAL in addition to an in-memory queue. Each transaction is written to the WAL based on the transaction type (Take or Put) and the queue is modified accordingly. Each time a transaction is committed, fsync is called on the appropriate file to ensure the data is actually on disk and a pointer to that event is placed on a queue. The queue serves just like any other queue: it manages what is yet to be consumed by the sink. During a take, a pointer is removed from the queue. The event is then read directly from the WAL. Due to the large amount of RAM available today, it's very common for that read to occur from the operating system file cache.

After a crash, the WAL can be replayed to place the queue in the same state it was immediately preceding the crash such that no committed transactions are lost. Replaying WALs can be time consuming, so the queue itself is written to disk periodically. Writing the queue to disk is called a checkpoint. After a crash, the queue is loaded from disk and then only committed transactions after the queue was saved to disk are replayed, significantly reducing the amount of WAL, which must be read. 

For example, a channel that has two events will look like this:


The WAL contains three important items: the transaction id, sequence number, and event data. Each transaction has a unique transaction id, and each event has a unique sequence number. The transaction id is used simply to group events into a transaction while the sequence number is used when replaying logs. In the above example, the transaction id is 1 and the sequence numbers are 1, 2, and 3.

When the queue is saved to disk - a checkpoint - the sequence number is incremented and saved as well. At restart, first the queue from disk is loaded and then any WAL entries with a greater sequence number than the queue, are replayed. During the checkpoint operation the channel is locked so that no Put or Take operations can alter it's state. Allowing modification of the queue during the checkpoint would result in an inconsistent snapshot of the queue stored on disk.

In the example queue above, a checkpoint occurs after the commit of transaction 1 resulting in the queue being saved to disk with both events ("a" and "b") and a sequence number of 4.

After that point, event a is Taken from the queue in transaction 2:


If a crash occurs, the queue checkpoint is read from disk. Note that since the checkpoint occurred before transaction 2, both events a and b currently exist on the queue. Then the WAL is read and any committed transaction with a sequence number greater than 4 is applied resulting in "a" being removed from the queue.

Two items are not covered by the design above. Takes and Puts which are in progress at the time the checkpoint occurs are lost. Assume the checkpoint occurred instead after the take of "a":


If a crash occurred at this point, under the design described above, event "b" would be on the queue and on replay any WAL entry with a sequence number greater than 5 would be replayed. The Rollback for transaction 2 would be replayed, but the Take for transaction 2 would not be replayed. As such, "a" would not be placed on the queue resulting in data loss. A similar scenario is played out for Puts. For this reason, when a queue checkpoint occurs, transactions which are still in progress are also written out so that this scenario can be handled appropriately.

Implementation

FileChannel is stored in the flume-file-channel module of the Flume project and it's Java package name is org.apache.flume.channel.file. The queue described above is named FlumeEventQueue and the WAL is named Log. The queue itself is a circular array and is backed by a Memory Mapped File while the WAL is a set of files written and read from using the LogFile class and it's subclasses.

Conclusion

FileChannel provides Flume users with durability in the face of hardware, software, and environmental failures while perserving high throughput. It is the recommended channel for most topologies where both aspects are important.

Tuesday Dec 20, 2011

Apache Flume Hackathon

On December 16, 2011, more than 40 people from various companies gathered at Cloudera's Headquarters to participate in Apache Flume Hackathon. This post summarizes the hackathon along with a picture and video from the session.[Read More]

Friday Dec 09, 2011

Apache Flume - Architecture of Flume NG

Apache Flume is currently undergoing incubation at The Apache Software Foundation. Work related to it's next major revision is informally referred to as Flume NG. This blog post covers the high level architecture of Flume NG. This is first in a series of blog posts that will drill into the detailed design and implementation of Flume NG.
[Read More]

Calendar

Search

Hot Blogs (today's hits)

Tag Cloud

Categories

Feeds

Links

Navigation