Entries tagged [hbase]

Tuesday October 05, 2021

An HBase & HDFS Short-Circuit Read Odyssey

An HBase & HDFS Short-Circuit Read Odyssey

Huaxiang Sun and Michael Stack

We were asked to consult on an Apache HBase cluster. This deploy had RegionServers that were ABORTing under high-load. On occasion, a RegionServer would hang without crashing holding its allotment of data temporarily offline. HDFS’s Short-Circuit Read (SCR) configuration turned out to be the ultimate culprit but it took us a while to get there. Here is our odyssey in the hope it helps others who find themselves in a similar bind.

The workloads on this multi-tenant cluster included big Apache Spark full table scan batch jobs running in the foreground, making transforms, and writing the results back to the source table as well as multiple Apache Kafka streaming applications reading, inserting and updating. A multi-step, latency-sensitive Spark Streaming application in particular was backing up and we were asked to help identify the root cause.

We assume the reader is familiar with HDFS’s Short-Circuit Read feature and its setup in HBase. If you are looking for a refresher, we suggest you read Sun Lisheng’s excellent writeup on how SCR works, HDFS短路读详解_Hadoop技术博文 (In case you do not read Chinese, machine translation tools do an adequate job for conveying the technical detail into English.).

ABORT & Hang

We started with an investigation into the ABORTing RegionServers. Crash recovery moves Regions and this movement causes any Spark and MapReduce tasks that have already been scheduled to read remotely instead of locally. On this cluster, remote tasks take much longer to complete, which drastically slows data processing.

The ABORTs looked like this:

2021-07-24 02:14:21,613 ERROR [regionserver/host.example.com:16020.logRoller] regionserver.HRegionServer: ***** ABORTING region server host.example.com,16020,1626231398827: IOE in log roller *****

java.io.IOException: Connection to closed

at org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput$AckHandler.lambda$channelInactive$2(FanOutOneBlockAsyncDFSOutput.java:286)…

This above stack trace shows an IOException was thrown from the write-ahead log (WAL) subsystem. At the time, the WAL system was “rolling” the log file: flushing and closing the existing file in order to start a new one. This is a critical process for ensuring data durability, and the failure here brought on the ABORT. This is actually intentional behavior -- when a failed write happens at this point, ABORTing the process is the data-safest thing to do. Our observation was that these ABORT occurrences happened way too frequently, at a rate of a few servers each day. Occasionally, a variant on the above had the RegionServer hang here in WAL roll (See HBASE-26042 WAL lockup on 'sync failed' org.apache.hbase.thirdparty.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer).

The WAL roll was provoked by incidences of an IOException reported earlier in the RegionServer log:

2021-07-16 14:08:29,210 WARN [AsyncFSWAL-0-hdfs://mycluster:8020/hbase] wal.AsyncFSWAL: sync failed

This message means that the process failed to sync an edit to the HDFS-hosted WAL. After reporting the failure back to the client, HBase tries to roll the WAL that the sync failed against. This action resets the WAL subsystem by replacing the instance and associated HDFS client state. Most of the time, the reset ran smoothly. Occasionally, during the WAL roll, the RegionServer reported the following IOException, which caused the RegionServer to ABORT:

2021-07-16 14:08:29,210 WARN [AsyncFSWAL-0-hdfs://mycluster:8020/hbase] wal.AsyncFSWAL: sync failed

org.apache.hbase.thirdparty.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer

Our ‘peer’ in this case is the DataNode. It reset the connection. Why?

Looking in the local DataNode log turned up the cause of the occasional sync fail and of the connection reset. The DataNode logs reported lots of:

BP-1200783253-]] datanode.DataNode: host.example.com:9866:DataXceiver error processing WRITE_BLOCK operation src: / dst: /


at sun.nio.ch.EPollArrayWrapper.isEventsHighKilled(EPollArrayWrapper.java:174)

Where these exceptions clumped together, the DataNode, blaming the NullPointerExceptions, would exit:

2021-07-24 02:13:53,942 INFO [Socket Reader #1 for port 9867] util.ExitUtil: Exiting with status 1: Bug in read selector!

This DataNode process crash caused the RegionServer process to ABORT. Note that sometimes it was not the local DataNode peer but one of the remote nodes participating in the WAL “fanout” that had reset. For details on our WAL fanout implementation, the release note on HBASE-14790 Implement a new DFSOutputStream for logging WAL only. DataNodes were crashing at a rate that was far in excess of the RegionServer crash rate but usually the RegionServer would recover and continue on its merry way. It was when the crash broke the established connection (‘Connection reset by peer’) during a WAL roll that we’d see the RegionServer ABORT, or worse, on occasion, the process would hang.

The above NullPointerException in #isEventsHighKilled is an old JVM bug seen by other folks who rely on the Netty project for their networking. Looking for mitigations for that bug led us to discover that the DataNodes were running with a process file descriptor ulimit of 16k. This was not intentional; it was a misconfiguration. Usually DataNodes in this cluster run with 2-4k active file descriptors but something was causing the file descriptor count to spike in excess of the 16k ulimit. We upgraded the JDK from JDK8 to JDK11 and increased the file descriptor ulimit. These changes cured the chronic restarting of DataNodes processes, which in turn cured the RegionServers of their crashing.

However we hadn’t yet cured the cause of the file descriptor bloat.

HDFS SCR tuning

Curing the crashing servers improved the cluster’s overall health, but our multi-step Spark Streaming application was still backing up.

Thankfully, one of the application authors seemed to have a ‘nose’ for identifying ‘bad actors’; at a particular stage in the streaming job he would load the Master UI and sort the view by RegionServer hit rate. The server that was persistently riding with the highest hit rate, he claimed, was causing the backup.

An inspection of the identified RegionServer turned up some interesting traits; it was exhibiting a high hit rate but also suffered from high response latency and its RPC queue was heavily backed-up. The RegionServer and the DataNode processes reported bloated file descriptor counts in excess of 100k, as observed using lsof -p PID|wc -l. Thread dumps were also particular with most handlers, reading, in this state:

"RpcServer.default.FPBQ.Fifo.handler=0,queue=0,port=16020" #64 daemon prio=5 os_prio=0 cpu=1209403.19ms elapsed=114238.14s tid=0x00007ed9466bfc30 nid=0x3c12 waiting on condition [0x00007ed943dfd000]

java.lang.Thread.State: WAITING (parking)

at jdk.internal.misc.Unsafe.park(java.base@11.0.12/Native Method)

- parking to wait for <0x00007edad0f9bad8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)

at java.util.concurrent.locks.LockSupport.park(java.base@11.0.12/LockSupport.java:194)

at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(java.base@11.0.12/AbstractQueuedSynchronizer.java:2018)

at org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:245)

at org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:418)

at org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1002)

at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:531)

at org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:772)

at org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:709)

at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:480)

at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.build(BlockReaderFactory.java:358)

at org.apache.hadoop.hdfs.DFSInputStream.getBlockReader(DFSInputStream.java:645)

at org.apache.hadoop.hdfs.DFSInputStream.actualGetFromOneDataNode(DFSInputStream.java:1050)

at org.apache.hadoop.hdfs.DFSInputStream.fetchBlockByteRange(DFSInputStream.java:1002)

at org.apache.hadoop.hdfs.DFSInputStream.pread(DFSInputStream.java:1361)

at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:1325)

at org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:93)

at org.apache.hadoop.hbase.io.util.BlockIOUtils.preadWithExtra(BlockIOUtils.java:214)

at org.apache.hadoop.hbase.io.hfile.HFileBlock$FSReaderImpl.readAtOffset(HFileBlock.java:1531)

at org.apache.hadoop.hbase.io.hfile.HFileBlock$FSReaderImpl.readBlockDataInternal(HFileBlock.java:1755)

at org.apache.hadoop.hbase.io.hfile.HFileBlock$FSReaderImpl.readBlockData(HFileBlock.java:1563)

at org.apache.hadoop.hbase.io.hfile.HFileReaderImpl.readBlock(HFileReaderImpl.java:1339)

at org.apache.hadoop.hbase.io.hfile.HFileBlockIndex$CellBasedKeyBlockIndexReader.loadDataBlockWithScanInfo(HFileBlockIndex.java:331)

at org.apache.hadoop.hbase.io.hfile.HFileReaderImpl$HFileScannerImpl.seekTo(HFileReaderImpl.java:681)

at org.apache.hadoop.hbase.io.hfile.HFileReaderImpl$HFileScannerImpl.seekTo(HFileReaderImpl.java:633)

at org.apache.hadoop.hbase.regionserver.StoreFileScanner.seekAtOrAfter(StoreFileScanner.java:315)

at org.apache.hadoop.hbase.regionserver.StoreFileScanner.seek(StoreFileScanner.java:216)

at org.apache.hadoop.hbase.regionserver.StoreScanner.seekScanners(StoreScanner.java:426)

at org.apache.hadoop.hbase.regionserver.StoreScanner.<init>(StoreScanner.java:263)

at org.apache.hadoop.hbase.regionserver.HStore.createScanner(HStore.java:2175)

at org.apache.hadoop.hbase.regionserver.HStore.getScanner(HStore.java:2166)

at org.apache.hadoop.hbase.regionserver.HRegion$RegionScannerImpl.initializeScanners(HRegion.java:6618)

at org.apache.hadoop.hbase.regionserver.HRegion$RegionScannerImpl.<init>(HRegion.java:6598)

at org.apache.hadoop.hbase.regionserver.HRegion.instantiateRegionScanner(HRegion.java:3028)

at org.apache.hadoop.hbase.regionserver.HRegion.getScanner(HRegion.java:3008)

at org.apache.hadoop.hbase.regionserver.HRegion.getScanner(HRegion.java:2990)

at org.apache.hadoop.hbase.regionserver.HRegion.getScanner(HRegion.java:2984)

at org.apache.hadoop.hbase.regionserver.RSRpcServices.get(RSRpcServices.java:2679)

at org.apache.hadoop.hbase.regionserver.RSRpcServices.doNonAtomicRegionMutation(RSRpcServices.java:887)

at org.apache.hadoop.hbase.regionserver.RSRpcServices.multi(RSRpcServices.java:2829)

at org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:44870)

at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:393)

at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:133)

at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:338)

at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:318)

These reads were waiting to allocate a slot in a SCR shared memory segment (See the HDFS短路读详解_Hadoop技术博文 blog for background and see the SCR code in HDFS). Our HDFS version was older so we suspected we were experiencing some of the issues identified in the Sun Lisheng blog post, many of which have been fixed in HDFS 3.3.1. We were also suspicious of the coarse lock on the SCR cache as noted in HDFS-15202, which has also been fixed in HDFS-3.3.1.

