Saturday, May 25, 2013

Cassandra Stress Test

In this post, I will go through how you can quickly stress test your Cassandra performance. Before you go for tuning your Cassandra you might want to see how well its performing so far or where its slowing down. You can definitely write a benchmark tool which inserts some random data and reads it after that and measure performance based on time. When I first asked to stress test Cassandra, I was writing pretty much same kind of tool. But in the middle I found an existing code which stress test Cassandra and which is good enough to start with. It's basically a pom based Java project which uses Hector (my project also use Hector - A Java Client for Cassandra).

You can directly go here to get more information about how its written and how to run it:

But if you just want a quick way to run it, you can follow the following steps:

Step#1: Install It

Step#2: Run It:
What the above command doing is:
  • Inserting (-o insert) 1000000 records (-n) into column family StressStandard which has 10 columns (-c)
  • Using 5 threads (-t) and each batch size is 1000(-b)
  • So each thread is getting 1000000 / 5 = 200000 inserts, as the batch size is 1000, so each thread is actually inserting 200000 / 1000 = 200 times.
After it inserts 1000000, it will show you a brief stat of data insertion performance. For the above test, it took around 3 minutes to insert all records (no optimization), which was 140.87 write request per seconds with bandwidth 15730.39 kb/sec. You can also test read performance, as well as some other Hector's API performance (rangeslice, multiget, etc).

I played with this stress tool a lot and later I converted it based on my needs(to work with my Cassandra keyspace andcolumn families) and ran it for my stress test. I highly recommend you to use this stress tool, it will serve most of the basic cases.

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me :)

Friday, May 17, 2013

Chunk data import / Incremental Import in Sqoop

Recently I faced an issue while importing data from oracle with Sqoop. So far it was working fine till I faced a new requirement. Before discussing about the new requirement, let me quickly write about how it's currently working.

Currently I am running Sqoop from Oozie but I am not using coordinator job. So I am executing each Oozie job manually from command prompt.

You can check these links if you want to know how to run Sqoop and Oozie together.
In our option parameter file, I have a field something like this below:
ID <= 1000000
For each run, I used to change that field manually and re-run my Oozie job.

New Requirement

Now, what I have asked to do is run my Oozie job through coordinator and import block-wise/chunk data from Oracle. Based on the current requirement, what I'm trying to achieve is to import list of rows from M to N. Ideally for each run, I'm targeting to import 15 millions rows from that specific table and Hadoop will process those records and will be ready to process another batch before the following run.

As an example:
1st run: 1 to 20
2nd run: 21 to 40
3rd run: 41 to 60
and so on...

First thing which I started exploring is to use "--boundary-query" parameter which comes with sqoop. From their documents: "By default sqoop will use query select min(<split-by>), max(<split-by>) from <table name> to find out boundaries for creating splits. In some cases this query is not the most optimal so you can specify any arbitrary query returning two numeric columns using --boundary-query argument."

After spending some time on it and discussing in Sqoop mailing list, I came to know that incremental import is not working with chunks. It imports everything since last import (more specifically, everything from --last-value to end).

Then I decided to create a shell action in Oozie which will update the appropriate parameter after each execution of Sqoop, so that following Sqoop runs will have a new options for its import.

