Apache HBase

Thursday Oct 08, 2015

Imgur Notifications: From MySQL to HBase

This is the third in a series of posts on "Why We Use Apache HBase", in which we let HBase users and developers borrow our blog so they can showcase their successful HBase use cases, talk about why they use HBase, and discuss what worked and what didn't.

Carlos J. Espinoza is an engineer at Imgur.

An earlier version of the discussion in this post was published here on the Imgur Engineering blog.

- Andrew Purtell

Imgur Notifications: From MySQL to HBase

Imgur is a heavy user of MySQL. It has been a part of our stack since our beginning. However, with our scale it has become increasingly difficult to throw more features at it. For our latest feature upgrade, we re-implemented our notifications system and migrated it over from MySQL to HBase. In this post, I will talk about how HBase solved our use case and the features we exploited.

To add some context, previously we supported two types of notifications: messages and comment replies, all stored in MySQL. For this upgrade, we decided to support several additional notification types. We also introduced rules around when a notification should be delivered. This change in spec made it challenging to continue with our previous model, so we started from scratch.

Early in the design phase, we envisioned a world where MySQL remained the primary store. We put together some schemas, some queries and, for the better, stopped ourselves from making a huge mistake. We had to create a couple columns for every type of notification. Creating a new notification type afterwards would mean a schema change. Our select queries would require us to join against other application tables. We designed an architecture that could work, but we would sacrifice decoupling, simplicity, scalability, and extensibility.

Some of our notifications require that they only be delivered to the user once a milestone is crossed. For instance, if a post reaches 100 points, Imgur will notify the user at the 100 point mark. We don’t want to bother users with 99 notifications in between. So, at scale, how do we know that the post has reached 100 points?

A notification could have multiple milestones. We only want to deliver once the milestone is hit.

Considering MySQL is our primary store for posts, one way to do it is to increment the points counter in the posts table, then execute another query fetching the row and check if the points reached the threshold of 100. This approach has a few issues. Increment and then fetch is a race condition. Two clients could think they reached 100 points, delivering two notifications for the same event. Another problem is the extra query. For every increment, we must now fetch the volatile column, adding more stress to MySQL.

Though it is technically possible to do this in MySQL using transactions and consistent read locks, lock contention would make it possibly very expensive with votes as it’s one the most frequent operations on our site. Seeing as we already use HBase in other parts of our stack, we switched gears and we built our system on top of it. Here is how we use it to power notifications in real time and at scale.

Sparse Columns

At Imgur, each notification is composed of one or more events. The following image of our notifications dropdown illustrates how our notifications are modeled.

As illustrated, each notification is composed of one or more events. A notification maps to a row in HBase, and each event maps to multiple columns, one of which is a counter. This model makes our columns very sparse as different types of notifications have different types of events. In MySQL, this would mean a lot of NULL values. We are not tied to a strict table schema, so we can easily add other notification types using the same model.

Atomic Increments

HBase has an atomic increment operation that returns the new value in the same call. It’s a simple feature, but our implementation depends on this operation. This allows our notification delivery logic on the client to be lightweight: increment and only deliver the notification if and only if a milestone is crossed. No extra calls. In some cases, this means we now keep two counters. For instance, the points count in the MySQL table for posts, and the points count in HBase for notifications. Technically, they could get out of sync, but this is an edge case we optimize for.

Another benefit of doing increments in HBase is that it allows us to decouple the notifications logic from the application logic. All we need to know to deliver a notification is whether its counter has crossed a pre-defined threshold. For instance, we no longer need to know how to fetch a post and get its point count. HBase has allowed us to solve our problem in a more generic fashion.

Fast Table Scans

We also maintain a secondary order table. It stores notification references ordered by when they were last delivered using reversed timestamps. When users open their notifications dropdown, we fetch their most recent notifications by performing a table scan limited by their user ID. We can also support scanning for different types of notifications by using scan filters.

Other Benefits

With HBase we gain many other benefits, like linear scalability and replication. We can forward data to another cluster and run batch jobs on that data. We also get strong consistency. It is extremely important for notifications to be delivered exactly once when a milestone is crossed. We can make that guarantee knowing that HBase has strong row level consistency. It’s likely that we’ll use versioning in the future, but even without use of it, HBase is a great choice for our use case.

Imgur notifications is a young project, and we’ll continue to make improvements to it. As it matures and we learn more from it, I hope to share what we’ve built with the open source community.

Thursday Sep 03, 2015

Medium Data and Universal Data Systems

This is the second in a series of posts on "Why We Use Apache HBase", in which we let HBase users and developers borrow our blog so they can showcase their successful HBase use cases, talk about why they use HBase, and discuss what worked and what didn't.

Matthew Hunt is an architect at Bloomberg who works on Portfolio Analytics and Bloomberg infrastructure.

An earlier version of the discussion in this post was published here on the High Scalability blog.

- Andrew Purtell


Why Bloomberg uses HBase

The Bloomberg terminal is the dominant news and analytics platform in finance, found on nearly every trader's desk around the world. We are actively working to consolidate around fewer, faster, and simpler systems and see HBase as part of a broader whole, along with computation frameworks such as Spark, resource schedulers like Mesos, virtualization and containerization in OpenStack and Docker, and storage with Ceph and HDFS. This is a substantial undertaking: Bloomberg has tens of thousands of databases and applications built over 30 years, and a team of 4000 engineers.

While HBase was designed for a specific purpose originally, we believe it will grow to become a general purpose database - in fact, a universal one.

What is a universal database?

The great Michael Stonebraker wrote a few years ago that the era of "one size fits all" databases was over. Given that relational, time series, and hierarchical systems have long co-existed makes it questionable whether there ever was a one size fits all period. In fact, the era of one size fits all is, if anything, dawning. Stonebraker's central point - purpose built databases are often significantly better at their purpose then a general alternative - is true. But in the broader landscape, it doesn't matter. The universal database is one that is good enough to subsume nearly all tasks in practice. The distinction between data warehouse, offline batch reporting, and OLTP is already blurry and will soon vanish for most use cases. Slightly more formally, the universal database is one that is fast and low latency for small and medium problems, scales elastically to larger sizes transparently, has clear well understood access semantics as in MPP SQL systems, and connects to standard computation engines such as Spark. Many systems are converging towards this already, and we think HBase is one of the future contenders in this space.

In a strong vote of confidence, HBase has seen increasing adoption and endorsement from major players such as Apple and Google, and Spark integration from Huawei, Cloudera, and Hortonworks, continuing to build and foster the large community that success requires. At the end of the day, the technology projects that succeed are the ones that attract a critical mass. Longevity and future direction are essential considerations in large scale design decisions.

But the future can be cloudy. We also use HBase because, with minor modifications, it is a good fit for practical applications we had initially, once it received some buffing for what I call the medium data problem.

Why don't we use HBase?

No honest engineering writeup is complete without a perusal of alternatives. Many fine and more mature commercial systems exist, along with good open alternatives such as Cassandra. While we think HBase has critical mass and a broad and growing community, other systems have strengths too. HBase can be cumbersome to set up and configure, lacks a standard high level access language incorporated by default, and still has work to do for efficient interfaces to Spark. But the pace of development is swift, and we believe this is just scratching the surface of what is to come as it matures. I look forward to a subsequent blog posting that detailing these future improvements and our next generation of systems.

To the community

All real projects stem from the combined efforts of many. I would like to thank the incredible community of people who have participated in moving things forward, from the strong leadership of Michael Stack and Andrew Purtell to the fine people at Cloudera and Hortonworks and Databricks. Strong managerial support inside of Bloomberg has impressively cleared the path. Carter Page at Google and Andrew Morse at Apple offered early encouragement. And in particular, Sudarshan Kadambi has been a true partner in crime for shared dreams, vision and execution. Thank you all.

What follows is a deep dive into our original HBase projects and the thinking that lay behind them.

MBH, 8/2015

Search engines and large web vendors serve hundreds of millions of requests for straightforward web lookups. Bloomberg serves hundreds of thousands of sophisticated financial users where each request has tight tolerances and may be computationally expensive. Can the systems originally designed for the former work for the latter?

"Big Data" systems continue to attract substantial funding, attention, and excitement. As with many new technologies, they are neither a panacea, nor even a good fit for many common uses. Yet they also hold great promise, and Bloomberg has been contributing behind the scenes to strengthen their use for what we term medium data – high terabytes on modest clusters - and real world scenarios as we build towards simpler universal systems. The easiest way to understand these changes is to start with a bit of background on the origin of both these technologies and our company.

[This was taken from a talk called "Hadoop at Bloomberg", but it’s important to understand that we see this as part of a broader landscape that encompasses tools such as Spark, Mesos, and Docker which are working to provide frameworks for making many commodity machines work together.]

Bloomberg Products and Challenges

Bloomberg is the world’s premier provider of information and analytics to the financial world. Walk into nearly any trading floor in the world, and you will find "Bloomberg Terminals" in use. Simply put, data is our business, and our 4,000 engineers have built many systems over the last 30 years to address the stringent performance and reliability needs of the financial industry. We are always looking at ways to improve our core infrastructure, including consolidating many purpose built disparate systems to reduce complexity.

Our main product is the Bloomberg Professional service, which many refer to simply as “The Terminal”, shown below.


While the name of the Bloomberg Terminal is carried over from when we built hardware, it is analogous to a browser in the cloud: there is an "address bar" where a function name can be typed in, display areas where the results are shown, and the logic and heavy lifting is done server side in our data centers. Of course we don’t refer to them as browsers and clouds because we’ve been doing it since long before those words existed, but since the browser is ubiquitous, this should give everyone a good idea of the basics. There are also mobile versions available on the iPhone, iPad, and Android. There are perhaps 100,000 functions and tens of thousands of databases. Below is a sample of a few of them.


Chart of a single security
Storm track showing oil wells and shipping and companies likely to be affected
News and sentiment analysis of twitter feed
Portfolio and risk analytics

Bloomberg’s origins were in providing electronic analytics for bonds and other securities. "Security" is just a fancy word for instruments you can trade like stocks or bonds. It transformed a world that was paper driven and information poor into one in which prices were available at the touch of a button. As nearly every school, road, airport, and other piece of modern infrastructure is financed by bond issuance, this was an important and useful innovation that reduced the cost of financing. Prior to that time, the cost of a bond was more or less what your sales guy said it was.

In the early 1980s when the company began, there were few off the shelf computer technologies adequate to the task. Commercial databases were weak, and personal business computers nonexistent. Even the IBM PC itself was only announced two months before Bloomberg LP was founded. The name of our main product, the Bloomberg Terminal, reflects our history of needing to hardware at our outset, along with developing the software and database systems in house. And so out of necessity we did.

As the company expanded, its infrastructure grew around powerful Unix machines and fast reliable services designed to furnish almost any kind of analysis there is about a company or security. The company grew and prospered, becoming one of the largest private companies in the United States, and entering news and other lines of business. Owning our own systems has conferred substantial benefits aside from simply making our existence possible. Independent of the flexibility, not paying vendors such as Oracle billions of dollars for our tens of thousands of databases is good business.

As time has passed, the sophistication of our usage has grown. Interactive functions are frequently geared towards complicated risk and other multi-security analytics, which require far more data and computation at a time per user and request. This puts a strain on systems architected when interactive requests were for charting of the price of a small number of instruments. Many groups within Bloomberg have addressed by copying systems and caching large quantities of data. There are also different data systems for different kinds of usage – one for streaming pricing data, another for historical data, and so on. Complexity kills. The more pieces and moving parts there are, the harder to build and maintain. Can we consolidate around a small number of open platforms? We believe we can.

Big Data Origins

Modern era big data technologies are a solution to an economics problem faced by Google and others a decade ago. Storing, indexing, and responding to searches against all web pages required tremendous amounts of disk space and computer power. Very powerful machines, fast SAN storage, and data center space were prohibitively expensive. The solution was to pack cheap commodity machines as tightly together as possible with local disks.

