Apache Accumulo

Wednesday December 14, 2016

Accumulo blog moved to project website

This Accumulo blog has moved to Accumulo project website.  All previous blog posts are available in the news archive on the project website. This site will no longer be updated with new blog posts.

Monday April 06, 2015

Replicating data across Accumulo clusters

This post was moved to the Accumulo project site.

Traditionally, Apache Accumulo can only operate within the confines of a single physical location. The primary reason for this restriction is that Accumulo relies heavily on Apache ZooKeeper for distributed lock management and some distributed state. Due to the consistent nature of ZooKeeper and its protocol, it doesn't handle wide-area networks (WAN) well. As such, Accumulo suffers the same problems operating over a WAN.

Data-Center Replication is a new feature, to be included in the upcoming Apache Accumulo 1.7.0, which aims to address the limitation of Accumulo to one local-area network (LAN). The implementation makes a number of decisions with respect to consistency and available which aim to avoid the normal "local" operations of the primary Accumulo instance. That is to say, replication was designed in such a way that enabling the feature on an instance should not affect the performance of that system. However, this comes at a cost of consistency across all replicas. Replication from one instance to others is performed lazily. Succinctly, replication in Accumulo can be described as an eventually-consistent system and not a strongly-consistent system (an Accumulo instance is strongly-consistent).

Because replication is performed lazily, this implies that the data to replicate must be persisted in some shape until the actual replication takes place. This is done using Accumulo's write-ahead log (WAL) files for this purpose. The append-only nature of these files make them obvious candidates for reuse without the need to persist the data in another form for replication. The only necessary changes internally to Accumulo to support this is changing the conditions that the Accumulo garbage collector will delete WAL files. Using WAL files also has the benefit of making HDFS capacity the limiting factor in how "lazy" replication can be. This means that the amount of time replication can be offline or stalled is only limited by the amount of extra HDFS space available which is typically ample.


Terminology

Before getting into details on the feature, it will help to define some basic terminology. Data in Accumulo is replicated from a "primary" Accumulo instance to a "peer" Accumulo instance. Each instance here is a normal Accumulo instance -- each instance is only differentiated by a few new configuration values. Users ingest data into the primary instance, and that data will eventually be replicated to a peer. Each instance requires a unique name to identify itself among all Accumulo instances replicating with each other. Replication from a primary to a peer is defined on a per-table basis -- that is, the configuration states that tableA on the primary will be replicated to tableB on the peer. A primary can have multiple peers defined, e.g. tableA on the primary can will be replicated to tableB on peer1 and tableC on peer2.

Overview

Internally, replication is comprised of a few components to make up the user-facing feature: the management of data ingested on the primary which needs to be replicated, the assignment of replication work within the primary, the execution of that work within the primary to send the data to a peer, and the application of the data to the appropriate table within the peer.

State Management on Primary

The most important state to manage for replication is the tracking the data that was ingested in the primary. This is what ensures that all of the data will be eventually replicated to the necessary peer(s). This state is kept in both the Accumulo metadata table and a new table in the accumulo namespace: replication. Through the use of an Accumulo Combiner on these tables, updates to the replication state are simple updates to the replication table. This makes management of the state machine across all of the nodes within the Accumulo instance extremely simple. For example, TabletServers reporting that data was ingested into a write-ahead log, the Master preparing data to be replicated and the TabletServer reporting that data has been replicated to the peer are all updates to the replication table.

To "seed" the state machine, TabletServers first write to the metadata table at the end of a minor compaction. The Master will read records from the metadata table and add them to the replication table. Each Key-Value pair in the replication table represents a WAL's current state within the replication "state machine" with different column families representing different states. For example, one column family represents the status of a WAL file being replicated to a specific peer while a different column family represents the status of a WAL file being replicated to all necessary peers.

The Master is the primary driver of this state machine, reading the replication table and making the necessary updates repeatedly. This allows the Master to maintain a constant amount of memory with respect to the amount of data that needs to be replicated. The only limitation on persisted state for replication is the size of the replication table itself and the amount of space the WAL files on HDFS consume.

RPC from primary to peer

Like the other remote procedure calls in Accumulo, Apache Thrift is used to make RPCs from the primary Accumulo instance to a peer instance. The purpose of these methods is to send the relevant data from a WAL file to the peer. The Master advertises units of replication work, a WAL file that needs to be replicated to a single peer, and all TabletServers in the primary instance will try to reserve, and then perform, that work. ZooKeeper provides this feature to us with very little code in Accumulo.

Once a TabletServer obtains the work, it will read through the WAL file extracting updates only for the table in this unit of work and send the updates across the wire to a TabletServer in the peer. The TabletServer on the primary asks the active Master in the peer for a TabletServer to communicate with. As such, ignoring some very quick interactions with the Master, RPC for replication is primarily a TabletServer to TabletServer operation which means that replication should scale in performance with respect to the number of available TabletServers on the primary and peer.

The amount of data read from a WAL and sent to the peer per RPC is a configurable parameter defaulting to 50MB. Increasing the amount of data read at a time will have a large impact on the amount of memory consumed by a TabletServer when using replication, so take care when altering this property. It is also important to note that the Thrift server used for the purposes of replication is completely separate from the thrift server used by clients. Replication and the client service servers will not compete against one another for RPC resources.

Replay of data on peer