So I made some changes in my option parameter file (option.par) and here is the new one:
To store current index value and chunk-size, I used another property based file
My shell script will update the value of startIndex by the chunkSize. Here is the script ( which I wrote for this:

I want to add something here is that when you are modifying a file by a script and running through Oozie, a cache version of the file in HDFS actually being updated. That's why I had to copy back those files to my original location of HDFS. Again, behind the scene, a mapred user is doing the work but I'm running the oozie job as ambari_qa user (note: I'm using Hortonworks Hadoop, HDP 1.2.0). That's why I had to give back all the permissions on those files to all users.

Here is my Oozie workflow (workflow.xml):
I put everything inside my Oozie application path in HDFS. Here is my folder structure:
Don't forget to give the "write" permission when you first put it inside HDFS. Now you can run the Oozie workflow by executing this:
[ambari_qa@ip-10-0-0-91 ~]$ oozie job -oozie http://ip-10-0-0-91:11000/oozie -config -run
Here is the file:
This is it! Now every time you execute the Oozie job, it will import a new chunk of data from Oracle. How I'm running it as a coordinator job, I will put them in another post. Jarcec mentioned in one of the Sqoop user mail threads that Sqoop will have this feature soon but I'm not sure it's time frame. So I had to do this work around. It worked for me, I hope it will work for you too!

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Saturday, April 6, 2013

Configure Ganglia for multiple clusters in Unicast mode

In my previous post I talked about how to: Setting up Ganglia in CentOS environment. At that time, I used only a single cluster for the whole setup. But it's highly unlikely that you have only a single cluster in your development/production environment. Consider you have two clusters - 1. Storm 2. Kafka and you want to monitor all of these cluster nodes through a single Ganglia UI. You do not have to install Ganglia multiple times for that, you just need to configure your Ganglia. It would have been much easier if AWS supports multicast but as it doesn't support multicast, you need to do a work-around in unicast mode to achieve monitoring multiple clusters in one single Ganglia.

The idea behind this work-around is pretty straightforward. Suppose I have two clusters: cluster#1 - Storm and cluster#2 - Kafka and their respective IP addresses are: - Storm Cluster (supervisor 1) - Storm Cluster (supervisor 2) - Storm Cluster (supervisor 3) - Storm Cluster (nimbus) - Kafka Cluster - Kafka Cluster - Kafka Cluster - my client machine

What I am going to do is, I will configure each of the cluster to send collected data (gmond) to one of their specific node only and configure the gmetad daemon in a way that it can collects the data only from a designated node (gmond daemon) from each cluster. Ganglia will categorize each cluster data by their unique cluster name defined in gmond.conf file.

As you can see in the above figure that all Kakfa cluster's data is sending to one specific node - and all Storm cluster's data is sending to one of its node - Client machine ( is running gmetad daemon and I will configure that daemon so that it can look for two data sources for two clusters where their source IP addresses will be and for Kafka and Storm respectively.

I'm assuming that you already setup your Ganglia and it's running as expected. So I am not going to discuess about what is gmond.conf and gmetad.conf files. In case if you have not setup yet, you might want to take a look at this post
This is my gmond.conf file (only the part which I modified) which I'm using for all Kafka hosts (this file is unique for each host per cluster):
And here is my gmond.conf file for all Storm hosts (this file is unique for each host per cluster):

You notice that I'm using unique host address for udp_send_channel for each cluster. Now, I need to tell my gmetad daemon to look for those two host address to collect data from. Here is my gmetad.conf file:
You are done! Now restart all gmond daemons and gmetad daemon and wait for few minutes.
Once you navigate to your Ganglia UI url you should be able to see your grid and list of your clusters in the drop-down.

You can dig further to see each of your host for each cluster:

There is another work-around which you can also try to get a better understanding of Ganglia. In that case you need to use separate port number for each cluster. Here, I'm distinguishing each cluster's data source per IP address, but in that work-around you can have a single IP address for all clusters but multiple port numbers. You can try that work-around as an exercise :).

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Monday, April 1, 2013

Setting up Ganglia in CentOS

Ganglia is a scalable distributed monitoring system for high-performance computing systems such as clusters and Grids (ref). Installing and configuring Ganglia is very straight-forward. It has two major parts:

Gmond (Ganglia monitoring daemon): Runs on every single node and collects the data and sends to meta daemon node.

Gmetad (Ganglia meta daemon): Runs on a head (or client) node and gathers the data from all monitoring nodes and displays it on UI.

Assume I have 4 nodes cluster and one of the nodes also works as client. So, I will install the Ganglia PHP UI on that machine.

Here are their IP addresses and list of services I am going to install on them:
  • - client node (gmetad, gmond, ui)
  • - monitoring node (gmond)
  • - monitoring node (gmond)
  • - monitoring node (gmond)

On client node:
--> Install meta daemon, monitoring daemon and web UI by executing:
--> If they are not available, then you might need to install EPEL repositories to your machine.

On monitoring node:
--> Install monitoring daemon by:


By this point, everything is installed and now you need to configure your Ganglia.
  • /etc/ganglia/gmetad.conf --- configuration file for gmetad daemon
  • /etc/ganglia/gmond.conf --- configuration file for gmond daemon

I have updated only the following part on gmond.conf file in each monitoring node.
Notice that I have commented out mcast_join and bind because multicast is not supported by AWS EC2 and unicast is only the option for Ganglia. So, all monitoring nodes are sending collected data to the node ( which is collecting data (nodes which is running gmetad daemon).

On gmetad.conf file I have updated this:
data_source "Cloud for Beginners" 60
Here I'm telling to meta daemon the name of the cluster (name should be matched to organize list of hosts by cluster) and host's IP address and port from where data will be collected from and duration (collect data in every 60 seconds).

You are done! Now start monitoring daemon and meta daemon in all nodes.
After 1-2 minutes you should be able to see all your monitoring data through:

You might want to change boot configuration so that gmetad and gmond daemons will be started at boot:

Common Issue: 
In case if you are facing that the gmetad is not starting up, you can check the log by:
In log you might see "Please make sure that /var/lib/ganglia/rrds is owned by nobody" error, in that case you need to execute this:

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Thursday, March 21, 2013

A basic Oozie coordinator job

Suppose you want to run your workflow in every two hours or once per day, at that point coordinator job comes out very handy. There are several more use cases where you can use Oozie coordinator. Today I'm just showing you how to write a very basic Oozie coordinator job.

I'm assuming that you are already familiar with Oozie and have an workflow ready to be used as coodinator job. For this tutorial, my Oozie workflow is a shell-based action workflow. I want to execute a shell script in every two hours starting from today to next 10 days. My workflow.xml is already inside the a HDFS directory.
Without the coordinator, I'm currently running it like this:
Here is my file:
Now I want to run this workflow with coordinator. Oozie Coordinator Engine is responsible for the coordinator job and the input of the engine is a Coordinator App. At least two files are required for each Coordinator App:
  1. coordinator.xml - Definition of coordinator job is defined in this file. Based on what(time based or input based) your workflow will trigger, how long it will continue, workflow wait time - all of this information need to be written on this coordinator.xml file.
  2. - Contain properties for coordinator job, behaves same as job.properfiles file.
Based on my requirement, here is my coordinator.xml file:
As I need to pass file for a coordinator job, I cannot pass previous file at the same time. That's why I need to move all properties from the file to file. Remember one thing, file must have a property which specifics the location of coordinator.xml file (similar to in After moving those properties my file became:

As you noticed I mentioned application path oozie.coord.application.path and that path contains the cooridnator.xml file.
Now I'm pretty much set. Now if I execute a coordinator job now it will execute the coordinator app located in the coordinator application path. Coordinator app has a tag <workflow><app-path>.... </app-path></workflow> which specifics the actual workflow location. At that location, I have my workflow.xml file. So that workflow.xml will be triggered based on  how I define the job in coordinator.xml file.

I'm submitting my coordinator job by:
If you are running your coordinator job successfully, I highly recommend you to go through this document and try out some other use cases and alternatives.

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Friday, March 8, 2013

Oozie Example: Sqoop Actions

To run a Sqoop action through Oozie, you at least need two files, a) workflow.xml and b) But if you prefer to pass Sqoop options through a parameter file, then you also need to copy that parameter file into your Oozie workflow application folder. I prefer to use a parameter file for Sqoop so I'm passing that file too.

In this tutorial, what I'm trying to achieve is to run a Sqoop action which will export data from HDFS to Oracle. On my previous post, I already wrote about how to do import/export between HDFS & Oracle. Before run Sqoop action through Oozie, make sure your Sqoop is working without any errors. Once it's working without Oozie, then try it through Oozie by using Sqoop action.

I'm assuming when you execute the following line, it executes successfully and data loaded into Oracle without any error:
A successful Sqoop export should be ended with the following message on the console:
In my Oracle, I have already created the specific table "Emp_Record" which resembles the data present in HDFS (under /user/ambari-qa/example/output/sqoop/emprecord folder). That means, each rows on the HDFS files represent a row in the table. Again, the data in HDFS is tab delimited and each column represents a column in the table "Emp_Record". To know more about this, please check my previous post as I'm using the same table and HDFS files here.

So, here is my option.par file which I'm using for my Sqoop export:
And my workflow.xml file:
As you see, all the Sqoop commands which we generally use on the command line, can be passed as an argument by using <arg> tag. If you do not want to use parameter file, then you need to pass each of the command in a separate <arg> tag like: Since I'm using a parameter file for this Sqoop action, I also need to put it inside the Oozie workflow application path and have to mention this file through a <file> tag.  So, my Oozie workflow application path becomes:
Finally my file for this workflow:
Execution command for this Oozie workflow will be same as others:
A common issue: 

If your Sqoop job is failing, then you need to check your log. Most of the time(I'm using Hortonworks distribution) you might face this error message on the log:
This is happens when you do not have required Oracle library file in Oozie's classpath. In that case, you need to manually copy the required ojdbc6.jar file to Sqoop's lib folder "/usr/lib/sqoop/lib". While running Sqoop through Oozie, you need to do the same thing but in Oozie's Sqoop lib folder. You can do that by executing:
Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Sunday, March 3, 2013

Oozie Example: Java Action / MapReduce Jobs

Running a Java action through Oozie is very easy. But there are some things you need to consider before you run your Java action. In this tutorial, I'm going to execute a very simple Java action. I have a JAR file TestMR.jar which is a MapReduce application. So this application will be executed on the Hadoop cluster as a Map-Reduce job. 

TestMR.jar file has a class which has a public static void main method(String args[]) that initiates the whole application. To run a Java action, you need to pass the main Java class name through the tag <main-class>.

This is the workflow.xml file for a Java action with minimum number of parameters:

Your Java action has to be configured with <job-tracker> and <name-node>. And as you know, Hadoop will throw exceptions if the output folder is already exists. That's why I'm using <prepare> tag which will delete the output folder before execution. My jar also takes command line arguments. One of the argument is "-r 6" which means how many reducers I want to use for the MR job. So I'm using "<arg>" tag to pass command line arguments. You can have multiple <arg> for a single Java action. As like other actions, to indicate a "ok" transition, the main Java application needs to be completed without any error. If it throws any exception, the workflow will indicate a "error" transition.

Now comes to the folder structure inside HDFS. When Oozie executes any action, it automatically adds all JAR files and native libraries from the "/lib" folder to its classpath. Here, "/lib" folder is a sub-folder inside Oozie workflow application path. So, if "java-action" is the workflow application path then the structure would be:
- java-action
- java-action/workflow.xml
- java-action/lib

In my HDFS, I have:
And here is my file:
That's pretty much it! Now you can execute your workflow by:
Remember, this is a very basic and simple workflow to run a Java action through Oozie. You can do a lot more than these by using several other options provided by Oozie. Once you are able to run a simple workflow, I would recommend you to go through Oozie documentation and try some workflows with different settings.

Consideration: Be careful about what you have inside your "/lib" folder. If the version of the library which you are using for your application conflicts with Hadoop's library file's version, it will throw errors and those type of errors are hard to find. To avoid those kind of errors, better to match your library files with the versions you have inside "/usr/lib/[hadoop/hive/hbase/oozie]/lib" folder on your client node.

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.