Entries tagged [accumulo]

Monday April 06, 2015

Replicating data across Accumulo clusters

This post was moved to the Accumulo project site.

Traditionally, Apache Accumulo can only operate within the confines of a single physical location. The primary reason for this restriction is that Accumulo relies heavily on Apache ZooKeeper for distributed lock management and some distributed state. Due to the consistent nature of ZooKeeper and its protocol, it doesn't handle wide-area networks (WAN) well. As such, Accumulo suffers the same problems operating over a WAN.

Data-Center Replication is a new feature, to be included in the upcoming Apache Accumulo 1.7.0, which aims to address the limitation of Accumulo to one local-area network (LAN). The implementation makes a number of decisions with respect to consistency and available which aim to avoid the normal "local" operations of the primary Accumulo instance. That is to say, replication was designed in such a way that enabling the feature on an instance should not affect the performance of that system. However, this comes at a cost of consistency across all replicas. Replication from one instance to others is performed lazily. Succinctly, replication in Accumulo can be described as an eventually-consistent system and not a strongly-consistent system (an Accumulo instance is strongly-consistent).

Because replication is performed lazily, this implies that the data to replicate must be persisted in some shape until the actual replication takes place. This is done using Accumulo's write-ahead log (WAL) files for this purpose. The append-only nature of these files make them obvious candidates for reuse without the need to persist the data in another form for replication. The only necessary changes internally to Accumulo to support this is changing the conditions that the Accumulo garbage collector will delete WAL files. Using WAL files also has the benefit of making HDFS capacity the limiting factor in how "lazy" replication can be. This means that the amount of time replication can be offline or stalled is only limited by the amount of extra HDFS space available which is typically ample.


Before getting into details on the feature, it will help to define some basic terminology. Data in Accumulo is replicated from a "primary" Accumulo instance to a "peer" Accumulo instance. Each instance here is a normal Accumulo instance -- each instance is only differentiated by a few new configuration values. Users ingest data into the primary instance, and that data will eventually be replicated to a peer. Each instance requires a unique name to identify itself among all Accumulo instances replicating with each other. Replication from a primary to a peer is defined on a per-table basis -- that is, the configuration states that tableA on the primary will be replicated to tableB on the peer. A primary can have multiple peers defined, e.g. tableA on the primary can will be replicated to tableB on peer1 and tableC on peer2.


Internally, replication is comprised of a few components to make up the user-facing feature: the management of data ingested on the primary which needs to be replicated, the assignment of replication work within the primary, the execution of that work within the primary to send the data to a peer, and the application of the data to the appropriate table within the peer.

State Management on Primary

The most important state to manage for replication is the tracking the data that was ingested in the primary. This is what ensures that all of the data will be eventually replicated to the necessary peer(s). This state is kept in both the Accumulo metadata table and a new table in the accumulo namespace: replication. Through the use of an Accumulo Combiner on these tables, updates to the replication state are simple updates to the replication table. This makes management of the state machine across all of the nodes within the Accumulo instance extremely simple. For example, TabletServers reporting that data was ingested into a write-ahead log, the Master preparing data to be replicated and the TabletServer reporting that data has been replicated to the peer are all updates to the replication table.

To "seed" the state machine, TabletServers first write to the metadata table at the end of a minor compaction. The Master will read records from the metadata table and add them to the replication table. Each Key-Value pair in the replication table represents a WAL's current state within the replication "state machine" with different column families representing different states. For example, one column family represents the status of a WAL file being replicated to a specific peer while a different column family represents the status of a WAL file being replicated to all necessary peers.

The Master is the primary driver of this state machine, reading the replication table and making the necessary updates repeatedly. This allows the Master to maintain a constant amount of memory with respect to the amount of data that needs to be replicated. The only limitation on persisted state for replication is the size of the replication table itself and the amount of space the WAL files on HDFS consume.

RPC from primary to peer

Like the other remote procedure calls in Accumulo, Apache Thrift is used to make RPCs from the primary Accumulo instance to a peer instance. The purpose of these methods is to send the relevant data from a WAL file to the peer. The Master advertises units of replication work, a WAL file that needs to be replicated to a single peer, and all TabletServers in the primary instance will try to reserve, and then perform, that work. ZooKeeper provides this feature to us with very little code in Accumulo.