After a TabletServer on the primary invokes an RPC to a TabletServer on the peer, but before that RPC completes, the TabletServer on the peer must apply the updates it received to the local table. The TabletServer on the peer constructs a BatchWriter and simply applies the updates to the table. In the event of an error in writing the data, the RPC will return in error and it will be retried by a TabletServer on the primary. As such, in these failure conditions, it is possible that data will be applied on the peer multiple times. The use of Accumulo Combiners on tables used being replicated is nearly always a bad idea which will result in inconsistencies between the primary and replica.

Because there are many TabletServers, each with their own BatchWriter, potential throughput for replication on the peer should be equivalent to the ingest throughput observed by clients normally ingesting data uniformly into Accumulo.


Complex replication configurations

So far, we've only touched on configurations which have a single primary and one to many peers; however, the feature allows multiple primary instances in addition to multiple peers. This primary-primary configuration allows data to be replicated in both directions instead of just one. This can be extended even further to allow replication between a trio of instances: primaryA replicates to primaryB which replicates to primaryC which replicates to primaryA. This aspect is supported by including provenance of which systems an update was seen inside of each Mutation. In "cyclic" replication setups, this prevents updates from being replicated indefinitely.

Supporting these cycles allows for different collections of users to access physically separated instances and eventually see the changes made by other groups. For example, consider two instance of Accumulo, one in New York City and another San Francisco. Users on the west coast can use the San Francisco instance while users on the east coast can use the instance in New York. With the two instances configured to replicate to each other, data created by east coast users will eventually be seen by west coast users and vice versa.


Conclusion and future work

The addition of the replication feature fills a large gap in the architecture of Accumulo where the system does not easily operate across WANs. While strong consistency between a primary and a peer is sacrificed, the common case of using replication for disaster recovery favors availability of the system over strong consistency and has the added benefit of not significantly impacting the ingest performance on the primary instance. Replication provides active backup support while enabling Accumulo to automatically share data between instances across large physical distances.

One interesting detail about the implementation of this feature is that the code which performs replication between two Accumulo instances, the AccumuloReplicaSystem, is pluggable via the ReplicaSystem interface. It is reasonable to consider other implementations which can automatically replicate data from Accumulo to other systems for purposes of backup or additional query functionality through other data management systems. For example, Accumulo could be used to automatically replicate data to other indexing systems such as Lucene or even relational databases for advanced query functionality. Certain implementations of the ReplicaSystem could perform special filtering to limit the set of columns replicated to certain systems resulting in a subset of the complete dataset stored in one Accumulo instance without forcing clients to write the data to multiple systems. Each of these considerations are only theoretical at this point; however, the potential for advancement is definitely worth investigating.

Friday March 20, 2015

Balancing Groups of Tablets

This post was moved to the Accumulo project site.

Accumulo has a pluggable tablet balancer that decides where tablets should be placed.  Accumulo's default configuration spreads each tables tablets evenly and randomly across the tablet servers.  Each table can configure a custom balancer that does something different.

For some applications to perform optimally, sub-ranges of a table need to be spread evenly across the cluster.  Over the years I have run into multiple use cases for this situation.  The latest use case was bad performance on the Fluo Stress Test.  This test stores a tree in an Accumulo table and creates multiple tablets for each level in the tree.  In parallel, the test reads data from one level and writes it up to the next level.  Figure 1 below shows an example of tablet servers hosting tablets for different levels of the tree.  Under this scenario if many threads are reading data from level 2 and writing up to level 1, only Tserver 1 and Tserver 2 will be utilized.  So in this scenario 50% of the tablet servers are idle.

Figure 1

Figure 1

ACCUMULO-3439 remedied this situation with the introduction of the GroupBalancer and RegexGroupBalancer which will be available in Accumulo 1.7.0.  These balancers allow a user to arbitrarily group tablets.  Each group defined by the user will be evenly spread across the tablet servers.  Also, the total number of groups on each tablet server is minimized.   As tablets are added or removed from the table, the balancer will migrate tablets to satisfy these goals.  Much of the complexity in the GroupBalancer code comes from trying to minimize the number of migrations needed to reach a good state.

A GroupBalancer could be configured for the table in figure 1 in such a way that it grouped tablets by level.  If this were done, the result may look like Figure 2 below.  With this tablet to tablet server mapping, many threads reading from level 2 and writing data up to level 1 would utilize all of the tablet servers yielding better performance.

Figure 2

Figure 2

README.rgbalancer provides a good example of configuring and using the RegexGroupBalancer.  If a regular expression can not accomplish the needed grouping, then a grouping function can be written in Java.  Extend GroupBalancer to write a grouping function in java.  RegexGroupBalancer provides a good example of how to do this.

When using a GroupBalancer, how Accumulo automatically splits tablets must be kept in mind.  When Accumulo decides to split a tablet, it chooses the shortest possible row prefix from the tablet data that yields a good split point. Therefore its possible that a split point that is shorter than what is expected by a GroupBalancer could be chosen.  The best way to avoid this situation is to pre-split the table such that it precludes this possibility.

The Fluo Stress test is a very abstract use case.  A more concrete use case for the group balancer would be using it to ensure tablets storing geographic data were spread out evenly.  For example consider GeoWave's Accumulo Persistence Model.  Tablets could be balanced such that bins related to different regions are spread out evenly.  For example tablets related to each continent could be assigned a group ensuring data related to each continent is evenly spread across the cluster.  Alternatively, each Tier could spread evenly across the cluster.  

Tuesday September 02, 2014

Generating Keystores for configuring Accumulo with SSL

This post was moved to the Accumulo project site.

