Apache HBase

Saturday April 23, 2016

HDFS HSM and HBase: Experiment (continued) (Part 5 of 7)

This is part 5 of a 7 part report by HBase Contributor, Jingcheng Du and HDFS contributor, Wei Zhou (Jingcheng and Wei are both Software Engineers at Intel)

  1. Introduction
  2. Cluster Setup
  3. Tuning
  4. Experiment
  5. Experiment (continued)
  6. Issues
  7. Conclusions

1TB Dataset in a Single Storage

The performance for 1TB dataset in HDD and SSD is shown in Figure 6 and Figure 7. Due to the limitation of memory capability, 1TB dataset in RAMDISK is not tested.

Figure 6. YCSB throughput of a single storage type with 1TB dataset

Figure 7. YCSB latency of a single storage type with 1TB dataset

The throughput and latency on SSD are both better than HDD (134% throughput and 35% latency). This is consistent with 50GB data test.

The benefits gained for throughput by using SSD are different between 50GB and 1TB (from 128% to 134%), SSD gains more benefits in the 1TB test. This is because much more I/O intensive events such as compactions occur in 1TB dataset test than 50GB, and this shows the superiority of SSD in huge data scenarios. Figure 8 shows the changes of the network throughput during the tests.

Figure 8. Network throughput measured for case 1T_HDD and 1T_SSD

In 1T_HDD case the network throughput is lower than 10Gbps, and in 1T_SSD case the network throughput can be much larger than 10Gbps. This means if we use a 10Gbps switch in 1T_SSD case, the network should be the bottleneck.

Figure 9. Disk throughput measured for case 1T_HDD and 1T_SSD

In Figure 9, we can see the bottleneck for these two cases is disk bandwidth.

  • In 1T_HDD, at the beginning of the test the throughput is almost 1000 MB/s, but after a while the throughput drops down due to memstore limitation of regions caused by slow flush.

  • In 1T_SSD case, the throughput seems to be limited by a ceiling of around 1300 MB/s, nearly the same with the bandwidth limitation of SATA controllers. To further improve the throughput, more SATA controllers are needed (e.g. using HBA card) instead of more SSDs are needed.

During 1T_SSD test, we observe that the operation latencies on eight SSDs per node are very different as shown in the following chart. In Figure 10, we only include latency of two disks, sdb represents disks with a high latency and sdf represents disks with a low latency.

Figure 10. I/O await time measured for different disks

Four of them have a better latency than the other ones. This is caused by the hardware design issue. You can find the details in Disk I/O Bandwidth and Latency Varies for Ports. The disk with higher latency might take the same workload as the disks with lower latency in the existing VolumeChoosingPolicy, this would slow down the performance. We suggest to implement a latency-aware VolumeChoosingPolicy in HDFS.

Performance Estimation for RAMDISK with 1TB Dataset

We cannot measure the performance of RAMDISK with 1T dataset due to RAMDISK limited capacity. Instead we have to evaluate its performance by analyzing the results of cases HDD and SSD.

The performance between 1TB and 50GB dataset are pretty close in HDD and SSD.

The throughput difference between 50GB and 1TB dataset for HDD is


While for SSD the value is


If we make an average of the above values as the throughput decrease in RAMDISK between 50GB and 1TB dataset, it is around 2.15% ((2.89%+1.41%)/2=2.15%), thus the throughput for RAMDISK with 1T dataset should be

406577×(1+2.15%)=415318 (ops/sec)

Figure 11.  YCSB throughput estimation for RAMDISK with 1TB dataset

Please note: the throughput doesn’t drop much in 1 TB dataset cases compared to 50 GB dataset cases because they do not use the same number of pre-split regions. The table is pre-split to 18 regions in 50 GB dataset cases, and it is pre-split to 210 regions in the 1 TB dataset.

Performance for Tiered Storage

In this section, we will study the HBase write performance on tiered storage (i.e. different storage mixed together in one test). This would show what performance it can achieve by mixing fast and slow storage together, and help us to conclude the best balance of storage between performance and cost.

Figure 12 and Figure 13 show the performance for tiered storage. You can find the description of each case in Table 1.

Most of the cases that introduce fast storage have better throughput and latency. With no surprise, 1T_RAM_SSD has the best performance among them. The real surprise is that the throughput of 1T_RAM_HDD is worse than 1T_HDD (-11%) and 1T_RAM_SSD_All_HDD is worse than 1T_SSD_All_HDD (-2%) after introducing RAMDISK, and 1T_SSD is worse than 1T_SSD_HDD (-2%).

Figure 12.  YCSB throughput data for tiered storage

Figure 13.  YCSB latency data for tiered storage

We also investigate how much data is written to different storage types by collecting information from one DataNode.

Figure 14. Distribution of data blocks on each storage of HDFS in one DataNode

As shown in Figure 14, generally, more data are written to disks for test cases with higher throughput. Fast storage can accelerate the flush and compaction, which lead to more flushes and compactions.  Thus, more data are written to disks. In some RAMDISK-related cases, only WAL can be written to RAMDISK, and there are 1216 GB WALs written to one DataNode.

For tests without SSD (1T_HDD and 1T_RAM_HDD), we by purpose limiting the number of flush and compaction actions by using fewer flushers and compactors. This is due to limited IOPs capability of HDD, which lead to fewer flush & compactions. Too many concurrent reads and writes can hurt HDD performance which eventually slows down the performance.

Many BLOCKED DataNode threads can be blocked up to tens of seconds in 1T_RAM_HDD. We observe this in other cases as well, but it happens most often in 1T_RAM_HDD. This is because each DataNode holds one big lock when creating/finalizing HDFS blocks, these methods might take tens of seconds sometimes (see Long-time BLOCKED threads in DataNode), the more these methods are used (in HBase they are used in flusher, compactor, and WAL), the more often the BLOCKED occurs. Writing WAL in HBase needs to create/finalize blocks which can be blocked, and consequently users’ inputs are blocked. Multiple WAL with a large number of groups or WAL per region might also encounter this problem, especially in HDD.

With the written data distribution in mind, let’s look back at the performance result in Figure 12 and Figure 13. According to them, we have following observations:

  1. Mixing SSD and HDD can greatly improve the performance (136% throughput and 35% latency) compared to pure HDD. But fully replacing HDD with SSD doesn’t show an improvement (98% throughput and 99% latency) over mixing SSD/HDD. This is because the hardware design cannot evenly split the I/O bandwidth to all eight disks, and 94% data are written in SSD while only 6% data are written to HDD in SSD/HDD mixing case. This strongly hints a mix use of SSD/HDD can achieve the best balance between performance and cost. More information is in Disk Bandwidth Limitation and Disk I/O Bandwidth and Latency Varies for Ports.

  2. Including RAMDISK in SSD/HDD tiered storage has different results with 1T_RAM_SSD_All_HDD and 1T_RAM_SSD_HDD. The case 1T_RAM_SSD_HDD shows a result when there are only a few data written to HDD, which improves the performance over SSD/HDD mixing cases. The results of 1T_RAM_SSD_All_HDD when there are a large number of data written to HDD is worse than SSD/HDD mixing cases. This means if we distribute the data appropriately to SSD and HDD in HBase, we can gain a good performance when mixing RAMDISK/SSD/HDD.

  3. The RAMDISK/SSD tiered storage is the winner of both throughput and latency (109% throughput and 67% latency of pure SSD case). So, if cost is not an issue and maximum performance is needed, RAMDISK/SSD should be chosen.

The throughput decreases by 11% by comparing 1T_RAM_HDD to 1T_HDD. This is initially because 1T_RAM_HDD uses RAMDISK which consumes part of the RAM, which results in the OS buffer having less memory to cache the data.

Further, with 1T_RAM_HDD, the YCSB client can push data at very high speed, cells are accumulated very fast in memstore while the flush and compaction in HDD are slow, the RegionTooBusyException occurs more often (the figure below shows a much larger memstore in 1T_RAM_HDD than 1T_HDD), and we observe much longer GC pause in 1T_RAM_HDD than 1T_HDD, it can be up to 20 seconds in a minute.

Figure 15. Memstore size in 1T_RAM_HDD and 1T_HDD

Finally, as we try to increase the number of flushers and compactors, the performance even goes worse because of the reasons mentioned when explaining why we use less flusher and compactors in HDD-related tests (see Long-time BLOCKED threads in DataNode).

The performance reduction in 1T_RAM_SSD_All_HDD than 1T_SSD_All_HDD (-2%) is due to the same reasons mentioned above.

We suggest:

Implement a finer grained lock mechanism in DataNode.

  1. Use reasonable configurations for flusher and compactor, especially in HDD-related cases.

  2. Don’t use the storage that has large performance gaps, such as directly mixing RAMDISK and HDD together.

  3. In many cases, we can observe the long GC pause around 10 seconds per minute. We need to implement an off-heap memstore in HBase to solve long GC pause issues.

  4. Implement a finer grained lock mechanism in DataNode.

Go to part 6, Issues

HDFS HSM and HBase: Experiment (Part 4 of 7)

This is part 4 of a 7 part report by HBase Contributor, Jingcheng Du and HDFS contributor, Wei Zhou (Jingcheng and Wei are both Software Engineers at Intel) 

  1. Introduction
  2. Cluster Setup
  3. Tuning
  4. Experiment
  5. Experiment (continued)
  6. Issues
  7. Conclusions


Performance for Single Storage Type

First, we will study the HBase write performance on each single storage type (i.e. no storage type mix). This test is to show the maximum performance each storage type can achieve and provide a guide to following hierarchy storage analysis.

We study the single-storage-type performance with two data sets: 50GB and 1TB data insertion. We believe 1TB is a reasonable data size in practice. And 50GB is used to evaluate the performance uplimit as HBase performance is typically higher when the data size is small. The 50GB size was chosen because we need to avoid data out of space in test. Further, due to RAMDISK limited capacity, we have to use a small data size when storing all data in RAMDISK.

50GB Dataset in a Single Storage

The throughput and latency by YCSB for 50GB dataset are listed in Figure 2 and Figure 3. As expected, storing 50GB data in RAMDISK has the best performance whereas storing data in HDD has the worst.

Figure 2. YCSB throughput of a single storage with 50GB dataset

Note: Performance data for SSD and HDD may be better than its real capability as OS can buffer/cache data in memory.

Figure 3. YCSB latency of a single storage with 50GB dataset

Note: Performance data for SSD and HDD may be better than its real capability as OS can buffer/cache data in memory.

For throughput, RAMDISK and SSD are higher than HDD (163% and 128% of HDD throughput, respectively). But the average latencies are dramatically lower than HDD (only 11% and 32% of HDD latency). This is expected as HDD has long latency on seek operation. The latency improvements of using RAMDISK, SSD over HDD are bigger than throughput improvement. This is caused by the huge access latency gap of different storage. The latency we measured of accessing 4KB raw data from RAM, SSD and HDD are respectively 89ns, 80µs and 6.7ms (RAM and SSD are about 75000x and 84x faster than HDD).

Now consider the data of hardware (disk, network and CPU in each DataNode) collected during the tests.

Disk Throughput

Throughput theoretically (MB/s)

Throughput measured (MB/s)



127 x 8 = 1016




447 x 8 = 3576







Table 10. Disk utilization of test cases

Note: The data listed in the table for 50G_HDD and 50G_SSD are the total throughput of 8 disks.

The disk throughput is decided by factors such as access model, block size and I/O depth. The theoretical disk throughput is measured by using performance-friendly factors; in real cases they won’t be that friendly and would limit the data to lower values. In fact we observe that the disk utilization usually goes up to 100% for HDD.

Network Throughput

Each DataNode is connected by a 20Gbps (2560MB/s) full duplex network, which means both receive and transmit speed can reach 2560MB/s simultaneously. In our tests, the receive and transmit throughput are almost identical, so only the receive throughput data is listed in Table 9.

Throughput theoretically (MB/s)

Throughput measured (MB/s)














Table 11. Network utilization of test cases

CPU Utilization








Table 12. CPU utilization of test cases

Clearly, the performance on HDD (50G_HDD) is bottlenecked on disk throughput. However, we can see that, for SSD and RAMDISK, neither disk, network nor CPU are the bottleneck.  So, the bottlenecks must be somewhere else. They can be somewhere in software (e.g. HBase compaction, memstore in regions, etc) or hardware (except disk, network and CPU).