To confirm our suspicions, we enabled TRACE-level logging in the org.apache.hadoop.hdfs.shortcircuit package on one of our production RegionServers. The output was profuse and hard to read. The pathological state occurred at high load only, which meant the rate of logging was high -- so high that it overwhelmed the logging sub-system; log-lines were cut-off. We had to infer what a log line was trying to say by correlating the thread name and context with the SCR code. As best as we could tell, within seconds, we were reloading file descriptors for the same blocks over and over again. How could this be? The file descriptors for the blocks were supposed to be cached. We fixated on this log line:

2021-07-31 08:26:24,723 DEBUG [ShortCircuitCache_Cleaner] shortcircuit.ShortCircuitCache: org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache$CacheCleaner@58c8bc43: finishing cache cleaner run started at 5052332994. Demoted 0 mmapped replicas; purged 0 replicas.

Repeatedly, this cache cleaner thread logged activity but in essence was performing no work on the cache: ‘purged 0 replicas’.

In this cluster, dfs.client.read.shortcircuit.streams.cache.size was set to 1000. The default value of this configuration is 256. This parameter controls the size of the HDFS SCR cache of local block file descriptors. dfs.client.read.shortcircuit.streams.cache.expiry.ms was set to the default of five minutes. The cache cleaner runs on a dfs.client.read.shortcircuit.streams.cache.expiry.ms/4 interval. We noticed that when a SCR file descriptor is returned to the SCR cache, the code will check the cache size and purge entries if the size is in excess of the configured limit. This code path seemed to run too often. We speculated that the cluster read rate was such that the cache was up against its limit; when the cache cleaner ran, there was nothing for it to evict because every check-back-in of an element brought on a cache purge to make space to accommodate the returned item.


Despite the cache being full, nearly every read required an expensive inline HDFS block open.

Some data that helped with our analysis:

  1. From Sun Lisheng’s blog, under heavy load, the latency at 75th-percentile of allocating the local HDFS block ReplicaInfo SCR subclass that hosts the local block file descriptors is 50ms. 

  2. On average this cluster has ~4 hfiles per region. Almost every file was greater than 256M in size. HDFS blocksize on this cluster was configured to be 256M.

To serve a read request, assume the RegionServer needs to open ~2 HDFS blocks per HFile. On average the HFile size is greater than the default block size so an HFile comprises more than one HDFS block. On the open of an HFile we need to consult the HFile trailer data structure. It is where the HFile metadata is stored. We then read the data which could be in a different block to that of the trailer data structure.

So, if we have approximately two HDFS blocks per HFile and each Region has four HFiles, then on average, we open roughly 8 HDFS blocks per Region. The overhead of setting up SCRs, at an extreme, could be ~50ms * 8 = 400ms, which is a huge portion of our read request budget!

File descriptor counts in the DataNode and RegionServer process tended to bloat under high-load. This was happening because not only was file descriptor allocation slow, but so was the SCR file descriptor purge operation as noted in HDFS短路读详解_Hadoop技术博文 (fixed in HDFS 3.3.1). Open file descriptors stuck around while waiting on their clean-up to run.


We tried increasing dfs.client.read.shortcircuit.streams.cache.size to 4k from 1k, and then 16k, and saw no change in behavior. It was only when we hit 64k that we started to see cache cleaner log lines stating that the purge was non-zero. That is to say, file descriptors had been in the cache for at least dfs.client.read.shortcircuit.streams.cache.expiry.ms. The file descriptor cache in SCR had enough capacity to cache an active working set. The cache had started working!

We mentioned our discoveries in the Apache HBase #user slack channel and a long-time contributor, Bryan Beaudreault, mentioned that he’d run into a similar issue and offered the following heuristic (which makes sense to us):

...nowadays, we tune dfs.client.read.shortcircuit.streams.cache.size to approx totalHdfsBlocks * 3 / numRegionServers. We periodically re-tune.

totalHfdsBlocks * 3 (hdfs replication factor) is an estimate of the total count of physical blocks in the HDFS cluster. Thus, totalHdfsBlocks * 3 / numRegionServers is an estimate of the average number of HDFS blocks that a RegionServer (or more precisely, a DataNode) hosts. With this configuration, the SCR cache size is configured such that it can cache all of the HDFS block file descriptors managed by one RegionServer.

In our case, Bryan’s equation suggests that for our cluster, we set dfs.client.read.shortcircuit.streams.cache.size to ~60k. This configuration is set in the RegionServer process because it’s the RegionServer that has the instance of the HDFS client, and thus an instance of the ShortCircuitRead cache. We applied this configuration change to all RegionServers in the cluster and it fixed the slow read behavior seen in the multistep Spark Streaming application.


We’d like to thank our colleagues Cesar Delgado, Clara Xiong, Nick Dimiduk, and Sean Busbey for their helpful feedback and input.

Monday September 09, 2019

Introduction "hbtop", a real-time monitoring tool for HBase modeled after Unix's 'top' command

by Toshihiro Suzuki, HBase Committer

hbtop is a real-time monitoring tool for HBase modeled after Unix's _top_ command. It can display summary information as well as metrics per Region/Namespace/Table/RegionServer. With this tool, you can see metrics sorted by a selected field and filter the metrics to see only metrics you are interested in. Also, with the drill-down feature, you dig in on hot-spotting Regions.