One of the major features added in Accumulo 1.6.0 was the ability to configure Accumulo so that the Thrift communications will run over SSL. Apache Thrift is the remote procedure call library that is leverage for both intra-server communication and client communication with Accumulo. Issuing these calls over a secure socket ensures that unwanted actors cannot inspect the traffic sent across the wire. Given the sometimes sensitive nature of data stored in Accumulo and the authentication details for users, ensuring that no prying eyes have access to these communications is critical.

Due to the complex and deployment specific nature of the security model for some system, Accumulo expects users to provide their own certificates, guaranteeing that they are, in fact, secure. However, for those who want to get security who do not already operate within the confines of an established security infrastructure, OpenSSL and the Java keytool command can be used to generate the necessary components to enable wire encryption.

To enable SSL with Accumulo, it is necessary to generate a certificate authority and certificates which are signed by that authority. Typically, each client and server has its own certificate which provides the finest level of control over a secure cluster when the certificates are properly secured.

Generate a Certificate Authority

The certificate authority (CA) is what controls what certificates can be used to authenticate with each other. To create a secure connection with two certificates, each certificate must be signed by a certificate authority in the "truststore" (A Java KeyStore which contains at least one Certificate Authority's public key). When creating your own certificate authority, a single CA is typically sufficient (and would result in a single public key in the truststore). Alternatively, a third party can also act as a certificate authority (to add an additional layer of security); however, these are typically not a free service.

The below is an example of creating a certificate authority and adding its public key to a Java KeyStore to provide to Accumulo.

# Create a private key
openssl genrsa -des3 -out root.key 4096

# Create a certificate request using the private key
openssl req -x509 -new -key root.key -days 365 -out root.pem

# Generate a Base64-encoded version of the PEM just created
openssl x509 -outform der -in root.pem -out root.der

# Import the key into a Java KeyStore
keytool -import -alias root-key -keystore truststore.jks -file root.der

# Remove the DER formatted key file (as we don't need it anymore)
rm root.der

Remember to protect root.key and never distribute it as the private key is the basis for your circle of trust. The keytool command will prompt you about whether or not the certificate should be trusted: enter "yes". The truststore.jks file, a "truststore", is meant to be shared with all parties communicating with one another. The password provided to the truststore verifies that the contents of the truststore have not been tampered with.

Generate a certificate/keystore per host

For each host in the system, it's desirable to generate a certificate. Typically, this corresponds to a certificate per host. Additionally, each client connecting to the Accumulo instance running with SSL should be issued their own certificate. By issuing individual certificates to each entity, it gives proper control to revoke/reissue certificates to clients as necessary, without widespread interruption.

# Create the private key for our server
openssl genrsa -out server.key 4096

# Generate a certificate signing request (CSR) with our private key
openssl req -new -key server.key -out server.csr

# Use the CSR and the CA to create a certificate for the server (a reply to the CSR)
openssl x509 -req -in server.csr -CA root.pem -CAkey root.key -CAcreateserial -out server.crt -days 365

# Use the certificate and the private key for our server to create PKCS12 file
openssl pkcs12 -export -in server.crt -inkey server.key -certfile server.crt -name 'server-key' -out server.p12

# Create a Java KeyStore for the server using the PKCS12 file (private key)
keytool -importkeystore -srckeystore server.p12 -srcstoretype pkcs12 -destkeystore server.jks -deststoretype JKS

# Remove the PKCS12 file as we don't need it
rm server.p12

# Import the CA-signed certificate to the keystore
keytool -import -trustcacerts -alias server-crt -file server.crt -keystore server.jks

These commands create a private key for the server, generated a certificate signing request created from that private key, used the certificate authority to generate the certificate using the signing request and then created a Java KeyStore with the certificate and the private key for our server. This, paired with the truststore, provide what is needed to configure Accumulo servers to run over SSL. Both the private key (server.key), the certificate signed by the CA (server.pem), and the keystore (server.jks) should be restricted to only be accessed by the user running Accumulo on the host it was generated for. Use chown and chmod to protect the files and do not distribute them over insecure networks.

Configure Accumulo Servers

Now that the Java KeyStores have been created with the necessary information, the Accumulo configuration must be updated so that Accumulo creates the Thrift server over SSL instead of a normal socket. In accumulo-site.xml, configure the following:

<property>
  <name>rpc.javax.net.ssl.keyStore</name>
  <value>/path/to/server.jks</value>
</property>
<property>
  <name>rpc.javax.net.ssl.keyStorePassword</name>
  <value>server_password</value>
</property>
<property>
  <name>rpc.javax.net.ssl.trustStore</name>
  <value>/path/to/truststore.jks</value>
</property>
<property>
  <name>rpc.javax.net.ssl.trustStorePassword</name>
  <value>truststore_password</value>
</property>
<property>
  <name>instance.rpc.ssl.enabled</name>
  <value>true</value>
</property>

The keystore and truststore paths are both absolute paths on the local filesystem (not HDFS). Remember that the server keystore should only be readable by the user running Accumulo and, if you place plaintext passwords in accumulo-site.xml, make sure that accumulo-site.xml is also not globally readable. To keep these passwords out of accumulo-site.xml, consider configuring your system with the new Hadoop CredentialProvider class, see ACCUMULO-2464 for more information which will be available in Accumulo-1.6.1.

Also, be aware that if unique passwords are used for each server when generating the certificate, this will result in different accumulo-site.xml files for each host. Unique configuration files per host will add much complexity to the configuration management of your instance. The use of a CredentialProvider, a feature from Hadoop which allows for acquisitions of passwords from alternate systems) can be used to help alleviate the unique accumulo-site.xml files on each host. A Java KeyStore can be created using the CredentialProvider tools which removes the necessity of passwords to be stored in accumulo-site.xml and can instead point to the CredentialProvider URI which is consistent across hosts.