This addressed the space and hardware cost problem, but introduced a software challenge. Writing distributed code is hard, and with many machines comes many failures, and so a framework was also required to take care of such problems automatically for the system to be viable. Google pioneered one of the first viable commodity machine platforms to do this, which is part of why they went on to dominance rather than bankruptcy. Fortunately for the rest of us, they published how they did it, and while there are more sophisticated means available today, it’s a great place to start an understanding of concepts.

The essential elements of this system are a place to put files, and a way to perform computations. The landmark Google papers introduced GFS for storage and MapReduce for computation. MapReduce had high reliability and throughput, owning records for benchmarks like sorting a petabyte of data at the time. However, its response time is far too long for interactive tasks, and so BigTable, a low latency database, was also introduced. Google published papers detailing these systems, but did not release their software. This in a nutshell explains Hadoop, which began as the open source implementation of these Google systems, supported by Yahoo and other large web vendors facing the same challenges at the time.

Understanding the why and how these systems were created also offers insight into some of their weaknesses. Their goal was to make crunching hundreds of petabytes of data circa 2004 on thousands of machines both cost effective and reliable, especially for batch computations. A system that ropes together thousands of machines for an overnight computation is no good if it gets most of the way there but never finishes.

And therein lies the problem. Most people don’t have petabytes of data to crunch at a time. The number of companies who have faced this has been restricted to a handful of internet giants. Similarly, these solutions are architected around the limitations of commodity machines of the day, which had single core processors, spinning rust disks, and far less memory. A decade later, high core counts, SSDs or better, and large RAM footprints are common. And Hadoop in particular is also written in Java, which creates its own challenges for low latency performance.

Why Hadoop? It’s not just about Hadoop

Choosing the right system is more than a strictly technical one. Is it well supported? Will it be around for the future? Does it address enough problems to be worthwhile? X86 chips didn’t begin as the best server CPU. Linux was a toylike operating system at its outset. What has made them dominant is sustained effort, talent, and money from an array of companies and people. We believe the same will happen with some variant of these tools, immature though they may be. Intel has presumably invested for the same reason – if systems of the future are built around a distributed framework, they want to be in on the game.

Big data systems are widely used and money and talent have been pouring in. Recently, Cloudera raised $950 million dollars, of which $750 million came from Intel. Hortonworks has also had some impressive fundraising. More than a billion dollars into just two companies goes a long way. Skilled talent such as Michal Kornacker, who architected the F1 system at Google, Jeff Hammerbacher, the founder of the data science team at Facebook, and Mike Olson, an old database hand, and members of the Yahoo data sciences team such as Arun Murthy help the ecosystem.

It’s not just about Hadoop for us. We group Spark, Mesos, Docker, Hadoop, and other tools into the broader group of "tools for making many machines work together seamlessly". Hadoop has some useful properties that make it a good starting point for discussion in addition to being useful in its own right.

We’d like to consolidate our systems around an open commodity framework for storage and computation that has broad support. Hadoop and its ilk provide potential but are immature and have weaknesses when it comes to taking advantage of modern hardware, consistently fast performance, high resiliency, and in the number of people who know it well.

So that’s the background. Now on to a specific use case and how we’ve tackled it with HBase.

HBase Overview


HBase is short for the "Hadoop database", and it’s the implementation of Google’s BigTable that was designed for interactive requests that MapReduce wasn’t suited for. Many of us think of databases as things with many tables and indexes that support SQL and relational semantics. HBase has a simpler model designed to spread across many machines.

The easiest analogy is with printed encyclopedias. Each volume of the encyclopedia lists the range of words that can be found inside, such as those beginning with the letters A to C. In HBase, the equivalent of a volume is called a region. A region server is a process that hosts some number of regions, much as a bookcase can hold several books. There is typically one region server process per physical computer. The diagram above shows three machines, each with one region server process, each of which has two regions.

A table in HBase can have many columns and rows but only one primary key. The table is always physically sorted by that key, just as the encyclopedia is, and many of its characteristics follow from this. For example, writes are naturally parallel. A-C words go to one machine, and C-F to another. Elastic scalability follows too: drop another machine in, and you can move some of the regions to it. And if a region gets two big, it splits into two new ones, and so very large datasets can be accommodated. These are all positive properties.

But there are downsides, including the loss of things taken for granted in a relational database. HBase is less efficient per machine than a single server database. Updates of individual rows are atomic, but there aren’t all-or-nothing complex transactions that can be rolled back. There are no secondary indexes, and lookups not based on the primary key are much slower. Joins between multiple tables are cumbersome and inefficient. These systems are simply much less mature then relational databases, and understood by far fewer people. While HBase is growing in sophistication and maturity, if your problem fits in a relational database on a single machine, then HBase is almost certainly a poor choice.

When considering broad architectural decisions, it is important to consider alternate solutions. There are many strong alternatives beyond known relational systems. Products such as Greenplum, Vertica, SAP Hana, Sybase IQ and many others are robust, commercially supported, and considerably more efficient per node, with distributed MPP SQL or pure columnar stores. Cassandra offers a similar model to HBase and is completely open, so why not use it?

Our answer is that we have big picture concerns. We prefer open systems rather than being locked in to a single vendor. Hadoop and other systems are part of an expanding ecosystem of other services, such as distributed machine learning and batch processing, that we also want to benefit from. While the alternatives are excellent data stores, they only offer a smaller piece of the puzzle. And finally, we have actual use cases that happen to fit reasonably well and are able to address many of the downsides.

So on to one of our specific use cases.

Time series data and the Portfolio analytics challenge



Much of financial data is time series data. While the systems to manage this data have often been highly sophisticated and tuned, the data itself is very simple, and the problem has been one of speed and volume. Time series data for a security is generally just a security, a field, a date, and a value. The security could be a stock such as IBM, the field the price, and the date and value self-explanatory.

The reason there is a speed and volume issue is that there are thousands of possible fields – volume traded, for example – and tens of millions of securities. The universe of bonds alone is far larger then stocks, which we call equities. These systems have also historically been separated into intraday and historical data – intraday captures all the ticks during the day, while historical systems have end of day values that go back much farther in time. Unifying them was impractical in the past owing to different performance tradeoffs: intraday must manage a very high volume of incoming writes, while end of day has batch updates on market close but more data and searches in total.

The Bloomberg end of day historical system is called PriceHistory. This is a slight misnomer since price is just one of thousands of fields, but price is obviously an important one. This system was designed for the high volume of requests for single securities and quite impressive for its day. The response time for a year’s data for a field on a security is 5ms, and it receives billions of hits a day, around half a million a second at peak. It is a critical system for Bloomberg; failures there have the potential to disrupt the capital markets.

The challenge comes from applications like Portfolio Analytics that request far more data at a time. A bond portfolio can easily have tens of thousands of bonds in it, and a common calculation like attribution requires 40 fields per day per instrument. Performing attribution for a bond portfolio over a few years of data requires tens of millions of datapoints instead of a handful. When requesting tens of millions of datapoints, even a high cache hit rate of 99.9% will still have thousands of cache misses, and if the underlying system is backed by magnetic media, this means thousands of disk seeks. Multiply across a large user base making many requests and it’s clear to see why this created a problem for the existing price history system.

The solution a few years back was to move to all memory and solid state machines with highly compacted blobs split into separate databases. This approach worked and was 2-3 orders of magnitude faster – a giant leap – but less than ideal. Compacted blobs split across many databases is cumbersome.

Enter HBase.

HBase Suitability and Issues

Time series data is both very simple and lends itself to embarrassing parallelism. When fetching tens of millions of datapoints for a portfolio, the work can be arbitrarily divided among tens of millions of machines if need be. It doesn’t get any better than that in terms of potential parallelism. With a single main table trivially distributed, HBase should be a good fit – in theory.

Of course, in theory, theory and practice agree, but in practice, they don’t always. The data sets may work, but what are the obstacles? Performance, efficiency, maturity, and resiliency. What follows is how we went about tackling them.

Failure and recovery


The first and most critical problem was failure behavior. When an individual machine fails, data is not lost – it is replicated on several machines via HDFS – but there is still a serious drawback. At any given time, all reads and writes to any given region is managed by one and only one region server. If a region server dies, the failure will be detected and a replacement automatically brought back up, but until it does, no reads and writes are possible to the regions it managed.

The HBase community has made great strides in speeding up recovery time after a failure is detected, but this still leaves a crucial hole. The crux of the problem is that failure detection in a distributed system is ultimately accomplished by timeout – some period of time that elapses where a heartbeat or other mechanism fails. If this timeout is made too short, there will be false positives, which are also problematic.

In Bloomberg’s case, our SLAs are on the order of milliseconds. The shortest that timeouts can be made are on the order of many seconds. This is a fundamental mismatch even if recovery after failure detection becomes infinitely fast.

Discussions with Hadoop vendors resulted in several possible solutions, most of which involved running multiple copies of each database. Since Bloomberg’s operational policies already dictate that systems can survive the loss of entire datacenters and then several additional machines, this would have meant many running copies of each database, which we deemed too operationally complex. Ultimately, we were unhappy with the alternatives, and so we worked to change them.

If failover detection and recovery can never be fast enough, then the alternative is there must be standby nodes that can be queried in the event a region server goes down. The insight that made this practical in the short term is that the data already exists in multiple places, albeit with a possible slight delay in propagation. This can be used for standby region servers that at worst lag slightly behind but receive events in the same order, known as timeline consistency. This works for end of day systems like PriceHistory that are updated in batch.

Timeline consistent standby region servers were added to HBase as part of HBASE-10070.

Performance I: Distribution and parallelism


With failures addressed, though, there are still issues of raw performance and consistency. Here we’ll detail three experiments and outcomes on performance. Tests were run on a modest cluster of 11 machines with SSDs, dual Xeon E5’s, and 128 GB of RAM each. In the top right hand corner of the slide, two numbers are shown. The first is the average response time at the outset of the experiment, and the second the new time after improvements were made. The average request is for around 200,000 records that are random key-value lookups.

Part of the premise of HBase is that large Portfolio requests can be divided up and tackled among existing machines in a cluster. This is part of why standby region servers to address failures are mandated – if a request hits every server, and one doesn’t respond for a minute or more, it’s bad news. The failure of any one server would stall every such request.

So the next thing to tackle was parallelism and distribution. The first issue was in dividing work evenly: given a large request, did each machine in the cluster get the same size chunk of work? Ideally, requesting 1000 rows from ten machines would get 100 rows from each. With a naive schema, the answer is no: if the row key is security + something else, then everything from IBM will be in one region, and IBM is hit more often than other securities. This phenomena is known as hotspotting.

The solution is to take advantage of the properties of HBase as a fix for the problem. Data in HBase is always kept physically sorted by the row key. If the row key itself is hashed, and that hash used as a prefix, then the rows will be distributed differently. Imagine the row key is security+year+month+field. Take an MD5 hash of that, and use that as a prefix, and any one record for IBM is equally likely to be on any server. This resulted in almost perfect distribution of work load. We don’t use the MD5 algorithm exactly, because it’s too slow and large, but the concept is the same.

With perfect distribution, we have perfect parallelism: With 11 servers each one was doing the same amount of work not just on average, but for every request. And if parallelism is good, then more parallelism is better, right? Yes - up to a point.

As mentioned earlier a general Hadoop weakness is that it was designed for older and less powerful machines. One of the ways this manifests itself is through low resource consumption on the server. At maximum throughput, none of the hardware was breathing hard – low CPU and disk and network, no matter how many configuration options were changed.

One way to address this is to run more region servers per machine, provided the machine has the capacity to do so. This will increase the number of total region servers and hence the level of fan out for parallelism, and if more parallelism is better, then response times will drop.

The chart from the slide shows the results. With 11 machines and 1 region server each – 11 in total – average response time was 260 ms. Going to 3 per machine, for 33 region servers in total, dropped the average time to 185 ms. 5 region servers per machine further improved to 160 ms, but at 10 per machine times got worse. Why?

