Apache HBase

Wednesday June 10, 2015

Scalable Distributed Transactional Queues on Apache HBase

This is the first in a series of posts on "Why We Use Apache HBase", in which we let HBase users and developers borrow our blog so they can showcase their successful HBase use cases, talk about why they use HBase, and discuss what worked and what didn't.

Today's entry is a guest post by Terence Yim, a Software Engineer at Cask, responsible for designing and building realtime processing systems on Hadoop/HBase, originally published here on the Cask engineering blog.

- Andrew Purtell

A real time stream processing framework usually involves two fundamental constructs: processors and queues. A processor reads events from a queue, executes user code to process them, and optionally writing events to another queue for additional downstream processors to consume. Queues are provided and managed by the framework. Queues transfer data and act as a buffer between processors, so that the processors can operate and scale independently. For example, a web server access log analytics application can look like this:


One key differentiator among various frameworks is the queue semantics, which commonly varies along these lines:

  • Delivery Guarantee: At least once, at most once, exactly-once.
  • Fault Tolerance: Failures are transparent to user and automatic recovery.
  • Durability: Data can survive failures and restarts.
  • Scalability: Characteristics and limitations when adding more producers/consumers.
  • Performance: Throughput and latency for queue operations.

In the open-source Cask Data Application Platform (CDAP), we wanted to provide a real-time stream processing framework that is dynamically scalable, strongly consistent and with an exactly-once delivery guarantee. With such strong guarantees, developers are free to perform any form of data manipulation without worrying about inconsistency, potential reprocessing or failure. It helps developers build their big data application even if they do not have strong distributed systems background. Moreover, it is possible to relax these strong guarantees to trade-off for higher performance if needed; it is always easier than doing it the other way around.

Queue Scalability

The basic operations that can be performed on a queue are enqueue and dequeue. Producers write data to the head of the queue (enqueue) and consumers read data from the tail of the queue (dequeue). We say a queue is scalable when you enqueue faster as a whole by adding more producers and dequeue faster as a whole by adding more consumers. Ideally, the scaling is linear, meaning doubling the amount of producers/consumers will double the rate of enqueue/dequeue and is only bounded by the size of the cluster. In order to support linear scalability for producers, the queue needs to be backed by a storage system that scales linearly with the number of concurrent writers. For consumers to be linearly scalable, the queue can be partitioned such that each consumer only processes a subset of queue data.

Another aspect of queue scalability is that it should scale horizontally. This means the upper bound of the queue performance can be increased by adding more nodes to the cluster. It is important because it makes sure that the queue can keep working regardless of cluster size and can keep up with the growth in data volume.

Partitioned HBase Queue

We chose Apache HBase as the storage layer for the queue. It is designed and optimized for strong row-level consistency and horizontal scalability. It provides very good concurrent write performance and its support of ordered scans fits well for a partitioned consumer. We use the HBase Coprocessors for efficient scan filtering and queue cleanup. In order to have the exactly-once semantics on the queue, we use Tephra’s transaction support for HBase.

Producers and consumers operate independently. Each producer enqueues by performing batch HBase Puts and each consumer dequeues by performing HBase Scans. There is no link between the number of producers and consumers and they can scale separately.

The queue has a notion of consumer groups. A consumer group is a collection of consumers partitioned by the same key such that each event published to the queue is consumed by exactly one consumer within the group. The use of consumer groups allows you to partition the same queue with different keys and to scale independently based on the operational characteristics of the data. Using the access log analytics example above, the producer and consumer groups might look like this:


There are two producers running for the Log Parser and they are writing to the queue concurrently. On the consuming side, there are two consumer groups. The Unique User Counter group has two consumers, using UserID as the partitioning key and the Page View Counter group contains three consumers, using PageID as the partitioning key.

Queue rowkey format

Since an event emitted by a producer can be consumed by one or more consumer groups, we write the event to one or more rows in an HBase table, with one row designated for each consumer group. The event payload and metadata are stored in separate columns, while the row key follows this format:


The two interesting parts of the row key are the Partition ID and the Entry ID. The Partition ID determines the row key prefix for a given consumer. This allows consumer to read only the data it needs to process using a prefix scan on the table during dequeue. The Partition ID consists of two parts: a Consumer Group ID and an Consumer ID. The producer computes one partition ID per consumer group and writes to those rows on enqueue.