Configure Accumulo Clients

To configure Accumulo clients, use $HOME/.accumulo/config. This is a simple Java properties file: each line is a configuration, key and value can be separated by a space, and lines beginning with a # symbol are ignored. For example, if we generated a certificate and placed it in a keystore (as described above), we would generate the following file for the Accumulo client.

instance.rpc.ssl.enabled true
rpc.javax.net.ssl.keyStore  /path/to/client-keystore.jks
rpc.javax.net.ssl.keyStorePassword  client-password
rpc.javax.net.ssl.trustStore  /path/to/truststore.jks
rpc.javax.net.ssl.trustStorePassword  truststore-password
When creating a ZooKeeperInstance, the implementation will automatically look for this file and set up a connection with the methods defined in this configuration file. The ClientConfiguration class also contains methods that can be used instead of a configuration file on the filesystem. Again, the paths to the keystore and truststore are on the local filesystem, not HDFS.

Wednesday July 09, 2014

Functional reads over Accumulo

 This post was moved to the Accumulo project site.

Table structure is a common area of discussion between all types of Accumulo users. In the relational database realm, there was often a straightforward way that most users could agree upon that would be ideal to store and query some dataset. Data was identified by its schema, some fixed set of columns where each value within that column had some given characteristic. One of the big pushes behind the "NoSQL" movement was a growing pain in representing evolving data within a static schema. Applications like Accumulo removed that notion for a more flexible layout where the columns vary per row, but this flexibility often sparks debates about how data is "best" stored that often ends without a clear-cut winner.

In general, I've found that, with new users to Accumulo, it's difficult to move beyond the basic concept of GETs and PUTs of some value for a key. Rightfully so, it's analogous to a spreadsheet: get or update the cell in the given row and column. However, there's a big difference in that the spreadsheet is running on your local desktop, instead of running across many machines. In the same way, while a local spreadsheet application has some similar functionality to Accumulo, it doesn't really make sense to think about using Accumulo as you would a spreadsheet application. Personally, I've developed a functional-programming-inspired model which I tend to follow when implementing applications against Accumulo. The model encourages simple, efficient and easily testable code, mainly as a product of modeling the client interactions against Accumulo's APIs.

Read APIs

Accumulo has two main classes for reading data from an Accumulo table: the Scanner and BatchScanner. Both accept Range(s) which limit the data read from the table based on a start and stop Key. Only data from the table that falls within those start and stop keys will be returned to the client. The reason that we have two "types" of classes to read data is that a Scanner will return data from a single Range in sorted order whereas the BatchScanner accepts multiple Ranges and returns the data unordered. In terms of Java language specifics, both the Scanner and BatchScanner are also Iterables, which return a Java Iterator that can be easily passed to some other function, transformation or for-loop.

Having both a sorted, synchronous stream and an unsorted stream of Key-Value pairs from many servers in parallel allows for a variety of algorithms to be implemented against Accumulo. Both constructs allow for the transparency in where the data came from and encourage light-weight processing of those results on the client.

Accumulo Iterators

One notable feature of Accumulo is the SortedKeyValueIterator interface, or, more succinctly, Accumulo Iterators. Typically, these iterators run inside of the TabletServer process and perform much of the heavy lifting. Iterators are used to implement a breadth of internal features such as merged file reads, visibility label filtering, versioning, and more. However, users also have the ability to leverage this server-side processing mechanism to deploy their own custom code.

One interesting detail about these iterators is that they each have an implicit source which provides them data to operate on. This source is also a SortedKeyValueIterator which means that the "local" SortedKeyValueIterator can use its own API on its data source. With this implicit hierarchy, Iterators act in concert with each other in some fixed order - they are stackable. The order in which Iterators are constructed, controlled by an Iterator's priority, determines the order of the stack. An Iterator uses its "source" Iterator to read data, performs some operation, and then passes it on (the next element could be a client or another Iterator). The design behind iterators deserves its own blog post; however, the concept to see here is that iterators are best designed as stateless as possible (transformations, filters, or aggregations that always net the same results given the same input).

Functional Influences

In practice, these two concepts mesh very well with each other. Data read from a table can be thought of as a "stream" which came from some number of operations on the server. For a Scanner, this stream of data is backed by one tablet at a time to preserve sorted-order of the table. In the case of the BatchScanner, this is happening in parallel across many tablets from many tabletservers, with the client receiving data from many distinct hosts at one time. Likewise, the Scanner and BatchScanner APIs also encourage stateless processing of this data by presenting the data as a Java Iterator. Exposing explicit batches of Key-Value pairs would encourage blocking processing of each batch would be counter-intuitive to what the server-side processing model is. It creates a more seamless implementation paradigm on both the client and the server.

When we take a step back from Object-Oriented Java and start to think about applications in a Functional mindset, it becomes clear how these APIs encourage functional-esque code. We are less concerned about mutability and encapsulation, and more concerned about stateless operations over some immutable data. Modeling our client code like this helps encourage parallelism as application in some multi-threaded environment is much simpler.

Practical Application

I started out talking about schemas and table layouts which might seem a bit unrelated to this discussion on the functional influences in the Accumulo API. Any decisions made on a table structure must take query requirements with respect to the underlying data into account. As a practical application of what might otherwise seem like pontification, let's consider a hypothetical system that processes clickstream data using Accumulo.