hbtop (https://issues.apache.org/jira/browse/HBASE-11062) is available in the coming hbase 2.1.7, 2.2.2, and 2.3.0 releases and is being actively backported to branch-1.

Run hbtop with the following command:

$ hbase hbtop

In this case, the values of hbase.client.zookeeper.quorum and zookeeper.znode.parent in hbase-site.xml in the classpath or their default values are used to connect. Or, specify your own zookeeper quorum and znode parent as follows:

$ hbase hbtop -Dhbase.client.zookeeper.quorum=<zookeeper quorum> -Dzookeeper.znode.parent=<znode parent>

Run hbtop and you'll see something like the following:

Top screen

The top screen consists of a summary and metrics section. In the summary section, you can see HBase Version, Cluster ID, The number of region servers, Region count, Average Cluster Load and Aggregated Request/s. In the metrics portion, you can see metrics per Region/Namespace/Table/RegionServer depending on the selected mode. The top screen is refreshed on a period – 3 seconds by default.

You can scroll the metric records in the metrics section.

Scrolling metric records

Command line arguments

Argument Description
-d,--delay <arg> The refresh delay (in seconds); default is 3 seconds
-h,--help Print usage; for help while the tool is running press h key
-m,--mode <arg>

The mode n (Namespace)|t (Table)|r (Region)|s (RegionServer), default is r (Region) Mode

You can change mode by pressing the m key in the top screen.

Changing mode

Change the refresh rate by pressing d key in the top screen.

Changing the refresh delay

You can change the field screen by pressing f key in the top screen. In the fields screen, you can change the displayed fields by choosing a field and pressing d key or space key.

Changing the displayed fields

You can move to the fields screen by pressing f key in the top screen. In the field screen, you can change the sort field by choosing a field and pressing s key. Also, you can change the sort order (ascending or descending) by pressing R key.

Changing the sort field

You can move to the fields screen by pressing f key in the top screen. In the field screen, you can change the order of the fields.

Changing the order of the fields


You can filter the metric records with the filter feature. We can add filters by pressing o key for ignoring case or O key for case sensitive.

Adding filters

The syntax is as follows:


For example, we can add filters like the following:


The operators you can specify are as follows:

Operator Description
= Partial match
== Exact match
> Greater than
>= Greater than or equal to
< Less than
<= Less than and equal to

You can see the current filters by pressing ^o key and clear them by pressing = key.

Showing and clearing filters

Drilling down

You can drill down on a metric record by pressing i key in the top screen. With this feature, you can find hot regions easily in a top-down manner.

Drilling down

Help screen

You can see the help screen by pressing h key in the top screen.

Help screen

How hbtop gets the metrics data

hbtop gets the metrics from ClusterMetrics which is returned as the result of a call to Admin#getClusterMetrics() on the current HMaster. To add metrics to hbtop, they will need to be exposed via ClusterMetrics.

Feedback, bugs, and enhancements are all welcome!

The Reference Guide - hbtop:


Friday August 31, 2018

Beijing in Summertime: HBaseConAsia2018

by Yu Li, Chair of the HBaseConAsia2018 Conference Committee and member of the HBase PMC and Michael Stack, HBase PMC-er.

A crowd of us gathered at the Gehua New Century Hotel, in North-Eastern Beijing to attend a lively HBaseConAsia2018. It was an all-day community-run event held on August 17th. Admission was free thanks to our generous host, the Alibaba Group. This was the second HBaseConAsia conference. The first was hosted by Huawei in Shenzhen, in 2017.

The majority of the talks were in Chinese [1] but slides were in English. Below are links that point into slideshare where you’ll find slides and video. There were three tracks -- internals, ops, and ecosystem -- of heavy-duty hbasing.

Your authors led off the conference with a keynote on the “State of Apache HBase”. It covered general state (“stable”), the effort getting hbase-2.0.0 out the door, and toward the end, a couple of features that would sit pretty in any hbase-3.0.0.

The second (dense) keynote was by Chunhui Shen (Long-time HBase PMC member) and Long Cao. They talked of “Recent Developments” around HBase at Alibaba and at Alibaba Cloud (“Aliyun”). Alibaba has been using HBase since 0.20.6! and keep up their own version internally (“AliHB”) where they dev features that later get pushed upstream to Apache. The node and cluster counts were ahem, impressive, as is the ecosystem built up over time to serve different workload types. Near-future projects are figuring separation of compute from storage and hbase deploy on pangu, the Alibaba filesystem. This talk also served as overview for some of the talks that came later in the day where topics mentioned in the morning were given a deep dive in later sessions.

One such deep-dive was the first talk in track one, on “Using CCSMap to improve HBase YGC time” by Chance Li and Lijin Bin. CCSMap is a stripped-down ConcurrentSkipListMap for use in place of the native java implementation HBase currently uses. Look for it in an upcoming Apache HBase release.

Ramkrishna Vasudevan and Anoop Sam John, HBase PMCers from Intel described their recent development work, WALLess HBase with persistent memory devices.

A bunch of talks were focused on hbase deploys at various companies. The Xiaomi folks, heavyweight contributors to Apache HBase with lots of HBase deployed at scale, gave two talks, one by Xie Gang on “HDFS optimizations for HBase at Xiaomi“, and “HBase at Xiaomi” by HBase Committer, Guanghao Zhang. JingYi Yao talked on “HBase at Didi” (GeoMesa, JanusGraph, and Phoenix). Other interesting deploy-types were described in “HBase at China Telecom”, “@ China Life Insurance”, “@ Meituan”, and “@ Lianjia”.

The boys from PInterest, Chenji Pan and Lianghong Xu, did “Improving HBase reliability at Pinterest with geo-­‐replication and efficient backup” and our WenLong (Allan) Yang described an interesting scheme for treating hot and cold data differently in “Separating hot-cold data into heterogeneous storage based on layered compaction”.

Apache Kylin and JanusGraph run on HBase and each had a dedicated session. AntsDB is a fun new project that puts a mysql face on an Apache HBase cluster. Its easy to set up and works nicely.

Alibaba described how they do security, phoenix, spark, in their cloud HBase offering, AsparaDB, and an interesting backup solution they use internally that requires “zero-modification” to HBase. “The Application of HBase in New Energy Vehicle Monitoring System” was by Yan Yu, Chetankumar Jyestaram Khatri talked on “Scaling 30 TB's of Data lake with Apache HBase and Scala DSL at Production”, Biju Nair of Bloomberg described “Serving billions of queries in millisecond latency”, and Pankaj Kumar, Wei Zhi, and Chaoqiang Zhong of last year’s hosts Huawei presented on “HBase and OpenTSDB practice at Huawei” (Huawei also offer HBase in the cloud).

At the end of all sessions, a worn-out PMC took questions from a worn-out audience.

On the day after, HBase contributors came together for a fruitful dev meeting. Rough Notes were posted to the dev@hbase mailing list.

Let us finish this short report with a picture of all the HBaseConAsia2018 speakers taken at the end of the day after all was done.

  1. https://www.quora.com/Is-it-appropriate-to-refer-to-Mandarin-simply-as-Chinese

Friday July 14, 2017


(This is the second of a two-part post. The first part an be found at this spot).


Using a technique called key-flattening, a document can be shredded by storing each value in the document according to the path from the root to the name of the element containing the value.  HDocDB uses this approach.

Column Family: default
Row Key Column: <property 1 path> Column: <property 2 path>
<document ID>  <property 1 value> <property 2 value>

The document can also be stored as a binary value, in which case support for Medium Objects (MOBs) can be used if the documents are large.  This approach is described in the book Architecting HBase Applications.

Column Family: default
Row Key Column: body
<document ID>  <reference to MOB>


There are many ways to store a graph in HBase.  One method is to use an adjacency list, where each vertex stores its neighbors in the same row.  This is the approach taken in JanusGraph.

Column Family: default
Row Key Column: <edge 1 key> Column: <edge 2 key> Column: <property 1 name> Column: <property 2 name>
<vertex ID>  <edge 1 properties> <edge 2 properties> <property 1 value> <property 2 value>

In the table above, the edge key is actually comprised of a number of parts, including the label, direction, edge ID, and adjacent vertex ID.

Alternatively, a separate table to represent edges can be used, in which case the incident vertices are stored in the same row as an edge.   This may scale better if the adjacency list is large, such as in a social network.  This is the approach taken in both Zen and HGraphDB.

Column Family: default
Row Key Column: <property 1 name> Column: <property 2 name>
<vertex ID> <property 1 value> <property 2 value>
Column Family: default
Row Key Column: fromVertex Column: toVertex Column: <property 1 name> Column: <property 2 name>
<edge ID>  <vertex ID> <vertex ID> <property 1 value> <property 2 value>

When storing edges in a separate table, additional index tables must be used to provide efficient access to the incident edges of a vertex.  For example, the full list of tables in HGraphDB can be viewed here.


A queue can be modeled by using a row key comprised of the consumer ID and a counter.  Both Cask and Box implement queues in this manner.

Column Family: default
Row Key Column: metadata Column: body
<consumer ID + counter>  <message metadata> <message body>

Cask also uses coprocessors for efficient scan filtering and queue trimming, and Apache Tephra for transactional queue processing.


The Metrics archetype is a variant of the Entity archetype in which the column values are counters or some other aggregate.

Column Family: default
Row Key Column: <property 1 name> Column: <property 2 name>
<entity ID>  <property 1 counter> <property 2 counter>

HGraphDB is actually a combination of the Graph and Metrics archetypes, as arbitrary counters can be stored on either vertices or edges.

(This is the second of a two-part article. The first part can be found over here). 


by Robert Yokota, HBase Contributor

(This post originally appeared on Robert's personal blog. It is reposted here as a two-parter. The second-part can be found here.)

At Yammer, we’ve transitioned away from polyglot persistence to persistence consolidation. In a microservice architecture, the principle that each microservice should be responsible for its own data had led to a proliferation of different types of data stores at Yammer. This in turn led to multiple efforts to make sure that each data store could be easily used, monitored, operationalized, and maintained. In the end, we decided it would be more efficient, both architecturally and organizationally, to reduce the number of data store types in use at Yammer to as few as possible.

Today HBase is the primary data store for non-relational data at Yammer (we use PostgreSQL for relational data).  Microservices are still responsible for their own data, but the data is segregated by cluster boundaries or mechanisms within the data store itself (such as HBase namespaces or PostgreSQL schemas).

HBase was chosen for a number of reasons, including its performance, scalability, reliability, its support for strong consistency, and its ability to support a wide variety of data models.  At Yammer we have a number of services that rely on HBase for persistence in production:

  • Feedie, a feeds service
  • RoyalMail, an inbox service
  • Ocular, for tracking messages that a user has viewed
  • Streamie, for storing activity streams
  • Prankie, a ranking service with time-based decay
  • Authlog, for authorization audit trails
  • Spammie, for spam monitoring and blocking
  • Graphene, a generic graph modeling service

HBase is able to satisfy the persistence needs of several very different domains. Of course, there are some use cases for which HBase is not recommended, for example, when using raw HDFS would be more efficient, or when ad-hoc querying via SQL is preferred (although projects like Apache Phoenix can provide SQL on top of HBase).

Previously, Lars George and Jonathan Hsieh from Cloudera attempted to survey the most commonly occurring use cases for HBase, which they referred to as application archetypes.  In their presentation, they categorized archetypes as either “good”, “bad”, or “maybe” when used with HBase. Below I present an augmented listing of their “good” archetypes, along with pointers to projects that implement them.


The Entity archetype is the most natural of the archetypes.  HBase, being a wide column store, can represent the entity properties with individual columns.  Projects like Apache Gora and HEntityDB support this archetype.

Column Family: default
Row Key Column: <property 1 name> Column: <property 2 name>
<entity ID>  <property 1 value> <property 2 value>

Entities can be also stored in the same manner as with a key-value store.  In this case the entity would be serialized as a binary or JSON value in a single column.

Column Family: default
Row Key Column: body
<entity ID>  <entity blob>


The Sorted Collection archetype is a generalization of the original Messaging archetype that was presented.  In this archetype the entities are stored as binary or JSON values, with the column qualifier being the value of the sort key to use.  For example, in a messaging feed, the column qualifier would be a timestamp or a monotonically increasing counter of some sort.  The column qualifier can also be “inverted” (such as by subtracting a numeric ID from the maximum possible value) so that entities are stored in descending order.

Column Family: default
Row Key Column: <sort key 1 value> Column: <sort key 2 value>
<collection ID>  <entity 1 blob> <entity 2 blob>

Alternatively, each entity can be stored as a set of properties.  This is similar to how Cassandra implements CQL.  HEntityDB supports storing entity collections in this manner.

Column Family: default
Row Key Column: <sort key 1 value + property 1 name> Column: <sort key 1 value + property 2 name> Column: <sort key 2 value + property 1 name> Column: <sort key 2 value + property 2 name>
<collection ID> <property 1 of entity 1> <property 2 of entity 1> <property 1 of entity 2> <property 2 of entity 2>

In order to access entities by some other value than the sort key, additional column families representing indices can be used.

Column Family: sorted Column Family: index
Row Key Column: <sort key 1 value> Column: <sort key 2 value> Column: <index 1 value> Column: <index 2 value>
<collection ID>  <entity 1 blob> <entity 2 blob> <entity 1 blob> <entity 2 blob>

To prevent the collection from growing unbounded, a coprocessor can be used to trim the sorted collection during compactions.  If index column families are used, the coprocessor would also remove corresponding entries from the index column families when trimming the sorted collection.  At Yammer, both the Feedie and RoyalMail services use this technique.  Both services also use server-side filters for efficient pagination of the sorted collection during queries.

Continued here...

Sunday April 09, 2017

Accordion: Developer View of In-Memory Compaction

by Anastasia Braginsky (HBase Committer), Eshcar Hillel (HBase Committer) and Edward Bortnikov (Contributor) of Yahoo! Research

In-memory compaction (Accordion project) demonstrated sizable improvement in HBase’s write amplification and read/write performance. In this post, we describe the design behind Accordion’s algorithms, and how it fits within the HBase internals.

What’s New

Accordion affects the regionserver package. Its centerpiece component is the CompactingMemStore class, which inherits from AbstractMemStore, and is sibling to DefaultMemStore. In contrast with DefaultMemStore, which maintains a monolithic dynamic (mutable) index to cell storage, CompactingMemStore manages multiple indexes, ordered by creation time. The youngest index is mutable, whereas the rest are immutable.

Cell indexes are implemented as descendants of the CellSet class that provides the basic NavigableMap access to cells. In addition to the traditional ConcurrentSkipListMap mutable index, Accordion introduces an immutable CellArrayMap index - a space-efficient ordered array that uses binary search. CellArrayMap is allocated on heap.

Accordion introduces the Segment abstraction, which encapsulates the combination of the CellSet and associated metadata (time range tracker, MSLAB reference, size counters, etc.). Beforehand, these (gory) details were managed directly by the MemStore. The abstract Segment class manages a single CellSet and its metadata. It has two subclasses:  MutableSegment and ImmutableSegment. The latter can either manage an immutable CellSet, or provide a read-only wrapper to a mutable CellSet. The CompositeImmutableSegment class extends ImmutableSegment; it provides a similar API for a fixed set of segments.

Segment’s are scannable. The traversal is provided by the SegmentScanner class that implements the KeyValueScanner interface. SegmentScanner exploits the NavigableMap API implemented by the CellSet encapsulated by the segment.

CompactingMemStore manages one MutableSegment (in what follows, active) and multiple ImmutableSegment’s. It supports the top-level scan mechanism via a list of SegmentScanner’s, each referring to one segment. In this context, the MemStoreScanner class became deprecated and was eliminated in HBase 2.0.

Figure 1 depicts the Segment and cell index (NavigableMap) class hierarchies.


Figure 1. Segment and cell index (NavigableMap) class hierarchies.

Immutable segments are created upon in-memory flush. Following this, they travel through an interim pipeline (CompactionPipeline class) to the snapshot buffer from where they are flushed to disk, and finally released. Pipeline is accessed in parallel by multiple tasks; in what follows, we discuss how its thread-safety and correctness are guaranteed. The snapshot is simpler because its content never changes; it is implemented as CompositeImmutableSegment.  

In-memory flushes trigger in-memory compactions. The latter replace one or more segments in pipeline with semantically equivalent but more memory-efficient presentations. The MemStoreCompactor class is an algorithmic tool that implements the in-memory compaction policies. It uses the MemStoreSegmentsIterator helper class to traverse the segments. Figure 2 depicts the classes that implement in-memory compaction.


Figure 2. Classes that implement in-memory compaction.

The  StoreScanner class implements a consistent scan mechanism for HRegion. It maintains a heap of KeyValueScanner’s to merge the MemStore data with the on-disk HFile data. CompactingMemStore returns a subset of these scanners (list of SegmentScanner instances) for all its Segment’s.

MemStoreCompactor exploits the same mechanism, via the MemStoreSegmentsIterator helper; it only iterates through immutable segments. Figure 3 depicts the classes involved in in-memory compaction.


Figure 3. Classes involved in in-memory compaction.

Managing the Compacting Memstore State

MemStore’s in HBase run processing tasks concurrently with serving normal read and write requests - for example, flush data from RAM to disk. In CompactingMemStore, there are more concurrent scenarios, with in-memory flushes and compactions introducing more complexity. Here, pipeline is the most complex since it is accessed by multiple tasks in parallel.

Our guiding principles are:

  1. Correctness. Data retrieval semantics are preserved - in particular, data is never lost.

  2. Performance. Infrequent flushes and compactions, which happen in the background, do not affect the datapath operations, namely scans.

Let us give a quick look at how these principles manifest in the CompactionPipeline design.

Data Structures and Synchronization

Pipeline contains a double-ended queue of ImmutableSegments’s ordered by segment creation time. It is accessed by scans (read) as well as flushes and compactions (update). Since the segments are immutable, it is sufficient to provide the reader with a clone of the queue. One way to go would be to clone upon each scan, under the protection of a reentrant shared lock. We chose a more efficient copy-on-write approach. Namely, only the update operations synchronize on the pipeline. Each update modifies the read-only copy of the queue (volatile reference). The subsequent scans retrieve their clone lock-free. Note that if some segments are removed from the queue by in-memory compaction or disk flush in parallel with an ongoing scan, correctness is not affected because the data does not disappear. Rather, it may be referenced from multiple locations (for instance, both pipeline and snapshot). The scan algorithm filters the duplicates.

In-memory compaction swaps one or more segments in the queue with new (compacted) segments. Similarly to scan, it is a long-running operation, which should not disrupt the concurrent datapath operations. In order to achieve this, we implemented compaction in a non-blocking way. CompactionPipeline maintains a version that is promoted each time the queue tail is modified. When the compaction starts, it records this version. Upon completion, it atomically checks whether the version changed in the meantime, and atomically swaps the segments if it did not. This opportunistic approach succeed in most cases. Since in-memory compaction is an optimization, it is fine for it to fail on rare occasions. The version counter (long) is volatile - that is, changes to it are atomic and immediately observable.

Detailed Scenarios

Scan Operation (in particular, Get). The SegmentScanner’s are created (non-atomically) in the order of data movement between the MemStore segments, to preserve correctness. For example, in the course of scanner set creation a segment can move from active to pipeline, in which case it will be referenced by two scanners - however, no data is lost. The merge algorithm eliminates the redundant results that stem from the overlap.

In-Memory Flush (happens when active overflows). A dedicated worker (1) blocks updates for the region (via RegionServicesForStores), (2) creates a new ImmutableSegment that wraps active, (3) atomically inserts it into pipeline, (4) creates a new MutableSegment and flips the active reference to it, (5) unblocks the updates, and (6) calls MemStoreCompactor.

Disk Flush (happens when the region overflows, and decides to free up space in RAM). A dedicated worker (1) forces in-memory flush (to guarantee there is at least one segment in the pipeline), (2) creates a new CompositeImmutableSegment from all segments in the read-only clone of pipeline and flips the snapshot reference, (3) atomically removes references to segments in snapshot from CompactionPipeline, and (4) scans snapshot (merge across multiple segments) and flushes the results to disk.

In-Memory Compaction (triggered by in-memory flush, except in the disk flush case). (1) Retrieves a versioned copy of pipeline, (2) builds a new (compacted) ImmutableSegment, (3) atomically, if the version did not change, swap one or more segments in pipeline with the new segment (swap target depends on the compaction policy, see below).

Note that all the atomic sections are extremely lightweight. They only include manipulation of a few references, and avoid any computation and copy.

In-Memory Compaction Policies

MemStoreCompactor provides two compaction policies: BASIC and EAGER.

The BASIC policy is a low-cost/low-overhead alternative that merges the indexes of all segments in pipeline into a single flat index. It does not eliminate redundancies, in order to avoid cell data copy. Namely, once the number of segments in pipeline exceeds N, the algorithm scans the CellSet’s of N+1 youngest segments in pipeline, and copies the KeyValue references to a new CellArrayMap. The scan retrieves all the KeyValue’s in the original CellSet’s ordered by key and version (non-SQM matcher).

The EAGER policy is a high-cost/high-reward alternative that both flattens the index and eliminates redundancies across segments. It scans all the segments in pipeline, and merges them into one segment encapsulating a new CellArrayMap index. Redundant data versions are eliminated in the course of scan (SQM matcher). If the MemStore uses MSLAB cell storage, then the data is copied to new (compact) MSLAB’s under the new index. This policy trades extra data copy and GC overhead for maximal memory efficiency.

Disk Flush Policy and WAL Truncation

HBase 2.0 introduces a notion of sloppy MemStore’s - that is, MemStore implementations that dynamically expand and contract their RAM footprint over time. CompactingMemStore is currently the only sloppy MemStore implementation. When a region triggers a flush to disk to free up memory, sloppy stores  are the last candidates for flush. The rationale is that they manage their memory more efficiently than DefaultMemStore by over time, and therefore should be prioritized for remaining in RAM.

Disk flushes trigger WAL truncation (archiving), as the WAL entries corresponding to persisted data versions become obsolete. Region maintains the estimate of the lower bound (minimum sequence id) of non-flushed data among all its stores; the log entries below this bound can be safely removed. Prior to Accordion, this maintenance was simple. Since DefaultMemStore dumps the whole in-memory content to disk, the store-level minimum sequence id was reset when flush was scheduled, and re-installed by the first put operation to occur after the flush.

Since sloppy stores can flush in-memory data to disk partially (for example, CompactingMemStore can flush any suffix of CompactionPipeline) the minimum sequence id maintenance becomes more subtle, to avoid data loss. Namely, every segment maintains its own minimum sequence id, and therefore, the CompactingMemStore lower bound is the minimum among all segments. Note that this is just a conservative estimate. For example, an eager in-memory compaction that happens concurrently to a disk flush might eliminate redundant cells and thereby lift the lower bound. However, this estimate is safe because the value can only monotonously grow over time. It can be safely computed anytime; no atomicity is required while retrieving the segment lower bounds.

If the WAL grows too big despite the truncation efforts, the periodic LogRoller process kicks in and forces a full flush to disk. This generic mechanism guarantees that the recovery after crash does not need to replay the entire history, and also trims the WAL. In other words, however efficient, in-memory compaction does not eliminate disk flushes entirely - rather, it pushes them further into the future. Note that for when EAGER compaction is adopted, periodic flushing is even more important because the WAL stores all the data redundancies that are eliminated by the compaction algorithm.


In this blog post, we covered Accordion’s internals - new classes, relationships, and execution flows. We also zoomed in the synchronization scheme that guarantees thread-safety, and shed light on the compaction policy implementations.

We thank Michael Stack, Anoop Sam John and Ramkrishna Vasudevan for their continuous support that made this project happen.

Accordion: HBase Breathes with In-Memory Compaction

by Anastasia Braginsky (HBase Committer), Eshcar Hillel (HBase Committer) and Edward Bortnikov (Contributor) of Yahoo! Research

Modern products powered by HBase exhibit ever-increasing  expectations from its read and write performance. Ideally, HBase applications would like to enjoy the speed of in-memory databases without giving up on the reliable persistent storage guarantees. We introduce a new algorithm in HBase 2.0, named Accordion, which takes a significant step towards this goal.

HBase partitions the data into regions controlled by a cluster of RegionServer’s. The internal (vertical) scalability of RegionServer is crucial for end-user performance as well as for the overall system utilization. Accordion improves the RegionServer scalability via a better use of RAM. It accommodates more data in memory and writes to disk less frequently. This manifests in multiple desirable phenomena. First, HBase’s disk occupancy and write amplification are reduced. Second, more reads and writes get served from RAM, and less are stalled by disk I/O - in other words, HBase’s performance is increased. Traditionally, these different metrics were considered at odds, and tuned at each other’s expense. With Accordion, they all get improved simultaneously.

Accordion is inspired by the Log-Structured-Merge (LSM) tree design pattern that governs the HBase storage organization. An HBase region is stored as a sequence of searchable key-value maps. The topmost is a mutable in-memory store, called MemStore, which absorbs the recent write (put) operations. The rest are immutable HDFS files, called HFiles. Once a MemStore overflows, it is flushed to disk, creating a new HFile. HBase adopts the multi-versioned concurrency control, that is, MemStore stores all data modifications as separate versions. Multiple versions of one key may therefore reside in MemStore and the HFile tier. A read (get) operation, which retrieves the value by key, scans the HFile data in BlockCache, seeking for the latest version. To reduce the number of disk accesses, HFiles are merged in the background. This process, called compaction, removes the redundant cells and creates larger files.

LSM trees deliver superior write performance by transforming random application-level I/O to sequential disk I/O. However, their traditional design makes no attempt to compact the in-memory data. This stems from historical reasons: LSM trees have been designed in the age when RAM was very short resource, therefore the MemStore capacity was small. With recent changes in the hardware landscape, the overall MemStore memstore managed by RegionServer can be multiple gigabytes, leaving a lot of headroom for optimization.

Accordion reapplies the LSM principle to MemStore, in order to eliminate redundancies and other overhead while the data is still in RAM. Doing so decreases the frequency of flushes to HDFS, thereby reducing the write amplification and the overall disk footprint. With less flushes, the write operations are stalled less frequently as the MemStore overflows, therefore the write performance is improved. Less data on disk also implies less pressure on the block cache, higher hit rates, and eventually better read response times. Finally, having less disk writes also means having less compaction happening in the background, i.e., less cycles are stolen from productive (read and write) work. All in all, the effect of in-memory compaction can be envisioned as a catalyst that enables the system move faster as a whole.

Accordion currently provides two levels of in-memory compaction - basic and eager. The former applies generic optimizations that are good for all data update patterns. The latter is most useful for applications with high data churn, like producer-consumer queues, shopping carts, shared counters, etc. All these use cases feature frequent updates of the same keys, which generate multiple redundant versions that the algorithm takes advantage of to provide more value. On the flip side, eager optimization may incur compute overhead (more memory copies and garbage collection), which may affect response times under intensive write loads. The overhead is high if the MemStore uses on-heap MemStore-Local Allocation Buffer (MSLAB) allocation; this configuration is not advised in conjunction with eager compaction. See more details about Accordion’s compaction algorithms in the next sections.

Future implementations may tune the optimal compaction policy automatically, based on the observed workload.

How To Use

The in-memory compaction level can be configured both globally and per column family. The supported levels are none (legacy implementation), basic, and eager.

By default, all tables apply basic in-memory compaction. This global configuration can be overridden in hbase-site.xml, as follows:





The level can also be configured in the HBase shell per column family, as follows:  

create ‘<tablename>’,


Performance Gains, or Why You Should Care

We stress-tested HBase extensively via the popular Yahoo Cloud Service Benchmark (YCSB). Our experiments used 100-200 GB datasets, and exercised a variety of representative workloads. The results demonstrate significant performance gains delivered by Accordion.

Heavy-tailed (Zipf) distribution. The first experiment exercises a workload in which the key popularities follow the Zipf distribution that arises in most of the real-life scenarios. In this context, when 100% of the operations are writes, Accordion achieves up to 30% reduction of write amplification, 20% increase of write throughput, and 22% reduction of GC. When 50% of the operations are reads, the tail read latency is reduced by 12%.

Uniform distribution. The second experiment exercises a workload in which all keys are equally popular. In this context, under 100% writes, Accordion delivers up to 25% reduction of write amplification, 50% increase of write throughput, and 36% reduction of GC. The tail read latencies are not impacted (which is expected, due to complete lack of locality).

How Accordion Works

High Level Design. Accordion introduces CompactingMemStore - a MemStore implementation that applies compaction internally. Contrast to the default MemStore, which maintains all data in one monolithic data structure, Accordion manages it as a sequence of segments. The youngest segment, called active, is mutable; it absorbs the put operations. Upon overflow (by default, 32MB - 25% of the MemStore size bound), the active segment is moved to an in-memory pipeline, and becomes immutable. We call this in-memory flush. Get operations scan through these segments and the HFiles (the latter are accessed via the block cache, as usual in HBase).

CompactingMemStore may merge multiple immutable segments in the background from time to time, creating larger and leaner segments. The pipeline is therefore “breathing” (expanding and contracting), similar to accordion bellows.

When RegionServer decides to flush one or more MemStore’s to disk to free up memory, it considers the CompactingMemStore’s after the rest that have overflown. The rationale is to prolong the lifetime of MemStore’s that manage their memory efficiently, in order to reduce the overall I/O. When such a flush does happen, all pipeline segments are moved to a composite snapshot,  merged, and streamed to a new HFile.

Figure 1 illustrates the structure of CompactingMemStore versus the traditional design.

Figure 1. CompactingMemStore vs DefaultMemStore

Segment Structure. Similarly to the default MemStore, CompactingMemStore maintains an index on top of cell storage, to allow fast search by key. Traditionally, this index was implemented as a Java skiplist  (ConcurrentSkipListMap) - a dynamic but wasteful data structure that manages a lot of small objects. CompactingMemStore uses a space-efficient flat layout for immutable segment indexes. This universal optimization helps all compaction policies reduce the RAM overhead, even when the data has little-to-none redundancies. Once a segment is added to the pipeline, the store serializes its index into a sorted array named CellArrayMap that is amenable to fast binary search.

CellArrayMap supports both direct allocation of cells from the Java heap and custom allocation from MSLAB’s - either on-heap or off-heap. The implementation differences are abstracted away via the helper KeyValue objects that are referenced from the index (Figure 2). CellArrayMap itself is always allocated on-heap.

Figure 2. Immutable segment with a flat CellArrayMap index and MSLAB cell storage.

Compaction Algorithms. The in-memory compaction algorithms maintains a single flat index on top of the pipelined segments. This saves space, especially when the data items are small, and therefore pushes the disk flush further off away in time. A single index allows searching in one place, therefore bounding the tail read latency.

When an active segment is flushed to memory, it is queued to the compaction pipeline, and a background merge task is immediately scheduled. The latter simultaneously scans all the segments in the pipeline (similarly to on-disk compaction) and merges their indexes into one. The differences between the basic and eager compaction policies manifest in how they handle the cell data. Basic compaction does not eliminate the redundant data versions in order to  avoid physical copy; it just rearranges the references the KeyValue objects. Eager compaction, on the contrary, filters out the duplicates. This comes at the cost of extra compute and data migration - for example, with MSLAB storage the surviving cells are copied to the newly created MSLAB(s). The compaction overhead pays off when the data is highly redundant.

Future implementations of compaction may automate the choice between the basic and eager compaction policies. For example, the algorithm might try eager compaction once in awhile, and schedule the next compaction based on the value delivered (i.e., fraction of data eliminated). Such an approach could relieve the system administrator from deciding a-priori, and adapt to changing access patterns.


In this blog post, we covered Accordion’s basic principles, configuration, performance gains, and some details of the in-memory compaction algorithms. The next post will focus on system internals for HBase developers.

We thank Michael Stack, Anoop Sam John and Ramkrishna Vasudevan for their continuous support that made this project happen.

Monday March 27, 2017

HBase on Azure: Import/Export snapshots from/to ADLS

by Apekshit Sharma, HBase Committer.


Azure Data Lake Store (ADLS) is Microsoft’s cloud alternative for Apache HDFS. In this blog, we’ll see how to use it as backup for storing snapshots of Apache HBase tables. You can export snapshots to ADLS for backup; and for recovery, import the snapshot back to HDFS and use it to clone/restore the table. In this post, we’ll go over the configuration changes needed to make HDFS client talk to ADLS, and commands to copy HBase table snapshots from HDFS to ADLS and vice-versa.


“The Azure Data Lake store is an Apache Hadoop file system compatible with Hadoop Distributed File System (HDFS) and works with the Hadoop ecosystem.”

ADLS can be treated as any HDFS service, except that it’s in the cloud. But then how do applications talk to it? That’s where the hadoop-azure-datalake module comes into the picture. It enables an HDFS client to talk to ADLS whenever the following access path syntax is used:

adl://<Account Name>.azuredatalakestore.net/

For eg.
hdfs dfs -mkdir adl://<Account Name>.azuredatalakestore.net/test_dir

However, before it can access any data in ADLS, the module needs to be able to authenticate to Azure. That requires a few configuration changes. These we describe in the next section.

Configuration changes

ADLS requires an OAuth2 bearer token to be present as part of request’s HTTPS header. Users who have access to an ADLS account can obtain this token from the Azure Active Directory (Azure AD) service. To allow an HDFS client to authenticate to ADLS and access data, you’ll need to specify these tokens in core-site.xml using the following four configurations:



To find the values for dfs.adls.oauth2.* configurations, refer to this document.

Since all files/folders in ADLS are owned by the account owner, it’s ACL environment works well with that of HDFS which can have multiple users. Since the user issuing commands using the HDFS client will be different than what’s in Azure’s AD, any operation which checks for ACL will fail. To workaround this issue, use the following configuration which will tell the HDFS client that in case of ADLS requests, assume that the current user owns all files.


Make sure to deploy the above configuration changes to the cluster.

Export snapshot to ADLS

Here are the steps to export a snapshot from HDFS to ADLS.

  1. Create a new directory in ADLS to store snapshots.

$ hdfs dfs -mkdir adl://appy.azuredatalakestore.net/hbase

$ hdfs dfs -ls adl://appy.azuredatalakestore.net/

Found 1 items

drwxr-xr-x   - systest hdfs          0 2017-03-21 23:43 adl://appy.azuredatalakestore.net/hbase

  1. Create the snapshot. To know more about this feature and how to create/list/restore snapshots, refer to HBase Snapshots section in the HBase reference guide.

  2. Export snapshot to ADLS

$ sudo -u hbase hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot <snapshot_name> -copy-to adl://appy.azuredatalakestore.net/hbase


17/03/21 23:50:24 INFO snapshot.ExportSnapshot: Copy Snapshot Manifest

17/03/21 23:50:48 INFO snapshot.ExportSnapshot: Export Completed: snapshot_1

  1. Verify that the snapshot was copied to ADLS.

$ hbase snapshotinfo -snapshot <snapshot_name> -remote-dir adl://appy.azuredatalakestore.net/hbase

Snapshot Info


  Name: snapshot_1

  Type: FLUSH

 Table: t

Format: 2

Created: 2017-03-21T23:42:56

  1. It’s now safe to delete the local snapshot (one in HDFS).

Restore/Clone table from a snapshot in ADLS

If you have a snapshot in ADLS which you want to use either to restore an original table to a previous state, or create a new table by cloning, follow the steps below.

  1. Copy the snapshot back from ADLS to HDFS. Make sure to copy to ‘hbase’ directory on HDFS, because that’s where HBase service will look for snapshots.

$ sudo -u hbase hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot <snapshot_name> -copy-from adl://appy.azuredatalakestore.net/hbase -copy-to hdfs:///hbase

  1. Verify that the snapshot exists in HDFS. (Note that there is no -remote-dir parameter)

$ hbase snapshotinfo -snapshot snapshot_1

Snapshot Info


  Name: snapshot_1

  Type: FLUSH

 Table: t

Format: 2

Created: 2017-03-21T23:42:56

  1. Follow the instructions in HBase Snapshots section of HBase reference guide to restore/clone from the snapshot.


The Azure module in HDFS makes it easy to interact with ADLS. We can keep using the commands we are already know and our applications that use the HDFS client just need a few configuration changes. What what a seamless integration! In this blog, we got a glimpse of the HBase integration with Azure - Using ADLS as a backup for storing snapshots. Let’s see what the future has in store for us. Maybe, a HBase cluster fully backed by ADLS!

Thursday March 09, 2017

Offheap Read-Path in Production - The Alibaba story

By Yu Li (HBase Committer/Alibaba), Yu Sun (Alibaba), Anoop Sam John (HBase PMC/Intel), and Ramkrishna S Vasudevan (HBase PMC/Intel)


HBase is the core storage system in Alibaba’s Search Infrastructure. Critical e-commerce data about products, sellers and promotions etc. are all synced into HBase from various online databases. We query HBase to build and provide real time updates on the search index. In addition, user behavior data, such as impressions, clicks and transactions will also be streamed into HBase. They serve as feature data for our online machine learning system, which optimizes the personalized search result in real time. The whole system produces mixed workloads on HBase that includes bulkload/snapshot for full index building, batch mutation for real time index updates and streaming/continuous query for online machine learning. Our biggest HBase cluster has reached more than 1500 nodes and 200,000 regions. It routinely serves tens of millions QPS.

Both latency and throughput are important for our HBase deploy. From the latency perspective, it directly affects how quickly users can search an item after it has been posted as well as how ‘real-time’ we can run our inventory accounting. From the throughput perspective, it decides the speed of machine learning program processing, and thus the accuracy of recommendations made. What’s more, since data is distributed through the cluster and accesses are balanced, applications are sensitive to latency spikes on a single node, which makes GC a critical factor in our system servicing capability.

By caching more data in memory, the read latency (and throughput) can be greatly improved. If we can get our data from local cache, we save having to make a trip to HDFS. Apache HBase has two layers of data caching. There is what we call “L1” caching, our first caching tier – which caches data in an on heap Least Recently Used (LRU) cache -- and then there is an optional, “L2” second cache tier (aka Bucket Cache).

Bucket Cache can be configured to keep its data in a file -- i.e. caching data in a local file on disk -- or in memory. File mode usually is able to cache more data but there will be more attendant latency reading from a file vs reading from memory. Bucket Cache can also be configured to use memory outside of the Java heap space (‘offheap’) so users generally configurea a large L2 cache with offheap memory along with a smaller on heap L1 cache.

At Alibaba we use an offheap L2 cache dedicating 12GB to Bucket Cache on each node. We also backported a patch currently in master branch only (to be shipped in the coming hbase-2.0.0) which makes it so the hbase read path runs offheap end-to-end. This combination improved our average throughput significantly. In the below sections, we’ll first talk about why the off-heaping has to be end-to-end, then introduce how we back ported the feature from master branch to our customized 1.1.2, and at last show the performance with end-to-end read-path offheap in an A/B test and on Singles’ Day (11/11/2016).

Necessity of End-to-end Off-heaping

Before offheap, the QPS curve looked like below from our A/B test cluster


We could see that there were dips in average throughput. Concurrently, the average latency would be high during these times.

Checking RegionServer logs, we could see that there were long GC pauses happening. Further analysis indicated that when disk IO is fast enough, as on PCIe-SSD, blocks would be evicted from cache quite frequently even when there was a high cache hit ratio. The eviction rate was so high that the GC speed couldn’t keep up bringing on frequent long GC pauses impacting throughput.

Looking to improve throughput, we tried the existing Bucket Cache in 1.1.2 but found GC was still heavy. In other words, although Bucket Cache in branch-1 (branch for current stable releases) already supports using offheap memory for Bucket Cache, it tends to generate lots of garbages. To understand why end-to-end off-heaping is necessary, let’s see how reads from Bucket cache work in branch-1. But before we do this, lets understand how bucket cache itself has been organized.

The allocated offheap memory is reserved as DirectByteBuffers, each of size 4 MB. So we can say that physically the entire memory area is split into many buffers each of size 4 MB.  Now on top of this physical layout, we impose a logical division. Each logical area is sized to accommodate different sized HFile blocks (Remember reads of HFiles happen as blocks and block by block it will get cached in L1 or L2 cache). Each logical split accommodates different sized HFile blocks from 4 KB to 512 KB (This is the default. Sizes are configurable). In each of the splits, there will be more that one slot into which we can insert a block. When caching, we find an appropriately sized split and then an empty slot within it and here we insert the block. Remember all slots are offheap. For more details on Bucket cache, refer here [4]. Refer to the HBase Reference Guide [5] for how to setup Bucket Cache.

In branch-1, when the read happens out of an L2 cache, we will have to copy the entire block into a temporary onheap area. This is because the HBase read path assumes block data is backed by an onheap byte array.  Also as per the above mentioned physical and logical split, there is a chance that one HFile block data is spread across 2 physical ByteBuffers.

When a random row read happens in our system, even if the data is available in L2 cache, we will end up reading the entire block -- usually ~64k in size -- into a temporary onheap allocation for every row read. This creates lots of garbage (and please note that without the HBASE-14463 fix, this copy from offheap to onheap reduced read performance a lot). Our read workload is so high that this copy produces lots of GCs, so we had to find a way to avoid the need of copying block data from offheap cache into temp onheap arrays.

How was it achieved? - Our Story

The HBASE-11425 Cell/DBB end-to-end on the read-path work in the master branch, avoids the need to copy offheap block data back to onheap when reading. The entire read path is changed to work directly off the offheap Bucket Cache area and serve data directly from here to clients (see the details of this work and performance improvement details here [1], and [2]). So we decided to try this project in our custom HBase version based on 1.1.2 backporting it from the master branch.

The backport cost us about 2 people months, including getting familiar with and analysis of the JIRAs to port, fix UT failures, fixing problems found in functional testing (HBASE-16609/16704), and resolving compatibility issues (HBASE-16626). We have listed the full to-back-port JIRA list here [3] and please refer to it for more details if interested.

About configurations, since for tables of different applications use different block sizes -- from 4KB to 512KB -- the default bucket splits just worked for our use case. We also kept the default values for other configurations after carefully testing and even after tuning while in production. Our configs are listed below:

Alibaba’s Bucket Cache related configuration





















How it works? - A/B Test and Singles’ Day

We tested the performance on our A/B test cluster (with 450 physical machines, and each with 256G memory + 64 core) after back porting and got a better throughput as illustrated below


It can be noted that now the average throughput graph is very much more linear and there are no more dips in throughput across time.

The version with the offheap read path feature was released on October 10th and has been online ever since (more than 4 months). Together with the NettyRpcServer patch (HBASE-15756), we successfully made it through our 2016 Singles’ Day, with peaks at 100K QPS on a single RS.



[1] https://blogs.apache.org/hbase/entry/offheaping_the_read_path_in

[2] http://www.slideshare.net/HBaseCon/offheaping-the-apache-hbase-read-path

[3] https://issues.apache.org/jira/browse/HBASE-17138

[4] https://issues.apache.org/jira/secure/attachment/12562209/Introduction%20of%20Bucket%20Cache.pdf

[5] http://hbase.apache.org/book.html#offheap.blockcache

Thursday December 17, 2015

Offheaping the Read Path in Apache HBase: Part 1 of 2

Detail on the work involved making it so the Apache HBase read path could work against off heap memory (without copy).[Read More]

Saturday June 06, 2015

Saving CPU! Using Native Hadoop Libraries for CRC computation in HBase

by Apekshit Sharma, HBase contributor and Cloudera Engineer

TL;DR Use Hadoop Native Library calculating CRC and save CPU!

Checksums in HBase

Checksum are used to check data integrity. HDFS computes and stores checksums for all files on write. One checksum is written per chunk of data (size can be configured using bytes.per.checksum) in a separate, companion checksum file. When data is read back, the file with corresponding checksums is read back as well and is used to ensure data integrity. However, having two files results in two disk seeks reading any chunk of data. For HBase, the extra seek while reading HFileBlock results in extra latency. To work around the extra seek, HBase inlines checksums. HBase calculates checksums for the data in a HFileBlock and appends them to the end of the block itself on write to HDFS (HDFS then checksums the HBase data+inline checksums). On read, by default HDFS checksum verification is turned off, and HBase itself verifies data integrity.

Can we then get rid of HDFS checksum altogether? Unfortunately no. While HBase can detect corruptions, it can’t fix them, whereas HDFS uses replication and a background process to detect and *fix* data corruptions if and when they happen. Since HDFS checksums generated at write-time are also available, we fall back to them when HBase verification fails for any reason. If the HDFS check fails too, the data is reported as corrupt.

The related hbase configurations are hbase.hstore.checksum.algorithm, hbase.hstore.bytes.per.checksum and hbase.regionserver.checksum.verify. HBase inline checksums are enabled by default.

Calculating checksums is computationally expensive and requires lots of CPU. When HDFS switched over to JNI + C for computing checksums, they witnessed big gains in CPU usage.

This post is about replicating those gains in HBase by using Native Hadoop Libraries (NHL). See HBASE-11927


We switched to use the Hadoop DataChecksum library which under-the-hood uses NHL if available, else we fall back to use the Java CRC implementation. Another alternative considered was the ‘Circe’ library. The following table highlights the differences with NHL and makes the reasoning for our choice clear.

Hadoop Native Library


Native code supports both crc32 and crc32c

Native code supports only crc32c

Adds dependency on hadoop-common which is reliable and actively developed

Adds dependency on external project

Interface supports taking in stream of data, stream of checksums, chunk size as parameters and compute/verify checksums  considering data in chunks.

Only supports calculation of single checksum for all input data.

Both libraries supported use of the special x86 instruction for hardware calculation of CRC32C if available (defined in SSE4.2 instruction set). In the case of NHL, hadoop-2.6.0 or newer version is required for HBase to get the native checksum benefit.

However, based on the data layout of HFileBlock, which has ‘real data’ followed by checksums on the end, only NHL supported the interface we wanted. Implementing the same in Circe would have been significant effort. So we chose to go with NHL.


Since the metric to be evaluated was CPU usage, a simple configuration of two nodes was used. Node1 was configured to be the NameNode, Zookeeper and HBase master. Node2 was configured to be DataNode and RegionServer. All real computational work was done on Node2 while Node1 remained idle most of the time. This isolation of work on a single node made it easier to measure impact on CPU usage.


Ubuntu 14.04.2 LTS (GNU/Linux 3.13.0-24-generic x86_64)

CPU: Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz                                          

Socket(s) : 1

Core(s) per socket : 1

Thread(s) per core : 4

Logical CPU(s) : 4

Number of disks : 1

Memory : 8 GB

HBase Version/Distro *: 1.0.0 / CDH 5.4.0

*Since trunk resolves to hadoop-2.5.1 which does not have HDFS-6865, it was easier to use a CDH distro which already has HDFS-6865 backported.


We chose to study the impact on major compactions, mainly because of the presence of CompactionTool which a) can be used offline, b) allowed us to profile only the relevant workload. PerformanceEvaluation (bin/hbase pe) was used to build a test table which was then copied to local disk for reuse.