The first answer that will probably spring to mind is that the capacity of the machine was exceeded; with 10 region server processes on a box, each with many threads, perhaps there weren’t enough cores. As it turns out, this is not the cause, and the actual answer is more subtle and interesting and resulted in another leap in performance that is applicable to many systems of this type.

But before we get into that, let’s talk about in place computation.

Performance II: In place computation


One of the tenets of big data is that the computation should be moved to the data and not the other way around. The idea here is straightforward. Imagine you need to know how many rows are in a large database table. You could fetch every row to your local client, and then iterate through and count – or you could issue a “select count(*) from ... “ if it’s a relational database. The latter is much more efficient.

It’s also much more easily said than done. Many problems can’t be solved this way at all, and many frameworks fall short even where the problem is known to be amenable. In the 2013 report on massive data analysis the National Research Council proposed seven computational giants for parallelism in an attempt to categorize the capabilities of a distributed system, a relevant and interesting read, and Hadoop can’t handle all of them. Fortunately for us, we have a simple use case that does make sense that let us again cut response times in half.

The situation is that there are multiple "sources" for data. Barclays and Merrill Lynch and many other companies provide data for bond pricing. The same bond for the same dates is often in more than one source, but the values don’t agree. Customers differ on the relative precedence of each, and so with each request include a waterfall which is just an ordering of what sources should be used in what order. Try to get all the data from the first source, and then for anything that’s missing, try the next source , and so on. On average, a request must go through 5 such sources until all the data is found.

In the world of separate databases, different sources are in different physical locations, and so this means trying the first database, pulling all the records back, seeing what’s missing, constructing a new request, and issuing that.

For HBase, we can again take advantage of physical sorting, support for millions of columns, and in-place computation through coprocessors, which are the HBase equivalent of classic stored procedures. By making the source and field columns with the security and date still the row key, they will all be collocated on the same region server. The coprocessor logic then becomes "find the first source for this row that matches for this security on this date, and return that." Instead of five round trips, there will always be one and only one. A small coprocessor of a few lines worked as expected, and average response time dropped to 85ms.

Now, on to the mystery from before.

Performance III: Synchronized disruption


The results from increasing the number of region servers left a mystery: Why did response times improve at first, and then worsen? The answer involves both Java and an important property of any high fan out distributed system.

A request that goes to more than one machine "fans out". The more machines hit, the higher the degree of fan out. In a request that fans out, how long does it take to get a response? The answer is that it takes as long as the slowest responder. With ten region servers per machines and 11 machines, each request fanned out to 110 processes. If 109 respond in a microsecond and 1 in 170 ms, the response time is 170 ms.

The immediate culprit turned out to be Java’s garbage collection, which freezes a machine until it completes. The higher the degree of fan-out, the greater the likelihood that any one region server process was garbage collecting at the time. At 110 regions, the odds were close to 1.

The solution here is devilishly simple. As long as the penalty must be paid if any one Java process garbage collects, why not just make them do it all at the same time? That way, more time will elapse where requests are issued and no garbage collection takes place. We wrote the simplest, dumbest test for this: a coprocessor with one line – System.gc() – and a timer that called it every 30 seconds. This dropped our average response time from 85 ms to 60 ms on the first go.

Many system developers and C programmer eyeballs will collectively roll as this is a Java specific problem. While Java GC is an issue, this has wider applicability. It is a general case of what Google’s Jeff Dean calls Synchronized Disruption. Requests in any parallel system that fan out suffer from the laggard, and so things that slow down individual machines should be done at the same time. For more details, see the excellent writeup "Achieving Rapid Response Times in Large Online Services" covering this and other properties of high fan out systems. That said, Java is the basis now of quite a number of high fan out computing systems, including Hadoop, HBase, Spark, and Solr. Synchronizing garbage collection holds potential for them as well.

Conclusions And Next Steps

As these experiments have shown, performance from HBase can be improved. The changes described here alone cut average runtime response times to a quarter of what they were, with plenty of room left for further improvements. Faster machines are available that would halve response times again. (Incidentally, there is a myth that the speed and performance of an individual machine is irrelevant since it’s better to scale horizontally with more machines. Individual machine speed still matters immensely: the length of a garbage collection cycle, for example, is linearly proportional to the single threaded CPU performance, and even if your system doesn’t use Java, the laggard responder may well be affected by machine speed.) HBase produces far more garbage than it ought; this too can be fixed. And reissuing requests to an alternate server after a delay – a technique also mentioned in Dean’s paper – can also lower not just average response times, but reduce or eliminate outliers too, which is equally important.

We’ll do some of these, but we are nearing the point of diminishing returns. Below a threshold of some tens of milliseconds, the difference is imperceptible for a person.

On the flip side, there are applications for which this would never make sense. High frequency trading, where the goal is to shave microseconds away, is a poor fit.

More importantly, our goal is more than performance alone. While this setup with HBase provides a thousandfold improvement on write performance and 3x faster reads, it is a connector to a broader distributed platform of tools and ideas.

Wednesday Jun 10, 2015

Scalable Distributed Transactional Queues on Apache HBase

This is the first in a series of posts on "Why We Use Apache HBase", in which we let HBase users and developers borrow our blog so they can showcase their successful HBase use cases, talk about why they use HBase, and discuss what worked and what didn't.

Today's entry is a guest post by Terence Yim, a Software Engineer at Cask, responsible for designing and building realtime processing systems on Hadoop/HBase, originally published here on the Cask engineering blog.

- Andrew Purtell

A real time stream processing framework usually involves two fundamental constructs: processors and queues. A processor reads events from a queue, executes user code to process them, and optionally writing events to another queue for additional downstream processors to consume. Queues are provided and managed by the framework. Queues transfer data and act as a buffer between processors, so that the processors can operate and scale independently. For example, a web server access log analytics application can look like this:


One key differentiator among various frameworks is the queue semantics, which commonly varies along these lines:

  • Delivery Guarantee: At least once, at most once, exactly-once.
  • Fault Tolerance: Failures are transparent to user and automatic recovery.
  • Durability: Data can survive failures and restarts.
  • Scalability: Characteristics and limitations when adding more producers/consumers.
  • Performance: Throughput and latency for queue operations.

In the open-source Cask Data Application Platform (CDAP), we wanted to provide a real-time stream processing framework that is dynamically scalable, strongly consistent and with an exactly-once delivery guarantee. With such strong guarantees, developers are free to perform any form of data manipulation without worrying about inconsistency, potential reprocessing or failure. It helps developers build their big data application even if they do not have strong distributed systems background. Moreover, it is possible to relax these strong guarantees to trade-off for higher performance if needed; it is always easier than doing it the other way around.

Queue Scalability

The basic operations that can be performed on a queue are enqueue and dequeue. Producers write data to the head of the queue (enqueue) and consumers read data from the tail of the queue (dequeue). We say a queue is scalable when you enqueue faster as a whole by adding more producers and dequeue faster as a whole by adding more consumers. Ideally, the scaling is linear, meaning doubling the amount of producers/consumers will double the rate of enqueue/dequeue and is only bounded by the size of the cluster. In order to support linear scalability for producers, the queue needs to be backed by a storage system that scales linearly with the number of concurrent writers. For consumers to be linearly scalable, the queue can be partitioned such that each consumer only processes a subset of queue data.

Another aspect of queue scalability is that it should scale horizontally. This means the upper bound of the queue performance can be increased by adding more nodes to the cluster. It is important because it makes sure that the queue can keep working regardless of cluster size and can keep up with the growth in data volume.

Partitioned HBase Queue

We chose Apache HBase as the storage layer for the queue. It is designed and optimized for strong row-level consistency and horizontal scalability. It provides very good concurrent write performance and its support of ordered scans fits well for a partitioned consumer. We use the HBase Coprocessors for efficient scan filtering and queue cleanup. In order to have the exactly-once semantics on the queue, we use Tephra’s transaction support for HBase.

Producers and consumers operate independently. Each producer enqueues by performing batch HBase Puts and each consumer dequeues by performing HBase Scans. There is no link between the number of producers and consumers and they can scale separately.

The queue has a notion of consumer groups. A consumer group is a collection of consumers partitioned by the same key such that each event published to the queue is consumed by exactly one consumer within the group. The use of consumer groups allows you to partition the same queue with different keys and to scale independently based on the operational characteristics of the data. Using the access log analytics example above, the producer and consumer groups might look like this:


There are two producers running for the Log Parser and they are writing to the queue concurrently. On the consuming side, there are two consumer groups. The Unique User Counter group has two consumers, using UserID as the partitioning key and the Page View Counter group contains three consumers, using PageID as the partitioning key.

Queue rowkey format

Since an event emitted by a producer can be consumed by one or more consumer groups, we write the event to one or more rows in an HBase table, with one row designated for each consumer group. The event payload and metadata are stored in separate columns, while the row key follows this format:


The two interesting parts of the row key are the Partition ID and the Entry ID. The Partition ID determines the row key prefix for a given consumer. This allows consumer to read only the data it needs to process using a prefix scan on the table during dequeue. The Partition ID consists of two parts: a Consumer Group ID and an Consumer ID. The producer computes one partition ID per consumer group and writes to those rows on enqueue.

The Entry ID in the row key contains the transaction information. It consists of the producer transaction write pointer issued by Tephra and a monotonic increasing counter. The counter is generated locally by the producer and is needed to make row key unique for the event since a producer can enqueue more than one event within the same transaction.

On dequeue, the consumer will use the transaction writer pointer to determine if that queue entry has been committed and hence can be consumed. The row key is always unique because of the inclusion of a transaction write pointer and counter. This makes producers operate independently and never have write conflicts.

In order to generate the Partition ID, a producer needs to know the size and the partitioning key of each consumer group. The consumer groups information is recorded transactionally when the application starts as well as when there are any changes in group size.

Changing producers and consumers

It is straightforward to increase or decrease producers since each producer operates independently. Adding or removing producer processes will do the job. However, when the size of consumer group needs to change, coordination is needed to update the consumer group information correctly. The steps can be summarized by this diagram:


Pausing and resuming are fast operations as they are coordinated using Apache ZooKeeper and executed in parallel. For example, with the web access log analytics application we mentioned above, changing the consumer groups information may look something like this:


With this queue design, the enqueue and dequeue performance is on par with batch HBase Puts and HBase Scans respectively, with some overhead for talking to the Tephra server. That overhead can be greatly reduced by batching multiple events in the same transaction.

Finally, to prevent “hotspotting“, we pre-split the HBase table based on the cluster size and apply salting on the row key to better distribute writes which otherwise would have been sequential due to monotonically increasing transaction writepointer.

Performance Numbers

We’ve tested the performance on a small ten-node HBase cluster and the result is impressive. Using a 1K bytes payload with batch size of 500 events, we achieved a throughput of 100K events per second produced and consumed, running with three producers and ten consumers. We also observed the throughput increases linearly when we add more producers and consumers: for example, it increased to 200K events per second when we doubled the number of producers and consumers.

With the help of HBase and a combination of best practices, we successfully built a linearly scalable, distributed transactional queue system and used it to provide a real time stream processing framework in CDAP: dynamically scalable, strongly consistent, and with an exactly-once delivery guarantee.

Saturday Jun 06, 2015

Saving CPU! Using Native Hadoop Libraries for CRC computation in HBase

by Apekshit Sharma, HBase contributor and Cloudera Engineer

TL;DR Use Hadoop Native Library calculating CRC and save CPU!

Checksums in HBase

Checksum are used to check data integrity. HDFS computes and stores checksums for all files on write. One checksum is written per chunk of data (size can be configured using bytes.per.checksum) in a separate, companion checksum file. When data is read back, the file with corresponding checksums is read back as well and is used to ensure data integrity. However, having two files results in two disk seeks reading any chunk of data. For HBase, the extra seek while reading HFileBlock results in extra latency. To work around the extra seek, HBase inlines checksums. HBase calculates checksums for the data in a HFileBlock and appends them to the end of the block itself on write to HDFS (HDFS then checksums the HBase data+inline checksums). On read, by default HDFS checksum verification is turned off, and HBase itself verifies data integrity.