Clickstream data refers to logging users who visit a website, typically for the purpose of understanding usage patterns. If a website is thought of as a directed graph, where an anchor on one page which links to another page is an edge in that graph, a user's actions on that website can be thought of as a "walk" over that graph. In managing a website, it's typically very useful to understand usage patterns of your site: what page is most common? which links are most commonly clicked? what changes to a page make users act differently?

Now, let's abstractly consider that we store this clickstream data in Accumulo. Let's not go into specifics, but say we retain the typical row-with-columns idea: each row represents some user visiting a page on your website using a globally unique identifier. Each column would contain some information about that visit: the user who is visiting the website, the page they're visiting, the page they came from, the web-browser user-agent string, etc. Say you're the owner of this website, and you recently made a modification to you website which added a prominent link to some new content on the front-page. You want to know how many people are visiting your new content with this new link you've added, so we want to answer the question "how many times was our new link on the index page clicked by any user?". For the purposes of this example, let's assume we don't have any index tables which might help us answer this query more efficiently.

Let's think about this query in terms of stateless operations and performing as much of a reduction in data returned to the client as possible. We have a few basic steps:

  1. Filter: Ignore all clickstream events that are not for the index page.
  2. Filter: Ignore all clickstream events that are not for the given anchor.
  3. Aggregation: Only a sum of the occurrences is needed, not the full record.

The beauty in using Accumulo is that all three of these operations can be performed inside of the tablet server process without returning any unnecessary data to the client. Unwanted records can be easily skipped, while each record that matches our criteria is reduced to a single "+1" counter. Instead of returning each full record to the client, the tablet server can combine these counts together and simply return a sum to the client for the Range of data that was read.

The other perk of thinking about the larger problem in discrete steps is that it is easily parallelized. Assuming we have many tablet servers hosting the tablets which make up our clickstream table, we can easily run this query in parallel across them all using the BatchScanner. Additionally, because we've reduced our initial problem from a large collection of records to a stream of partial sums, we've drastically reduced the amount of work that must be performed on our (single) client. Each key-value pair returned by a server is a partial-sum which can be combined together in a very lightweight operation (in both memory and computation) as the result is made available. The client then has the simple task of performing one summation. We took a hard problem and performed an extreme amount of heavy lifting server-side while performing next-to-no computation in our client which is great for web applications or thin clients.

Tiered Computation

This type of algorithm, a multi-stage computation, becomes very common when working with Accumulo because of the ability to push large amounts of computation to each tablet server. Tablet servers can compute aggregations, filters and/or transformations very "close" to the actual data, return some reduced view of the data being read. Even when some function is very efficient, computing it over large data sets can still be extremely time-consuming. Eliminating unwanted data as early as possible can often outweigh even the most optimal algorithms due to the orders of magnitude difference in the speed of CPU over disk and network.

It's important to remember that this idea isn't new, though. The above model is actually very reminiscent of the MapReduce paradigm, just applied with different constraints. The types of problems efficiently solvable by MapReduce is also a super-set of what is possible with one representation of data stored in Accumulo. This also isn't a recommendation Accumulo Iterators are not a complete replacement for MapReduce (a tool is rarely a 100% "better" replacement for another). In fact, Accumulo Iterators are often used as another level of computation to make an existing MapReduce job more efficient, typically through the AccumuloInputFormat.

We've identified a category of problems - a function is applied to a batch of key-value pairs which reduces the complexity of a question asked over a distributed dataset - in which the features and APIs of Accumulo lend themselves extremely well to solving in an efficient and simple manner. The ability to leverage Accumulo to perform these computations requires foresight into the types of questions that are to be asked of a dataset, the structure of the dataset within Accumulo, and the reduction of a larger problem into discrete functions which are each applied to the dataset by an Accumulo Iterator.

Wednesday June 25, 2014

Scaling Accumulo With Multi-Volume Support

 This post was moved to the Accumulo project site.

MapReduce is a commonly used approach to querying or analyzing large amounts of data. Typically MapReduce jobs are created using using some set of files in HDFS to produce a result. When new files come in, they get added to the set, and the job gets run again. A common Accumulo approach to this scenario is to load all of the data into a single instance of Accumulo.

A single instance of Accumulo can scale quite largely[1,2] to accommodate high levels of ingest and query. The manner in which ingest is performed typically depends on latency requirements. When the desired latency is small, inserts are performed directly into Accumulo. When the desired latency is allowed to be large, then a bulk style of ingest[3] can be used. There are other factors to consider as well, but they are outside the scope of this article.

On large clusters using the bulk style of ingest input files are typically batched into MapReduce jobs to create a set of output RFiles for import into Accumulo. The number of files per job is typically determined by the required latency and the number of MapReduce tasks that the cluster can complete in the given time-frame. The resulting RFiles, when imported into Accumulo, are added to the list of files for their associated tablets. Depending on the configuration this will cause Accumulo to major compact these tablets. If the configuration is tweaked to allow more files per tablet, to reduce the major compactions, then more files need to be opened at query time when performing scans on the tablet. Note that no single node is burdened by the file management; but, the number of file operations in aggregate is very large. If each server has several hundred tablets, and there are a thousand tablet servers, and each tablet compacts some files every few imports, we easily have 50,000 file operations (create, allocate a block, rename and delete) every ingest cycle.