To further understand why the utilization of SSD is so low and throughput is not as high as expected —only 132% of HDD, considering SSD has much higher theoretical throughput (447 MB/s vs. 127MB/s per disk) — we make an additional test to write 50GB data on four SSDs per node instead of eight SSDs per node.

Figure 4. YCSB throughput of a single storage type with 50 GB dataset stored in different number of SSDs

Figure 5. YCSB latency of a single storage type with 50 GB dataset stored in different number of SSDs

We can see that while the number of SSDs doubled, the throughput and latency of eight SSDs per node case (50G_8SSD) are improved to a much lesser extent (104% throughput and 79% latency) compared to four SSDs per node case (50G_4SSD). This means that the ability of SSDs are far from full use.

We made further dive and found that it is caused by current mainstream server hardware design. Currently, mainstream server has two SATA controllers which can support up to 12 Gbps bandwidth. This means that the total disk bandwidth is limited to around 1.5GB/s. That is the bandwidth of approximately 3.4 SSDs. You can find the details in the section Disk Bandwidth Limitation. So SATA controllers have become the bottleneck for eight SSDs per node. This explains why there is almost no improvement on YCSB throughput for eight SSDs per node. In the 50G_8SSD test, the disk is the bottleneck.

Go to part 5, Experiment (continued)

HDFS HSM and HBase: Cluster setup (Part 2 of 7)

This is part 2 of a 7 part report by HBase Contributor, Jingcheng Du and HDFS contributor, Wei Zhou (Jingcheng and Wei are both Software Engineers at Intel)

  1. Introduction
  2. Cluster Setup
  3. Tuning
  4. Experiment
  5. Experiment (continued)
  6. Issues
  7. Conclusions

Cluster Setup

In all, five nodes are used in the testing. Figure 1 shows the topology of these nodes; the services on each node are listed in Table 3.

Figure 1. Cluster topology

  1. For HDFS, one node serves as NameNode, three nodes as DataNodes.

  2. For HBase, one HMaster is collocated together with NameNode. Three RegionServers are collocated with DataNodes.

  3. All the nodes in the cluster are connected to the full duplex 10Gbps DELL N4032 Switch. Network bandwidth for client-NameNode, client-DataNode and NameNode-DataNode is 10Gbps. Network bandwidth between DataNodes is 20Gbps (20Gbps is achieved by network bonding).







YCSB client






Table 3. Services run on each node


Hardware configurations of the nodes in the cluster is listed in the following tables.




Intel® Xeon® CPU E5-2695 v3 @ 2.3GHz, dual sockets


Micron 16GB DDR3-2133MHz, 384GB in total


Intel 10-Gigabit X540-AT2


Intel S3500 800G


Seagate Constellation™ ES ST2000NM0011 2T 7200RPM



Table 4. Hardware for DataNode/RegionServer

Note: OS is stored on an independent SSD (Intel® SSD DC S3500 240GB) for both DataNodes and NameNode. The number of SSD or HDD (OS SSD not included) in DataNode varies for different testing cases., See Section ‘Methodology’ for details.




Intel® Xeon® CPU E5-2697 v2 @ 2.70GHz, dual sockets


Micron 16GB DDR3-2133MHz, 260GB in total


Intel 10-Gigabit X540-AT2


Intel S3500 800G

Table 5. Hardware for NameNode/HBase MASTER



Intel Hyper-Threading Tech


Intel Virtualization


Intel Turbo Boost Technology


Energy Efficient Turbo


Table 6. Processor configuration





CentOS release 6.5













JVM Heap

NameNode:      32GB

DataNode:          4GB

HMaster:            4GB

RegionServer:  64GB



Table 7. Software stack version and configuration

NOTE: as mentioned in Methodology, HDFS and HBase have been enhanced to support this test


We use YCSB 0.3.0 as the benchmark and use one YCSB client in the tests.

This is the workload configuration for 1T dataset:

# cat ../workloads/1T_workload












And we use following command to start the YCSB client:

./ycsb load hbase-10 -P ../workloads/1T_workload -threads 200 -p columnfamily=family -p clientbuffering=true -s > 1T_workload.dat

Go to part 3, Tuning

HDFS HSM and HBase: Introduction (Part 1 of 7)

This is part 1 of a 7 part report by HBase Contributor, Jingcheng Du and HDFS contributor, Wei Zhou (Jingcheng and Wei are both Software Engineers at Intel)

  1. Introduction
  2. Cluster Setup
  3. Tuning
  4. Experiment
  5. Experiment (continued)
  6. Issues
  7. Conclusions


As more and more fast storage types (SSD, NVMe SSD, etc.) emerge, a methodology is necessary for better throughput and latency when using big data. However, these fast storage types are still expensive and are capacity limited. This study provides a guide for cluster setup with different storage media.

In general, this guide considers the following questions:

  1. What is the maximum performance user can achieve by using fast storage?

  2. Where are the bottlenecks?

  3. How to achieve the best balance between performance and cost?

  4. How to predict what kind of performance a cluster can have with different storage combinations?

In this study, we study the HBase write performance on different storage media. We leverage the hierarchy storage management support in HDFS to store different categories of HBase data on different media.

Three different types of storage (HDD, SSD and RAMDISK) are evaluated. HDD is the most popular storage in current usages, SATA SSD is a faster storage which is more and more popular now. RAMDISK is used to emulate the extremely high performance PCI-e NVMe based SSDs and coming faster storage (e.g. Intel 3D XPoint® based SSD). Due to hardware unavailability, we have to use RAMDISK to perform this emulation. And we believe our results hold for PCI-e SSD and other fast storage types.

Note: RAMDISK is logical device emulated with memory. Files stored into RAMDISK will only be cached in memory.


We test the write performance in HBase with a tiered storage in HDFS and compare the performance when storing different HBase data into different storages. YCSB (Yahoo! Cloud Serving Benchmark, a widely used open source framework for evaluating the performance of data-serving systems) is used as the test workload.

Eleven test cases are evaluated in this study.  We split the table into 210 regions in 1 TB dataset cases to avoid region split at runtime, and we pre-split the table into 18 regions in 50 GB dataset cases.

The format of case names is <dataset size>_<storage type>.

Case Name







Store all the files in RAMDISK. We have to limit data size to 50GB in this case due to the capacity limitation of RAMDISK.


8 SSDs


Store all the files in SATA SSD. Compare the performance by 50GB data with the 1st case.


8 HDDs


Store all the files in HDD.


8 SSDs


Store all files in SATA SSD. Compare the performance by 1TB data with cases in tiered storage.


8 HDDs


Store all files in HDD. Use 1TB in this case.



8 SSDs


Store files in a tiered storage (i.e. different storage mixed together in one test), WAL is stored in RAMDISK, and all the other files are stored in SSD.



8 HDDs


Store files in a tiered storage, WAL is stored in RAMDISK, and all the other files are stored in HDD.


4 SSDs

4 HDDs


Store files in a tiered storage, WAL is stored in SSD, some smaller files (not larger than 1.5GB) are stored in SSD, and all the other files are stored in HDD including all archived files.


4 SSDs

4 HDDs


Store files in a tiered storage, WAL and flushed HFiles are stored in SSD, and all the other files are stored in HDD including all archived files and compacted files.



4 SSDs

4 HDDs


Store files in a tiered storage, WAL is stored in RAMDISK, some smaller files (not larger than 1.5GB) are stored in SSD, and all the other files are stored in HDD including all archived files.



4 SSDs

4 HDDs


Store files in a tiered storage, WAL is stored in RAMDISK, flushed HFiles are stored in SSD, and all the other files are stored in HDD including all archived files and compacted files.

Table 1. Test Cases

NOTE: In all 1TB test cases, we pre-split the HBase table into 210 regions to avoid the region split at runtime.

The metrics in Table 2 are collected during the test for performance evaluation.



Storage media level

IOPS, throughput (sequential/random R/W)

OS level

CPU usage, network IO, disk IO, memory usage

YCSB Benchmark level

Throughput, latency

Table 2. Metrics collected during the tests

Go to Part 2, Cluster Setup

Friday April 22, 2016

HDFS HSM and HBase: Tuning (Part 3 of 7)

This is part 3 of a 7 part report by HBase Contributor, Jingcheng Du and HDFS contributor, Wei Zhou (Jingcheng and Wei are both Software Engineers at Intel) 

  1. Introduction
  2. Cluster Setup
  3. Tuning
  4. Experiment
  5. Experiment (continued)
  6. Issues
  7. Conclusions

Stack Enhancement and Parameter Tuning

Stack Enhancement

To perform the study, we made a set of enhancements in the software stack:

  • HDFS:

    • Support a new storage RAMDISK

    • Add file level mover support, a user can move blocks per file without scanning all metadata in NameNode

  • HBase:

    • WAL, flushed HFiles, HFiles generated in compactions, and archived HFiles can be stored in different storage

    • When renaming HFiles across storage, the blocks of that file would be moved to the target storage asynchronously

HDFS/HBase Tuning

This step is to find the best configurations for HDFS and HBase.

Known Key Performance Factors in HBase

These are the key performance factors in HBase:

  1. WAL: write ahead log to guarantee the non-volatility and consistency of the data. Each record that is inserted to HBase must be written to WAL which can slow down user operations. It’s latency-sensitive.

  2. Memstore and Flush: The records inserted into HBase are cached in memstore, and when reaches a threshold the memstore is flushed to a store file. Slow flush can lead to high GC (Garbage Collection) pause, and make memory usage reach the thresholds in regions and region server, which can block the user operations.

  3. Compaction and Number of Store Files: HBase compaction compacts small store files to a larger one which can reduce the number of store files and accelerate the reading, but it can generate heavy I/O and consume the disk bandwidth in runtime. Less compaction can accelerate the writing but generates too many store files, which slow down the reading. When there are too many store files, the memstore flush can be slowed down which can lead to a large memstore and further slow the user operations.

Based on this understanding, the following are the tuned parameters we finally used.







Table 8. HDFS configuration




3 for non-SSD test cases.

8 for all SSD related test cases.


5 for non-SSD test cases.

15 for all SSD related test cases.













Table 9. HBase configuration

Go to part 4, Experiment

Wednesday April 13, 2016

Investing In Big Data: Apache HBase

This is the fourth in a series of posts on "Why We Use Apache HBase", in which we let HBase users and developers borrow our blog so they can showcase their successful HBase use cases, talk about why they use HBase, and discuss what worked and what didn't.

Lars Hofhansl and Andrew Purtell are HBase Committers and PMC members. At Salesforce.com, Lars is a Vice President and Software Engineering Principal Architect, and Andrew is a Cloud Storage Architect.

An earlier version of the discussion in this post was published here on the Salesforce + Open Source = ❤ blog.

- Andrew Purtell

Investing In Big Data: Apache HBase

 By © Earth at Night / CC BY 3.0

The world is good at making data. You can see it in every corner of commerce and industry: everything we interact with is getting smarter, and producing a massive stream of readings, geolocations, images, and more. From medical devices to jet engines, it’s transforming every part of the modern world. And it’s accelerating!

Salesforce’s products are all about helping our customers connect with their customers. So obviously, a big part of that is equipping them to interact with all of the data a customer’s interactions might generate, in whatever quantities it shows up in. We do that in many ways, across all of our products. 

In this post, we’d like to zoom in on one particular Open Source data system we use and contribute heavily to: Apache HBase. If you’re not familiar with HBase, this post will start with a few high level concepts about the system, and then go into how it fits in at Salesforce. 

What IS HBase? 

HBase is an open source distributed database. It’s designed to store record-oriented data across a scalable cluster of machines. We sometimes refer to it as a “sparse, consistent, distributed, multi-dimensional, persistent, sorted map”. This usually makes people say “Wat??”. So, to break that down a little bit:

  • distributed :  rows are spread over many machines;
  • consistent :  it’s strongly consistent (at a per-row level);
  • persistent :  it stores data on disk with a log, so it sticks around;
  • sorted : rows are stored in sorted order, making seeks very fast;
  • sparse :  if a row has no value in a column, it doesn’t use any space;
  • multi-dimensional :  data is addressable in several dimensions: tables, rows, columns, versions, etc.

Think of it as a giant list of key / value pairs, spread over a large cluster, sorted for easy access.