% ./bin/hbase pe --nomapred --rows=150000 --table="t1" --valueSize=10240 --presplit=10 sequentialWrite 10

Table size: 14.4G

Number of rows: 1.5M

Number of regions: 10

Row size: 10K

Total store files across regions: 67

For profiling, Lightweight-java-profiler was used and FlameGraph was used to generate graphs.

For benchmarking, the linux ‘time’ command was used. Profiling was disabled during these runs. A script repeatedly executed following in order:

  1. delete hdfs:///hbase

  2. copy t1 from local disk hdfs:///hbase/data/default

  3. run compaction tool on t1 and time it



CPU profiling of HBase not using NHL (figure 1) shows that about 22% cpu is used for generating and validating checksums, whereas, while using NHL (figure 2) it takes only about 3%.

Screen Shot 2015-06-02 at 8.14.07 PM.png

Figure 1: CPU profile - HBase not using NHL (svg)

Screen Shot 2015-06-02 at 8.03.39 PM.png

Figure 2: CPU profile - HBase using NHL (svg)


Benchmarking was done for three different cases: (a) neither HBase nor HDFS use NHL, (b) HDFS uses NHL but not HBase, and (c) both HDFS and HBase use NHL. For each case, we did 5 runs. Observations from table 1:

  1. Within a case, while real time fluctuates across runs, user and sys times remain same. This is expected as compactions are IO bound.

  2. Using NHL only for HDFS reduces CPU usage by about 10% (A vs B)

  3. Further, using NHL for HBase checksums reduces CPU usage by about 23% (B vs C).