The Entry ID in the row key contains the transaction information. It consists of the producer transaction write pointer issued by Tephra and a monotonic increasing counter. The counter is generated locally by the producer and is needed to make row key unique for the event since a producer can enqueue more than one event within the same transaction.

On dequeue, the consumer will use the transaction writer pointer to determine if that queue entry has been committed and hence can be consumed. The row key is always unique because of the inclusion of a transaction write pointer and counter. This makes producers operate independently and never have write conflicts.

In order to generate the Partition ID, a producer needs to know the size and the partitioning key of each consumer group. The consumer groups information is recorded transactionally when the application starts as well as when there are any changes in group size.

Changing producers and consumers

It is straightforward to increase or decrease producers since each producer operates independently. Adding or removing producer processes will do the job. However, when the size of consumer group needs to change, coordination is needed to update the consumer group information correctly. The steps can be summarized by this diagram:


Pausing and resuming are fast operations as they are coordinated using Apache ZooKeeper and executed in parallel. For example, with the web access log analytics application we mentioned above, changing the consumer groups information may look something like this:


With this queue design, the enqueue and dequeue performance is on par with batch HBase Puts and HBase Scans respectively, with some overhead for talking to the Tephra server. That overhead can be greatly reduced by batching multiple events in the same transaction.

Finally, to prevent “hotspotting“, we pre-split the HBase table based on the cluster size and apply salting on the row key to better distribute writes which otherwise would have been sequential due to monotonically increasing transaction writepointer.

Performance Numbers

We’ve tested the performance on a small ten-node HBase cluster and the result is impressive. Using a 1K bytes payload with batch size of 500 events, we achieved a throughput of 100K events per second produced and consumed, running with three producers and ten consumers. We also observed the throughput increases linearly when we add more producers and consumers: for example, it increased to 200K events per second when we doubled the number of producers and consumers.

With the help of HBase and a combination of best practices, we successfully built a linearly scalable, distributed transactional queue system and used it to provide a real time stream processing framework in CDAP: dynamically scalable, strongly consistent, and with an exactly-once delivery guarantee.

Saturday June 06, 2015

Saving CPU! Using Native Hadoop Libraries for CRC computation in HBase

by Apekshit Sharma, HBase contributor and Cloudera Engineer

TL;DR Use Hadoop Native Library calculating CRC and save CPU!

Checksums in HBase

Checksum are used to check data integrity. HDFS computes and stores checksums for all files on write. One checksum is written per chunk of data (size can be configured using bytes.per.checksum) in a separate, companion checksum file. When data is read back, the file with corresponding checksums is read back as well and is used to ensure data integrity. However, having two files results in two disk seeks reading any chunk of data. For HBase, the extra seek while reading HFileBlock results in extra latency. To work around the extra seek, HBase inlines checksums. HBase calculates checksums for the data in a HFileBlock and appends them to the end of the block itself on write to HDFS (HDFS then checksums the HBase data+inline checksums). On read, by default HDFS checksum verification is turned off, and HBase itself verifies data integrity.

Can we then get rid of HDFS checksum altogether? Unfortunately no. While HBase can detect corruptions, it can’t fix them, whereas HDFS uses replication and a background process to detect and *fix* data corruptions if and when they happen. Since HDFS checksums generated at write-time are also available, we fall back to them when HBase verification fails for any reason. If the HDFS check fails too, the data is reported as corrupt.

The related hbase configurations are hbase.hstore.checksum.algorithm, hbase.hstore.bytes.per.checksum and hbase.regionserver.checksum.verify. HBase inline checksums are enabled by default.

Calculating checksums is computationally expensive and requires lots of CPU. When HDFS switched over to JNI + C for computing checksums, they witnessed big gains in CPU usage.

This post is about replicating those gains in HBase by using Native Hadoop Libraries (NHL). See HBASE-11927


We switched to use the Hadoop DataChecksum library which under-the-hood uses NHL if available, else we fall back to use the Java CRC implementation. Another alternative considered was the ‘Circe’ library. The following table highlights the differences with NHL and makes the reasoning for our choice clear.

Hadoop Native Library


Native code supports both crc32 and crc32c

Native code supports only crc32c

Adds dependency on hadoop-common which is reliable and actively developed

Adds dependency on external project

Interface supports taking in stream of data, stream of checksums, chunk size as parameters and compute/verify checksums  considering data in chunks.