People often refer to HBase as a “NoSQL” store–a term coined back in 2009 to refer to a big cohort of similar systems that were doing data storage without SQL (Structured Query Language). Contributors to the HBase project will tell you they have always disliked that term, though; a toaster is also NoSQL. It’s better to say HBase is architected differently than a typical relational database engine, and can scale out better for some use cases.

Google was among the first companies to move in this direction, because they were operating at the scale of the entire web. So, they long ago built their infrastructure on top of a new kind of system (BigTable, which is the direct intellectual ancestor of HBase). It grew from there, with dozens of Open Source variants on the theme emerging in the space of a few years: Cassandra, MongoDB, Riak, Redis, CouchDB, etc. 

Why use a NoSQL store, if you already have a relational database?

Let’s be clear: relational databases are terrific. They’ve been the dominant form of data storage on Earth for nearly three decades, and that’s not by accident. In particular, they give you one really killer feature: the ability to decompose the physical storage of data into different conceptual buckets (entities, aka tables), with relationships to each other … and to modify the state of many related values atomically (transactions). This incurs a cost (for finding and re-assembling the decomposed data when you want to read it) but relational database query planners have gotten so shockingly good that this is actually a perfectly good trade-off in most cases.

Salesforce is deeply dependent on relational databases. A majority of our row-oriented data still lives there, and they are integral to the basic functionality of the system. We’ve been able to scale them to massive load, both via our unique data storage architecture, and also by sharding our entire infrastructure (more on that in the The Crystal Shard, a post by Ian Varley on the Salesforce blog). They’re not going anywhere. On the contrary, we do constant and active research into new designs for scaling and running relational databases. 

But, it turns out there are a subset of use cases that have very different requirements from relational data. In particular, less emphasis on webs of relationships that require complex transactions for correctness, and more emphasis on large streams of data that accrue over time, and need linear access characteristics. You certainly can store these in an RDBMS. But, when you do, you’re essentially paying a penalty (performance and scale limitations) for features you don’t need; a simpler design can scale more powerfully.

It’s for those new use cases–and all the customer value they unlock–that we’ve added HBase to our toolkit.

Of all the NoSQL stores, why HBase?

This question comes up a lot: people say, “I heard XYZ-database is web scale, you should use that!”. The world of “polyglot persistence” has produced a boatload of choices, and they all have their merits. In fact, we use almost all of them somewhere in the Salesforce product suite: Cassandra, Redis, CouchDB, MongoDB, etc.

To choose HBase as a key area of investment for Salesforce Core, we went through a pretty intense “bake-off” process that included evaluations of several different systems, with experiments, spikes, POCs, etc. The decision ultimately came down to three big points for us:

  1. HBase is a strongly consistent store. In the CAP Theorem, that means it’s a (CP) store, not an (AP) store. Eventual consistency is great when used for the right purpose, but it can tend to push challenges up to the application developer. We didn’t think we’d be able to absorb that extra complexity, for general use in a product with such a large surface area. 

  2. It’s a high quality project. It did well in our benchmarks and tests, and is well respected in the community. Facebook built their entire Messaging infrastructure on HBase (as well as many other things), and the Open Source community is active and friendly.

  3. The Hadoop ecosystem already had an operational presence at Salesforce. We’ve been using Hadoop in the product for ages, and we already had a pretty good handle on how to deploy and operate it. HBase can use Hadoop’s distributed filesystem for persistence and offers first class integration with MapReduce (and, coming soon, Spark), so is a way to level up existing Hadoop deployments with modest incremental effort.

Our experience was (and still is) that HBase wasn’t the easiest to get started with, but it was the most dependable at scale; we sometimes refer to it as the “industrial strength” scalable store, ready for use in demanding enterprise situations, and taking issues like data durability and security extremely seriously. 

HBase (and its API) is also broadly used in the industry. HBase is an option on Amazon’s EMR, and is also available as part of Microsoft’s Azure offerings. Google Cloud includes a hosted BigTable service sporting the de-facto industry standard HBase client API. (It’s fair to say the HBase client API has widespread if not universal adoption for Hadoop and Cloud storage options, and will likely live on beyond the HBase engine proper.) 

When does Salesforce use HBase?

There are three indicators we look at when deciding to run some functionality on HBase. We want things that are big, record-oriented, and transactionally independent

By “big”, we mean things in the ballpark of hundreds of millions of rows per tenant, or more. HBase clusters in the wild have been known to grow to thousands of compute nodes, and hundreds of Petabytes, which is substantially bigger than we’d ever grow a single one of our instance relational databases to. (None of our individual clusters are that big yet, mind you.) Data size is not only not a challenge for HBase, it’s a desired feature!

By “record-oriented”, we mean that it “looks like” a database, not a file store. If your data can be modeled as big BLOBs, and you don’t need to independently read or write small bits of data in the middle of those big blobs, then you should use a file store.

Why does it matter if things are record-oriented? After all, HBase is built on top of HDFS, which is … a file store. The difference is that the essential function that HBase plays is specifically to let you treat big immutable files as if they were a mutable database. To do this magic, it uses something called a Log Structured Merge Tree, which provides for both fast reads and fast writes. (If that seems impossible, read up on LSMs; they’re legitimately impressive.)

And, what do we mean by “transactionally independent”? Above, we described that a key feature of relational databases is their transactional consistency: you can modify records in many different tables and have those modifications either commit or rollback as a unit. HBase, as a separate data store, doesn’t participate in these transactions, so for any data that spans both stores, it requires application developers to reason about consistency on their own. This is doable, but it is tricky. So, we prefer to emphasize those cases where that reasoning is simple by design

(For the hair-splitters out there, note that HBase does offer consistent “transactions”, but only on a single-row basis, not across different rows, objects, or (most importantly) different databases.)

One criterion for using HBase that you’ll notice we didn’t mention is that as a “NoSQL” store, you wouldn’t use it for something that called for using SQL. The reason we didn’t mention that is that it isn’t true! We built and open sourced a library called “Phoenix” (which later became Apache Phoenix) which brings a SQL access layer to HBase. So, as they say, “We put the SQL back in NoSQL”.

We treat HBase as a “System Of Record”, which means that we depend on it to be every bit as secure, durable and available as our other data stores. Getting it there required a lot of work: accounting for site switching, data movement, authenticated access between subsystems, and more. But we’re glad we did!

What features does Salesforce use HBase for?

To give you a concrete sense of some of what HBase is used for at Salesforce, we’ll mention a couple of use cases briefly. We’ll also talk about more in future posts.

The main place you may have seen HBase in action in Salesforce to date is what we call Salesforce Shield. Shield is a connected suite of product features that enable enterprise businesses to encrypt data and track user actions. This is a requirement in certain protected sectors of business, like Health and Government, where compliance laws dictate the level of retention and oversight of access to data.

One dimension of this feature is called Field Audit Trail (FAT). In Salesforce, there’s an option that allows you to track changes made to fields (either directly by users through a web UI or mobile device, or via other applications through the API). This historical data is composed of “before” and “after” values for every tracked field of an object. These stick around for a long time, as they’re a matter of historical record. That means that if you have data that changes very frequently, this data set can grow rapidly, and without any particular bound. So we use HBase as a destination for moving older sets of audit data over, so it’s still accessible via the API, but doesn’t have any cost to the relational database optimizer. The same principle applies to other data that behaves the same way; we can archive this data into HBase as a cold store.

Another part of shield that uses HBase is the Event Monitoring feature. The initial deployment was based on Hadoop, and culls a subset of application log lines, making them available to customers directly as downloadable files. New work is in progress to capture some event data into HBase, like Login Forensics, in real time, so it can be queried interactively. This could be useful for security and audit, limit monitoring, and general usage tracking. We’re also introducing an asynchronous way to query this data so you can work with the potentially large resulting data sets in sensible ways.

More generally, the Platform teams at Salesforce have been cooking up a general way for customers to use HBase. It’s called BigObjects (more here). BigObjects behave in most (but not all) ways like standard relational-database-backed objects, and are ideally suited for use cases that require ingesting large amounts of read-only data from external systems: log files, POS data, event data, clickstreams, etc.

We’re adding more features that use HBase in every release, like improving mention relevancy in Chatter, tracking Apex unit test results, caching report data, and more. We’ll post follow-ups here and on the Salesforce Engineering Medium Channel about these as they evolve.

Contributing To HBase

So, we’ve got HBase clusters running in data centers all over the world, powering new features in the product, and running on commodity hardware that can scale linearly.

But perhaps more importantly, Salesforce has put a premium on not just being a user of HBase, but also being a contributor. The company employs multiple committers on the Apache project, including both of us (Lars is a committer and PMC member, and Andrew is an Apache VP and PMC Chair). We’ve written and committed hundreds of patches, including some major features. As committers, we’ve reviewed and committed thousands of patches, and served as release managers for the 0.94 release (Lars) and 0.98 release (Andrew).

We’ve also presented dozens of talks at conferences about HBase, including HBaseCon and OSCON.

We’re committed to doing our work in the community, not in a private fork except where temporarily needed for critical issues. That’s a key tenet of the team’s OSS philosophy — no forking! — that we’ll talk about more in an upcoming post on the Salesforce Engineering Medium Channel.

By the way, outside of the Core deployment of HBase that we’ve described here, there are actually a number of other places in the larger Salesforce universe where HBase is deployed as well, including Data.com, the Marketing Cloud (more here), and Argus, an internal time-series monitoring service. More on these uses soon, too!


We’re happy to have been a contributing part of the HBase community these last few years, and we’re looking forward to even more good stuff to come.

Thursday December 17, 2015

Offheaping the Read Path in Apache HBase: Part 1 of 2

Detail on the work involved making it so the Apache HBase read path could work against off heap memory (without copy).[Read More]

Offheaping the Read Path in Apache HBase: Part 2 of 2

by HBase Committers Anoop Sam John, Ramkrishna S Vasudevan, and Michael Stack

This is part two of a two part blog. Herein we compare before and after off heaping. See part one for preamble and detail on work done.

Performance Results

There were two parts to our performance measurement.  

  1. Using HBase’s built-in Performance Evaluation (PE) tool.  

  2. Using YCSB to measure the throughput.

The PE test was conducted on a single node machine.  Table is created and loaded with 100 GB of data.  Table has one CF and one column per row. Each cell value size is 1K. Configuration of the node :

System configuration

CPU : Intel(R) Xeon(R) CPU with 8 cores.
RAM : 150 GB
JDK : 1.8

HBase configuration

  <value> 104448 </value>

20% of the heap is allocated to the L1 cache (LRU cache). When L2 is enabled, L1 holds no data, just index and bloom filter blocks. 102 GB off heap space is allocated to the L2 cache (BucketCache)

Before performing the read tests, we have made sure that all data is loaded into BucketCache so there is no i/o.  The read workloads of PE tool run with multiple threads.  We considerthe average completion time for each of the threads to do the required reads.

1. Each thread does 100 row multi get operations for 1000000 times. We can see that there is a 55 – 82 % reduction in average run time. See the graph below for the test for 5 to 75 threads reading. Y axis shows average completion time, in seconds, for one thread.

Here each thread is doing 100000000 row get and converting this to throughput numbers we can see

5 Threads

10 Threads

20 Threads

25 Threads

50 Threads

75 Threads

Throughput Without Patch







Throughput With HBASE-11425







Throughput gain







So without the patch case, at the 20 threads level, the system goes to peak load situation and throughput starts to fall off. But with HBASE-11425 this is not the case and even with 75 threads. It is mostly linear scaling with more loading.  The major factor which helps us here is reduced GC activity.

2. Each thread is doing a range scan of 10000 rows with filtering of all the data on the server side. The filtering is done to see the server side gain alone and avoid any impact of network and/or client app side bottleneck. Each thread is doing the scan operation 10000 times. We can see that there is 55 – 74 % reduction in average run time of each thread. See below the graph for the test for 5 to 75 threads reading. Y axis shows average completion time, in seconds, for one thread.

3.  Another range scan test is performed with part of data returned back to client.  The test will return 10% of total rows back to the client and the remaining rows are filtered out at the server. The below graph is for a test with 10, 20, 25 and 50 threads. Again Y axis gives the average thread completion time, in seconds. The gain is  28 – 39% latency.

The YCSB test is done in same cluster with 90 GB of data. We had similar system configuration and HBase configuration as for the PE tests.