Once a TabletServer obtains the work, it will read through the WAL file extracting updates only for the table in this unit of work and send the updates across the wire to a TabletServer in the peer. The TabletServer on the primary asks the active Master in the peer for a TabletServer to communicate with. As such, ignoring some very quick interactions with the Master, RPC for replication is primarily a TabletServer to TabletServer operation which means that replication should scale in performance with respect to the number of available TabletServers on the primary and peer.

The amount of data read from a WAL and sent to the peer per RPC is a configurable parameter defaulting to 50MB. Increasing the amount of data read at a time will have a large impact on the amount of memory consumed by a TabletServer when using replication, so take care when altering this property. It is also important to note that the Thrift server used for the purposes of replication is completely separate from the thrift server used by clients. Replication and the client service servers will not compete against one another for RPC resources.

Replay of data on peer

After a TabletServer on the primary invokes an RPC to a TabletServer on the peer, but before that RPC completes, the TabletServer on the peer must apply the updates it received to the local table. The TabletServer on the peer constructs a BatchWriter and simply applies the updates to the table. In the event of an error in writing the data, the RPC will return in error and it will be retried by a TabletServer on the primary. As such, in these failure conditions, it is possible that data will be applied on the peer multiple times. The use of Accumulo Combiners on tables used being replicated is nearly always a bad idea which will result in inconsistencies between the primary and replica.

Because there are many TabletServers, each with their own BatchWriter, potential throughput for replication on the peer should be equivalent to the ingest throughput observed by clients normally ingesting data uniformly into Accumulo.

Complex replication configurations

So far, we've only touched on configurations which have a single primary and one to many peers; however, the feature allows multiple primary instances in addition to multiple peers. This primary-primary configuration allows data to be replicated in both directions instead of just one. This can be extended even further to allow replication between a trio of instances: primaryA replicates to primaryB which replicates to primaryC which replicates to primaryA. This aspect is supported by including provenance of which systems an update was seen inside of each Mutation. In "cyclic" replication setups, this prevents updates from being replicated indefinitely.

Supporting these cycles allows for different collections of users to access physically separated instances and eventually see the changes made by other groups. For example, consider two instance of Accumulo, one in New York City and another San Francisco. Users on the west coast can use the San Francisco instance while users on the east coast can use the instance in New York. With the two instances configured to replicate to each other, data created by east coast users will eventually be seen by west coast users and vice versa.

Conclusion and future work

The addition of the replication feature fills a large gap in the architecture of Accumulo where the system does not easily operate across WANs. While strong consistency between a primary and a peer is sacrificed, the common case of using replication for disaster recovery favors availability of the system over strong consistency and has the added benefit of not significantly impacting the ingest performance on the primary instance. Replication provides active backup support while enabling Accumulo to automatically share data between instances across large physical distances.

One interesting detail about the implementation of this feature is that the code which performs replication between two Accumulo instances, the AccumuloReplicaSystem, is pluggable via the ReplicaSystem interface. It is reasonable to consider other implementations which can automatically replicate data from Accumulo to other systems for purposes of backup or additional query functionality through other data management systems. For example, Accumulo could be used to automatically replicate data to other indexing systems such as Lucene or even relational databases for advanced query functionality. Certain implementations of the ReplicaSystem could perform special filtering to limit the set of columns replicated to certain systems resulting in a subset of the complete dataset stored in one Accumulo instance without forcing clients to write the data to multiple systems. Each of these considerations are only theoretical at this point; however, the potential for advancement is definitely worth investigating.

Friday March 20, 2015

Balancing Groups of Tablets

This post was moved to the Accumulo project site.

Accumulo has a pluggable tablet balancer that decides where tablets should be placed.  Accumulo's default configuration spreads each tables tablets evenly and randomly across the tablet servers.  Each table can configure a custom balancer that does something different.

For some applications to perform optimally, sub-ranges of a table need to be spread evenly across the cluster.  Over the years I have run into multiple use cases for this situation.  The latest use case was bad performance on the Fluo Stress Test.  This test stores a tree in an Accumulo table and creates multiple tablets for each level in the tree.  In parallel, the test reads data from one level and writes it up to the next level.  Figure 1 below shows an example of tablet servers hosting tablets for different levels of the tree.  Under this scenario if many threads are reading data from level 2 and writing up to level 1, only Tserver 1 and Tserver 2 will be utilized.  So in this scenario 50% of the tablet servers are idle.

Figure 1

Figure 1