Can we then get rid of HDFS checksum altogether? Unfortunately no. While HBase can detect corruptions, it can’t fix them, whereas HDFS uses replication and a background process to detect and *fix* data corruptions if and when they happen. Since HDFS checksums generated at write-time are also available, we fall back to them when HBase verification fails for any reason. If the HDFS check fails too, the data is reported as corrupt.

The related hbase configurations are hbase.hstore.checksum.algorithm, hbase.hstore.bytes.per.checksum and hbase.regionserver.checksum.verify. HBase inline checksums are enabled by default.

Calculating checksums is computationally expensive and requires lots of CPU. When HDFS switched over to JNI + C for computing checksums, they witnessed big gains in CPU usage.

This post is about replicating those gains in HBase by using Native Hadoop Libraries (NHL). See HBASE-11927


We switched to use the Hadoop DataChecksum library which under-the-hood uses NHL if available, else we fall back to use the Java CRC implementation. Another alternative considered was the ‘Circe’ library. The following table highlights the differences with NHL and makes the reasoning for our choice clear.

Hadoop Native Library


Native code supports both crc32 and crc32c

Native code supports only crc32c

Adds dependency on hadoop-common which is reliable and actively developed

Adds dependency on external project

Interface supports taking in stream of data, stream of checksums, chunk size as parameters and compute/verify checksums  considering data in chunks.

Only supports calculation of single checksum for all input data.

Both libraries supported use of the special x86 instruction for hardware calculation of CRC32C if available (defined in SSE4.2 instruction set). In the case of NHL, hadoop-2.6.0 or newer version is required for HBase to get the native checksum benefit.

However, based on the data layout of HFileBlock, which has ‘real data’ followed by checksums on the end, only NHL supported the interface we wanted. Implementing the same in Circe would have been significant effort. So we chose to go with NHL.


Since the metric to be evaluated was CPU usage, a simple configuration of two nodes was used. Node1 was configured to be the NameNode, Zookeeper and HBase master. Node2 was configured to be DataNode and RegionServer. All real computational work was done on Node2 while Node1 remained idle most of the time. This isolation of work on a single node made it easier to measure impact on CPU usage.


Ubuntu 14.04.2 LTS (GNU/Linux 3.13.0-24-generic x86_64)

CPU: Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz                                          

Socket(s) : 1

Core(s) per socket : 1

Thread(s) per core : 4

Logical CPU(s) : 4

Number of disks : 1

Memory : 8 GB

HBase Version/Distro *: 1.0.0 / CDH 5.4.0

*Since trunk resolves to hadoop-2.5.1 which does not have HDFS-6865, it was easier to use a CDH distro which already has HDFS-6865 backported.


We chose to study the impact on major compactions, mainly because of the presence of CompactionTool which a) can be used offline, b) allowed us to profile only the relevant workload. PerformanceEvaluation (bin/hbase pe) was used to build a test table which was then copied to local disk for reuse.

% ./bin/hbase pe --nomapred --rows=150000 --table="t1" --valueSize=10240 --presplit=10 sequentialWrite 10

Table size: 14.4G

Number of rows: 1.5M

Number of regions: 10

Row size: 10K

Total store files across regions: 67

For profiling, Lightweight-java-profiler was used and FlameGraph was used to generate graphs.

For benchmarking, the linux ‘time’ command was used. Profiling was disabled during these runs. A script repeatedly executed following in order:

  1. delete hdfs:///hbase

  2. copy t1 from local disk hdfs:///hbase/data/default

  3. run compaction tool on t1 and time it



CPU profiling of HBase not using NHL (figure 1) shows that about 22% cpu is used for generating and validating checksums, whereas, while using NHL (figure 2) it takes only about 3%.

Screen Shot 2015-06-02 at 8.14.07 PM.png

Figure 1: CPU profile - HBase not using NHL (svg)

Screen Shot 2015-06-02 at 8.03.39 PM.png

Figure 2: CPU profile - HBase using NHL (svg)


Benchmarking was done for three different cases: (a) neither HBase nor HDFS use NHL, (b) HDFS uses NHL but not HBase, and (c) both HDFS and HBase use NHL. For each case, we did 5 runs. Observations from table 1:

  1. Within a case, while real time fluctuates across runs, user and sys times remain same. This is expected as compactions are IO bound.

  2. Using NHL only for HDFS reduces CPU usage by about 10% (A vs B)

  3. Further, using NHL for HBase checksums reduces CPU usage by about 23% (B vs C).

All times are in seconds. This stackoverflow answer provides a good explaination of real, user and sys times.

run #

no native for HDFS and HBase (A)

no native for HBase (B)

native (C)


real      469.4

user 110.8

sys 30.5

real    422.9

user    95.4

sys     30.5

real 414.6

user 67.5

sys 30.6


real 384.3

user 111.4

sys 30.4

real 400.5

user 96.7

sys 30.5

real 393.8

user     67.6

sys 30.6


real 400.7

user 111.5

sys 30.6

real 398.6

user 95.8

sys 30.6

real    392.0

user    66.9

sys     30.5


real 396.8

user 111.1

sys 30.3

real 379.5

user 96.0

sys 30.4

real    390.8

user    67.2

sys     30.5


real 389.1

user 111.6

sys 30.3

real 377.4

user 96.5

sys 30.4

real    381.3

user    67.6

sys     30.5

Table 1



Native Hadoop Library leverages the special processor instruction (if available) that does pipelining and other low level optimizations when performing CRC calculations. Using NHL in HBase for heavy checksum computation, allows HBase make use of this facility, saving significant amounts of CPU time checksumming.

Tuesday May 12, 2015

The HBase Request Throttling Feature

Govind Kamat, HBase contributor and Cloudera Performance Engineer

(Edited on 5/14/2015 -- changed ops/sec/RS to ops/sec.)

Running multiple workloads on HBase has always been challenging, especially  when trying to execute real-time workloads while concurrently running analytical jobs. One possible way to address this issue is to throttle analytical MR jobs so that real-time workloads are less affected

A new QoS (quality of service) feature that Apache HBase 1.1 introduces is request-throttling, which controls the rate at which requests get handled by a HBase cluster.   HBase typically treats all requests identically; however, the new throttling feature can be used to specify a maximum rate or bandwidth to override this behavior.  The limit may be applied to a requests originating from a particular user, or alternatively, to requests directed to a given table or a specified namespace.

The objective of this post is to evaluate the effectiveness of this feature and the overhead it might impose on a running HBase workload.  The performance runs carried out showed that throttling works very well, by redirecting resources from a user whose workload is throttled to the workloads of other users, without incurring a significant overhead in the process.

Enabling Request Throttling

It is straightforward to enable the request-throttling feature -- all that is necessary is to set the HBase configuration parameter hbase.quota.enabled to true.  The related parameter hbase.quota.refresh.period  specifies the time interval in milliseconds that that regionserver should re-check for any new restrictions that have been added.

The throttle can then be set from the HBase shell, like so:

hbase> set_quota TYPE => THROTTLE, USER => 'uname', LIMIT => '100req/sec'

hbase> set_quota TYPE => THROTTLE, TABLE => 'tbl', LIMIT => '10M/sec'

hbase> set_quota TYPE => THROTTLE, NAMESPACE => 'ns', LIMIT => 'NONE'

Test Setup

To evaluate how effectively HBase throttling worked, a YCSB workload was imposed on a 10 node cluster.  There were 6 regionservers and 2 master nodes.  YCSB clients were run on the 4 nodes that were not running regionserver processes.  The client processes were initiated by two separate users and the workload issued by one of them was throttled.

More details on the test setup follow.

HBase version: HBase 0.98.6-cdh5.3.0-SNAPSHOT (HBASE-11598 was backported to this version)


CentOS release 6.4 (Final)                                                

CPU sockets: 2                                                            

Physical cores per socket: 6

Total number of logical cores: 24

Number of disks: 12

Memory: 64 GB

Number of RS: 6

Master nodes: 2  (for the Namenode, Zookeeper and HBase master)

Number of client nodes: 4

Number of rows: 1080M

Number of regions: 180

Row size: 1K

Threads per client: 40

Workload: read-only and scan

Key distribution: Zipfian

Run duration: 1 hour


An initial data set was first generated by running YCSB in its data generation mode.  A HBase table was created with the table specifications above and pre-split.  After all the data was inserted, the table was flushed, compacted and saved as a snapshot.  This data set was used to prime the table for each run.  Read-only and scan workloads were used to evaluate performance; this eliminates effects such as memstore flushes and compactions.  One run with a long duration was carried out first to ensure the caches were warmed and that the runs yielded repeatable results.

For the purpose of these tests, the throttle was applied to the workload emanating from one user in a two-user scenario. There were four client machines used to impose identical read-only workloads.  The client processes on two machines were run by the user “jenkins”, while those on the other two were run as a different user.   The throttle was applied to the workload issued by this second user.  There were two sets of runs, one with both users running read workloads and the second where the throttled user ran a scan workload.  Typically, scans are long running and it can be desirable on occasion to de-prioritize them in favor of more real-time read or update workloads.  In this case, the scan was for sets of 100 rows per YCSB operation.

For each run, the following steps were carried out:

  • Any existing YCSB-related table was dropped.

  • The initial data set was cloned from the snapshot.

  • The desired throttle setting was applied.

  • The desired workloads were imposed from the client machines.

  • Throughput and latency data was collected and is presented in the table below.

The throttle was applied at the start of the job (the command used was the first in the list shown in the “Enabling Request Throttling” section above).  The hbase.quota.refresh.period property was set to under a minute so that the throttle took effect by the time test setup was finished.

The throttle option specifically tested here was the one to limit the number of requests (rather than the one to limit bandwidth).

Observations and Results

The throttling feature appears to work quite well.  When applied interactively in the middle of a running workload, it goes into effect immediately after the the quota refresh period and can be observed clearly in the throughput numbers put out by YCSB while the test is progressing.  The table below has performance data from test runs indicating the impact of the throttle.  For each row, the throughput and latency numbers are also shown in separate columns, one set for the “throttled” user (indicated by “T” for throttled) and the other for the “non-throttled” user (represented by “U” for un-throttled).

Read + Read Workload

Throttle (req/sec)

Avg Total Thruput (ops/sec)

Thruput_U (ops/sec)

Thruput_T (ops/sec)

Latency_U (ms)

Latency_T (ms)







2500 rps






2000 rps






1500 rps






1000 rps






500 rps







As can be seen, when the throttle pressure is increased (by reducing the permitted throughput for user “T” from 2500 req/sec to 500 req/sec, as shown in column 1), the total throughput (column 2) stays around the same.  In other words, the cluster resources get redirected to benefit the non-throttled user, with the feature consuming no significant overhead.  One possible outlier is the case where the throttle parameter is at its most restrictive (500 req/sec), where the total throughput is about 10% less than the maximum cluster throughput.

Correspondingly, the latency for the non-throttled user improves while that for the throttled user degrades.  This is shown in the last two columns in the table.

The charts above show that the change in throughput is linear with the amount of throttling, for both the throttled and non-throttled user.  With regard to latency, the change is generally linear, until the throttle becomes very restrictive; in this case, latency for the throttled user degrades substantially.

One point that should be noted is that, while the throttle parameter in req/sec is indeed correlated to the actual restriction in throughput as reported by YCSB (ops/sec) as seen by the trend in column 4, the actual figures differ.  As user “T”’s throughput is restricted down from 2500 to 500 req/sec, the observed throughput goes down from 2500 ops/sec to 1136 ops/sec.  Therefore, users should calibrate the throttle to their workload to determine the appropriate figure to use (either req/sec or MB/sec) in their case.

Read + Scan Workload

Throttle (req/sec)

Thruput_U (ops/sec)

Thruput_T (ops/sec)

Latency_U (ms)

Latency_T (ms)

3000 Krps





1000 Krps





500 Krps





250 Krps





50 Krps