The YCSB setup involves creating a table with a single column family with around 90 GB of data. There are 10 columns each with 100 bytes of data (each row having 1k of data). All the readings are taken after ensuring that the entire data set is in the BucketCache. The multi get test includes each thread doing 100 row gets and 5000000 such operations. Range scan does random range scans with 1000000 operations.


With HBASE-11425(Ops/sec)















With HBASE-11425(Ops/Sec)














For multi get there is 20 – 160 % throughput gain whereas for range scan it is 20 - 80%.

With HBASE-11425 we can see there is linear throughput increase with more threads whereas old code starts performing badly when more threads (See when 50 and 75 threads)

GC graphs

With HBASE-11425, we serve data directly from the off heap cache rather than copy each of the blocks on heap for each of the reads.  So we should be doing much better with respect to GC on the  RegionServer side.  Below are the GC graph samples taken on one RegionServer during the PE test run.  We can clearly notice that with the changes in HBASE-11425, we are doing much less (better) GC.

MultiGets – without HBASE-11425 (25 threads)


Multigets – with HBASE-11425(25 threads)


ScanRange10000 – without HBASE-11425 (20 threads)


ScanRange10000 – with HBASE-11425 (20 threads)


Future Work

One implication of the findings above is that we should run with off heap Cache on always. We were reluctant to do this in the past when reads from the off heap cache took longer. This is no longer the case. We will look into making this option on by default. Also, this posting has described our conversion of the read pipeline to make it run with offheap buffers. Next up, naturally, would be making the write pipeline offheap.


Some parts of this work made it into branch-1 but to run with a fully off heap read path, you will have to wait on the HBase 2.0 release which should be available early next year (2016). Enabling L2 (BucketCache) in off heap mode will automatically turn on the off heap mechanisms described above. We hope you enjoy these improvements made to the Apache HBase read path.

Thursday October 08, 2015

Imgur Notifications: From MySQL to HBase

This is the third in a series of posts on "Why We Use Apache HBase", in which we let HBase users and developers borrow our blog so they can showcase their successful HBase use cases, talk about why they use HBase, and discuss what worked and what didn't.

Carlos J. Espinoza is an engineer at Imgur.

An earlier version of the discussion in this post was published here on the Imgur Engineering blog.

- Andrew Purtell

Imgur Notifications: From MySQL to HBase

Imgur is a heavy user of MySQL. It has been a part of our stack since our beginning. However, with our scale it has become increasingly difficult to throw more features at it. For our latest feature upgrade, we re-implemented our notifications system and migrated it over from MySQL to HBase. In this post, I will talk about how HBase solved our use case and the features we exploited.

To add some context, previously we supported two types of notifications: messages and comment replies, all stored in MySQL. For this upgrade, we decided to support several additional notification types. We also introduced rules around when a notification should be delivered. This change in spec made it challenging to continue with our previous model, so we started from scratch.

Early in the design phase, we envisioned a world where MySQL remained the primary store. We put together some schemas, some queries and, for the better, stopped ourselves from making a huge mistake. We had to create a couple columns for every type of notification. Creating a new notification type afterwards would mean a schema change. Our select queries would require us to join against other application tables. We designed an architecture that could work, but we would sacrifice decoupling, simplicity, scalability, and extensibility.

Some of our notifications require that they only be delivered to the user once a milestone is crossed. For instance, if a post reaches 100 points, Imgur will notify the user at the 100 point mark. We don’t want to bother users with 99 notifications in between. So, at scale, how do we know that the post has reached 100 points?

A notification could have multiple milestones. We only want to deliver once the milestone is hit.

Considering MySQL is our primary store for posts, one way to do it is to increment the points counter in the posts table, then execute another query fetching the row and check if the points reached the threshold of 100. This approach has a few issues. Increment and then fetch is a race condition. Two clients could think they reached 100 points, delivering two notifications for the same event. Another problem is the extra query. For every increment, we must now fetch the volatile column, adding more stress to MySQL.

Though it is technically possible to do this in MySQL using transactions and consistent read locks, lock contention would make it possibly very expensive with votes as it’s one the most frequent operations on our site. Seeing as we already use HBase in other parts of our stack, we switched gears and we built our system on top of it. Here is how we use it to power notifications in real time and at scale.

Sparse Columns

At Imgur, each notification is composed of one or more events. The following image of our notifications dropdown illustrates how our notifications are modeled.

As illustrated, each notification is composed of one or more events. A notification maps to a row in HBase, and each event maps to multiple columns, one of which is a counter. This model makes our columns very sparse as different types of notifications have different types of events. In MySQL, this would mean a lot of NULL values. We are not tied to a strict table schema, so we can easily add other notification types using the same model.

Atomic Increments

HBase has an atomic increment operation that returns the new value in the same call. It’s a simple feature, but our implementation depends on this operation. This allows our notification delivery logic on the client to be lightweight: increment and only deliver the notification if and only if a milestone is crossed. No extra calls. In some cases, this means we now keep two counters. For instance, the points count in the MySQL table for posts, and the points count in HBase for notifications. Technically, they could get out of sync, but this is an edge case we optimize for.

Another benefit of doing increments in HBase is that it allows us to decouple the notifications logic from the application logic. All we need to know to deliver a notification is whether its counter has crossed a pre-defined threshold. For instance, we no longer need to know how to fetch a post and get its point count. HBase has allowed us to solve our problem in a more generic fashion.

Fast Table Scans

We also maintain a secondary order table. It stores notification references ordered by when they were last delivered using reversed timestamps. When users open their notifications dropdown, we fetch their most recent notifications by performing a table scan limited by their user ID. We can also support scanning for different types of notifications by using scan filters.

Other Benefits

With HBase we gain many other benefits, like linear scalability and replication. We can forward data to another cluster and run batch jobs on that data. We also get strong consistency. It is extremely important for notifications to be delivered exactly once when a milestone is crossed. We can make that guarantee knowing that HBase has strong row level consistency. It’s likely that we’ll use versioning in the future, but even without use of it, HBase is a great choice for our use case.

Imgur notifications is a young project, and we’ll continue to make improvements to it. As it matures and we learn more from it, I hope to share what we’ve built with the open source community.

Thursday September 03, 2015

Medium Data and Universal Data Systems

This is the second in a series of posts on "Why We Use Apache HBase", in which we let HBase users and developers borrow our blog so they can showcase their successful HBase use cases, talk about why they use HBase, and discuss what worked and what didn't.

Matthew Hunt is an architect at Bloomberg who works on Portfolio Analytics and Bloomberg infrastructure.

An earlier version of the discussion in this post was published here on the High Scalability blog.

- Andrew Purtell


Why Bloomberg uses HBase

The Bloomberg Terminal is a leading news, information and analytics platform in finance, found on hundreds of thousands of traders' desks around the world. We are actively working to consolidate around fewer, faster, and simpler systems and see HBase as part of a broader whole, along with computation frameworks such as Spark, resource schedulers like Mesos, virtualization and containerization in OpenStack and Docker, and storage with Ceph and HDFS. This is a substantial undertaking: Bloomberg has tens of thousands of databases and applications built over 30 years, and a team of 4000 engineers.

While HBase was designed for a specific purpose originally, we believe it will grow to become a general purpose database - in fact, a universal one.

What is a universal database?

The great Michael Stonebraker wrote a few years ago that the era of "one size fits all" databases was over. Given that relational, time series, and hierarchical systems have long co-existed makes it questionable whether there ever was a one size fits all period. In fact, the era of one size fits all is, if anything, dawning. Stonebraker's central point - purpose built databases are often significantly better at their purpose then a general alternative - is true. But in the broader landscape, it doesn't matter. The universal database is one that is good enough to subsume nearly all tasks in practice. The distinction between data warehouse, offline batch reporting, and OLTP is already blurry and will soon vanish for most use cases. Slightly more formally, the universal database is one that is fast and low latency for small and medium problems, scales elastically to larger sizes transparently, has clear well understood access semantics as in MPP SQL systems, and connects to standard computation engines such as Spark. Many systems are converging towards this already, and we think HBase is one of the future contenders in this space.

In a strong vote of confidence, HBase has seen increasing adoption and endorsement from major players such as Apple and Google, and Spark integration from Huawei, Cloudera, and Hortonworks, continuing to build and foster the large community that success requires. At the end of the day, the technology projects that succeed are the ones that attract a critical mass. Longevity and future direction are essential considerations in large scale design decisions.

But the future can be cloudy. We also use HBase because, with minor modifications, it is a good fit for practical applications we had initially, once it received some buffing for what I call the medium data problem.

Why don't we use HBase?

No honest engineering writeup is complete without a perusal of alternatives. Many fine and more mature commercial systems exist, along with good open alternatives such as Cassandra. While we think HBase has critical mass and a broad and growing community, other systems have strengths too. HBase can be cumbersome to set up and configure, lacks a standard high level access language incorporated by default, and still has work to do for efficient interfaces to Spark. But the pace of development is swift, and we believe this is just scratching the surface of what is to come as it matures. I look forward to a subsequent blog posting that detailing these future improvements and our next generation of systems.

To the community

All real projects stem from the combined efforts of many. I would like to thank the incredible community of people who have participated in moving things forward, from the strong leadership of Michael Stack and Andrew Purtell to the fine people at Cloudera and Hortonworks and Databricks. Strong managerial support inside of Bloomberg has impressively cleared the path. Carter Page at Google and Andrew Morse at Apple offered early encouragement. And in particular, Sudarshan Kadambi has been a true partner in crime for shared dreams, vision and execution. Thank you all.

What follows is a deep dive into our original HBase projects and the thinking that lay behind them.

MBH, 8/2015

Search engines and large web vendors serve hundreds of millions of requests for straightforward web lookups. Bloomberg serves hundreds of thousands of sophisticated financial users where each request has tight tolerances and may be computationally expensive. Can the systems originally designed for the former work for the latter?

"Big Data" systems continue to attract substantial funding, attention, and excitement. As with many new technologies, they are neither a panacea, nor even a good fit for many common uses. Yet they also hold great promise, and Bloomberg has been contributing behind the scenes to strengthen their use for what we term medium data – high terabytes on modest clusters - and real world scenarios as we build towards simpler universal systems. The easiest way to understand these changes is to start with a bit of background on the origin of both these technologies and our company.

[This was taken from a talk called "Hadoop at Bloomberg", but it’s important to understand that we see this as part of a broader landscape that encompasses tools such as Spark, Mesos, and Docker which are working to provide frameworks for making many commodity machines work together.]

Bloomberg Products and Challenges

Bloomberg is the world’s premier provider of information and analytics to the financial world. Walk into nearly any trading floor in the world, and you will find "Bloomberg Terminals" in use. Simply put, data is our business, and our 4,000 engineers have built many systems over the last 30 years to address the stringent performance and reliability needs of the financial industry. We are always looking at ways to improve our core infrastructure, including consolidating many purpose built disparate systems to reduce complexity.

Our main product is the Bloomberg Professional service, which many refer to simply as “The Terminal”, shown below.


While the name of the Bloomberg Terminal is carried over from when we built hardware, it is analogous to a browser in the cloud: there is an "address bar" where a function name can be typed in, display areas where the results are shown, and the logic and heavy lifting is done server side in our data centers. Of course we don’t refer to them as browsers and clouds because we’ve been doing it since long before those words existed, but since the browser is ubiquitous, this should give everyone a good idea of the basics. There are also mobile versions available on the iPhone, iPad, and Android. There are perhaps 100,000 functions and tens of thousands of databases. Below is a sample of a few of them.


Chart of a single security
Storm track showing oil wells and shipping and companies likely to be affected
News and sentiment analysis of twitter feed
Portfolio and risk analytics

Bloomberg’s origins were in providing electronic analytics for bonds and other securities. "Security" is just a fancy word for instruments you can trade like stocks or bonds. It transformed a world that was paper driven and information poor into one in which prices were available at the touch of a button. As nearly every school, road, airport, and other piece of modern infrastructure is financed by bond issuance, this was an important and useful innovation that reduced the cost of financing. Prior to that time, the cost of a bond was more or less what your sales guy said it was.