In addition to the NameNode operations caused by bulk ingest, other Accumulo processes (e.g. master, gc) require interaction with the NameNode. Single processes, like the garbage collector, can be starved of responses from the NameNode as the NameNode is limited on the number of concurrent operations. It is not unusual for an operator's request for “hadoop fs -ls /accumulo” to take a minute before returning results during the peak file-management periods. In particular, the file garbage collector can fall behind, not finishing a cycle of unreferenced file removal before the next ingest cycle creates a new batch of files to be deleted.

The Hadoop community addressed the NameNode bottleneck issue with HDFS federation[4] which allows a datanode to serve up blocks for multiple namenodes. Additionally, ViewFS allows clients to communicate with multiple namenodes through the use of a client-side mount table. This functionality was insufficient for Accumulo in the 1.6.0 release as ViewFS works at a directory level; as an example, /dirA is mapped to one NameNode and /dirB is mapped to another, and Accumulo uses a single HDFS directory for its storage.

Multi-Volume support (MVS), included in 1.6.0, includes the changes that allow Accumulo to work across multiple HDFS clusters (called volumes in Accumulo) while continuing to use a single HDFS directory. A new property, instance.volumes, can be configured with multiple HDFS nameservices and Accumulo will use them all to balance out NameNode operations. The nameservices configured in instance.volumes may optionally use the High Availability NameNode feature as it is transparent to Accumulo. With MVS you have two options to horizontally scale your Accumulo instance. You can use an HDFS cluster with Federation and multiple NameNodes or you can use separate HDFS clusters.

By default Accumulo will perform round-robin file allocation for each tablet, spreading the files across the different volumes. The file balancer is pluggable, allowing for custom implementations. For example, if you don't use Federation and use multiple HDFS clusters, you may want to allocate all files for a particular table to one volume.

Comments in the JIRA[5] regarding backups could lead to follow-on work. With the inclusion of snapshots in HDFS, you could easily envision an application that quiesces the database or some set of tables, flushes their entries from memory, and snapshots their directories. These snapshots could then be copied to another HDFS instance either for an on-disk backup, or bulk-imported into another instance of Accumulo for testing or some other use.

The example configuration below shows how to set up Accumulo with HA NameNodes and Federation, as it is likely the most complex. We had to reference several web sites, one of the HDFS mailing lists, and the source code to find all of the configuration parameters that were needed. The configuration below includes two sets of HA namenodes, each set servicing an HDFS nameservice in a single HDFS cluster. In the example below, nameserviceA is serviced by name nodes 1 and 2, and nameserviceB is serviced by name nodes 3 and 4.

[1] http://ieeexplore.ieee.org/zpl/login.jsp?arnumber=6597155

[2] http://www.pdl.cmu.edu/SDI/2013/slides/big_graph_nsa_rd_2013_56002v1.pdf

[3] http://accumulo.apache.org/1.6/examples/bulkIngest.html

[4] https://issues.apache.org/jira/browse/HDFS-1052

[5] https://issues.apache.org/jira/browse/ACCUMULO-118


- By Dave Marion and Eric Newton

core-site.xml:

  <property>
    <name>fs.defaultFS</name>
    <value>viewfs:///</value>
  </property>
  <property>
    <name>fs.viewfs.mounttable.default.link./nameserviceA</name>
    <value>hdfs://nameserviceA</value>
  </property>
  <property>
    <name>fs.viewfs.mounttable.default.link./nameserviceB</name>
      <value>hdfs://nameserviceB</value>
  </property>
  <property>
    <name>fs.viewfs.mounttable.default.link./nameserviceA/accumulo/instance_id</name>
      <value>hdfs://nameserviceA/accumulo/instance_id</value>
    <description>Workaround for ACCUMULO-2719</description>
  </property>
  <property>
    <name>dfs.ha.fencing.methods</name>
    <value>sshfence(hdfs:22)      
           shell(/bin/true)</value>
  </property>
  <property>   
    <name>dfs.ha.fencing.ssh.private-key-files</name>
      <value><PRIVATE_KEY_LOCATION></value>
  </property>
  <property>
    <name>dfs.ha.fencing.ssh.connect-timeout</name>
    <value>30000</value>
  </property>
  <property>
    <name>ha.zookeeper.quorum</name>   
    <value>zkHost1:2181,zkHost2:2181,zkHost3:2181</value>
  </property>