All times are in seconds. This stackoverflow answer provides a good explaination of real, user and sys times.

run #

no native for HDFS and HBase (A)

no native for HBase (B)

native (C)


real      469.4

user 110.8

sys 30.5

real    422.9

user    95.4

sys     30.5

real 414.6

user 67.5

sys 30.6


real 384.3

user 111.4

sys 30.4

real 400.5

user 96.7

sys 30.5

real 393.8

user     67.6

sys 30.6


real 400.7

user 111.5

sys 30.6

real 398.6

user 95.8

sys 30.6

real    392.0

user    66.9

sys     30.5


real 396.8

user 111.1

sys 30.3

real 379.5

user 96.0

sys 30.4

real    390.8

user    67.2

sys     30.5


real 389.1

user 111.6

sys 30.3

real 377.4

user 96.5

sys 30.4

real    381.3

user    67.6

sys     30.5

Table 1



Native Hadoop Library leverages the special processor instruction (if available) that does pipelining and other low level optimizations when performing CRC calculations. Using NHL in HBase for heavy checksum computation, allows HBase make use of this facility, saving significant amounts of CPU time checksumming.

Tuesday May 12, 2015

The HBase Request Throttling Feature

Govind Kamat, HBase contributor and Cloudera Performance Engineer

(Edited on 5/14/2015 -- changed ops/sec/RS to ops/sec.)

