Avoiding The Mess In the Hadoop Cluster (Part 1)
Author: Adam Kawa (Orignally appeared in http://getindata.com/blog/post/avoiding-the-mess-from-the-hadoop-cluster-part-1/)
In the first part we describe possible open-source solutions for data cataloguing, data discovery and process scheduling such as Apache Hive, HCatalog and Apache Falcon.
Mess in the Hadoop cluster
We start with a true story that happened around year ago. That day I wanted to use Hive to analyse terabytes of data available in the 3-year old Hadoop cluster to find answer to my simple, but important question. My query was straightforward – just joining three production tables and counting the number of occurrences of some event.
I estimated that roughly 10 minutes are needed to implement this simple query. Unfortunately, the problems arose very quickly and slowed me down badly. First of all, I wasn’t able to quickly discover what datasets I should process and where they are located. When looking at HDFS and Hive, I saw many directories and tables with the names that suggested me that it’s the data that I needed, but I wasn’t 100% sure of that. Some of them looked like junk, like samples, like duplicates and like production datasets. To learn what a given dataset is about I had to send a group email to all analysts because it wasn’t stated anywhere who is the exact and true owner of a given dataset. There was no good documentation available, so that I simply guessed the meaning of the fields and their relationships with the fields from related tables. After several hours, a few emails and confirmations from +2 analysts, I found the data that I needed!
I implemented my HiveQL query very quickly, but it was running very slowly. It eventually finished successfully and returned the numbers that I wanted to see. I thought that it would be nice to schedule this query every week to check later if the numbers are still great. In this case, the frequent scheduling of that query would require me to implement a small piece of Python code in the framework called Luigi and modify the Unix’s crontab files that trigger the computation. It’s not that difficult, but slightly time-consuming, so that I gave up again and decided that I could simply run this query manually whenever it’s needed.
Wild Wild West
This made me sad. I had all data in Hadoop needed to solve a (simple) business problem, but I wasted not only my time by searching for and understanding the input data, but also somebody else’s time by asking where the input data is.
The core reason for the above is the lack of good practices that are introduced early enough and continuously enforced. It’s surprising, but proper data management and easy scheduling of processes are serious challenges that all data-driven companies face, but many of them under-prioritise. Although turning a blind eye to these aspects might not cause troubles when your data is small, it definitely becomes a nightmare when the number of your datasets, processes and data analyst is large. At scale, this kind of big data technical debt becomes more expensive!
Knowing some of the possible problems caused by bad practices, my colleagues and I started thinking how to avoid them and what open-source tools could possibly address them. Thankfully, there is a number of useful (but still less adopted) tools from the Hadoop ecosystem that help you to discover datasets quicker, schedule processes easier, smoothly migrate from one file format to another (without even touching your parsing code) and many others. We will cover them in this blog series.
First of all, your should give identity to your datasets, so that analysts can easily find the data with no time-consuming investigation, understand the meaning of their fields and be sure that what a given dataset is about.
Perhaps you might want to hear about an innovative tool that automatically makes HDFS self-documented? Well, the tool that we propose isn’t revolutionary at all. Our simple idea is to add each production dataset to Hive. By creating Hive table on top of your data in HDFS, you can supply a lot of information about it: its name, fields, types and comments, the location, the data format, various properties in form of the key-value pairs and meaningful description of the dataset.
A nice side effect of adding your production datasets to Hive is that everyone can process them using SQL-like queries in Hive, Presto, Impala, Spark SQL and take advantage of BI tools that often integrate with Hadoop through Hive.
Reading data from Hive tables
When your datasets are nicely abstracted by Hive tables, you should benefit from this abstraction as often as possible. Therefore, the Hive Metastore becomes increasingly important and should be always up and running. Apart from that, all frameworks that wish to process your data should integrate with Hive to learn where a given dataset is located, what its file format is and so on. Obviously, HiveQL queries integrate with Hive perfectly, but in case of other tools like Spark, Scalding, Pig, MapReduce or Sqoop, you need some kind of adapter that knows how to talk to Hive.
In case of Pig, Scalding and MapReduce (and some others), HCatalog becomes this adapter. Thanks to HCatalog, you don’t need to hardcode or parametrize the path or format of the dataset – just specify the name of the dataset which is the same as its Hive table.
Currently, Spark Core doesn’t integrate with HCatalog well. This is not a big reason to worry about, though, because you can use Spark SQL in the Hive context instead. Spark SQL allows you to fetch the interesting data from the Hive table using an SQL-like query. Later, you can seamlessly process this data using the Spark Core API in Scala, Java or Python.
Web UI to your data
Thanks to Hive, you have a central and nicely documented repository of our datasets. Thanks to HCatalog (or Spark SQL), your Hive tables can be accessed by the most popular Big Data frameworks. Unfortunately, neither Hive nor HCatalog offer out-of-the box web UI to search, discover and learn about your datasets.
There is a small temptation to implement such a web UI by yourself because it doesn’t seem to be that hard. Alternative approach is to use some ready-to-use solution and one of them is Apache Falcon.
Falcon allows you to define datasets and submit them to the Falcon server, so it can “manage” them. In Falcon’s nomenclature, datasets are called feeds. For each feed, you can specify which Hive table (or HDFS dataset) it corresponds to. You can tag dataset and later use these tags when searching. You can define who is the owner of of the dataset, write its description, specify the group that a given dataset belongs to etc. This information is somewhat redundant to what you specify in Hive tables, but there is the idea that Falcon could scan the Hive Metastore and automatically create feeds for each Hive table and inherit its properties. As we also see later, Falcon lets you supply additional properties of your datasets that mean nothing to Hive, but are used by Falcon for popular data management tasks (e.g. retention) which we cover later.
In Falcon, a dataset can be described by writing relatively short XML file, or filling a simple form in the web UI. This new (and improved) web UI is currently in the code-review phase under the FALCON-790 ticket, but it should be committed to the trunk before the end of 2015Q1 (hopefully).
Once feeds are added to Falcon, you can see and search them by name, tag and status using the web UI and CLI. Currently, the web UI isn’t mind-blowing, but probably there isn’t anything more suitable in open-source projects today. Naturally, it can be extended by the community, so that additional search criteria are possible e.g. field name, field comment, partition field, file format, ownership, compression codec, directory name, total size, the last modification or access time, the number of applications that process the feed.
Automatic data retention
Apart from the nice web UI, Falcon offers multiple useful features related to data management. For example, you can define the retention period for a dataset to enforce that each instance of this dataset should be automatically removed from Hive/HDFS after a specified period of time since its creation or last access. Thanks to that you won’t keep old or unused datasets on disks, and you won’t have to schedule the spontaneous “HDFS cleaning day” to urgently remove useless data when the free HDFS disk space falls below the critical threshold (let’s say, 10% of total disk space) and/or implement own cleaning scripts for that.
You tested your Hive query, run it once and saw that it works. Now you want to schedule it periodically.
There are a couple of scheduling tools especially built for Hadoop. While Oozie, Azkaban and Luigi (which is often combined with cron) are probably the most popular and widely-adopted ones, the project that we found interesting is … Falcon.
Assuming that your datasets are described as feeds, Falcon allows you to define and schedule processes that process these feeds. For each process, you define what the input and output feeds are, how often a process should be executed, how to retry it when something fails (i.e. how many times to retry and what time intervals are), what the type of the process is, and many others. Out of the box, Falcon can schedule Hive queries, Pig scripts and Oozie workflows (a native support for other frameworks like Spark or MapReduce is to be added soon, but you can still schedule them through Oozie shell/ssh actions).
The integration between Falcon and Oozie is very close. Falcon isn’t actually a scheduler built from scratch, it simply delegates most of the scheduling responsibilities to Oozie, but gives us a bit nicer, shorter and more powerful scheduling API.
Falcon also helps you to keep track of the execution of the recent instances of your process. It shows you which instances finished successfully, which failed, which still wait for the execution time and/or the input dataset, which time-outed and so on. For these events, Falcon sends JMS messages – you can consume them from ActiveMQ and possibly convert into email and/or alert to let yourself or colleagues know when something important happens.
Please note that I don’t claim that scheduling a process in Falcon is easier than in Luigi, Azkaban or Oozie. It definitely requires some effort, time, practice and discipline, but once you do so, you get many benefits for free (some of them will be described in the remaining part of this blog post series).
Since Falcon has a wealth of information about your processes and their input and output feeds, it becomes easy to keep track how your data is transformed through the pipeline. Falcon displays this information in form of graph that shows the “lineage”. You can navigate through this graph by clicking each vertex. The lineage helps you quickly answer questions such as where the data came from, where and how it is currently used, what the consequences of removing it are – everything without the need of sending emails to your colleagues.
Another useful (but still in the code-review phase) feature is “triaging” issues related to data-processing. Imagine that your dataset hasn’t been generated yet and you want to discover why. In this scenario, Falcon could easily walk up the dependency tree to identify the root cause such as a failed or still running parent process and display it in a consumable form to the users.
The second part
In the second (and last) part of this blog series, we explain how to smoothly change the format of your datasets, highlight move advanced features of Falcon such as SLA, data backups, disaster recovery, late data handling as well as future enhancements and ideas. We will describe some disadvantages of Falcon as well. Stay tuned!
I would like to thank Josh Baer and Piotr Krewski for a technical review of this article.
This blog is reproduced from http://getindata.com/blog/post/avoiding-the-mess-from-the-hadoop-cluster-part-1/ with permissions from author. Images used in the blog are from http://pixabay.com/en and licensed under "CC0 Public Domain"
Starting Apache Falcon in distributed mode
Apache Falcon can be deployed in two modes, Embedded mode and Distributed mode. Both the modes have there uses which are described below : 1. Embedded Mode : Embedded mode is also called standalone mode. This setup is useful when you have only one cluster from where you want data to be managed and processed. All the request like submit, delete, schedule etc go to the single server that is started.
To start falcon in distributed mode the following steps are required :1. Build falcon in distributed mode 2. Edit startup property for falcon server / servers 3. Edit runtime property for falcon prism 4. Example
Step 1. Build falcon in distributed modePlease follow the steps here for building falcon in distributed mode. If above step is successful you should have 3 ".tar.gz" files at the end ,one each for falcon prism, server and client in the target dirs.
Step 2. Edit startup property for falcon serverFor starting falcon in distributed we need to provide logical names to servers we are starting. These names need to be mentioned in startup.properties for the given server. Add the following property in the "conf/startup.properties" in the server package you have untar.
*.current.colo=< colo name >Colo is a colocation or a physical data centre location where one or more clusters may be configured. If you are planning to start more than one server add the above in each of the conf/startup.properties . Remember the cole name you have mentioned here. Same name should be used in cluster.xml while submitted a cluster later via prism. Start the servers using the commands given here here
Step 3. Edit runtime property for falcon prismUntar the falcon prism package. Once servers have been configured and started, prism need to be configured with information on where these servers are started. For that below two properties needs to be added to prism "conf/runtime.properties" .
*.all.colos=< colo name 1 >, < colo name 2 > *.falcon.< colo name 1 >.endpoint=http://< hostname 1 >:15000 *.falcon.< colo name 2 >.endpoint=http://< hostname 2 >:15000Start Falcon Prism using commands given here here
Example :If you are configuring a falcon setup with 1 prism and 2 servers, with cluster name cluster1 and cluster2, below are the thing that needs to added : 1. edit server one startup.properties and add *.current.colo=colo1 2. edit server two startup.properties and add *.current.colo=colo2 3. Start the servers 4. edit prism runtime.properties and add
*.all.colos=colo1, colo2 *.falcon.colo1.endpoint=http://< hostname 1 >:15000 *.falcon.colo2.endpoint=http://< hostname 2 >:150005. Start falcon prism.
Note:By default falcon server start with its own activemq, if for any reason you dont want embedded activemq (like starting more than one server on same host) , it can be disabled. Step 1. Disable start of activemq in falcon server start script edit "bin/falcon-start" script and add FALCON_OPTS="-Dfalcon.embeddedmq=false" Step 2. In startup.properties of falcon server add your own activemq url