hdfs-site.xml:

  <property>
    <name>dfs.nameservices</name>
    <value>nameserviceA,nameserviceB</value>
  </property>
  <property>
    <name>dfs.ha.namenodes.nameserviceA</name>
    <value>nn1,nn2</value>
  </property>
  <property> 
    <name>dfs.ha.namenodes.nameserviceB</name>
    <value>nn3,nn4</value>
  </property>
  <property>
    <name>dfs.namenode.rpc-address.nameserviceA.nn1</name>
    <value>host1:8020</value>
  </property>
  <property>
    <name>dfs.namenode.rpc-address.nameserviceA.nn2</name>
    <value>host2:8020</value>
  </property>
  <property>
    <name>dfs.namenode.http-address.nameserviceA.nn1</name>
    <value>host1:50070</value>
  </property>
  <property>
    <name>dfs.namenode.http-address.nameserviceA.nn2</name>
    <value>host2:50070</value>
  </property>
  <property>
    <name>dfs.namenode.rpc-address.nameserviceB.nn3</name>
    <value>host3:8020</value>
  </property>
  <property>
    <name>dfs.namenode.rpc-address.nameserviceB.nn4</name>
    <value>host4:8020</value>
  </property>
  <property> 
    <name>dfs.namenode.http-address.nameserviceB.nn3</name>
    <value>host3:50070</value>
  </property>
  <property>
    <name>dfs.namenode.http-address.nameserviceB.nn4</name>
    <value>host4:50070</value>
  </property>
  <property> 
    <name>dfs.namenode.shared.edits.dir.nameserviceA.nn1</name>
    <value>qjournal://jHost1:8485;jHost2:8485;jHost3:8485/nameserviceA</value>
  </property>
  <property>
    <name>dfs.namenode.shared.edits.dir.nameserviceA.nn2</name>   
    <value>qjournal://jHost1:8485;jHost2:8485;jHost3:8485/nameserviceA</value>
  </property>
  <property>
    <name>dfs.namenode.shared.edits.dir.nameserviceB.nn3</name>
    <value>qjournal://jHost1:8485;jHost2:8485;jHost3:8485/nameserviceB</value>
  </property>
  <property>
    <name>dfs.namenode.shared.edits.dir.nameserviceB.nn4</name>
    <value>qjournal://jHost1:8485;jHost2:8485;jHost3:8485/nameserviceB</value>
  </property>
  <property>
    <name>dfs.client.failover.proxy.provider.nameserviceA</name>
    <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
  </property>
  <property> 
    <name>dfs.client.failover.proxy.provider.nameserviceB</name>
    <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
  </property>
  <property>
    <name>dfs.ha.automatic-failover.enabled.nameserviceA</name>
    <value>true</value>
  </property>
  <property>
    <name>dfs.ha.automatic-failover.enabled.nameserviceB</name>
    <value>true</value>
  </property>

 
  

accumulo-site.xml:

  <property>
    <name>instance.volumes</name>   
    <value>hdfs://nameserviceA/accumulo,hdfs://nameserviceB/accumulo</value>
  </property>

Tuesday May 27, 2014

Getting Started with Apache Accumulo 1.6.0

 This post was moved to the Accumulo project site.

On May 12th, 2014, the Apache Accumulo project happily announced version 1.6.0 to the community. This is a new major release for the project which contains many numerous new features and fixes. For the full list of notable changes, I'd recommend that you check out the release notes that were published alongside the release itself. For this post, I'd like to cover some of the changes that have been made at the installation level that are a change for users who are already familiar with the project.

Download the release

Like always, you can find out releases on the our downloads page at http://accumulo.apache.org/downloads/.  You have the choice of downloading the source and building it yourself, or choosing the binary tarball which already contains pre-built jars for use.

Native Maps

One of the major components of the original BigTable design was an "In-Memory Map" which provided fast insert and read operations. Accumulo implements this using a C++ sorted map with a custom allocator which is invoked by the TabletServer using JNI. Each TabletServer uses its own "native" map. It is highly desirable to use this native map as it comes with a notable performance increase over a Java map (which is the fallback when the Accumulo shared library is not found) in addition to greatly reducing the TabletServer's JVM garbage collector stress when ingesting data.

In previous versions, the binary tarball contained a pre-compiled version of the native library (under lib/native/). Shipping a compiled binary was a convenience but also left much confusion when it didn't work on systems which had different, incompatible versions of GCC toolchains installed than what the binary was built against. As such, we have stopped bundling the pre-built shared library in favor of users building this library on their own, and instead include an accumulo-native.tar.gz file within the lib directory which contains the necessary files to build the library yourself.

To reduce the burden on users, we've also introduced a new script inside of the bin directory:

  build_native_map.sh

Invoking this script will automatically unpack, build and install the native map in $ACCUMULO_HOME/lib/native. If you've used older versions of Accumulo, you will also notice that the library name is different in an attempt to better follow standard conventions: libaccumulo.so on Linux and libaccumulo.dylib on Mac OS X.

Example Configurations