Running multiple workloads on HBase has always been challenging, especially  when trying to execute real-time workloads while concurrently running analytical jobs. One possible way to address this issue is to throttle analytical MR jobs so that real-time workloads are less affected

A new QoS (quality of service) feature that Apache HBase 1.1 introduces is request-throttling, which controls the rate at which requests get handled by a HBase cluster.   HBase typically treats all requests identically; however, the new throttling feature can be used to specify a maximum rate or bandwidth to override this behavior.  The limit may be applied to a requests originating from a particular user, or alternatively, to requests directed to a given table or a specified namespace.

The objective of this post is to evaluate the effectiveness of this feature and the overhead it might impose on a running HBase workload.  The performance runs carried out showed that throttling works very well, by redirecting resources from a user whose workload is throttled to the workloads of other users, without incurring a significant overhead in the process.

Enabling Request Throttling

It is straightforward to enable the request-throttling feature -- all that is necessary is to set the HBase configuration parameter hbase.quota.enabled to true.  The related parameter hbase.quota.refresh.period  specifies the time interval in milliseconds that that regionserver should re-check for any new restrictions that have been added.

The throttle can then be set from the HBase shell, like so:

hbase> set_quota TYPE => THROTTLE, USER => 'uname', LIMIT => '100req/sec'

hbase> set_quota TYPE => THROTTLE, TABLE => 'tbl', LIMIT => '10M/sec'