ACCUMULO-3439 remedied this situation with the introduction of the GroupBalancer and RegexGroupBalancer which will be available in Accumulo 1.7.0.  These balancers allow a user to arbitrarily group tablets.  Each group defined by the user will be evenly spread across the tablet servers.  Also, the total number of groups on each tablet server is minimized.   As tablets are added or removed from the table, the balancer will migrate tablets to satisfy these goals.  Much of the complexity in the GroupBalancer code comes from trying to minimize the number of migrations needed to reach a good state.

A GroupBalancer could be configured for the table in figure 1 in such a way that it grouped tablets by level.  If this were done, the result may look like Figure 2 below.  With this tablet to tablet server mapping, many threads reading from level 2 and writing data up to level 1 would utilize all of the tablet servers yielding better performance.

Figure 2

Figure 2

README.rgbalancer provides a good example of configuring and using the RegexGroupBalancer.  If a regular expression can not accomplish the needed grouping, then a grouping function can be written in Java.  Extend GroupBalancer to write a grouping function in java.  RegexGroupBalancer provides a good example of how to do this.

When using a GroupBalancer, how Accumulo automatically splits tablets must be kept in mind.  When Accumulo decides to split a tablet, it chooses the shortest possible row prefix from the tablet data that yields a good split point. Therefore its possible that a split point that is shorter than what is expected by a GroupBalancer could be chosen.  The best way to avoid this situation is to pre-split the table such that it precludes this possibility.

The Fluo Stress test is a very abstract use case.  A more concrete use case for the group balancer would be using it to ensure tablets storing geographic data were spread out evenly.  For example consider GeoWave's Accumulo Persistence Model.  Tablets could be balanced such that bins related to different regions are spread out evenly.  For example tablets related to each continent could be assigned a group ensuring data related to each continent is evenly spread across the cluster.  Alternatively, each Tier could spread evenly across the cluster.  

Wednesday July 09, 2014

Functional reads over Accumulo

 This post was moved to the Accumulo project site.

Table structure is a common area of discussion between all types of Accumulo users. In the relational database realm, there was often a straightforward way that most users could agree upon that would be ideal to store and query some dataset. Data was identified by its schema, some fixed set of columns where each value within that column had some given characteristic. One of the big pushes behind the "NoSQL" movement was a growing pain in representing evolving data within a static schema. Applications like Accumulo removed that notion for a more flexible layout where the columns vary per row, but this flexibility often sparks debates about how data is "best" stored that often ends without a clear-cut winner.

In general, I've found that, with new users to Accumulo, it's difficult to move beyond the basic concept of GETs and PUTs of some value for a key. Rightfully so, it's analogous to a spreadsheet: get or update the cell in the given row and column. However, there's a big difference in that the spreadsheet is running on your local desktop, instead of running across many machines. In the same way, while a local spreadsheet application has some similar functionality to Accumulo, it doesn't really make sense to think about using Accumulo as you would a spreadsheet application. Personally, I've developed a functional-programming-inspired model which I tend to follow when implementing applications against Accumulo. The model encourages simple, efficient and easily testable code, mainly as a product of modeling the client interactions against Accumulo's APIs.

Read APIs

Accumulo has two main classes for reading data from an Accumulo table: the Scanner and BatchScanner. Both accept Range(s) which limit the data read from the table based on a start and stop Key. Only data from the table that falls within those start and stop keys will be returned to the client. The reason that we have two "types" of classes to read data is that a Scanner will return data from a single Range in sorted order whereas the BatchScanner accepts multiple Ranges and returns the data unordered. In terms of Java language specifics, both the Scanner and BatchScanner are also Iterables, which return a Java Iterator that can be easily passed to some other function, transformation or for-loop.

Having both a sorted, synchronous stream and an unsorted stream of Key-Value pairs from many servers in parallel allows for a variety of algorithms to be implemented against Accumulo. Both constructs allow for the transparency in where the data came from and encourage light-weight processing of those results on the client.

Accumulo Iterators

One notable feature of Accumulo is the SortedKeyValueIterator interface, or, more succinctly, Accumulo Iterators. Typically, these iterators run inside of the TabletServer process and perform much of the heavy lifting. Iterators are used to implement a breadth of internal features such as merged file reads, visibility label filtering, versioning, and more. However, users also have the ability to leverage this server-side processing mechanism to deploy their own custom code.

One interesting detail about these iterators is that they each have an implicit source which provides them data to operate on. This source is also a SortedKeyValueIterator which means that the "local" SortedKeyValueIterator can use its own API on its data source. With this implicit hierarchy, Iterators act in concert with each other in some fixed order - they are stackable. The order in which Iterators are constructed, controlled by an Iterator's priority, determines the order of the stack. An Iterator uses its "source" Iterator to read data, performs some operation, and then passes it on (the next element could be a client or another Iterator). The design behind iterators deserves its own blog post; however, the concept to see here is that iterators are best designed as stateless as possible (transformations, filters, or aggregations that always net the same results given the same input).