image(2).pngWith the read/scan workload, similar results are observed as in the read/read workload.  As the extent of throttling is increased for the long-running scan workload, the observed throughput decreases and latency increases.  Conversely, the read workload benefits. displaying better throughput and improved latency.  Again, the specific numeric value used to specify the throttle needs to be calibrated to the workload at hand.  Since scans break down into a large number of read requests, the throttle parameter needs to be much higher than in the case with the read workload.  Shown above is a log-linear chart of the impact on throughput of the two workloads when the extent of throttling is adjusted.


HBase request throttling is an effective and useful technique to handle multiple workloads, or even multi-tenant workloads on an HBase cluster.  A cluster administrator can choose to throttle long-running or lower-priority workloads, knowing that regionserver resources will get re-directed to the other workloads, without this feature imposing a significant overhead.  By calibrating the throttle to the cluster and the workload, the desired performance can be achieved on clusters running multiple concurrent workloads.

Friday May 01, 2015

Scan Improvements in HBase 1.1.0

Jonathan Lawlor, Apache HBase Contributor

Over the past few months there have a been a variety of nice changes made to scanners in HBase. This post focuses on two such changes, namely RPC chunking (HBASE-11544) and scanner heartbeat messages (HBASE-13090). Both of these changes address long standing issues in the client-server scan protocol. Specifically, RPC chunking deals with how a server handles the scanning of very large rows and scanner heartbeat messages allow scan operations to progress even when aggressive server-side filtering makes infrequent result returns.


In order to discuss these issues, lets first gain a general understanding of how scans currently work in HBase.

From an application's point of view, a ResultScanner is the client side source of all of the Results that an application asked for. When a client opens a scan, it’s a ResultScanner that is returned and it is against this object that the client invokes next to fetch more data. ResultScanners handle all communication with the RegionServers involved in a scan and the ResultScanner decides which Results to make visible to the application layer. While there are various implementations of the ResultScanner interface, all implementations use basically the same protocol to communicate with the server.

In order to retrieve Results from the server, the ResultScanner will issue ScanRequests via RPC's to the different RegionServers involved in a scan. A client configures a ScanRequest by passing an appropriately set Scan instance when opening the scan setting start/stop rows, caching limits, the maximum result size limit, and the filters to apply.

On the server side, there are three main components involved in constructing the ScanResponse that will be sent back to the client in answer to a ScanRequest:


The RSRpcService is a service that lives on RegionServers that can respond to incoming RPC requests, such as ScanRequests. During a scan, the RSRpcServices is the server side component that is responsible for constructing the ScanResponse that will be sent back to the client. The RSRpcServices continues to scan in order to accumulate Results until the region is exhausted, the table is exhausted, or a scan limit is reached (such as the caching limit or max result size limit). In order to retrieve these Results, the RSRpcServices must talk to a RegionScanner


The RegionScanner is the server side component responsible for scanning the different rows in the region. In order to scan through the rows in the region, the RegionScanner will talk to a one or more different instances of StoreScanners (one per column family in the row). If the row passes all filtering criteria, the RegionScanner will return the Cells for that row to the RSRpcServices so that they can be used to form a Result to include in the ScanResponse.


The StoreScanner is the server side component responsible for scanning through the Cells in each column family.

When the client (i.e. ResultScanner) receives the ScanResponse back from the server, it can then decide whether or not scanning should continue. Since the client and the server communicate back and forth in chunks of Results, the client-side ResultScanner will cache all the Results it receives from the server. This allows the application to perform scans faster than the case where an RPC is required for each Result that the application sees.

RPC Chunking (HBASE-11544)

Why is it necessary?

Currently, the server sends back whole rows to the client (each Result contains all of the cells for that row). The max result size limit is only applied at row boundaries. After each full row is scanned, the size limit will be checked.

The problem with this approach is that it does not provide a granular enough restriction. Consider, for example, the scenario where each row being scanned is 100 MB. This means that 100 MB worth of data will be read between checks for the size limit. So, even in the case that the client has specified that the size limit is 1 MB, 100 MB worth of data will be read and then returned to the client.

This approach for respecting the size limit is problematic. First of all, it means that it is possible for the server to run out of memory and crash in the case that it must scan a large row. At the application level, you simply want to perform a scan without having to worry about crashing the region server.

This scenario is also problematic because it means that we do not have fine grained control over the network resources. The most efficient use of the network is to have the client and server talk back and forth in fixed sized chunks.

Finally, this approach for respecting the size limit is a problem because it can lead to large, erratic allocations server side playing havoc with GC.

Goal of the RPC Chunking solution

The goal of the RPC Chunking solution was to:

- Create a workflow that is more ‘regular’, less likely to cause Region Server distress

- Use the network more efficiently

- Avoid large garbage collections caused by allocating large blocks

Furthermore, we wanted the RPC chunking mechanism to be invisible to the application. The communication between the client and the server should not be a concern of the application. All the application wants is a Result for their scan. How that Result is retrieved is completely up to the protocol used between the client and the server.

Implementation Details

The first step in implementing this proposed RPC chunking method was to move the max result size limit to a place where it could be respected at a more granular level than on row boundaries. Thus, the max result size limit was moved down to the cell level within StoreScanner. This meant that now the max result size limit could be checked at the cell boundaries, and if in excess, the scan can return early.

The next step was to consider what would occur if the size limit was reached in the middle of a row. In this scenario, the Result sent back to the client will be marked as a "partial" Result. By marking the Result with this flag, the server communicates to the client that the Result is not a complete view of the row. Further Scan RPC's must be made in order to retrieve the outstanding parts of the row before it can be presented to the application. This allows the server to send back partial Results to the client and then the client can handle combining these partials into "whole" row Results. This relieves the server of the burden of having to read the entire row at a time.

Finally, these changes were performed in a backwards compatible manner. The client must indicate in its ScanRequest that partial Results are supported. If that flag is not seen server side, the server will not send back partial results. Note that the configuration hbase.client.scanner.max.result.size can be used to change the default chunk size. By default this value is 2 MB in HBase 1.1+.

An option (Scan#setAllowPartialResults) was also added so that an application can ask to see partial results as they are returned rather than wait on the aggregation of complete rows.

A consistent view on the row is maintained even though a row is the result of multiple RPC partials because the running context server-side keeps account of the outstanding mvcc read point and will not include in results Cells written later.

Note that this feature will be available starting in HBase 1.1. All 1.1+ clients will chunk their responses.

Heartbeat messages for scans (HBASE-13090)

What is a heartbeat message?

A heartbeat message is a message used to keep the client-server connection alive. In the context of scans, a heartbeat message allows the server to communicate back to the client that scan progress has been made. On receipt, the client resets the connection timeout. Since the client and the server communicate back and forth in chunks of Results, it is possible that a single progress update will contain ‘enough’ Results to satisfy the requesting application’s purposes. So, beyond simply keeping the client-server connection alive and preventing scan timeouts, heartbeat messages also give the calling application an opportunity to decide whether or not more RPC's are even needed.

Why are heartbeat messages necessary in Scans?

Currently, scans execute server side until the table is exhausted, the region is exhausted, or a limit is reached. As such there is no way of influencing the actual execution time of a Scan RPC. The server continues to fetch Results with no regard for how long that process is taking. This is particularly problematic since each Scan RPC has a defined timeout. Thus, in the case that a particular scan is causing timeouts, the only solution is to increase the timeout so it spans the long-running requests (hbase.client.scanner.timeout.period). You may have encountered such problems if you have seen exceptions such as OutOfOrderScannerNextException.

Goal of heartbeat messages

The goal of the heartbeating message solution was to:

- Incorporate a time limit concept into the client-server Scan protocol

- The time limit must be enforceable at a granular level (between cells)

- Time limit enforcement should be tunable so that checks do not occur too often

Beyond simply meeting these goals, it is also important that, similar to RPC chunking, this mechanism is invisible to the application layer. The application simply wants to perform its scan and get a Result back on a call to ResultScanner.next(). It does not want to worry about whether or not the scan is going to take too long and if it needs to adjust timeouts or scan sizings.

Implementation details

The first step in implementing heartbeat messages was to incorporate a time limit concept into the Scan RPC workflow. This time limit was based on the configured value of the client scanner timeout.

Once a time limit was defined, we had to decide where this time limit should be respected. It was decided that this time limit should be enforced within the StoreScanner. The reason for enforcing this limit inside store scanner was that it allowed the time limit to be enforced at the cell boundaries. It is important that the time limit be checked at a fine grained location because, in the case of restrictive filtering or time ranges, it is possible that large portions of time will be spent filtering out and skipping cells. If we wait to check the time limit at the row boundaries, it is possible when the row is wide that we may timeout before a single check occurs.

A new configuration was introduced (hbase.cells.scanned.per.heartbeat.check) to control how often these time limit checks occur. The default value of this configuration is 10,000 meaning that we check the time limit every 10,000 cells.

Finally, if this time limit is reached, the ScanResponse sent back to the client is marked as a heartbeat message. It is important that the ScanResponse be marked in this way because it communicates to the client the reason the message was sent back from the server. Since a time limit may be reached before any Results are retrieved, it is possible that the heartbeat message's list of Results will be empty. We do not want the client to interpret this as meaning that the region was exhausted.

Note that similar to RPC chunking, this feature was implemented in a fully backwards compatible manner. In other words, heartbeat messages will only be sent back to the client in the case that the client has specified that it can recognize when a ScanResponse is a heartbeat message. Heartbeat messages will be available starting in HBase 1.1. All HBase 1.1+ clients will heartbeat.

Cleaning up the Scan API (HBASE-13441)

Following these recent improvements to the client-server Scan protocol, there is now an effort to try and cleanup the Scan API. There are some API’s that are getting stale, and don’t make much sense anymore especially in light of the above changes. There are also some API’s that could use some love with regards to documentation.

For example the caching and max result size API’s deal with how data is transferred between the client and the server. Both of these API’s now seem misplaced in the Scan API. These are details that should likely be controlled by the RegionServer rather than the application. It seems much more appropriate to give the RegionServer control over these parameters so that it can tune them based on the current state of the RPC pipeline and server loadings.

HBASE-13441 Scan API Improvements is the open umbrella issue covering ideas for Scan API improvements. If you have some time, check it out. Let us know if you have some ideas for improvements that you would like to see.

Other recent Scanner improvements

It’s important to note that these are not the only improvements that have been made to Scanners recently. Many other impressive improvements have come through deserving of their own blog post. For those that are interested, I have listed two:

  • HBASE-13109: Make better SEEK vs SKIP decisions during scanning

  • HBASE-13262: Fixed a data loss issue in Scans


A spate of Scan improvements should make for a smoother Scan experience in HBase 1.1.

Thursday Apr 16, 2015

Come to HBaseCon2015!

Come learn about:hbasecon.com

  • A highly-trafficked HBase cluster with an uptime of sixteen months
  • An HBase deploy that spans three datacenters doing master-master replication between thousands of HBase nodes in each
  • Some nuggets on how Bigtable does it (and HBase could too)
  • How HBase is being used to record patient telemetry 24/7 on behalf of the Michael J. Fox Foundation to enable breakthroughs in Parkinson Disease research
  • How Pinterest and Microsoft are doing it in the cloud, how FINRA and Bloomberg are doing it out east, and how Rocketfuel, Yahoo! and Flipboard, etc., are representing the west

HBaseCon 2015 is the fourth annual edition of the community event for Apache HBase contributors, developers, admins, and users of all skill levels. The event is hosted and organized by Cloudera, with the Program Committee including leaders from across the HBase community. The conference will be one full day comprising general sessions in the morning, breakout sessions in the morning and afternoon, and networking opportunities throughout the day.

The agenda hase been posted here, hbasecon2015 agenda. You can register here.


The HBaseCon Program Committee 

Thursday Mar 05, 2015

HBase ZK-less Region Assignment

ZK-less region assignment allows us to achieve greater scale as well as do faster startups and assignment. It is simpler and has less code, improves the speed at which assignments run so we can do faster rolling restarts. This feature will be on by default in HBase 2.0.0.[Read More]

Tuesday Feb 24, 2015

Start of a new era: Apache HBase 1.0

Past, present and future state of the community

Author: Enis Söztutar, Apache HBase PMC member and HBase-1.0.0 release manager

The Apache HBase community has released Apache HBase 1.0.0. Seven years in the making, it marks a major milestone in the Apache HBase project’s development, offers some exciting features and new API’s without sacrificing stability, and is both on-wire and on-disk compatible with HBase 0.98.x.

In this blog, we look at the past, present and future of Apache HBase project. 

Versions, versions, versions 

Before enumerating feature details of this release let’s take a journey into the past and how release numbers emerged. HBase started its life as a contrib project in a subdirectory of Apache Hadoop, circa 2007, and released with Hadoop. Three years later, HBase became a standalone top-level Apache project. Because HBase depends on HDFS, the community ensured that HBase major versions were identical and compatible with Hadoop’s major version numbers. For example, HBase 0.19.x worked with Hadoop 0.19.x, and so on.

However, the HBase community wanted to ensure that an HBase version can work with multiple Hadoop versions—not only with its matching major release numbers Thus, a new naming scheme was invented where the releases would start at the close-to-1.0 major version of 0.90, as show above in the timeline. We also took on an even-odd release number convention where releases with odd version numbers were “developer previews” and even-numbered releases were “stable” and ready for production. The stable release series included 0.90, 0.92, 0.94, 0.96 and 0.98 (See HBase Versioning for an overview).

After 0.98, we named the trunk version 0.99-SNAPSHOT, but we officially ran out of numbers! Levity aside, last year, the HBase community agreed that the project had matured and stabilized enough such that a 1.0.0 release was due. After three releases in the 0.99.x series of “developer previews” and six Apache HBase 1.0.0 release candidates, HBase 1.0.0 has now shipped! See the above diagram, courtesy of Lars George, for a timeline of releases. It shows each release line together with the support lifecycle, and any previous developer preview releases if any (0.99->1.0.0 for example).

HBase-1.0.0, start of a new era

The 1.0.0 release has three goals:

1) to lay a stable foundation for future 1.x releases;