In the early 1980s when the company began, there were few off the shelf computer technologies adequate to the task. Commercial databases were weak, and personal business computers nonexistent. Even the IBM PC itself was only announced two months before Bloomberg LP was founded. The name of our main product, the Bloomberg Terminal, reflects our history of needing to hardware at our outset, along with developing the software and database systems in house. And so out of necessity we did.

As the company expanded, its infrastructure grew around powerful Unix machines and fast reliable services designed to furnish almost any kind of analysis there is about a company or security. The company grew and prospered, becoming one of the largest private companies in the United States, and entering news and other lines of business. Owning our own systems has conferred substantial benefits aside from simply making our existence possible. Independent of the flexibility, not paying vendors such as Oracle billions of dollars for our tens of thousands of databases is good business.

As time has passed, the sophistication of our usage has grown. Interactive functions are frequently geared towards complicated risk and other multi-security analytics, which require far more data and computation at a time per user and request. This puts a strain on systems architected when interactive requests were for charting of the price of a small number of instruments. Many groups within Bloomberg have addressed by copying systems and caching large quantities of data. There are also different data systems for different kinds of usage – one for streaming pricing data, another for historical data, and so on. Complexity kills. The more pieces and moving parts there are, the harder to build and maintain. Can we consolidate around a small number of open platforms? We believe we can.

Big Data Origins

Modern era big data technologies are a solution to an economics problem faced by Google and others a decade ago. Storing, indexing, and responding to searches against all web pages required tremendous amounts of disk space and computer power. Very powerful machines, fast SAN storage, and data center space were prohibitively expensive. The solution was to pack cheap commodity machines as tightly together as possible with local disks.

This addressed the space and hardware cost problem, but introduced a software challenge. Writing distributed code is hard, and with many machines comes many failures, and so a framework was also required to take care of such problems automatically for the system to be viable. Google pioneered one of the first viable commodity machine platforms to do this, which is part of why they went on to dominance rather than bankruptcy. Fortunately for the rest of us, they published how they did it, and while there are more sophisticated means available today, it’s a great place to start an understanding of concepts.

The essential elements of this system are a place to put files, and a way to perform computations. The landmark Google papers introduced GFS for storage and MapReduce for computation. MapReduce had high reliability and throughput, owning records for benchmarks like sorting a petabyte of data at the time. However, its response time is far too long for interactive tasks, and so BigTable, a low latency database, was also introduced. Google published papers detailing these systems, but did not release their software. This in a nutshell explains Hadoop, which began as the open source implementation of these Google systems, supported by Yahoo and other large web vendors facing the same challenges at the time.

Understanding the why and how these systems were created also offers insight into some of their weaknesses. Their goal was to make crunching hundreds of petabytes of data circa 2004 on thousands of machines both cost effective and reliable, especially for batch computations. A system that ropes together thousands of machines for an overnight computation is no good if it gets most of the way there but never finishes.

And therein lies the problem. Most people don’t have petabytes of data to crunch at a time. The number of companies who have faced this has been restricted to a handful of internet giants. Similarly, these solutions are architected around the limitations of commodity machines of the day, which had single core processors, spinning rust disks, and far less memory. A decade later, high core counts, SSDs or better, and large RAM footprints are common. And Hadoop in particular is also written in Java, which creates its own challenges for low latency performance.

Why Hadoop? It’s not just about Hadoop

Choosing the right system is more than a strictly technical one. Is it well supported? Will it be around for the future? Does it address enough problems to be worthwhile? X86 chips didn’t begin as the best server CPU. Linux was a toylike operating system at its outset. What has made them dominant is sustained effort, talent, and money from an array of companies and people. We believe the same will happen with some variant of these tools, immature though they may be. Intel has presumably invested for the same reason – if systems of the future are built around a distributed framework, they want to be in on the game.

Big data systems are widely used and money and talent have been pouring in. Recently, Cloudera raised $950 million dollars, of which $750 million came from Intel. Hortonworks has also had some impressive fundraising. More than a billion dollars into just two companies goes a long way. Skilled talent such as Michal Kornacker, who architected the F1 system at Google, Jeff Hammerbacher, the founder of the data science team at Facebook, and Mike Olson, an old database hand, and members of the Yahoo data sciences team such as Arun Murthy help the ecosystem.

It’s not just about Hadoop for us. We group Spark, Mesos, Docker, Hadoop, and other tools into the broader group of "tools for making many machines work together seamlessly". Hadoop has some useful properties that make it a good starting point for discussion in addition to being useful in its own right.

We’d like to consolidate our systems around an open commodity framework for storage and computation that has broad support. Hadoop and its ilk provide potential but are immature and have weaknesses when it comes to taking advantage of modern hardware, consistently fast performance, high resiliency, and in the number of people who know it well.

So that’s the background. Now on to a specific use case and how we’ve tackled it with HBase.

HBase Overview


HBase is short for the "Hadoop database", and it’s the implementation of Google’s BigTable that was designed for interactive requests that MapReduce wasn’t suited for. Many of us think of databases as things with many tables and indexes that support SQL and relational semantics. HBase has a simpler model designed to spread across many machines.

The easiest analogy is with printed encyclopedias. Each volume of the encyclopedia lists the range of words that can be found inside, such as those beginning with the letters A to C. In HBase, the equivalent of a volume is called a region. A region server is a process that hosts some number of regions, much as a bookcase can hold several books. There is typically one region server process per physical computer. The diagram above shows three machines, each with one region server process, each of which has two regions.

A table in HBase can have many columns and rows but only one primary key. The table is always physically sorted by that key, just as the encyclopedia is, and many of its characteristics follow from this. For example, writes are naturally parallel. A-C words go to one machine, and C-F to another. Elastic scalability follows too: drop another machine in, and you can move some of the regions to it. And if a region gets two big, it splits into two new ones, and so very large datasets can be accommodated. These are all positive properties.

But there are downsides, including the loss of things taken for granted in a relational database. HBase is less efficient per machine than a single server database. Updates of individual rows are atomic, but there aren’t all-or-nothing complex transactions that can be rolled back. There are no secondary indexes, and lookups not based on the primary key are much slower. Joins between multiple tables are cumbersome and inefficient. These systems are simply much less mature then relational databases, and understood by far fewer people. While HBase is growing in sophistication and maturity, if your problem fits in a relational database on a single machine, then HBase is almost certainly a poor choice.

When considering broad architectural decisions, it is important to consider alternate solutions. There are many strong alternatives beyond known relational systems. Products such as Greenplum, Vertica, SAP Hana, Sybase IQ and many others are robust, commercially supported, and considerably more efficient per node, with distributed MPP SQL or pure columnar stores. Cassandra offers a similar model to HBase and is completely open, so why not use it?

Our answer is that we have big picture concerns. We prefer open systems rather than being locked in to a single vendor. Hadoop and other systems are part of an expanding ecosystem of other services, such as distributed machine learning and batch processing, that we also want to benefit from. While the alternatives are excellent data stores, they only offer a smaller piece of the puzzle. And finally, we have actual use cases that happen to fit reasonably well and are able to address many of the downsides.

So on to one of our specific use cases.

Time series data and the Portfolio analytics challenge



Much of financial data is time series data. While the systems to manage this data have often been highly sophisticated and tuned, the data itself is very simple, and the problem has been one of speed and volume. Time series data for a security is generally just a security, a field, a date, and a value. The security could be a stock such as IBM, the field the price, and the date and value self-explanatory.

The reason there is a speed and volume issue is that there are thousands of possible fields – volume traded, for example – and tens of millions of securities. The universe of bonds alone is far larger then stocks, which we call equities. These systems have also historically been separated into intraday and historical data – intraday captures all the ticks during the day, while historical systems have end of day values that go back much farther in time. Unifying them was impractical in the past owing to different performance tradeoffs: intraday must manage a very high volume of incoming writes, while end of day has batch updates on market close but more data and searches in total.

The Bloomberg end of day historical system is called PriceHistory. This is a slight misnomer since price is just one of thousands of fields, but price is obviously an important one. This system was designed for the high volume of requests for single securities and quite impressive for its day. The response time for a year’s data for a field on a security is 5ms, and it receives billions of hits a day, around half a million a second at peak. It is a critical system for Bloomberg; failures there have the potential to disrupt the capital markets.

The challenge comes from applications like Portfolio Analytics that request far more data at a time. A bond portfolio can easily have tens of thousands of bonds in it, and a common calculation like attribution requires 40 fields per day per instrument. Performing attribution for a bond portfolio over a few years of data requires tens of millions of datapoints instead of a handful. When requesting tens of millions of datapoints, even a high cache hit rate of 99.9% will still have thousands of cache misses, and if the underlying system is backed by magnetic media, this means thousands of disk seeks. Multiply across a large user base making many requests and it’s clear to see why this created a problem for the existing price history system.

The solution a few years back was to move to all memory and solid state machines with highly compacted blobs split into separate databases. This approach worked and was 2-3 orders of magnitude faster – a giant leap – but less than ideal. Compacted blobs split across many databases is cumbersome.

Enter HBase.

HBase Suitability and Issues

Time series data is both very simple and lends itself to embarrassing parallelism. When fetching tens of millions of datapoints for a portfolio, the work can be arbitrarily divided among tens of millions of machines if need be. It doesn’t get any better than that in terms of potential parallelism. With a single main table trivially distributed, HBase should be a good fit – in theory.

Of course, in theory, theory and practice agree, but in practice, they don’t always. The data sets may work, but what are the obstacles? Performance, efficiency, maturity, and resiliency. What follows is how we went about tackling them.

Failure and recovery


The first and most critical problem was failure behavior. When an individual machine fails, data is not lost – it is replicated on several machines via HDFS – but there is still a serious drawback. At any given time, all reads and writes to any given region is managed by one and only one region server. If a region server dies, the failure will be detected and a replacement automatically brought back up, but until it does, no reads and writes are possible to the regions it managed.

The HBase community has made great strides in speeding up recovery time after a failure is detected, but this still leaves a crucial hole. The crux of the problem is that failure detection in a distributed system is ultimately accomplished by timeout – some period of time that elapses where a heartbeat or other mechanism fails. If this timeout is made too short, there will be false positives, which are also problematic.

In Bloomberg’s case, our SLAs are on the order of milliseconds. The shortest that timeouts can be made are on the order of many seconds. This is a fundamental mismatch even if recovery after failure detection becomes infinitely fast.

Discussions with Hadoop vendors resulted in several possible solutions, most of which involved running multiple copies of each database. Since Bloomberg’s operational policies already dictate that systems can survive the loss of entire datacenters and then several additional machines, this would have meant many running copies of each database, which we deemed too operationally complex. Ultimately, we were unhappy with the alternatives, and so we worked to change them.

If failover detection and recovery can never be fast enough, then the alternative is there must be standby nodes that can be queried in the event a region server goes down. The insight that made this practical in the short term is that the data already exists in multiple places, albeit with a possible slight delay in propagation. This can be used for standby region servers that at worst lag slightly behind but receive events in the same order, known as timeline consistency. This works for end of day systems like PriceHistory that are updated in batch.

Timeline consistent standby region servers were added to HBase as part of HBASE-10070.

Performance I: Distribution and parallelism


With failures addressed, though, there are still issues of raw performance and consistency. Here we’ll detail three experiments and outcomes on performance. Tests were run on a modest cluster of 11 machines with SSDs, dual Xeon E5’s, and 128 GB of RAM each. In the top right hand corner of the slide, two numbers are shown. The first is the average response time at the outset of the experiment, and the second the new time after improvements were made. The average request is for around 200,000 records that are random key-value lookups.

Part of the premise of HBase is that large Portfolio requests can be divided up and tackled among existing machines in a cluster. This is part of why standby region servers to address failures are mandated – if a request hits every server, and one doesn’t respond for a minute or more, it’s bad news. The failure of any one server would stall every such request.

So the next thing to tackle was parallelism and distribution. The first issue was in dividing work evenly: given a large request, did each machine in the cluster get the same size chunk of work? Ideally, requesting 1000 rows from ten machines would get 100 rows from each. With a naive schema, the answer is no: if the row key is security + something else, then everything from IBM will be in one region, and IBM is hit more often than other securities. This phenomena is known as hotspotting.

The solution is to take advantage of the properties of HBase as a fix for the problem. Data in HBase is always kept physically sorted by the row key. If the row key itself is hashed, and that hash used as a prefix, then the rows will be distributed differently. Imagine the row key is security+year+month+field. Take an MD5 hash of that, and use that as a prefix, and any one record for IBM is equally likely to be on any server. This resulted in almost perfect distribution of work load. We don’t use the MD5 algorithm exactly, because it’s too slow and large, but the concept is the same.