Apache Accumulo still bundles a set of example configuration files in conf/examples. Each sub-directory contains the complete set of files to run on a single node with the named memory limitations. For example, the files contained in conf/examples/3GB/native-standalone will run Accumulo on a single node, with native maps (don't forget to build them first!), within a total memory footprint of 3GB. Copy the contents of one of these directories into conf/ and make sure that your relevant installation details (e.g. HADOOP_PREFIX, JAVA_HOME, etc) are properly set in accumulo-env.sh. For example:

  cp $ACCUMULO_HOME/conf/examples/3G/native-standalone/* $ACCUMULO_HOME/conf

Alternatively, a new script, bootstrap_config.sh, was also introduced that can be invoked instead of manually copying files. It will step through a few choices (memory usage, in-memory map type, and Hadoop major version), and then automatically create the configuration files for you.

  $ACCUMULO_HOME/bin/bootstrap_config.sh

One notable change in these scripts over previous versions is that they default to using Apache Hadoop 2 packaging details, such as the Hadoop conf directory and jar locations. It is highly recommended by the community that you use Apache Accumulo 1.6.0 with at least Apache Hadoop 2.2.0, most notably, to ensure that you will not lose data in the face of power failure. If you are still running on a Hadoop 1 release (1.2.1), you will need to edit both accumulo-env.sh and accumulo-site.xml. There are comments in each file which instruct you what needs to be changed.

Starting Accumulo

Initializing and starting Accumulo hasn't changed at all! After you have created the configuration files and, if you're using them, built the native maps, run:

  accumulo init

This will prompt you to name your Accumulo instance and set the Accumulo root user's password, then start Accumulo using

  $ACCUMULO_HOME/bin/start-all.sh

Saturday May 03, 2014

The Accumulo ClassLoader

 This post was moved to the Accumulo project site.

The Accumulo classloader is an integral part of the software. The classloader is created before each of the services (master, tserver, gc, etc) are started and it is set as the classloader for that service. The classloader was rewritten in version 1.5 and this article will explain the new behavior.

First, some history

The classloader in version 1.4 used a simple hierarchy of two classloaders that would load classes from locations specified by two properties. The locations specified by the "general.classpaths" property would be used to create a parent classloader and locations specified by the "general.dynamic.classpaths" property were used to create a child classloader. The child classloader would monitor the specified locations for changes and when a change occurred the child classloader would be replaced with a new instance. Classes that referenced the orphaned child classloader would continue to work and the classloader would be garbage collected when no longer referenced. The diagram below shows the relationship between the classloaders in Accumulo 1.4.


The only place where the dynamic classloader would come into play is for user iterators and their dependencies. The general advice for using this classloader would be to put the jars containing your iterators in the dynamic location. Everything else that does not change very often or would require a restart should be put into the non-dynamic location.

There are a couple of things to note about the classloader in 1.4. First, if you modified the dynamic locations too often, you would run out of perm-gen space. This is likely due to unreferenced classes not being unloaded from the JVM. This is captured in ACCUMULO-599. Secondly, when you modified files in dynamic locations within the same cycle, it would on occasion miss the second change.

Out with the old, in with the new

The Accumulo classloader was rewritten in version 1.5. It maintains the same dynamic capability and includes a couple of new features. The classloader uses Commons VFS so that it can load jars and classes from a variety of sources, including HDFS. Being able to load jars from one location (hdfs, http, etc) will make it easier to deploy changes to your cluster. Additionally, we introduced the notion of classloader contexts into Accumulo. This is not a new concept for anyone that has used an application server, but the implementation is a little different for Accumulo.

The hierarchy set up by the new classloader uses the same property names as the old classloader. In the most basic configuration the locations specified by "general.classpaths" are used to create the root of the application classloader hierarchy. This classloader is a URLClassLoader and it does not support dynamic reloading. If you only specify this property, then you are loading all of your jars from the local file system and they will not be monitored for changes. We will call this top level application classloader the SYSTEM classloader. Next, a classloader is created that supports VFS sources and reloading. The parent of this classloader is the SYSTEM classloader and we will call this the VFS classloader. If the "general.vfs.classpaths" property is set, the VFS classloader will use this location. If the property is not set, it will use the value of "general.dynamic.classpaths" with a default value of $ACCUMULO_HOME/lib/ext to support backwards compatibility. The diagram below shows the relationship between the classloaders in Accumulo 1.5.


Running Accumulo From HDFS

If you have defined "general.vfs.classpaths" in your Accumulo configuration, then you can use the bootstrap_hdfs.sh script in the bin directory to seed HDFS with the Accumulo jars. A couple of jars will remain on the local file system for starting services. Now when you start up Accumulo the master, gc, tracer, and all of the tablet servers will get their jars and classes from HDFS. The bootstrap_hdfs.sh script sets the replication on the directory, but you may want to set it higher after bootstrapping. An example configuration setting would be:

  <property>
    <name>general.vfs.classpaths</name>
    <value>hdfs://localhost:8020/accumulo/system-classpath</value>
    <description>Configuration for a system level vfs classloader. Accumulo jars can be configured here and loaded out of HDFS.</description>
  </property>

About Contexts

You can also define classloader contexts in your accumulo-site.xml file. A context is defined by a user supplied name and it references locations like the other classloader properties. When a context is defined in the configuration, it can then be applied to one or more tables. When a context is applied to a table, then a classloader is created for that context. If multiple tables use the same context, then they share the context classloader. The context classloader is a child to the VFS classloader created above.

The goal here is to enable multiple tenants to share the same Accumulo instance. For example, we may have a context called 'app1' which references the jars for application A. We may also have another context called app2 which references the jars for application B. By default the context classloader delegates to the VFS classloader. This behavior may be overridden as seen in the app2 example below. The context classloader also supports reloading like the VFS classloader.

  <property>
    <name>general.vfs.context.classpath.app1</name>
    <value>hdfs://localhost:8020/applicationA/classpath/.*.jar,file:///opt/applicationA/lib/.*.jar</value>
    <description>Application A classpath, loads jars from HDFS and local file system</description>
  </property>

  <property>
    <name>general.vfs.context.classpath.app2.delegation=post</name>
    <value>hdfs://localhost:8020/applicationB/classpath/.*.jar,http://my-webserver/applicationB/.*.jar</value>
    <description>Application B classpath, loads jars from HDFS and HTTP, does not delegate to parent first</description>
  </property>

Context classloaders do not have to be defined in the accumulo-site.xml file. The "general.vfs.context.classpath.{context}" property can be defined on the table either programatically or manually in the shell. Then set the "table.classpath.context" property on your table.

Known Issues

Remember the two issues I mentioned above? Well, they are still a problem.

  • ACCUMULO-1507 is tracking VFS-487 for frequent modifications to files.
  • If you start running out of perm-gen space, take a look at ACCUMULO-599 and try applying the JVM settings for class unloading.
  • Additionally, there is an issue with the bootstrap_hdfs.sh script detailed in ACCUMULO-2761. There is a workaround listed in the issue.

Please email the dev list for comments and questions.

By Dave Marion

Calendar

Search

Hot Blogs (today's hits)

Tag Cloud

Categories

Feeds

Links

Navigation