2) to stabilize running HBase cluster and its clients; and

3) make versioning and compatibility dimensions explicit 

Including previous 0.99.x releases, 1.0.0 contains over 1500 jiras resolved. Some of the major changes are: 

API reorganization and changes

HBase’s client level API has evolved over the years. To simplify the semantics and to support and make it extensible and easier to use in the future, we revisited the API before 1.0. To that end, 1.0.0 introduces new APIs, and deprecates some of the commonly-used client side APIs (HTableInterface, HTable and HBaseAdmin).

We advise you to update your application to use the new style of APIs, since deprecated APIs will be removed in the future 2.x series of releases. For further guidance, please visit these two decks: http://www.slideshare.net/xefyr/apache-hbase-10-release and http://s.apache.org/hbase-1.0-api.

All Client side APIs are marked with the InterfaceAudience.Public class, indicating if a class/method is an official "client API" for HBase (See “11.1.1. HBase API Surface” in the HBase Refguide for more details on the Audience annotations). Going forward, all 1.x releases are planned to be API compatible for classes annotated as client public.

Read availability using timeline consistent region replicas

As part of phase 1, this release contains an experimental "Read availability using timeline consistent region replicas" feature. That is, a region can be hosted in multiple region servers in read-only mode. One of the replicas for the region will be primary, accepting writes, and other replicas will share the same data files. Read requests can be done against any replica for the region with backup RPCs for high availability with timeline consistency guarantees. See JIRA HBASE-10070 for more details.

Online config change and other forward ports from 0.89-fb branch

The 0.89-fb branch in Apache HBase was where Facebook used to post their changes. HBASE-12147 JIRA forward ported the patches which enabled reloading a subset of the server configuration without having to restart the region servers.

Apart from the above, there are hundreds of improvements, performance (improved WAL pipeline, using disruptor, multi-WAL, more off-heap, etc) and bug fixes and other goodies that are too long to list here. Check out the official release notes for a detailed overview. The release notes and the book also cover binary, source and wire compatibility requirements, supported Hadoop and Java versions, upgrading from 0.94, 0.96 and 0.98 versions and other important details.

HBase-1.0.0 is also the start of using “semantic versioning” for HBase releases. In short, future HBase releases will have MAJOR.MINOR.PATCH version with the explicit semantics for compatibility. The HBase book contains all the dimensions for compatibility and what can be expected between different versions.

What’s next

We have marked HBase-1.0.0 as the next stable version of HBase, meaning that all new users should start using this version. However, as a database, we understand that switching to a newer version might take some time. We will continue to maintain and make 0.98.x releases until the user community is ready for its end of life. 1.0.x releases as well as 1.1.0, 1.2.0, etc line of releases are expected to be released from their corresponding branches, while 2.0.0 and other major releases will follow when their time arrives.

Read replicas phase 2, per column family flush, procedure v2, SSD for WAL or column family data, etc are some of the upcoming features in the pipeline. 


Finally, the HBase 1.0.0 release has come a long way, with contributions from a very large group of awesome people and hard work from committers and contributors. We would like to extend our thanks to our users and all who have contributed to HBase over the years.

Keep HBase’ing!

Friday Aug 08, 2014

Comparing BlockCache Deploys

Comparing BlockCache Deploys

St.Ack on August 7th, 2014

A report comparing BlockCache deploys in Apache HBase for issue HBASE-11323 BucketCache all the time! Attempts to roughly equate five different deploys and compare how well they do under four different loading types that vary from no cache-misses through to missing cache most of the time. Makes recommendation on when to use which deploy.


In Nick Dimiduk's BlockCache 101 blog post, he details the different options available in HBase. We test what remains after purging SlabCache and the caching policy implementation DoubleBlockCache which have been deprecated in HBase 0.98 and removed in trunk because of Nick's findings and that of others.

Nick varies the JVM Heap+BlockCache sizes AND dataset size.  This report keeps JVM Heap+BlockCache size constant and varies the dataset size only. Nick looks at the 99th percentile only.  This article looks at that, as well as GC, throughput and loadings. Cell sizes in the following tests also vary between 1 byte and 256k in size.


If the dataset fits completely in cache, the default configuration, which uses the onheap LruBlockCache, performs best.  GC is half that of the next most performant deploy type, CombinedBlockCache:Offheap with at least 20% more throughput.

Otherwise, if your cache is experiencing churn running a steady stream of evictions, move your block cache offheap using CombinedBlockCache in the offheap mode. See the BlockCache section in the HBase Reference Guide for how to enable this deploy. Offheap mode requires only one third to one half of the GC of LruBlockCache when evictions are happening. There is also less variance as the cache misses climb when offheap is deployed. CombinedBlockCache:file mode has better GC profile but less throughput than CombinedBlockCache:Offheap. Latency is slightly higher on CBC:Offheap -- heap objects to cache have to be serialized in and out of the offheap memory -- than with the default LruBlockCache but the 95/99th percentiles are slightly better.  CBC:Offheap uses a bit more CPU than LruBlockCache. CombinedBlockCache:Offheap is limited only by the amount of RAM available.  It is not subject to GC.


The graphs to follow show results from five different deploys each run through four different loadings.

Five Deploy Variants

  1. LruBlockCache The default BlockCache in place when you start up an unconfigured HBase install. With LruBlockCache, all blocks are loaded into the java heap. See BlockCache 101 and LruBlockCache for detail on the caching policy of LruBlockCache.
  2. CombinedBlockCache:Offheap CombinedBlockCache deploys two tiers; an L1 which is an LruBlockCache instance to hold META blocks only (i.e. INDEX and BLOOM blocks), and an L2 tier which is an instance of BucketCache. In this offheap mode deploy, the BucketCache uses DirectByteBuffers to host a BlockCache outside of the JVM heap to host cached DATA blocks.
  3. CombinedBlockCache:Onheap In this onheap ('heap' mode), the L2 cache is hosted inside the JVM heap, and appears to the JVM to be a single large allocation. Internally it is managed by an instance of BucketCache The L1 cache is an instance of LruBlockCache.
  4. CombinedBlockCache:file In this mode, an L2 BucketCache instance puts DATA blocks into a file (hence 'file' mode) on a mounted tmpfs in this case.
  5. CombinedBlockCache:METAonly No caching of DATA blocks (no L2 instance). DATA blocks are fetched every time. The INDEX blocks are loaded into an L1 LruBlockCache instance.

Memory is fixed for each deploy. The java heap is a small 8G to bring on distress earlier. For deploy type 1., the LruBlockCache is given 4G of the 8G JVM heap. For 2.-5. deploy types., the L1 LruHeapCache is 0.1 * 8G (~800MB), which is more than enough to host the dataset META blocks. This was confirmed by inspecting the Block Cache vitals displayed in the RegionServer UI. For 2., 3., and 4. deploy types, the L2 bucket cache is 4G. Deploy type 5.'s L2 is 0G (Used HBASE-11581 Add option so CombinedBlockCache L2 can be null (fscache)).

Four Loading Types

  1. All cache hits all the time.
  2. A small percentage of cache misses, with all misses inside the fscache soon after the test starts.
  3. Lots of cache misses, all still inside the fscache soon after the test starts.
  4. Mostly cache misses where many misses by-pass the fscache.

For each loading, we ran 25 clients reading randomly over 10M cells of zipfian varied cell sizes from 1 byte to 256k bytes over 21 regions hosted by a single RegionServer for 20 minutes. Clients would stay inside the cache size for loading 1., miss the cache at just under ~50% for loading 2., and so on.

font-family: Times; font-size: medium; margin-bottom: 0in; line-height: 18.239999771118164px;">The dataset was hosted on a single RegionServer on small HDFS cluster of 5 nodes.  See below for detail on the setup.


For each attribute -- GC, Throughput, i/o -- we have five charts across; one for each deploy.  Each graph can be divided into four parts; the first quarter has the all-in-cache loading running for 20minutes, followed by the loading that has some cache misses (for twenty minutes), through to loading with mostly cache misses in the final quarter.

To find out what each color represents, read the legend.  The legend is small in the below. To see a larger version, browse to this non-apache-roller version of this document and click on the images over there.

Concentrate on the first four graph types.  The BlockCache Stats and I/O graphs add little other than confirming that the loadings ran the same across the different BlockCache deploys with fscache cutting in to soak up the seeks soon after start for all but the last mostly-cache-miss loading cases.


Be sure to check the y-axis in the graphs below (you can click on chart to get a larger version). All profiles look basically the same but a check of the y-axis will show that for all but the no-cache-misses case, CombinedBlockCache:Offheap, the second deploy type, has the best GC profile (less GC is better!).

The GC for the CombinedBLockCache:Offheap deploy mode looks to be climbing as the test runs.  See the Notes section on the end for further comment.

Image map


CombinedBlockCache:Offheap is better unless there are few to no cache misses.  In that case, LruBlockCache shines (I missed why there is the step in the LruBlockCache all-in-cache section of the graph)

Image map


Image map

Latency when in-cache

Same as above except that we do not show the last loading -- we show the first three only -- since the last loading of mostly cache misses skews the diagrams such that it is hard to compare latency when we are inside cache (BlockCache and fscache).
Image map


Try aggregating the system and user CPUs when comparing.
Image map

BlockCache Stats

Curves are mostly the same except for the cache-no-DATA-blocks case. It has no L2 deployed.

Image map


Read profiles are about the same across all deploys and tests with a spike at first until the fscache comes around and covers. The exception is the final mostly-cache-misses case.
Image map


Master branch. 2.0.0-SNAPSHOT, r69039f8620f51444d9c62cfca9922baffe093610.  Hadoop 2.4.1-SNAPSHOT.

5 nodes all the same with 48G and six disks.  One master and one regionserver, each on distinct nodes, with 21 regions of 10M rows of zipf varied size -- 0 to 256k -- created as follows:

$ export HADOOP_CLASSPATH=/home/stack/conf_hbase:`./hbase-2.0.0-SNAPSHOT/bin/hbase classpath`
$ nohup  ./hadoop-2.4.1-SNAPSHOT/bin/hadoop --config /home/stack/conf_hadoop org.apache.hadoop.hbase.PerformanceEvaluation --valueSize=262144 --valueZipf --rows=100000 sequentialWrite 100 &