hbase> set_quota TYPE => THROTTLE, NAMESPACE => 'ns', LIMIT => 'NONE'

Test Setup

To evaluate how effectively HBase throttling worked, a YCSB workload was imposed on a 10 node cluster.  There were 6 regionservers and 2 master nodes.  YCSB clients were run on the 4 nodes that were not running regionserver processes.  The client processes were initiated by two separate users and the workload issued by one of them was throttled.

More details on the test setup follow.

HBase version: HBase 0.98.6-cdh5.3.0-SNAPSHOT (HBASE-11598 was backported to this version)


CentOS release 6.4 (Final)                                                

CPU sockets: 2                                                            

Physical cores per socket: 6

Total number of logical cores: 24

Number of disks: 12

Memory: 64 GB

Number of RS: 6

Master nodes: 2  (for the Namenode, Zookeeper and HBase master)

Number of client nodes: 4

Number of rows: 1080M

Number of regions: 180

Row size: 1K

Threads per client: 40

Workload: read-only and scan

Key distribution: Zipfian

Run duration: 1 hour


An initial data set was first generated by running YCSB in its data generation mode.  A HBase table was created with the table specifications above and pre-split.  After all the data was inserted, the table was flushed, compacted and saved as a snapshot.  This data set was used to prime the table for each run.  Read-only and scan workloads were used to evaluate performance; this eliminates effects such as memstore flushes and compactions.  One run with a long duration was carried out first to ensure the caches were warmed and that the runs yielded repeatable results.

For the purpose of these tests, the throttle was applied to the workload emanating from one user in a two-user scenario. There were four client machines used to impose identical read-only workloads.  The client processes on two machines were run by the user “jenkins”, while those on the other two were run as a different user.   The throttle was applied to the workload issued by this second user.  There were two sets of runs, one with both users running read workloads and the second where the throttled user ran a scan workload.  Typically, scans are long running and it can be desirable on occasion to de-prioritize them in favor of more real-time read or update workloads.  In this case, the scan was for sets of 100 rows per YCSB operation.

For each run, the following steps were carried out:

  • Any existing YCSB-related table was dropped.

  • The initial data set was cloned from the snapshot.

  • The desired throttle setting was applied.

  • The desired workloads were imposed from the client machines.

  • Throughput and latency data was collected and is presented in the table below.

The throttle was applied at the start of the job (the command used was the first in the list shown in the “Enabling Request Throttling” section above).  The hbase.quota.refresh.period property was set to under a minute so that the throttle took effect by the time test setup was finished.

The throttle option specifically tested here was the one to limit the number of requests (rather than the one to limit bandwidth).

Observations and Results

The throttling feature appears to work quite well.  When applied interactively in the middle of a running workload, it goes into effect immediately after the the quota refresh period and can be observed clearly in the throughput numbers put out by YCSB while the test is progressing.  The table below has performance data from test runs indicating the impact of the throttle.  For each row, the throughput and latency numbers are also shown in separate columns, one set for the “throttled” user (indicated by “T” for throttled) and the other for the “non-throttled” user (represented by “U” for un-throttled).

Read + Read Workload

Throttle (req/sec)

Avg Total Thruput (ops/sec)

Thruput_U (ops/sec)

Thruput_T (ops/sec)

Latency_U (ms)

Latency_T (ms)







2500 rps






2000 rps






1500 rps






1000 rps






500 rps







As can be seen, when the throttle pressure is increased (by reducing the permitted throughput for user “T” from 2500 req/sec to 500 req/sec, as shown in column 1), the total throughput (column 2) stays around the same.  In other words, the cluster resources get redirected to benefit the non-throttled user, with the feature consuming no significant overhead.  One possible outlier is the case where the throttle parameter is at its most restrictive (500 req/sec), where the total throughput is about 10% less than the maximum cluster throughput.

Correspondingly, the latency for the non-throttled user improves while that for the throttled user degrades.  This is shown in the last two columns in the table.

The charts above show that the change in throughput is linear with the amount of throttling, for both the throttled and non-throttled user.  With regard to latency, the change is generally linear, until the throttle becomes very restrictive; in this case, latency for the throttled user degrades substantially.

One point that should be noted is that, while the throttle parameter in req/sec is indeed correlated to the actual restriction in throughput as reported by YCSB (ops/sec) as seen by the trend in column 4, the actual figures differ.  As user “T”’s throughput is restricted down from 2500 to 500 req/sec, the observed throughput goes down from 2500 ops/sec to 1136 ops/sec.  Therefore, users should calibrate the throttle to their workload to determine the appropriate figure to use (either req/sec or MB/sec) in their case.

Read + Scan Workload

Throttle (req/sec)

Thruput_U (ops/sec)

Thruput_T (ops/sec)

Latency_U (ms)

Latency_T (ms)

3000 Krps





1000 Krps





500 Krps





250 Krps





50 Krps






image(2).pngWith the read/scan workload, similar results are observed as in the read/read workload.  As the extent of throttling is increased for the long-running scan workload, the observed throughput decreases and latency increases.  Conversely, the read workload benefits. displaying better throughput and improved latency.  Again, the specific numeric value used to specify the throttle needs to be calibrated to the workload at hand.  Since scans break down into a large number of read requests, the throttle parameter needs to be much higher than in the case with the read workload.  Shown above is a log-linear chart of the impact on throughput of the two workloads when the extent of throttling is adjusted.


HBase request throttling is an effective and useful technique to handle multiple workloads, or even multi-tenant workloads on an HBase cluster.  A cluster administrator can choose to throttle long-running or lower-priority workloads, knowing that regionserver resources will get re-directed to the other workloads, without this feature imposing a significant overhead.  By calibrating the throttle to the cluster and the workload, the desired performance can be achieved on clusters running multiple concurrent workloads.

Friday May 01, 2015

Scan Improvements in HBase 1.1.0

Jonathan Lawlor, Apache HBase Contributor

Over the past few months there have a been a variety of nice changes made to scanners in HBase. This post focuses on two such changes, namely RPC chunking (HBASE-11544) and scanner heartbeat messages (HBASE-13090). Both of these changes address long standing issues in the client-server scan protocol. Specifically, RPC chunking deals with how a server handles the scanning of very large rows and scanner heartbeat messages allow scan operations to progress even when aggressive server-side filtering makes infrequent result returns.


In order to discuss these issues, lets first gain a general understanding of how scans currently work in HBase.