Only supports calculation of single checksum for all input data.

Both libraries supported use of the special x86 instruction for hardware calculation of CRC32C if available (defined in SSE4.2 instruction set). In the case of NHL, hadoop-2.6.0 or newer version is required for HBase to get the native checksum benefit.

However, based on the data layout of HFileBlock, which has ‘real data’ followed by checksums on the end, only NHL supported the interface we wanted. Implementing the same in Circe would have been significant effort. So we chose to go with NHL.


Since the metric to be evaluated was CPU usage, a simple configuration of two nodes was used. Node1 was configured to be the NameNode, Zookeeper and HBase master. Node2 was configured to be DataNode and RegionServer. All real computational work was done on Node2 while Node1 remained idle most of the time. This isolation of work on a single node made it easier to measure impact on CPU usage.


Ubuntu 14.04.2 LTS (GNU/Linux 3.13.0-24-generic x86_64)

CPU: Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz                                          

Socket(s) : 1

Core(s) per socket : 1

Thread(s) per core : 4

Logical CPU(s) : 4

Number of disks : 1

Memory : 8 GB

HBase Version/Distro *: 1.0.0 / CDH 5.4.0

*Since trunk resolves to hadoop-2.5.1 which does not have HDFS-6865, it was easier to use a CDH distro which already has HDFS-6865 backported.


We chose to study the impact on major compactions, mainly because of the presence of CompactionTool which a) can be used offline, b) allowed us to profile only the relevant workload. PerformanceEvaluation (bin/hbase pe) was used to build a test table which was then copied to local disk for reuse.

% ./bin/hbase pe --nomapred --rows=150000 --table="t1" --valueSize=10240 --presplit=10 sequentialWrite 10

Table size: 14.4G

Number of rows: 1.5M

Number of regions: 10

Row size: 10K

Total store files across regions: 67

For profiling, Lightweight-java-profiler was used and FlameGraph was used to generate graphs.

For benchmarking, the linux ‘time’ command was used. Profiling was disabled during these runs. A script repeatedly executed following in order:

  1. delete hdfs:///hbase

  2. copy t1 from local disk hdfs:///hbase/data/default

  3. run compaction tool on t1 and time it



CPU profiling of HBase not using NHL (figure 1) shows that about 22% cpu is used for generating and validating checksums, whereas, while using NHL (figure 2) it takes only about 3%.

Screen Shot 2015-06-02 at 8.14.07 PM.png

Figure 1: CPU profile - HBase not using NHL (svg)

Screen Shot 2015-06-02 at 8.03.39 PM.png

Figure 2: CPU profile - HBase using NHL (svg)


Benchmarking was done for three different cases: (a) neither HBase nor HDFS use NHL, (b) HDFS uses NHL but not HBase, and (c) both HDFS and HBase use NHL. For each case, we did 5 runs. Observations from table 1:

  1. Within a case, while real time fluctuates across runs, user and sys times remain same. This is expected as compactions are IO bound.

  2. Using NHL only for HDFS reduces CPU usage by about 10% (A vs B)

  3. Further, using NHL for HBase checksums reduces CPU usage by about 23% (B vs C).

All times are in seconds. This stackoverflow answer provides a good explaination of real, user and sys times.

run #

no native for HDFS and HBase (A)

no native for HBase (B)

native (C)


real      469.4

user 110.8

sys 30.5

real    422.9

user    95.4

sys     30.5

real 414.6

user 67.5

sys 30.6


real 384.3

user 111.4

sys 30.4

real 400.5

user 96.7

sys 30.5

real 393.8

user     67.6

sys 30.6


real 400.7

user 111.5

sys 30.6

real 398.6

user 95.8

sys 30.6

real    392.0

user    66.9

sys     30.5


real 396.8

user 111.1

sys 30.3

real 379.5

user 96.0

sys 30.4

real    390.8

user    67.2

sys     30.5


real 389.1

user 111.6

sys 30.3

real 377.4

user 96.5

sys 30.4

real    381.3

user    67.6

sys     30.5

Table 1



Native Hadoop Library leverages the special processor instruction (if available) that does pipelining and other low level optimizations when performing CRC calculations. Using NHL in HBase for heavy checksum computation, allows HBase make use of this facility, saving significant amounts of CPU time checksumming.



Hot Blogs (today's hits)

Tag Cloud