With perfect distribution, we have perfect parallelism: With 11 servers each one was doing the same amount of work not just on average, but for every request. And if parallelism is good, then more parallelism is better, right? Yes - up to a point.

As mentioned earlier a general Hadoop weakness is that it was designed for older and less powerful machines. One of the ways this manifests itself is through low resource consumption on the server. At maximum throughput, none of the hardware was breathing hard – low CPU and disk and network, no matter how many configuration options were changed.

One way to address this is to run more region servers per machine, provided the machine has the capacity to do so. This will increase the number of total region servers and hence the level of fan out for parallelism, and if more parallelism is better, then response times will drop.

The chart from the slide shows the results. With 11 machines and 1 region server each – 11 in total – average response time was 260 ms. Going to 3 per machine, for 33 region servers in total, dropped the average time to 185 ms. 5 region servers per machine further improved to 160 ms, but at 10 per machine times got worse. Why?

The first answer that will probably spring to mind is that the capacity of the machine was exceeded; with 10 region server processes on a box, each with many threads, perhaps there weren’t enough cores. As it turns out, this is not the cause, and the actual answer is more subtle and interesting and resulted in another leap in performance that is applicable to many systems of this type.

But before we get into that, let’s talk about in place computation.

Performance II: In place computation


One of the tenets of big data is that the computation should be moved to the data and not the other way around. The idea here is straightforward. Imagine you need to know how many rows are in a large database table. You could fetch every row to your local client, and then iterate through and count – or you could issue a “select count(*) from ... “ if it’s a relational database. The latter is much more efficient.

It’s also much more easily said than done. Many problems can’t be solved this way at all, and many frameworks fall short even where the problem is known to be amenable. In the 2013 report on massive data analysis the National Research Council proposed seven computational giants for parallelism in an attempt to categorize the capabilities of a distributed system, a relevant and interesting read, and Hadoop can’t handle all of them. Fortunately for us, we have a simple use case that does make sense that let us again cut response times in half.

The situation is that there are multiple "sources" for data. Barclays and Merrill Lynch and many other companies provide data for bond pricing. The same bond for the same dates is often in more than one source, but the values don’t agree. Customers differ on the relative precedence of each, and so with each request include a waterfall which is just an ordering of what sources should be used in what order. Try to get all the data from the first source, and then for anything that’s missing, try the next source , and so on. On average, a request must go through 5 such sources until all the data is found.

In the world of separate databases, different sources are in different physical locations, and so this means trying the first database, pulling all the records back, seeing what’s missing, constructing a new request, and issuing that.

For HBase, we can again take advantage of physical sorting, support for millions of columns, and in-place computation through coprocessors, which are the HBase equivalent of classic stored procedures. By making the source and field columns with the security and date still the row key, they will all be collocated on the same region server. The coprocessor logic then becomes "find the first source for this row that matches for this security on this date, and return that." Instead of five round trips, there will always be one and only one. A small coprocessor of a few lines worked as expected, and average response time dropped to 85ms.

Now, on to the mystery from before.

Performance III: Synchronized disruption


The results from increasing the number of region servers left a mystery: Why did response times improve at first, and then worsen? The answer involves both Java and an important property of any high fan out distributed system.

A request that goes to more than one machine "fans out". The more machines hit, the higher the degree of fan out. In a request that fans out, how long does it take to get a response? The answer is that it takes as long as the slowest responder. With ten region servers per machines and 11 machines, each request fanned out to 110 processes. If 109 respond in a microsecond and 1 in 170 ms, the response time is 170 ms.

The immediate culprit turned out to be Java’s garbage collection, which freezes a machine until it completes. The higher the degree of fan-out, the greater the likelihood that any one region server process was garbage collecting at the time. At 110 regions, the odds were close to 1.

The solution here is devilishly simple. As long as the penalty must be paid if any one Java process garbage collects, why not just make them do it all at the same time? That way, more time will elapse where requests are issued and no garbage collection takes place. We wrote the simplest, dumbest test for this: a coprocessor with one line – System.gc() – and a timer that called it every 30 seconds. This dropped our average response time from 85 ms to 60 ms on the first go.

Many system developers and C programmer eyeballs will collectively roll as this is a Java specific problem. While Java GC is an issue, this has wider applicability. It is a general case of what Google’s Jeff Dean calls Synchronized Disruption. Requests in any parallel system that fan out suffer from the laggard, and so things that slow down individual machines should be done at the same time. For more details, see the excellent writeup "Achieving Rapid Response Times in Large Online Services" covering this and other properties of high fan out systems. That said, Java is the basis now of quite a number of high fan out computing systems, including Hadoop, HBase, Spark, and Solr. Synchronizing garbage collection holds potential for them as well.

Conclusions And Next Steps

As these experiments have shown, performance from HBase can be improved. The changes described here alone cut average runtime response times to a quarter of what they were, with plenty of room left for further improvements. Faster machines are available that would halve response times again. (Incidentally, there is a myth that the speed and performance of an individual machine is irrelevant since it’s better to scale horizontally with more machines. Individual machine speed still matters immensely: the length of a garbage collection cycle, for example, is linearly proportional to the single threaded CPU performance, and even if your system doesn’t use Java, the laggard responder may well be affected by machine speed.) HBase produces far more garbage than it ought; this too can be fixed. And reissuing requests to an alternate server after a delay – a technique also mentioned in Dean’s paper – can also lower not just average response times, but reduce or eliminate outliers too, which is equally important.

We’ll do some of these, but we are nearing the point of diminishing returns. Below a threshold of some tens of milliseconds, the difference is imperceptible for a person.

On the flip side, there are applications for which this would never make sense. High frequency trading, where the goal is to shave microseconds away, is a poor fit.

More importantly, our goal is more than performance alone. While this setup with HBase provides a thousandfold improvement on write performance and 3x faster reads, it is a connector to a broader distributed platform of tools and ideas.

Wednesday June 10, 2015

Scalable Distributed Transactional Queues on Apache HBase

This is the first in a series of posts on "Why We Use Apache HBase", in which we let HBase users and developers borrow our blog so they can showcase their successful HBase use cases, talk about why they use HBase, and discuss what worked and what didn't.

Today's entry is a guest post by Terence Yim, a Software Engineer at Cask, responsible for designing and building realtime processing systems on Hadoop/HBase, originally published here on the Cask engineering blog.

- Andrew Purtell

A real time stream processing framework usually involves two fundamental constructs: processors and queues. A processor reads events from a queue, executes user code to process them, and optionally writing events to another queue for additional downstream processors to consume. Queues are provided and managed by the framework. Queues transfer data and act as a buffer between processors, so that the processors can operate and scale independently. For example, a web server access log analytics application can look like this:


One key differentiator among various frameworks is the queue semantics, which commonly varies along these lines:

  • Delivery Guarantee: At least once, at most once, exactly-once.
  • Fault Tolerance: Failures are transparent to user and automatic recovery.
  • Durability: Data can survive failures and restarts.
  • Scalability: Characteristics and limitations when adding more producers/consumers.
  • Performance: Throughput and latency for queue operations.

In the open-source Cask Data Application Platform (CDAP), we wanted to provide a real-time stream processing framework that is dynamically scalable, strongly consistent and with an exactly-once delivery guarantee. With such strong guarantees, developers are free to perform any form of data manipulation without worrying about inconsistency, potential reprocessing or failure. It helps developers build their big data application even if they do not have strong distributed systems background. Moreover, it is possible to relax these strong guarantees to trade-off for higher performance if needed; it is always easier than doing it the other way around.

Queue Scalability

The basic operations that can be performed on a queue are enqueue and dequeue. Producers write data to the head of the queue (enqueue) and consumers read data from the tail of the queue (dequeue). We say a queue is scalable when you enqueue faster as a whole by adding more producers and dequeue faster as a whole by adding more consumers. Ideally, the scaling is linear, meaning doubling the amount of producers/consumers will double the rate of enqueue/dequeue and is only bounded by the size of the cluster. In order to support linear scalability for producers, the queue needs to be backed by a storage system that scales linearly with the number of concurrent writers. For consumers to be linearly scalable, the queue can be partitioned such that each consumer only processes a subset of queue data.

Another aspect of queue scalability is that it should scale horizontally. This means the upper bound of the queue performance can be increased by adding more nodes to the cluster. It is important because it makes sure that the queue can keep working regardless of cluster size and can keep up with the growth in data volume.

Partitioned HBase Queue

We chose Apache HBase as the storage layer for the queue. It is designed and optimized for strong row-level consistency and horizontal scalability. It provides very good concurrent write performance and its support of ordered scans fits well for a partitioned consumer. We use the HBase Coprocessors for efficient scan filtering and queue cleanup. In order to have the exactly-once semantics on the queue, we use Tephra’s transaction support for HBase.

Producers and consumers operate independently. Each producer enqueues by performing batch HBase Puts and each consumer dequeues by performing HBase Scans. There is no link between the number of producers and consumers and they can scale separately.

The queue has a notion of consumer groups. A consumer group is a collection of consumers partitioned by the same key such that each event published to the queue is consumed by exactly one consumer within the group. The use of consumer groups allows you to partition the same queue with different keys and to scale independently based on the operational characteristics of the data. Using the access log analytics example above, the producer and consumer groups might look like this:


There are two producers running for the Log Parser and they are writing to the queue concurrently. On the consuming side, there are two consumer groups. The Unique User Counter group has two consumers, using UserID as the partitioning key and the Page View Counter group contains three consumers, using PageID as the partitioning key.

Queue rowkey format

Since an event emitted by a producer can be consumed by one or more consumer groups, we write the event to one or more rows in an HBase table, with one row designated for each consumer group. The event payload and metadata are stored in separate columns, while the row key follows this format:


The two interesting parts of the row key are the Partition ID and the Entry ID. The Partition ID determines the row key prefix for a given consumer. This allows consumer to read only the data it needs to process using a prefix scan on the table during dequeue. The Partition ID consists of two parts: a Consumer Group ID and an Consumer ID. The producer computes one partition ID per consumer group and writes to those rows on enqueue.

The Entry ID in the row key contains the transaction information. It consists of the producer transaction write pointer issued by Tephra and a monotonic increasing counter. The counter is generated locally by the producer and is needed to make row key unique for the event since a producer can enqueue more than one event within the same transaction.

On dequeue, the consumer will use the transaction writer pointer to determine if that queue entry has been committed and hence can be consumed. The row key is always unique because of the inclusion of a transaction write pointer and counter. This makes producers operate independently and never have write conflicts.

In order to generate the Partition ID, a producer needs to know the size and the partitioning key of each consumer group. The consumer groups information is recorded transactionally when the application starts as well as when there are any changes in group size.

Changing producers and consumers

It is straightforward to increase or decrease producers since each producer operates independently. Adding or removing producer processes will do the job. However, when the size of consumer group needs to change, coordination is needed to update the consumer group information correctly. The steps can be summarized by this diagram:


Pausing and resuming are fast operations as they are coordinated using Apache ZooKeeper and executed in parallel. For example, with the web access log analytics application we mentioned above, changing the consumer groups information may look something like this:


With this queue design, the enqueue and dequeue performance is on par with batch HBase Puts and HBase Scans respectively, with some overhead for talking to the Tephra server. That overhead can be greatly reduced by batching multiple events in the same transaction.

Finally, to prevent “hotspotting“, we pre-split the HBase table based on the cluster size and apply salting on the row key to better distribute writes which otherwise would have been sequential due to monotonically increasing transaction writepointer.

Performance Numbers

We’ve tested the performance on a small ten-node HBase cluster and the result is impressive. Using a 1K bytes payload with batch size of 500 events, we achieved a throughput of 100K events per second produced and consumed, running with three producers and ten consumers. We also observed the throughput increases linearly when we add more producers and consumers: for example, it increased to 200K events per second when we doubled the number of producers and consumers.

With the help of HBase and a combination of best practices, we successfully built a linearly scalable, distributed transactional queue system and used it to provide a real time stream processing framework in CDAP: dynamically scalable, strongly consistent, and with an exactly-once delivery guarantee.