From an application's point of view, a ResultScanner is the client side source of all of the Results that an application asked for. When a client opens a scan, it’s a ResultScanner that is returned and it is against this object that the client invokes next to fetch more data. ResultScanners handle all communication with the RegionServers involved in a scan and the ResultScanner decides which Results to make visible to the application layer. While there are various implementations of the ResultScanner interface, all implementations use basically the same protocol to communicate with the server.

In order to retrieve Results from the server, the ResultScanner will issue ScanRequests via RPC's to the different RegionServers involved in a scan. A client configures a ScanRequest by passing an appropriately set Scan instance when opening the scan setting start/stop rows, caching limits, the maximum result size limit, and the filters to apply.

On the server side, there are three main components involved in constructing the ScanResponse that will be sent back to the client in answer to a ScanRequest:


The RSRpcService is a service that lives on RegionServers that can respond to incoming RPC requests, such as ScanRequests. During a scan, the RSRpcServices is the server side component that is responsible for constructing the ScanResponse that will be sent back to the client. The RSRpcServices continues to scan in order to accumulate Results until the region is exhausted, the table is exhausted, or a scan limit is reached (such as the caching limit or max result size limit). In order to retrieve these Results, the RSRpcServices must talk to a RegionScanner


The RegionScanner is the server side component responsible for scanning the different rows in the region. In order to scan through the rows in the region, the RegionScanner will talk to a one or more different instances of StoreScanners (one per column family in the row). If the row passes all filtering criteria, the RegionScanner will return the Cells for that row to the RSRpcServices so that they can be used to form a Result to include in the ScanResponse.


The StoreScanner is the server side component responsible for scanning through the Cells in each column family.

When the client (i.e. ResultScanner) receives the ScanResponse back from the server, it can then decide whether or not scanning should continue. Since the client and the server communicate back and forth in chunks of Results, the client-side ResultScanner will cache all the Results it receives from the server. This allows the application to perform scans faster than the case where an RPC is required for each Result that the application sees.

RPC Chunking (HBASE-11544)

Why is it necessary?

Currently, the server sends back whole rows to the client (each Result contains all of the cells for that row). The max result size limit is only applied at row boundaries. After each full row is scanned, the size limit will be checked.

The problem with this approach is that it does not provide a granular enough restriction. Consider, for example, the scenario where each row being scanned is 100 MB. This means that 100 MB worth of data will be read between checks for the size limit. So, even in the case that the client has specified that the size limit is 1 MB, 100 MB worth of data will be read and then returned to the client.

This approach for respecting the size limit is problematic. First of all, it means that it is possible for the server to run out of memory and crash in the case that it must scan a large row. At the application level, you simply want to perform a scan without having to worry about crashing the region server.

This scenario is also problematic because it means that we do not have fine grained control over the network resources. The most efficient use of the network is to have the client and server talk back and forth in fixed sized chunks.

Finally, this approach for respecting the size limit is a problem because it can lead to large, erratic allocations server side playing havoc with GC.

Goal of the RPC Chunking solution

The goal of the RPC Chunking solution was to:

- Create a workflow that is more ‘regular’, less likely to cause Region Server distress

- Use the network more efficiently

- Avoid large garbage collections caused by allocating large blocks

Furthermore, we wanted the RPC chunking mechanism to be invisible to the application. The communication between the client and the server should not be a concern of the application. All the application wants is a Result for their scan. How that Result is retrieved is completely up to the protocol used between the client and the server.

Implementation Details

The first step in implementing this proposed RPC chunking method was to move the max result size limit to a place where it could be respected at a more granular level than on row boundaries. Thus, the max result size limit was moved down to the cell level within StoreScanner. This meant that now the max result size limit could be checked at the cell boundaries, and if in excess, the scan can return early.

The next step was to consider what would occur if the size limit was reached in the middle of a row. In this scenario, the Result sent back to the client will be marked as a "partial" Result. By marking the Result with this flag, the server communicates to the client that the Result is not a complete view of the row. Further Scan RPC's must be made in order to retrieve the outstanding parts of the row before it can be presented to the application. This allows the server to send back partial Results to the client and then the client can handle combining these partials into "whole" row Results. This relieves the server of the burden of having to read the entire row at a time.

Finally, these changes were performed in a backwards compatible manner. The client must indicate in its ScanRequest that partial Results are supported. If that flag is not seen server side, the server will not send back partial results. Note that the configuration hbase.client.scanner.max.result.size can be used to change the default chunk size. By default this value is 2 MB in HBase 1.1+.

An option (Scan#setAllowPartialResults) was also added so that an application can ask to see partial results as they are returned rather than wait on the aggregation of complete rows.

A consistent view on the row is maintained even though a row is the result of multiple RPC partials because the running context server-side keeps account of the outstanding mvcc read point and will not include in results Cells written later.

Note that this feature will be available starting in HBase 1.1. All 1.1+ clients will chunk their responses.

Heartbeat messages for scans (HBASE-13090)

What is a heartbeat message?

A heartbeat message is a message used to keep the client-server connection alive. In the context of scans, a heartbeat message allows the server to communicate back to the client that scan progress has been made. On receipt, the client resets the connection timeout. Since the client and the server communicate back and forth in chunks of Results, it is possible that a single progress update will contain ‘enough’ Results to satisfy the requesting application’s purposes. So, beyond simply keeping the client-server connection alive and preventing scan timeouts, heartbeat messages also give the calling application an opportunity to decide whether or not more RPC's are even needed.

Why are heartbeat messages necessary in Scans?

Currently, scans execute server side until the table is exhausted, the region is exhausted, or a limit is reached. As such there is no way of influencing the actual execution time of a Scan RPC. The server continues to fetch Results with no regard for how long that process is taking. This is particularly problematic since each Scan RPC has a defined timeout. Thus, in the case that a particular scan is causing timeouts, the only solution is to increase the timeout so it spans the long-running requests (hbase.client.scanner.timeout.period). You may have encountered such problems if you have seen exceptions such as OutOfOrderScannerNextException.

Goal of heartbeat messages

The goal of the heartbeating message solution was to:

- Incorporate a time limit concept into the client-server Scan protocol

- The time limit must be enforceable at a granular level (between cells)

- Time limit enforcement should be tunable so that checks do not occur too often

Beyond simply meeting these goals, it is also important that, similar to RPC chunking, this mechanism is invisible to the application layer. The application simply wants to perform its scan and get a Result back on a call to ResultScanner.next(). It does not want to worry about whether or not the scan is going to take too long and if it needs to adjust timeouts or scan sizings.

Implementation details

The first step in implementing heartbeat messages was to incorporate a time limit concept into the Scan RPC workflow. This time limit was based on the configured value of the client scanner timeout.

Once a time limit was defined, we had to decide where this time limit should be respected. It was decided that this time limit should be enforced within the StoreScanner. The reason for enforcing this limit inside store scanner was that it allowed the time limit to be enforced at the cell boundaries. It is important that the time limit be checked at a fine grained location because, in the case of restrictive filtering or time ranges, it is possible that large portions of time will be spent filtering out and skipping cells. If we wait to check the time limit at the row boundaries, it is possible when the row is wide that we may timeout before a single check occurs.

A new configuration was introduced (hbase.cells.scanned.per.heartbeat.check) to control how often these time limit checks occur. The default value of this configuration is 10,000 meaning that we check the time limit every 10,000 cells.

Finally, if this time limit is reached, the ScanResponse sent back to the client is marked as a heartbeat message. It is important that the ScanResponse be marked in this way because it communicates to the client the reason the message was sent back from the server. Since a time limit may be reached before any Results are retrieved, it is possible that the heartbeat message's list of Results will be empty. We do not want the client to interpret this as meaning that the region was exhausted.

Note that similar to RPC chunking, this feature was implemented in a fully backwards compatible manner. In other words, heartbeat messages will only be sent back to the client in the case that the client has specified that it can recognize when a ScanResponse is a heartbeat message. Heartbeat messages will be available starting in HBase 1.1. All HBase 1.1+ clients will heartbeat.

Cleaning up the Scan API (HBASE-13441)

Following these recent improvements to the client-server Scan protocol, there is now an effort to try and cleanup the Scan API. There are some API’s that are getting stale, and don’t make much sense anymore especially in light of the above changes. There are also some API’s that could use some love with regards to documentation.

For example the caching and max result size API’s deal with how data is transferred between the client and the server. Both of these API’s now seem misplaced in the Scan API. These are details that should likely be controlled by the RegionServer rather than the application. It seems much more appropriate to give the RegionServer control over these parameters so that it can tune them based on the current state of the RPC pipeline and server loadings.

HBASE-13441 Scan API Improvements is the open umbrella issue covering ideas for Scan API improvements. If you have some time, check it out. Let us know if you have some ideas for improvements that you would like to see.

Other recent Scanner improvements

It’s important to note that these are not the only improvements that have been made to Scanners recently. Many other impressive improvements have come through deserving of their own blog post. For those that are interested, I have listed two:

  • HBASE-13109: Make better SEEK vs SKIP decisions during scanning

  • HBASE-13262: Fixed a data loss issue in Scans


A spate of Scan improvements should make for a smoother Scan experience in HBase 1.1.

Thursday April 16, 2015

Come to HBaseCon2015!

Come learn about:hbasecon.com

  • A highly-trafficked HBase cluster with an uptime of sixteen months
  • An HBase deploy that spans three datacenters doing master-master replication between thousands of HBase nodes in each
  • Some nuggets on how Bigtable does it (and HBase could too)
  • How HBase is being used to record patient telemetry 24/7 on behalf of the Michael J. Fox Foundation to enable breakthroughs in Parkinson Disease research
  • How Pinterest and Microsoft are doing it in the cloud, how FINRA and Bloomberg are doing it out east, and how Rocketfuel, Yahoo! and Flipboard, etc., are representing the west

HBaseCon 2015 is the fourth annual edition of the community event for Apache HBase contributors, developers, admins, and users of all skill levels. The event is hosted and organized by Cloudera, with the Program Committee including leaders from across the HBase community. The conference will be one full day comprising general sessions in the morning, breakout sessions in the morning and afternoon, and networking opportunities throughout the day.

The agenda hase been posted here, hbasecon2015 agenda. You can register here.


The HBaseCon Program Committee 

Thursday March 05, 2015

HBase ZK-less Region Assignment

ZK-less region assignment allows us to achieve greater scale as well as do faster startups and assignment. It is simpler and has less code, improves the speed at which assignments run so we can do faster rolling restarts. This feature will be on by default in HBase 2.0.0.[Read More]



Hot Blogs (today's hits)

Tag Cloud