Here is my loading script.  It performs 4 loadings: All in cache, just out of cache, a good bit out of cache and then mostly out of cache:

[stack@c2020 ~]$ more bin/bc_in_mem.sh
date=`date -u +"%Y-%m-%dT%H:%M:%SZ"
echo testtype=$testtype $date` >> nohup.out
#for i in 38 76 304 1000; do
for i in 32 72 144 1000; do
  echo "`date` run size=${i}, clients=$clients ; $testtype time=$runtime size=$i" >> nohup.out
  timeout $runtime nohup ${HBASE_HOME}/bin/hbase --config /home/stack/conf_hbase org.apache.hadoop.hbase.PerformanceEvaluation --nomapred --valueSize=110000 --size=$i --cycles=$cycles randomRead $clients
Java version:
[stack@c2020 ~]$ java -version
java version "1.7.0_45"
Java(TM) SE Runtime Environment (build 1.7.0_45-b18)
Java HotSpot(TM) 64-Bit Server VM (build 24.45-b08, mixed mode)

I enabled caching options by uncommenting these configuration properties in hbase-site.xml . The hfile.block.cache.size was set to .5 to keep math simple.
<!--LRU Cache-->

<!--Bucket cache-->


The CombinedBlockCache has some significant overhead (to be better quantified -- 10%?). There will be slop left over because buckets will not fill to the brim especially when block sizes vary.  Observed running loadings.

I tried to bring on a FullGC by running for more than a day with LruBlockCache fielding mostly BlockCache misses and I failed. Thinking on it, the BlockCache only occupied 4G of an 8G heap. There was always elbow room available (Free block count and maxium block size allocatable settled down to a nice number and held constant). TODO: Retry but with less elbow room available.

Longer running CBC offheap test

Some of the tests above show disturbing GC tendencies -- ever rising GC -- so I ran tests for a longer period .  It turns out that BucketCache throws an OOME in long-running tests. You need HBASE-11678 BucketCache ramCache fills heap after running a few hours (fixed in hbase-0.98.6+) After the fix, the below test ran until the cluster was pul led out from under the loading:
gc over three daysThro    ughput over three days
Here are some long running tests with bucket cache onheap (ioengine=heap). The throughput is less and GC is higher.
gc over three days onheapThroughput over three days onheap


Does LruBlockCache get more erratic as heap size grows?  Nick's post implies that it does.
Auto-sizing of L1, onheap cache.

HBASE-11331 [blockcache] lazy block decompression has a report attachedthat evaluates the current state of attached patch to keep blocks compressed while in the BlockCache.  It finds that with the patch enabled, "More GC. More CPU. Slower. Less throughput. But less i/o.", but there is an issue with compression block pooling that likely impinges on performance.

Review serialization in and out of L2 cache. No serialization?  Because we can take blocks from HDFS without copying onheap?

Friday Apr 11, 2014

The Effect of ColumnFamily, RowKey and KeyValue Design on HFile Size

By Doug Meil, HBase Committer and Thomas Murphy


One of the most common questions in the HBase user community is estimating disk footprint of tables, which translates into HFile size – the internal file format in HBase.

We designed an experiment at Explorys where we ran combinations of design time options (rowkey length, column name length, row storage approach) and runtime options (HBase ColumnFamily compression, HBase data block encoding options) to determine these factors’ effects on the resultant HFile size in HDFS.

HBase Environment