Functional Influences

In practice, these two concepts mesh very well with each other. Data read from a table can be thought of as a "stream" which came from some number of operations on the server. For a Scanner, this stream of data is backed by one tablet at a time to preserve sorted-order of the table. In the case of the BatchScanner, this is happening in parallel across many tablets from many tabletservers, with the client receiving data from many distinct hosts at one time. Likewise, the Scanner and BatchScanner APIs also encourage stateless processing of this data by presenting the data as a Java Iterator. Exposing explicit batches of Key-Value pairs would encourage blocking processing of each batch would be counter-intuitive to what the server-side processing model is. It creates a more seamless implementation paradigm on both the client and the server.

When we take a step back from Object-Oriented Java and start to think about applications in a Functional mindset, it becomes clear how these APIs encourage functional-esque code. We are less concerned about mutability and encapsulation, and more concerned about stateless operations over some immutable data. Modeling our client code like this helps encourage parallelism as application in some multi-threaded environment is much simpler.

Practical Application

I started out talking about schemas and table layouts which might seem a bit unrelated to this discussion on the functional influences in the Accumulo API. Any decisions made on a table structure must take query requirements with respect to the underlying data into account. As a practical application of what might otherwise seem like pontification, let's consider a hypothetical system that processes clickstream data using Accumulo.

Clickstream data refers to logging users who visit a website, typically for the purpose of understanding usage patterns. If a website is thought of as a directed graph, where an anchor on one page which links to another page is an edge in that graph, a user's actions on that website can be thought of as a "walk" over that graph. In managing a website, it's typically very useful to understand usage patterns of your site: what page is most common? which links are most commonly clicked? what changes to a page make users act differently?

Now, let's abstractly consider that we store this clickstream data in Accumulo. Let's not go into specifics, but say we retain the typical row-with-columns idea: each row represents some user visiting a page on your website using a globally unique identifier. Each column would contain some information about that visit: the user who is visiting the website, the page they're visiting, the page they came from, the web-browser user-agent string, etc. Say you're the owner of this website, and you recently made a modification to you website which added a prominent link to some new content on the front-page. You want to know how many people are visiting your new content with this new link you've added, so we want to answer the question "how many times was our new link on the index page clicked by any user?". For the purposes of this example, let's assume we don't have any index tables which might help us answer this query more efficiently.

Let's think about this query in terms of stateless operations and performing as much of a reduction in data returned to the client as possible. We have a few basic steps:

  1. Filter: Ignore all clickstream events that are not for the index page.
  2. Filter: Ignore all clickstream events that are not for the given anchor.
  3. Aggregation: Only a sum of the occurrences is needed, not the full record.

The beauty in using Accumulo is that all three of these operations can be performed inside of the tablet server process without returning any unnecessary data to the client. Unwanted records can be easily skipped, while each record that matches our criteria is reduced to a single "+1" counter. Instead of returning each full record to the client, the tablet server can combine these counts together and simply return a sum to the client for the Range of data that was read.

The other perk of thinking about the larger problem in discrete steps is that it is easily parallelized. Assuming we have many tablet servers hosting the tablets which make up our clickstream table, we can easily run this query in parallel across them all using the BatchScanner. Additionally, because we've reduced our initial problem from a large collection of records to a stream of partial sums, we've drastically reduced the amount of work that must be performed on our (single) client. Each key-value pair returned by a server is a partial-sum which can be combined together in a very lightweight operation (in both memory and computation) as the result is made available. The client then has the simple task of performing one summation. We took a hard problem and performed an extreme amount of heavy lifting server-side while performing next-to-no computation in our client which is great for web applications or thin clients.

Tiered Computation

This type of algorithm, a multi-stage computation, becomes very common when working with Accumulo because of the ability to push large amounts of computation to each tablet server. Tablet servers can compute aggregations, filters and/or transformations very "close" to the actual data, return some reduced view of the data being read. Even when some function is very efficient, computing it over large data sets can still be extremely time-consuming. Eliminating unwanted data as early as possible can often outweigh even the most optimal algorithms due to the orders of magnitude difference in the speed of CPU over disk and network.