Saturday June 06, 2015

Saving CPU! Using Native Hadoop Libraries for CRC computation in HBase

by Apekshit Sharma, HBase contributor and Cloudera Engineer

TL;DR Use Hadoop Native Library calculating CRC and save CPU!

Checksums in HBase

Checksum are used to check data integrity. HDFS computes and stores checksums for all files on write. One checksum is written per chunk of data (size can be configured using bytes.per.checksum) in a separate, companion checksum file. When data is read back, the file with corresponding checksums is read back as well and is used to ensure data integrity. However, having two files results in two disk seeks reading any chunk of data. For HBase, the extra seek while reading HFileBlock results in extra latency. To work around the extra seek, HBase inlines checksums. HBase calculates checksums for the data in a HFileBlock and appends them to the end of the block itself on write to HDFS (HDFS then checksums the HBase data+inline checksums). On read, by default HDFS checksum verification is turned off, and HBase itself verifies data integrity.

Can we then get rid of HDFS checksum altogether? Unfortunately no. While HBase can detect corruptions, it can’t fix them, whereas HDFS uses replication and a background process to detect and *fix* data corruptions if and when they happen. Since HDFS checksums generated at write-time are also available, we fall back to them when HBase verification fails for any reason. If the HDFS check fails too, the data is reported as corrupt.

The related hbase configurations are hbase.hstore.checksum.algorithm, hbase.hstore.bytes.per.checksum and hbase.regionserver.checksum.verify. HBase inline checksums are enabled by default.

Calculating checksums is computationally expensive and requires lots of CPU. When HDFS switched over to JNI + C for computing checksums, they witnessed big gains in CPU usage.

This post is about replicating those gains in HBase by using Native Hadoop Libraries (NHL). See HBASE-11927


We switched to use the Hadoop DataChecksum library which under-the-hood uses NHL if available, else we fall back to use the Java CRC implementation. Another alternative considered was the ‘Circe’ library. The following table highlights the differences with NHL and makes the reasoning for our choice clear.

Hadoop Native Library


Native code supports both crc32 and crc32c

Native code supports only crc32c

Adds dependency on hadoop-common which is reliable and actively developed

Adds dependency on external project

Interface supports taking in stream of data, stream of checksums, chunk size as parameters and compute/verify checksums  considering data in chunks.

Only supports calculation of single checksum for all input data.

Both libraries supported use of the special x86 instruction for hardware calculation of CRC32C if available (defined in SSE4.2 instruction set). In the case of NHL, hadoop-2.6.0 or newer version is required for HBase to get the native checksum benefit.

However, based on the data layout of HFileBlock, which has ‘real data’ followed by checksums on the end, only NHL supported the interface we wanted. Implementing the same in Circe would have been significant effort. So we chose to go with NHL.


Since the metric to be evaluated was CPU usage, a simple configuration of two nodes was used. Node1 was configured to be the NameNode, Zookeeper and HBase master. Node2 was configured to be DataNode and RegionServer. All real computational work was done on Node2 while Node1 remained idle most of the time. This isolation of work on a single node made it easier to measure impact on CPU usage.


Ubuntu 14.04.2 LTS (GNU/Linux 3.13.0-24-generic x86_64)

CPU: Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz                                          

Socket(s) : 1

Core(s) per socket : 1

Thread(s) per core : 4

Logical CPU(s) : 4

Number of disks : 1

Memory : 8 GB

HBase Version/Distro *: 1.0.0 / CDH 5.4.0

*Since trunk resolves to hadoop-2.5.1 which does not have HDFS-6865, it was easier to use a CDH distro which already has HDFS-6865 backported.


We chose to study the impact on major compactions, mainly because of the presence of CompactionTool which a) can be used offline, b) allowed us to profile only the relevant workload. PerformanceEvaluation (bin/hbase pe) was used to build a test table which was then copied to local disk for reuse.

% ./bin/hbase pe --nomapred --rows=150000 --table="t1" --valueSize=10240 --presplit=10 sequentialWrite 10

Table size: 14.4G

Number of rows: 1.5M

Number of regions: 10

Row size: 10K

Total store files across regions: 67

For profiling, Lightweight-java-profiler was used and FlameGraph was used to generate graphs.

For benchmarking, the linux ‘time’ command was used. Profiling was disabled during these runs. A script repeatedly executed following in order:

  1. delete hdfs:///hbase

  2. copy t1 from local disk hdfs:///hbase/data/default

  3. run compaction tool on t1 and time it



CPU profiling of HBase not using NHL (figure 1) shows that about 22% cpu is used for generating and validating checksums, whereas, while using NHL (figure 2) it takes only about 3%.

Screen Shot 2015-06-02 at 8.14.07 PM.png

Figure 1: CPU profile - HBase not using NHL (svg)

Screen Shot 2015-06-02 at 8.03.39 PM.png

Figure 2: CPU profile - HBase using NHL (svg)


Benchmarking was done for three different cases: (a) neither HBase nor HDFS use NHL, (b) HDFS uses NHL but not HBase, and (c) both HDFS and HBase use NHL. For each case, we did 5 runs. Observations from table 1:

  1. Within a case, while real time fluctuates across runs, user and sys times remain same. This is expected as compactions are IO bound.

  2. Using NHL only for HDFS reduces CPU usage by about 10% (A vs B)

  3. Further, using NHL for HBase checksums reduces CPU usage by about 23% (B vs C).

All times are in seconds. This stackoverflow answer provides a good explaination of real, user and sys times.

run #

no native for HDFS and HBase (A)

no native for HBase (B)

native (C)


real      469.4

user 110.8

sys 30.5

real    422.9

user    95.4

sys     30.5

real 414.6

user 67.5

sys 30.6


real 384.3

user 111.4

sys 30.4

real 400.5

user 96.7

sys 30.5

real 393.8

user     67.6

sys 30.6


real 400.7

user 111.5

sys 30.6

real 398.6

user 95.8

sys 30.6

real    392.0

user    66.9

sys     30.5


real 396.8

user 111.1

sys 30.3

real 379.5

user 96.0

sys 30.4

real    390.8

user    67.2

sys     30.5


real 389.1

user 111.6

sys 30.3

real 377.4

user 96.5

sys 30.4

real    381.3

user    67.6

sys     30.5

Table 1



Native Hadoop Library leverages the special processor instruction (if available) that does pipelining and other low level optimizations when performing CRC calculations. Using NHL in HBase for heavy checksum computation, allows HBase make use of this facility, saving significant amounts of CPU time checksumming.

Tuesday May 12, 2015

The HBase Request Throttling Feature

Govind Kamat, HBase contributor and Cloudera Performance Engineer

(Edited on 5/14/2015 -- changed ops/sec/RS to ops/sec.)

Running multiple workloads on HBase has always been challenging, especially  when trying to execute real-time workloads while concurrently running analytical jobs. One possible way to address this issue is to throttle analytical MR jobs so that real-time workloads are less affected

A new QoS (quality of service) feature that Apache HBase 1.1 introduces is request-throttling, which controls the rate at which requests get handled by a HBase cluster.   HBase typically treats all requests identically; however, the new throttling feature can be used to specify a maximum rate or bandwidth to override this behavior.  The limit may be applied to a requests originating from a particular user, or alternatively, to requests directed to a given table or a specified namespace.

The objective of this post is to evaluate the effectiveness of this feature and the overhead it might impose on a running HBase workload.  The performance runs carried out showed that throttling works very well, by redirecting resources from a user whose workload is throttled to the workloads of other users, without incurring a significant overhead in the process.

Enabling Request Throttling

It is straightforward to enable the request-throttling feature -- all that is necessary is to set the HBase configuration parameter hbase.quota.enabled to true.  The related parameter hbase.quota.refresh.period  specifies the time interval in milliseconds that that regionserver should re-check for any new restrictions that have been added.

The throttle can then be set from the HBase shell, like so:

hbase> set_quota TYPE => THROTTLE, USER => 'uname', LIMIT => '100req/sec'

hbase> set_quota TYPE => THROTTLE, TABLE => 'tbl', LIMIT => '10M/sec'

hbase> set_quota TYPE => THROTTLE, NAMESPACE => 'ns', LIMIT => 'NONE'

Test Setup

To evaluate how effectively HBase throttling worked, a YCSB workload was imposed on a 10 node cluster.  There were 6 regionservers and 2 master nodes.  YCSB clients were run on the 4 nodes that were not running regionserver processes.  The client processes were initiated by two separate users and the workload issued by one of them was throttled.

More details on the test setup follow.

HBase version: HBase 0.98.6-cdh5.3.0-SNAPSHOT (HBASE-11598 was backported to this version)


CentOS release 6.4 (Final)                                                

CPU sockets: 2                                                            

Physical cores per socket: 6

Total number of logical cores: 24

Number of disks: 12

Memory: 64 GB

Number of RS: 6

Master nodes: 2  (for the Namenode, Zookeeper and HBase master)

Number of client nodes: 4

Number of rows: 1080M

Number of regions: 180

Row size: 1K

Threads per client: 40

Workload: read-only and scan

Key distribution: Zipfian

Run duration: 1 hour


An initial data set was first generated by running YCSB in its data generation mode.  A HBase table was created with the table specifications above and pre-split.  After all the data was inserted, the table was flushed, compacted and saved as a snapshot.  This data set was used to prime the table for each run.  Read-only and scan workloads were used to evaluate performance; this eliminates effects such as memstore flushes and compactions.  One run with a long duration was carried out first to ensure the caches were warmed and that the runs yielded repeatable results.

For the purpose of these tests, the throttle was applied to the workload emanating from one user in a two-user scenario. There were four client machines used to impose identical read-only workloads.  The client processes on two machines were run by the user “jenkins”, while those on the other two were run as a different user.   The throttle was applied to the workload issued by this second user.  There were two sets of runs, one with both users running read workloads and the second where the throttled user ran a scan workload.  Typically, scans are long running and it can be desirable on occasion to de-prioritize them in favor of more real-time read or update workloads.  In this case, the scan was for sets of 100 rows per YCSB operation.

For each run, the following steps were carried out:

  • Any existing YCSB-related table was dropped.

  • The initial data set was cloned from the snapshot.

  • The desired throttle setting was applied.

  • The desired workloads were imposed from the client machines.

  • Throughput and latency data was collected and is presented in the table below.

The throttle was applied at the start of the job (the command used was the first in the list shown in the “Enabling Request Throttling” section above).  The hbase.quota.refresh.period property was set to under a minute so that the throttle took effect by the time test setup was finished.

The throttle option specifically tested here was the one to limit the number of requests (rather than the one to limit bandwidth).

Observations and Results

The throttling feature appears to work quite well.  When applied interactively in the middle of a running workload, it goes into effect immediately after the the quota refresh period and can be observed clearly in the throughput numbers put out by YCSB while the test is progressing.  The table below has performance data from test runs indicating the impact of the throttle.  For each row, the throughput and latency numbers are also shown in separate columns, one set for the “throttled” user (indicated by “T” for throttled) and the other for the “non-throttled” user (represented by “U” for un-throttled).

Read + Read Workload

Throttle (req/sec)

Avg Total Thruput (ops/sec)

Thruput_U (ops/sec)

Thruput_T (ops/sec)

Latency_U (ms)

Latency_T (ms)







2500 rps






2000 rps






1500 rps






1000 rps






500 rps







As can be seen, when the throttle pressure is increased (by reducing the permitted throughput for user “T” from 2500 req/sec to 500 req/sec, as shown in column 1), the total throughput (column 2) stays around the same.  In other words, the cluster resources get redirected to benefit the non-throttled user, with the feature consuming no significant overhead.  One possible outlier is the case where the throttle parameter is at its most restrictive (500 req/sec), where the total throughput is about 10% less than the maximum cluster throughput.

Correspondingly, the latency for the non-throttled user improves while that for the throttled user degrades.  This is shown in the last two columns in the table.

The charts above show that the change in throughput is linear with the amount of throttling, for both the throttled and non-throttled user.  With regard to latency, the change is generally linear, until the throttle becomes very restrictive; in this case, latency for the throttled user degrades substantially.