CDH4.3.0 (HBase

Design Time Choices

  1. Rowkey

    1. Thin

      1. 16-byte MD5 hash of an integer.

    2. Fat

      1. 64-byte SHA-256 hash of an integer.

    1. Note: neither of these are realistic rowkeys for real applications, but they chosen because they are easy to generate and one is a lot bigger than the other.

  1. Column Names

    1. Thin

      1. 2-3 character column names (c1, c2).

    2. Fat

      1. 10 characters, randomly chosen but consistent for all rows.

    1. Note: it is advisable to have small column names, but most people don’t start that way so we have this as an option.

  1. Row Storage Approach

    1. Key Value Per Column

      1. This is the traditional way of storing data in HBase.

    2. One Key Value per row.

      1. Actually, two.

      2. One KV has an Avro serialized byte array containing all the data from the row.

      3. Another KV holds an MD5 hash of the version of the Avro schema.

Run Time

  1. Column Family Compression

    1. None

    2. GZ

    3. LZ4

    4. LZO

    5. Snappy

    1. Note: it is generally advisable to use compression, but what if you didn’t? So we tested that too.

  1. HBase Block Encoding

    1. None

    2. Prefix

    3. Diff

    4. Fast Diff

    1. Note: most people aren’t familiar with HBase Data Block Encoding. Primarily intended for squeezing more data into the block cache, it has effects on HFile size too. See HBASE-4218 for more detail.

1000 rows were generated for each combination of table parameters. Not a ton of data, but we don’t necessarily need a ton of data to see the varying size of the table. There were 30 columns per row comprised of 10 strings (each filled with 20 bytes of random characters), 10 integers (random numbers) and 10 longs (also random numbers).

HBase blocksize was 128k.


The easiest way to navigate the results is to compare specific cases, progressing from an initial implementation of a table to options for production.

Case #1: Fat Rowkey and Fat Column Names, Now What?

This is where most people start with HBase. Rowkeys are not as optimal as they should be (i.e., the Fat rowkey case) and column names are also inflated (Fat column-names).

Without CF Compression or Data Block Encoding, the baseline is:






What if we just changed CF compression?

This drastically changes the HFile footprint. Snappy compression reduces the HFile size from 6.2 Mb to 1.8 Mb, for example.

















However, we shouldn’t be too quick to celebrate. Remember that this is just the disk footprint. Over the wire the data is uncompressed, so 6.2 Mb is still being transferred from RegionServer to Client when doing a Scan over the entire table.

What if we just changed data block encoding?

Compression isn’t the only option though. Even without compression, we can change the data block encoding and also achieve HFile reduction. All options are an improvement over the 6.2 Mb baseline.














The following table shows the results of all remaining CF compression / data block encoding combinations.













































Case #2: What if we re-designed the column names (and left the rowkey alone)?

Let’s assume that we re-designed our column names but left the rowkey alone. After using the “thin” column-names without CF compression or data block encoding it results in an HFile 5.8 Mb in size. This is an improvement from the original 6.2 Mb baseline. It doesn’t seem like much, but it’s still a 6.5% reduction in the eventual wire-transfer footprint.






Applying Snappy compression can reduce the HFile size further:

















Case #3: What if we re-designed the rowkey (and left the column names alone)?

In this example, what if we only redesigned the rowkey? After using the “thin” rowkey the result is an HFile size that is 4.9 Mb down from the 6.2 Mb baseline, a 21% reduction. Not a small savings!






Applying Snappy compression can reduce the HFile size further:

















However, note that the resulting HFile size with Snappy and no data block encoding (1.7 Mb) is very similar in size to the baseline approach (i.e., fat rowkeys, fat column-names) with Snappy and no data block encoding (1.8 Mb). Why? The CF compression can compensate on disk for a lot of bloat in rowkeys and column names.

Case #4: What if we re-designed both the rowkey and the column names?

By this time we’ve learned enough HBase to know that we need to have efficient rowkeys and column-names. This produces an HFile that is 4.4 Mb, a 29% savings over the baseline of 6.2 Mb.





Applying Snappy compression can reduce the HFile size further:

















Again, the on-disk footprint with compression isn’t radically different from the others, as Compression can compensate to large degree for rowkey and column name bloat.

Case #5: KeyValue Storage Approach (e.g., 1 KV vs. KV-per-Column)

What if we did something radical and changed how we stored the data in HBase? With this approach, we are using a single KeyValue per row holding all of the columns of data for the row instead of a KeyValue per column (the traditional way).

The resulting HFile, even uncompressed and without Data Block Encoding, is radically smaller at 1.4 Mb compared to 6.2 Mb.






Adding Snappy compression and Data Block Encoding makes the resulting HFile size even smaller.

















Compare the 1.1 Mb Snappy without encoding to the 1.7 Snappy encoded Thin rowkey/Thin column-name.


Although Compression and Data Block Encoding can wallpaper over bad rowkey and column-name decisions in terms of HFile size, you will pay the price for this in terms of data transfer from RegionServer to Client. Also, concealing the size penalty brings with it a performance penalty each time the data is accessed or manipulated. So, the old advice about correctly designing rowkeys and column names still holds.

In terms of KeyValue approach, having a single KeyValue per row presents significant savings both in terms of data transfer (RegionServer to Client) as well as HFile size. However, there is a consequence with this approach in having to update each row entirely, and that old versions of rows also be stored in their entirety (i.e., as opposed to column-by-column changes). Furthermore, it is impossible to scan on select columns; the whole row must be retrieved and deserialized to access any information stored in the row. The importance of understanding this tradeoff cannot be over-stated, and is something that must be evaluated on an application-by-application basis.

Software engineering is an art of managing tradeoffs, so there isn’t necessarily one “best” answer. Importantly, this experiment only measures the file size and not the time or processor load penalties imposed by the use of compression, encoding, or Avro. The results generated in this test are still based on certain assumptions and your mileage may vary.

Here is the data if interested: http://people.apache.org/~dmeil/HBase_HFile_Size_2014_04.csv

Monday Mar 24, 2014

HBaseCon2014, the HBase Community Event, May 5th in San Francisco


By the HBaseCon Program Committee and Justin Kestelyn

HBaseCon 2014 (May 5, 2014 in San Francisco) is coming up fast.  It is shaping up to be another grand day out for the HBase community with a dense agenda spread over four tracks of case studies, dev, ops, and ecosystem.  Check out our posted agenda grid. As in previous years, the program committee favored presentations on open source, shipping technologies and descriptions of in-production systems. The schedule is chock-a-block with a wide diversity of deploy types (industries, applications, scALE!) and interesting dev and operations stories.  Come to the conference and learn from your peer HBase users, operators and developers.

The general session will lead off with a welcome by Mike Olson of Cloudera (Cloudera is the host for this HBase community conference).

Next up will be a talk by the leads of the BigTable team at Google, Avtandil Garakanidze and Carter Page.  They will present on BigTable, the technology that HBase is based on.  Bigtable is “...the world’s largest multi-purpose database, supporting 90 percent of Google’s applications around the world.”  They will give a brief overview of “...Bigtable evolution since it was originally described in an OSDI ’06 paper, its current use cases at Google, and future directions.”

Then comes our own Liyin Tang of Facebook and Lars Hofhansl of Salesforce.com.  Liyin will speak on, “HydraBase: Facebook’s Highly Available and Strongly Consistent Storage Service Based on Replicated HBase Instances”.  HBase powers multiple mission-critical online applications at Facebook. This talk will cover the design of HydraBase (including the replication protocol), an analysis of a failure scenario, and a plan for contributions to the HBase trunk.”  Lars will explain how Salesforce.com’s scalability requirements led it to HBase and the multiple use cases for HBase there today. You’ll also learn how Salesforce.com works with the HBase community, and get a detailed look into its operational environment.

We then break up into sessions where you will be able to learn about operations best practices, new features in depth, and what is being built on top of HBase. You can also hear how Pinterest builds large scale HBase apps up in AWS, HBase at Bloomberg, OPower, RocketFuel, Xiaomi, Yahoo! and others, as well as a nice little vignette on how HBase is an enabling technology at OCLC replacing all instances of Oracle powering critical services for the libraries of the world.

There will be an optional pre-conference prep for attendees who are new to HBase by Jesse Anderson (Cloudera University).  Jesse’s talk will offer a brief Cliff’s Notes-level HBase overview before the General Session that covers architecture, API, and schema design.

Register now for HBaseCon 2014 at: https://hbasecon2014.secure.mfactormeetings.com/registration/

A big shout out goes out to our sponsors – Continuuity, Hortonworks, Intel, LSI, MapR, Salesforce.com, Splice Machine, WibiData (Gold); Facebook, Pepperdata (Silver); ASF (Community); O’Reilly Media, The Hive, NoSQL Weekly (Media) — without which HBaseCon would be impossible!

Wednesday Feb 19, 2014

Apache HBase 0.98.0 is released

hbase-0.98.0 is now available for download from the Apache mirrors and its artifacts are available in the Apache Maven repository[Read More]

Friday Jan 10, 2014

HBase Cell Security

By Andrew Purtell, HBase Committer and member of the Intel HBase Team


Apache HBase is “the Apache Hadoop database”, a horizontally scalable nonrelational datastore built on top of components offered by the Apache Hadoop ecosystem, notably Apache ZooKeeper and Apache Hadoop HDFS. Although HBase therefore offers first class Hadoop integration, and is often chosen for that reason, it has come into its own as a good choice for high scale data storage of record. HBase is often the kernel of big data infrastructure.

There are many advantages offered by HBase: it is free open source software, it offers linear and modular scalability, it has strictly consistent reads and writes, automatic and configurable sharding and automatic failover are core concerns, and it has a deliberate tolerance for operation on “commodity” hardware, which is a very significant benefit at high scale. As more organizations are faced with Big Data challenges, HBase is increasingly found in roles formerly occupied by traditional data management solutions, gaining new users with new perspectives and the requirements and challenges of new use cases. Some of those users have a strong interest in security. They may be a healthcare provider operating within a strict regulatory regime regarding the access and sharing of patient information. They might be a consumer web property in a jurisdiction with strong data privacy laws. They could be a military or government agency that must manage a strict separation between multiple levels of information classification.

Access Control Lists and Security Labels

For some time Apache HBase has offered a strong security model based on Kerberos authentication and access control lists (ACLs). When Yahoo first made a version of Hadoop capable of strong authentication available in 2009, my team and I at a former employer, a commercial computer security vendor, were very interested. We needed a scale out platform for distributed computation, but we also needed one that could provide assurance against access of information by unauthorized users or applications. Secure Hadoop was a good fit. We also found in HBase a datastore that could scale along with computation, and offered excellent Hadoop integration, except first we needed to add feature parity and integration with Secure Hadoop. Our work was contributed back to Apache as HBASE-2742 and ZOOKEEPER-938. We also contributed an ACL-based authorization engine as HBASE-3025, and the coprocessor framework upon which it was built as HBASE-2000 and others. As a result, in the Apache family of Big Data storage options, HBase was the first to offer strong authentication and access control. These features have been improved and evolved by the HBase community many times over since then.

An access control list, or ACL, is a list of permissions associated with an object. The ACL specifies which subjects (users or system processes) shall be granted access to those objects, as well as which operations are allowed. Each entry in an ACL describes a subject and the operation(s) the subject is entitled to perform. When a subject requests an operation, HBase first uses Hadoop’s strong authentication support, based on Kerberos, to verify the subject’s identity. HBase then finds the relevant ACLs, and checks the entries of those ACLs to decide whether or not the request is authorized. This is an access control model that provides a lot of assurance, and is a flexible way for describing security policy, but not one that addresses all possible needs. We can do more.

All values written to HBase are stored in what is known as a cell. (“Cell” is used interchangeably with “key-value” or “KeyValue”, mainly for legacy reasons.) Cells are identified by a multidimensional key: { row, column, qualifier, timestamp }. (“Column” is used interchangeably with “family” or “column family”.) The table is implicit in every key, even if not actually present, because all cells are written into columns, and every column belongs to a table. This forms a hierarchical relationship: 

table -> column family -> cell

Users can today grant individual access permissions to subjects on tables and column families. Table permissions apply to all column families and all cells within those families. Column family permissions apply to all cells within the given family. The permissions will be familiar to any DBA: R (read), W (write), C (create), X (execute), A (admin). However for various reasons, until today, cell level permissions were not supported.

Other high scale data storage options in the Apache family, notably Apache Accumulo, take a different approach. Accumulo has a data model almost identical to that of HBase, but implements a security mechanism called cell-level security. Every cell in an Accumulo store can have a label, stored effectively as part of the key, which is used to determine whether a value is visible to a given subject or not. The label is not an ACL, it is a different way of expressing security policy. An ACL says explicitly what subjects are authorized to do what. A label instead turns this on its head and describes the sensitivity of the information to a decision engine that then figures out if the subject is authorized to view data of that sensitivity based on (potentially, many) factors. This enables data of various security levels to be stored within the same row, and users of varying degrees of access to query the same table, while enforcing strict separation between multiple levels of information classification. HBase users might approximate this model using ACLs, but it would be labor intensive and error prone.

New HBase Cell Security Features

Happily our team here at Intel has been busy extending HBase with cell level security features. First, contributed as HBASE-8496, HBase can now store arbitrary metadata for a cell, called tags, along with the cell. Then, as of HBASE-7662, HBase can store into and apply ACLs from cell tags, extending the current HBase ACL model down to the cell. Then, as of HBASE-7663, HBase can store visibility expressions into tags, providing cell-level security capabilities similar to Apache Accumulo, with API and shell support that will be familiar to Accumulo users. Finally, we have also contributed transparent server side encryption, as HBASE-7544, for additional assurance against accidental leakage of data at rest. We are working with the HBase community to make these features available in the next major release of HBase, 0.98.

Let’s talk a bit more now about HBase visibility labels and per-cell ACLs work.

HFile version 3

In order to take advantage of any cell level access control features, it will be necessary to store data in the new HFile version, 3. HFile version 3 is very similar to HFile version 2 except it also has support for persisting and retrieving cell tags, optional dictionary based compression of tag contents, and the HBASE-7544 encryption feature. Enabling HFile version 3 is as easy as adding a single configuration setting to all HBase site XML files, followed by a rolling restart. All existing HFile version 2 files will be read normally. New files will be written in the version 3 format. Although HFile version 3 will be marked as experimental throughout the HBase 0.98 release cycle, we have found it to be very stable under high stress conditions on our test clusters.

HBase Visibility Labels

We have introduced a new coprocessor, the VisibilityController, which can be used on its own or in conjunction with HBase’s AccessController (responsible for ACL handling). The VisibilityController determines, based on label metadata stored in the cell tag and associated with a given subject, if the user is authorized to view the cell. The maximal set of labels granted to a user is managed by new shell commands getauths, setauths, and clearauths, and stored in a new HBase system table. Accumulo users will find the new HBase shell commands familiar.

When storing or mutating a cell, the HBase user can now add visibility expressions, using a backwards compatible extension to the HBase API. (By backwards compatible, we mean older servers will simply ignore the new cell metadata, as opposed to throw an exception or fail.)

Mutation#setCellVisibility(new CellVisibility(String labelExpession));

The visibility expression can contain labels joined with logical expressions ‘&’, ‘|’ and ‘!’. Also using ‘(‘, ‘)’ one can specify the precedence order. For example, consider the label set { confidential, secret, topsecret, probationary }, where the first three are sensitivity classifications and the last describes if an employee is probationary or not. If a cell is stored with this visibility expression:

( secret | topsecret ) & !probationary

Then any user associated with the secret or topsecret label will be able to view the cell, as long as the user is not also associated with the probationary label. Furthermore, any user only associated with the confidential label, whether probationary or not, will not see the cell or even know of its existence. Accumulo users will also find HBase visibility expressions familiar, but also providing a superset of boolean operators.

We build the user’s label set in the RPC context when a request is first received by the HBase RegionServer. How users are associated with labels is pluggable. The default plugin passes through labels specified in Authorizations added to the Get or Scan. This will also be familiar to Accumulo users. 

Get#setAuthorizations(new Authorizations(String,…));

Scan#setAuthorizations(new Authorizations(String,…));

Authorizations not in the maximal set of labels granted to the user are dropped. From this point, visibility expression processing is very fast, using set operations.

In the future we envision additional plugins which may interrogate an external source when building the effective label set for a user, for example LDAP or Active Directory. Consider our earlier example. Perhaps the sensitivity classifications are attached when cells are stored into HBase, but the probationary label, determined by the user’s employment status, is provided by an external authorization service.

HBase Cell ACLs

We have extended the existing HBase ACL model to the cell level.

When storing or mutating a cell, the HBase user can now add ACLs, using a backwards compatible extension to the HBase API.

Mutation#setACL(String user, Permission perms);

Like at the table or column family level, a subject is granted permissions to the cell. Any number of permissions for any number of users (or groups using @group notation) can be added.

From then on, access checks for operations on the cell include the permissions recorded in the cell’s ACL tag, using union-of-permission semantics.

First we check table or column family level permissions*. If they grant access, we can early out before going to blockcache or disk to check the cell for ACLs. Table designers and security architects can therefore optimize for the common case by granting users permissions at the table or column family level. However, if indeed some cells require more fine grained control, if neither table nor column family checks succeed, we will enumerate the cells covered by the operation. By “covered”, this means we insure that every location which would be visibly modified by the operation has a cell ACL in place that grants access. We can stop at the first cell ACL that does not grant access.

For a Put, Increment, or Append we check the permissions for the most recent visible version. “Visible” means not covered by a delete tombstone. We treat the ACLs in each Put as timestamped like any other HBase value. A new ACL in a new Put applies to that Put. It doesn't change the ACL of any previous Put. This allows simple evolution of security policy over time without requiring expensive updates. To change the ACL at a specific { row, column, qualifier, timestamp } coordinate, a new value with a new ACL must be stored to that location exactly.

For Increments and Appends, we do the same thing as with Puts, except we will propagate any ACLs on the previous value unless the operation carries a new ACL explicitly.

Finally, for a Delete, we require write permissions on all cells covered by the delete. Unlike in the case of other mutations we need to check all visible prior versions, because a major compaction could remove them. If the user doesn't have permission to overwrite any of the visible versions ("visible", again, is defined as not covered by a tombstone already) then we have to disallow the operation.

* - For the sake of coherent explanation, this overlooks an additional feature. Optionally, on a per-operation basis, how cell ACLs are factored into the authorization decision can be flipped around. Instead of first checking table or column family level permissions, we enumerate the set of ACLs covered by the operation first, and only if there are no grants there we check for table or column family permissions. This is useful for use cases where a user is not granted table or column family permissions on a table and instead the cell level ACLs provide exceptional access. The default is useful for use cases where the user is granted table or column family permissions and cell level ACLs might withhold authorization. The default is likely to perform better. Again, which strategy is used can be specified on a per-operation basis.


This blog post was republished from https://communities.intel.com/community/datastack/blog/2013/10/29/hbase-cell-security

Friday Oct 25, 2013

Phoenix Guest Blog


James Taylor 

I'm thrilled to be guest blogging today about Phoenix, a BSD-licensed open source SQL skin for HBase that powers the HBase use cases at Salesforce.com. As opposed to relying on map-reduce to execute SQL queries as other similar projects, Phoenix compiles your SQL query into a series of HBase scans, and orchestrates the running of those scans to produce regular JDBC result sets. On top of that, you can add secondary indexes to your tables to transform what would normally be a full table scan into a series of point gets (and we all know how good HBase performs with those).

Getting Started

To get started using phoenix, follow these directions:

  • Download and expand the latest phoenix-2.1.0-install.tar from download page
  • Add the phoenix-2.1.0.jar to the classpath of every HBase region server. An easy way to do this is to copy it into the HBase lib directory.
  • Restart all region servers
  • Start our terminal interface that is bundled with our distribution against your cluster:
          $ bin/sqlline.sh localhost
  • Create a Phoenix table:
          create table stock_price(ticker varchar(6), price decimal, date date);
  • Load some data either through one of our CSV loading tools, or by mapping an existing HBase table to Phoenix table.
  • Run some queries:
          select * from stock_price;
          select * from stock_price where ticker='CRM';
          select ticker, avg(price) from stock_price 
              where date between current_date()-30 and current_date() 
              group by ticker;


Take a look at our recent announcement for our release which includes support for secondary indexing, peruse our FAQs and join our mailing list.



Hot Blogs (today's hits)

Tag Cloud