It's important to remember that this idea isn't new, though. The above model is actually very reminiscent of the MapReduce paradigm, just applied with different constraints. The types of problems efficiently solvable by MapReduce is also a super-set of what is possible with one representation of data stored in Accumulo. This also isn't a recommendation Accumulo Iterators are not a complete replacement for MapReduce (a tool is rarely a 100% "better" replacement for another). In fact, Accumulo Iterators are often used as another level of computation to make an existing MapReduce job more efficient, typically through the AccumuloInputFormat.

We've identified a category of problems - a function is applied to a batch of key-value pairs which reduces the complexity of a question asked over a distributed dataset - in which the features and APIs of Accumulo lend themselves extremely well to solving in an efficient and simple manner. The ability to leverage Accumulo to perform these computations requires foresight into the types of questions that are to be asked of a dataset, the structure of the dataset within Accumulo, and the reduction of a larger problem into discrete functions which are each applied to the dataset by an Accumulo Iterator.

Tuesday May 27, 2014

Getting Started with Apache Accumulo 1.6.0

 This post was moved to the Accumulo project site.

On May 12th, 2014, the Apache Accumulo project happily announced version 1.6.0 to the community. This is a new major release for the project which contains many numerous new features and fixes. For the full list of notable changes, I'd recommend that you check out the release notes that were published alongside the release itself. For this post, I'd like to cover some of the changes that have been made at the installation level that are a change for users who are already familiar with the project.

Download the release

Like always, you can find out releases on the our downloads page at http://accumulo.apache.org/downloads/.  You have the choice of downloading the source and building it yourself, or choosing the binary tarball which already contains pre-built jars for use.

Native Maps

One of the major components of the original BigTable design was an "In-Memory Map" which provided fast insert and read operations. Accumulo implements this using a C++ sorted map with a custom allocator which is invoked by the TabletServer using JNI. Each TabletServer uses its own "native" map. It is highly desirable to use this native map as it comes with a notable performance increase over a Java map (which is the fallback when the Accumulo shared library is not found) in addition to greatly reducing the TabletServer's JVM garbage collector stress when ingesting data.

In previous versions, the binary tarball contained a pre-compiled version of the native library (under lib/native/). Shipping a compiled binary was a convenience but also left much confusion when it didn't work on systems which had different, incompatible versions of GCC toolchains installed than what the binary was built against. As such, we have stopped bundling the pre-built shared library in favor of users building this library on their own, and instead include an accumulo-native.tar.gz file within the lib directory which contains the necessary files to build the library yourself.

To reduce the burden on users, we've also introduced a new script inside of the bin directory:


Invoking this script will automatically unpack, build and install the native map in $ACCUMULO_HOME/lib/native. If you've used older versions of Accumulo, you will also notice that the library name is different in an attempt to better follow standard conventions: libaccumulo.so on Linux and libaccumulo.dylib on Mac OS X.

Example Configurations

Apache Accumulo still bundles a set of example configuration files in conf/examples. Each sub-directory contains the complete set of files to run on a single node with the named memory limitations. For example, the files contained in conf/examples/3GB/native-standalone will run Accumulo on a single node, with native maps (don't forget to build them first!), within a total memory footprint of 3GB. Copy the contents of one of these directories into conf/ and make sure that your relevant installation details (e.g. HADOOP_PREFIX, JAVA_HOME, etc) are properly set in accumulo-env.sh. For example:

  cp $ACCUMULO_HOME/conf/examples/3G/native-standalone/* $ACCUMULO_HOME/conf

Alternatively, a new script, bootstrap_config.sh, was also introduced that can be invoked instead of manually copying files. It will step through a few choices (memory usage, in-memory map type, and Hadoop major version), and then automatically create the configuration files for you.


One notable change in these scripts over previous versions is that they default to using Apache Hadoop 2 packaging details, such as the Hadoop conf directory and jar locations. It is highly recommended by the community that you use Apache Accumulo 1.6.0 with at least Apache Hadoop 2.2.0, most notably, to ensure that you will not lose data in the face of power failure. If you are still running on a Hadoop 1 release (1.2.1), you will need to edit both accumulo-env.sh and accumulo-site.xml. There are comments in each file which instruct you what needs to be changed.

Starting Accumulo

Initializing and starting Accumulo hasn't changed at all! After you have created the configuration files and, if you're using them, built the native maps, run:

  accumulo init

This will prompt you to name your Accumulo instance and set the Accumulo root user's password, then start Accumulo using




Hot Blogs (today's hits)

Tag Cloud