One point that should be noted is that, while the throttle parameter in req/sec is indeed correlated to the actual restriction in throughput as reported by YCSB (ops/sec) as seen by the trend in column 4, the actual figures differ.  As user “T”’s throughput is restricted down from 2500 to 500 req/sec, the observed throughput goes down from 2500 ops/sec to 1136 ops/sec.  Therefore, users should calibrate the throttle to their workload to determine the appropriate figure to use (either req/sec or MB/sec) in their case.

Read + Scan Workload

Throttle (req/sec)

Thruput_U (ops/sec)

Thruput_T (ops/sec)

Latency_U (ms)

Latency_T (ms)

3000 Krps





1000 Krps





500 Krps





250 Krps





50 Krps






image(2).pngWith the read/scan workload, similar results are observed as in the read/read workload.  As the extent of throttling is increased for the long-running scan workload, the observed throughput decreases and latency increases.  Conversely, the read workload benefits. displaying better throughput and improved latency.  Again, the specific numeric value used to specify the throttle needs to be calibrated to the workload at hand.  Since scans break down into a large number of read requests, the throttle parameter needs to be much higher than in the case with the read workload.  Shown above is a log-linear chart of the impact on throughput of the two workloads when the extent of throttling is adjusted.


HBase request throttling is an effective and useful technique to handle multiple workloads, or even multi-tenant workloads on an HBase cluster.  A cluster administrator can choose to throttle long-running or lower-priority workloads, knowing that regionserver resources will get re-directed to the other workloads, without this feature imposing a significant overhead.  By calibrating the throttle to the cluster and the workload, the desired performance can be achieved on clusters running multiple concurrent workloads.

Friday May 01, 2015

Scan Improvements in HBase 1.1.0

Jonathan Lawlor, Apache HBase Contributor

Over the past few months there have a been a variety of nice changes made to scanners in HBase. This post focuses on two such changes, namely RPC chunking (HBASE-11544) and scanner heartbeat messages (HBASE-13090). Both of these changes address long standing issues in the client-server scan protocol. Specifically, RPC chunking deals with how a server handles the scanning of very large rows and scanner heartbeat messages allow scan operations to progress even when aggressive server-side filtering makes infrequent result returns.


In order to discuss these issues, lets first gain a general understanding of how scans currently work in HBase.

From an application's point of view, a ResultScanner is the client side source of all of the Results that an application asked for. When a client opens a scan, it’s a ResultScanner that is returned and it is against this object that the client invokes next to fetch more data. ResultScanners handle all communication with the RegionServers involved in a scan and the ResultScanner decides which Results to make visible to the application layer. While there are various implementations of the ResultScanner interface, all implementations use basically the same protocol to communicate with the server.

In order to retrieve Results from the server, the ResultScanner will issue ScanRequests via RPC's to the different RegionServers involved in a scan. A client configures a ScanRequest by passing an appropriately set Scan instance when opening the scan setting start/stop rows, caching limits, the maximum result size limit, and the filters to apply.

On the server side, there are three main components involved in constructing the ScanResponse that will be sent back to the client in answer to a ScanRequest:


The RSRpcService is a service that lives on RegionServers that can respond to incoming RPC requests, such as ScanRequests. During a scan, the RSRpcServices is the server side component that is responsible for constructing the ScanResponse that will be sent back to the client. The RSRpcServices continues to scan in order to accumulate Results until the region is exhausted, the table is exhausted, or a scan limit is reached (such as the caching limit or max result size limit). In order to retrieve these Results, the RSRpcServices must talk to a RegionScanner


The RegionScanner is the server side component responsible for scanning the different rows in the region. In order to scan through the rows in the region, the RegionScanner will talk to a one or more different instances of StoreScanners (one per column family in the row). If the row passes all filtering criteria, the RegionScanner will return the Cells for that row to the RSRpcServices so that they can be used to form a Result to include in the ScanResponse.


The StoreScanner is the server side component responsible for scanning through the Cells in each column family.

When the client (i.e. ResultScanner) receives the ScanResponse back from the server, it can then decide whether or not scanning should continue. Since the client and the server communicate back and forth in chunks of Results, the client-side ResultScanner will cache all the Results it receives from the server. This allows the application to perform scans faster than the case where an RPC is required for each Result that the application sees.

RPC Chunking (HBASE-11544)

Why is it necessary?

Currently, the server sends back whole rows to the client (each Result contains all of the cells for that row). The max result size limit is only applied at row boundaries. After each full row is scanned, the size limit will be checked.

The problem with this approach is that it does not provide a granular enough restriction. Consider, for example, the scenario where each row being scanned is 100 MB. This means that 100 MB worth of data will be read between checks for the size limit. So, even in the case that the client has specified that the size limit is 1 MB, 100 MB worth of data will be read and then returned to the client.

This approach for respecting the size limit is problematic. First of all, it means that it is possible for the server to run out of memory and crash in the case that it must scan a large row. At the application level, you simply want to perform a scan without having to worry about crashing the region server.

This scenario is also problematic because it means that we do not have fine grained control over the network resources. The most efficient use of the network is to have the client and server talk back and forth in fixed sized chunks.

Finally, this approach for respecting the size limit is a problem because it can lead to large, erratic allocations server side playing havoc with GC.

Goal of the RPC Chunking solution

The goal of the RPC Chunking solution was to:

- Create a workflow that is more ‘regular’, less likely to cause Region Server distress

- Use the network more efficiently

- Avoid large garbage collections caused by allocating large blocks

Furthermore, we wanted the RPC chunking mechanism to be invisible to the application. The communication between the client and the server should not be a concern of the application. All the application wants is a Result for their scan. How that Result is retrieved is completely up to the protocol used between the client and the server.

Implementation Details

The first step in implementing this proposed RPC chunking method was to move the max result size limit to a place where it could be respected at a more granular level than on row boundaries. Thus, the max result size limit was moved down to the cell level within StoreScanner. This meant that now the max result size limit could be checked at the cell boundaries, and if in excess, the scan can return early.

The next step was to consider what would occur if the size limit was reached in the middle of a row. In this scenario, the Result sent back to the client will be marked as a "partial" Result. By marking the Result with this flag, the server communicates to the client that the Result is not a complete view of the row. Further Scan RPC's must be made in order to retrieve the outstanding parts of the row before it can be presented to the application. This allows the server to send back partial Results to the client and then the client can handle combining these partials into "whole" row Results. This relieves the server of the burden of having to read the entire row at a time.

Finally, these changes were performed in a backwards compatible manner. The client must indicate in its ScanRequest that partial Results are supported. If that flag is not seen server side, the server will not send back partial results. Note that the configuration hbase.client.scanner.max.result.size can be used to change the default chunk size. By default this value is 2 MB in HBase 1.1+.

An option (Scan#setAllowPartialResults) was also added so that an application can ask to see partial results as they are returned rather than wait on the aggregation of complete rows.

A consistent view on the row is maintained even though a row is the result of multiple RPC partials because the running context server-side keeps account of the outstanding mvcc read point and will not include in results Cells written later.

Note that this feature will be available starting in HBase 1.1. All 1.1+ clients will chunk their responses.

Heartbeat messages for scans (HBASE-13090)

What is a heartbeat message?

A heartbeat message is a message used to keep the client-server connection alive. In the context of scans, a heartbeat message allows the server to communicate back to the client that scan progress has been made. On receipt, the client resets the connection timeout. Since the client and the server communicate back and forth in chunks of Results, it is possible that a single progress update will contain ‘enough’ Results to satisfy the requesting application’s purposes. So, beyond simply keeping the client-server connection alive and preventing scan timeouts, heartbeat messages also give the calling application an opportunity to decide whether or not more RPC's are even needed.

Why are heartbeat messages necessary in Scans?

Currently, scans execute server side until the table is exhausted, the region is exhausted, or a limit is reached. As such there is no way of influencing the actual execution time of a Scan RPC. The server continues to fetch Results with no regard for how long that process is taking. This is particularly problematic since each Scan RPC has a defined timeout. Thus, in the case that a particular scan is causing timeouts, the only solution is to increase the timeout so it spans the long-running requests (hbase.client.scanner.timeout.period). You may have encountered such problems if you have seen exceptions such as OutOfOrderScannerNextException.

Goal of heartbeat messages

The goal of the heartbeating message solution was to:

- Incorporate a time limit concept into the client-server Scan protocol

- The time limit must be enforceable at a granular level (between cells)

- Time limit enforcement should be tunable so that checks do not occur too often

Beyond simply meeting these goals, it is also important that, similar to RPC chunking, this mechanism is invisible to the application layer. The application simply wants to perform its scan and get a Result back on a call to ResultScanner.next(). It does not want to worry about whether or not the scan is going to take too long and if it needs to adjust timeouts or scan sizings.

Implementation details

The first step in implementing heartbeat messages was to incorporate a time limit concept into the Scan RPC workflow. This time limit was based on the configured value of the client scanner timeout.

Once a time limit was defined, we had to decide where this time limit should be respected. It was decided that this time limit should be enforced within the StoreScanner. The reason for enforcing this limit inside store scanner was that it allowed the time limit to be enforced at the cell boundaries. It is important that the time limit be checked at a fine grained location because, in the case of restrictive filtering or time ranges, it is possible that large portions of time will be spent filtering out and skipping cells. If we wait to check the time limit at the row boundaries, it is possible when the row is wide that we may timeout before a single check occurs.

A new configuration was introduced (hbase.cells.scanned.per.heartbeat.check) to control how often these time limit checks occur. The default value of this configuration is 10,000 meaning that we check the time limit every 10,000 cells.

Finally, if this time limit is reached, the ScanResponse sent back to the client is marked as a heartbeat message. It is important that the ScanResponse be marked in this way because it communicates to the client the reason the message was sent back from the server. Since a time limit may be reached before any Results are retrieved, it is possible that the heartbeat message's list of Results will be empty. We do not want the client to interpret this as meaning that the region was exhausted.

Note that similar to RPC chunking, this feature was implemented in a fully backwards compatible manner. In other words, heartbeat messages will only be sent back to the client in the case that the client has specified that it can recognize when a ScanResponse is a heartbeat message. Heartbeat messages will be available starting in HBase 1.1. All HBase 1.1+ clients will heartbeat.

Cleaning up the Scan API (HBASE-13441)

Following these recent improvements to the client-server Scan protocol, there is now an effort to try and cleanup the Scan API. There are some API’s that are getting stale, and don’t make much sense anymore especially in light of the above changes. There are also some API’s that could use some love with regards to documentation.

For example the caching and max result size API’s deal with how data is transferred between the client and the server. Both of these API’s now seem misplaced in the Scan API. These are details that should likely be controlled by the RegionServer rather than the application. It seems much more appropriate to give the RegionServer control over these parameters so that it can tune them based on the current state of the RPC pipeline and server loadings.

HBASE-13441 Scan API Improvements is the open umbrella issue covering ideas for Scan API improvements. If you have some time, check it out. Let us know if you have some ideas for improvements that you would like to see.

Other recent Scanner improvements

It’s important to note that these are not the only improvements that have been made to Scanners recently. Many other impressive improvements have come through deserving of their own blog post. For those that are interested, I have listed two:

  • HBASE-13109: Make better SEEK vs SKIP decisions during scanning

  • HBASE-13262: Fixed a data loss issue in Scans


A spate of Scan improvements should make for a smoother Scan experience in HBase 1.1.

Thursday April 16, 2015

Come to HBaseCon2015!

Come learn about:hbasecon.com

  • A highly-trafficked HBase cluster with an uptime of sixteen months
  • An HBase deploy that spans three datacenters doing master-master replication between thousands of HBase nodes in each
  • Some nuggets on how Bigtable does it (and HBase could too)
  • How HBase is being used to record patient telemetry 24/7 on behalf of the Michael J. Fox Foundation to enable breakthroughs in Parkinson Disease research
  • How Pinterest and Microsoft are doing it in the cloud, how FINRA and Bloomberg are doing it out east, and how Rocketfuel, Yahoo! and Flipboard, etc., are representing the west

HBaseCon 2015 is the fourth annual edition of the community event for Apache HBase contributors, developers, admins, and users of all skill levels. The event is hosted and organized by Cloudera, with the Program Committee including leaders from across the HBase community. The conference will be one full day comprising general sessions in the morning, breakout sessions in the morning and afternoon, and networking opportunities throughout the day.

The agenda hase been posted here, hbasecon2015 agenda. You can register here.


The HBaseCon Program Committee 



Hot Blogs (today's hits)

Tag Cloud