Saturday, June 15, 2013

Elastic Load Balancing (ELB) with a Java Web Application + Tomcat + Session Stickiness

Suppose you have a web application and you want to deploy it in Amazon cloud environment with load balance support. The whole process is pretty straight-forward and it generally doesn't take much time.

For this post, I'm using Apache Tomcat web server and I already have a war file from my HelloWorld application. 

Here is the Tomcat version I'm using:


I'm using two instances and I have extracted my tomcat zip file into /opt/ folder in each of those two instances. I have also placed HelloWorld.jar file into /opt/apache-tomcat-7.0.39/webapps folder.


Now, I will go to each of those two instances and will start tomcat server. After some minutes (or seconds) I should see my deployed web application is up and running. Which means, I can navigate to these URLs and able to see Log-In screen (initial page of my web app).

  • http://ip.address.instance-1:8080/HelloWorld/login.jsp
  • http://ip.address.instance-2:8080/HelloWorld/login.jsp

All of the above steps which I described so far, have nothing to do with Elastic Load Balancing (ELB). Just like everyone, I just deployed a web app in tomcat server. Before I start showing steps for ELB, I'm assuming your web application is also up and running and you can navigate through URLs separately.


Create Load Balancer

Step#1: On AWS EC2 console, click on the Load Balancer option under "Network & Security" section. If you do not have any ELB yet, you will see an empty list. Click on "Create Load Balancer" button.


Step#2: Write a name of your Load Balancer, this name will be used when it will create a default link. I'm also creating this Load Balancer inside my Virtual Private Cloud (VPC) that's why I'm selecting a specific VPC Id. By default, you might see only port 80 in the listerner configuration list, I have added port 8080 as my web app is running on port 8080. Add appropriate port based on your web application and click "Continue".


Step#3: This screen is dedicated to Health check configuration. Based on configuration, ELB will ping that path with that port to check the health condition and if it fails it will automatically remove your instances from the load balancer.


Since, Log-In is the default screen of my application (welcome page), so I'm using the path of Login screen as my ping path.

Step#4: Choose your Subnet id based on where you want to use your Load Balancer. For my case, subnet-2e961843 is my expected Subnet id.

Step#5: Next screen will ask you to select your security groups. I already have a security group for my VPC and I'm using it here too.

Step#6: In the "Add EC2 Instances" section, add the instances in where you already deployed Tomcat and your web application.

Step#7: This screen is for review purpose. Once you review it you can finally create your load balancer by clicking on the "Create" button.

Step#8: Once you create your balancer, it will redirect you to Load Balancer list and now you will see your newly created load balancer in the list. DNS Name column shows newly created DNS Name for your load balancer and you should be able to navigate it with proper port.

So for my case, I can navigate my load balancer by using:

http://helloworld-353060791.us-east-1.elb.amazonaws.com:8080/HelloWorld/login.jsp


Sticky Session:
Since you are using Tomcat with load balancer, it's pretty obvious that you might want to enable sticky session with session replication in Tomcat. My web application is a Spring MVC application and it uses Spring Security for all type of authorizations and authentications. If I directly go to the Log-In screen of my load balancer and try to authenticate, it might not work. It's expected as Tomcat gets confused when sending request and response in multiple instances. If I enable sticky session I will not face this issue.

You can do it with the help of AWS EC2 console. Open the Load Balancer screen and select your newly created load balancer.

If you look carefully at the port configuration part, you will see "Stickiness: Disabled" for all of your ports. By default, stickiness is disabled for all the ports you select for load balancer. Now click on the "edit" button of the port on where you want to enable stickiness. For my case, it will be port 8080. Once you click on the "edit" button, it will ask you how you want to enable session stickiness. You can either choose Load Balancer Generated Cookie Stickiness or Application Generated Cookie Stickiness. For my simple application, I have selected "Load Balancer Generated Cookie Stickiness" and I entered 86400 as my cookie expiration period which is a day in seconds.

After you enable it, you should be able to test your session stickiness. For my case, now I'm able to successfully authenticate to my application.

Some considerations: Sometimes you might see your load balancer is down or the link is not working or shows no page. In that case, best way to quickly test is to check each of the instance where tomcat is running and check whether you can access them individually (e.g. http://ip.address.instance-1:8080/HelloWorld/login.jsp). If you find that each of the instance is up and running, you can try removing them from your load balancer and add them again. Remember, "Status" section under "Description" tab of your load balancer does not get updated instantly. It takes some time and it waits for the result of the next health check. So wait few minutes until you see "Status: N of N instances in service".
.

That's pretty much it! This is the very basic AWS Load Balancer example with minimum configuration of Tomcat + Session Stickiness. Once its working for you, you can try other options (highly encouraged) and see how it works for you.


Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me :)

Saturday, June 1, 2013

Cassandra Performance Tuning

In my previous post, I discussed about how to stress test Cassandra. In this post, I will discuss on some easy steps to tune-up its performance. I'm a big fan of Cassandra. It is optimized for very fast and highly available data write. There are so many things you can do to optimize its write and read performance further. But today, I will only discuss on some major and easy tune-up steps which you can apply easily.


Dedicated Commit Log Disk: I think this is the first tune-up you may want to try as it gives you a significant performance improvement. But before changing commit log destination it would be better to know it gives performance boost. Cassandra write operations are occurred on a commit log on disk and then to an in-memory table structure called Memtable. When thresholds are reached, that Memtable is flushed to a disk in a format called SSTable. So if you separate out Commit Log locations, it will isolate Commit Log I/O traffics from other Cassandra Reads, Memtables and SSTables traffics. Remember, after the flush, the Commit Log is no longer needed and is deleted. So the Commit Log disk doesn't need to be large. It just need to be in the size where it can holds Memtable data before its flushed. You can follow the following steps to change commit log location for Cassandra.

Step#1: Mount a separate partition for commit log
Step#2: Make sure you give expected ownership and access on that drive
Step#3: Edit Cassandra configuration file which can be found at conf/cassandra.yaml. You will find a property "CommitLogDirectory", update it based on your mount location. For my case, it will be:
CommitLogDirectory: /mnt/commitlog
Step#4: Restart your Cassandra cluster.


Increasing Java Heap Size: Cassandra runs on JVM. So you might face out of memory issues when you run a heavy load on Cassandra. There is also a rule of thumb about how you want to keep your heap size.
  • Heap Size = 1/2 of System Memory when System Memory < 2GB
  • Heap Size = 1GB when System Memory >= 2GB and <= 4GB
  • Heap Size = 1/4 of System Memory(but not more than 8GB) when System Memory >4GB
Remember, just a larger heap size might not give you a performance boost. So a well-tuned Java heap size is very important. To change the Java heap size, you need to update cassandra-env.sh file and then restart Cassandra cluster again. If you are using Opscenter, you should see the updated heap size on one of the Opscenter's metrics.


Tune Concurrent Reads and Writes: Staged Event-Driven Architecture(SEDA) is used for implementing Cassandra. It breaks the application into stages. Concurrent readers and writers control the maximum number of threads allocated to a particular stage. So having an optimal concurrent reads and concurrent writes value will improve Cassandra performance. But raising these values beyond the limit will decrease Cassandra performance. These values are highly tied with CPU cores of the system. As like, Java heap size, there is also a rule of thumb about how to select these values:
  • Concurrent Reads: 4 concurrent reads per processor core
  • Concurrent Writes: Most of the time you do not need it as write is usually fast. If needed, you can set the value to equal or higher than the concurrent reads.
To change the value, you need to update conf/cassandra.yaml configuration file. There are two parameters present for these two: ConcurrentReaders and ConcurrentWriters. Update those values based on your system and restart Cassandra to take the effect.


Tune-Up Key Cache: For each of the column families, key cache holds the location of row keys in memory. Since keys are usually small, it can store a large cache without using much memory. Each cache hit results in less disk activity. 200000 is the default key cache size of Cassandra and its enabled by default. You can alter the default value by following:


You can monitor key cache performance by using nodetool cfstats command.



Tune-Up Row Cache: In Cassandra, row cache is disabled by default. Row cache holds the entire content of the date in memory. So a column family with large rows could easily consume system memory and could impact Cassandra performance, that's why its disabled by default and should be remain disabled in most of the cases. But if your column data is too small then using row cache will significantly improve performance as row cache keeps the most accessed rows hot in memory. To enable row cache, you can alter your column family and can pass number of rows for row cache.

You can also monitor it by using nodetool cfstats command like above (watch for ."Row cache hit rate").


Conclusion: As I said early, these are only some of the tune-up steps, there are more (high performing RAID level, file system optimization, disabling swap memory, memory mapped disk modes and so on). But I gave you something you can start with, once you find out improved Cassandra performance you can try the rest of the tuning. Cassandra is highly scalable and scaling up is done by enhancing each node (more RAM, high network throughput, SSD, disk size, etc). Remember, if you are using AWS EC2 instance do not expect much performance improvement if you are using medium or small type instance as they are not optimized for better I/O or network, use xlarge+ instance instead.

And finally, DO NOT forget to check the Cassandra Performance and Scalability slides by Adrian Cockcroft.


Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me :)

Saturday, May 25, 2013

Cassandra Stress Test

In this post, I will go through how you can quickly stress test your Cassandra performance. Before you go for tuning your Cassandra you might want to see how well its performing so far or where its slowing down. You can definitely write a benchmark tool which inserts some random data and reads it after that and measure performance based on time. When I first asked to stress test Cassandra, I was writing pretty much same kind of tool. But in the middle I found an existing code which stress test Cassandra and which is good enough to start with. It's basically a pom based Java project which uses Hector (my project also use Hector - A Java Client for Cassandra).

You can directly go here to get more information about how its written and how to run it:

But if you just want a quick way to run it, you can follow the following steps:

Step#1: Install It

Step#2: Run It:
What the above command doing is:
  • Inserting (-o insert) 1000000 records (-n) into column family StressStandard which has 10 columns (-c)
  • Using 5 threads (-t) and each batch size is 1000(-b)
  • So each thread is getting 1000000 / 5 = 200000 inserts, as the batch size is 1000, so each thread is actually inserting 200000 / 1000 = 200 times.
After it inserts 1000000, it will show you a brief stat of data insertion performance. For the above test, it took around 3 minutes to insert all records (no optimization), which was 140.87 write request per seconds with bandwidth 15730.39 kb/sec. You can also test read performance, as well as some other Hector's API performance (rangeslice, multiget, etc).

I played with this stress tool a lot and later I converted it based on my needs(to work with my Cassandra keyspace andcolumn families) and ran it for my stress test. I highly recommend you to use this stress tool, it will serve most of the basic cases.



Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me :)

Friday, May 17, 2013

Chunk data import / Incremental Import in Sqoop

Recently I faced an issue while importing data from oracle with Sqoop. So far it was working fine till I faced a new requirement. Before discussing about the new requirement, let me quickly write about how it's currently working.

Currently I am running Sqoop from Oozie but I am not using coordinator job. So I am executing each Oozie job manually from command prompt.

You can check these links if you want to know how to run Sqoop and Oozie together.
In our option parameter file, I have a field something like this below:
--where
ID <= 1000000
For each run, I used to change that field manually and re-run my Oozie job.

New Requirement

Now, what I have asked to do is run my Oozie job through coordinator and import block-wise/chunk data from Oracle. Based on the current requirement, what I'm trying to achieve is to import list of rows from M to N. Ideally for each run, I'm targeting to import 15 millions rows from that specific table and Hadoop will process those records and will be ready to process another batch before the following run.

As an example:
1st run: 1 to 20
2nd run: 21 to 40
3rd run: 41 to 60
and so on...

First thing which I started exploring is to use "--boundary-query" parameter which comes with sqoop. From their documents: "By default sqoop will use query select min(<split-by>), max(<split-by>) from <table name> to find out boundaries for creating splits. In some cases this query is not the most optimal so you can specify any arbitrary query returning two numeric columns using --boundary-query argument."

After spending some time on it and discussing in Sqoop mailing list, I came to know that incremental import is not working with chunks. It imports everything since last import (more specifically, everything from --last-value to end).

Then I decided to create a shell action in Oozie which will update the appropriate parameter after each execution of Sqoop, so that following Sqoop runs will have a new options for its import.

So I made some changes in my option parameter file (option.par) and here is the new one:
To store current index value and chunk-size, I used another property based file import.properties:
My shell script will update the value of startIndex by the chunkSize. Here is the script (script.sh) which I wrote for this:

I want to add something here is that when you are modifying a file by a script and running through Oozie, a cache version of the file in HDFS actually being updated. That's why I had to copy back those files to my original location of HDFS. Again, behind the scene, a mapred user is doing the work but I'm running the oozie job as ambari_qa user (note: I'm using Hortonworks Hadoop, HDP 1.2.0). That's why I had to give back all the permissions on those files to all users.

Here is my Oozie workflow (workflow.xml):
I put everything inside my Oozie application path in HDFS. Here is my folder structure:
Don't forget to give the "write" permission when you first put it inside HDFS. Now you can run the Oozie workflow by executing this:
[ambari_qa@ip-10-0-0-91 ~]$ oozie job -oozie http://ip-10-0-0-91:11000/oozie -config job.properties -run
Here is the job.properties file:
This is it! Now every time you execute the Oozie job, it will import a new chunk of data from Oracle. How I'm running it as a coordinator job, I will put them in another post. Jarcec mentioned in one of the Sqoop user mail threads that Sqoop will have this feature soon but I'm not sure it's time frame. So I had to do this work around. It worked for me, I hope it will work for you too!


Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Saturday, April 6, 2013

Configure Ganglia for multiple clusters in Unicast mode

In my previous post I talked about how to: Setting up Ganglia in CentOS environment. At that time, I used only a single cluster for the whole setup. But it's highly unlikely that you have only a single cluster in your development/production environment. Consider you have two clusters - 1. Storm 2. Kafka and you want to monitor all of these cluster nodes through a single Ganglia UI. You do not have to install Ganglia multiple times for that, you just need to configure your Ganglia. It would have been much easier if AWS supports multicast but as it doesn't support multicast, you need to do a work-around in unicast mode to achieve monitoring multiple clusters in one single Ganglia.

The idea behind this work-around is pretty straightforward. Suppose I have two clusters: cluster#1 - Storm and cluster#2 - Kafka and their respective IP addresses are:

10.0.0.194 - Storm Cluster (supervisor 1)
10.0.0.195 - Storm Cluster (supervisor 2)
10.0.0.196 - Storm Cluster (supervisor 3)
10.0.0.182 - Storm Cluster (nimbus)
10.0.0.249 - Kafka Cluster
10.0.0.250 - Kafka Cluster
10.0.0.251 - Kafka Cluster
10.0.0.33 - my client machine

What I am going to do is, I will configure each of the cluster to send collected data (gmond) to one of their specific node only and configure the gmetad daemon in a way that it can collects the data only from a designated node (gmond daemon) from each cluster. Ganglia will categorize each cluster data by their unique cluster name defined in gmond.conf file.


As you can see in the above figure that all Kakfa cluster's data is sending to one specific node - 10.0.0.249 and all Storm cluster's data is sending to one of its node - 10.0.0.182. Client machine (10.0.0.33) is running gmetad daemon and I will configure that daemon so that it can look for two data sources for two clusters where their source IP addresses will be 10.0.0.249 and 10.0.0.182 for Kafka and Storm respectively.

I'm assuming that you already setup your Ganglia and it's running as expected. So I am not going to discuess about what is gmond.conf and gmetad.conf files. In case if you have not setup yet, you might want to take a look at this post
This is my gmond.conf file (only the part which I modified) which I'm using for all Kafka hosts (this file is unique for each host per cluster):
And here is my gmond.conf file for all Storm hosts (this file is unique for each host per cluster):

You notice that I'm using unique host address for udp_send_channel for each cluster. Now, I need to tell my gmetad daemon to look for those two host address to collect data from. Here is my gmetad.conf file:
You are done! Now restart all gmond daemons and gmetad daemon and wait for few minutes.
Once you navigate to your Ganglia UI url you should be able to see your grid and list of your clusters in the drop-down.



You can dig further to see each of your host for each cluster:





There is another work-around which you can also try to get a better understanding of Ganglia. In that case you need to use separate port number for each cluster. Here, I'm distinguishing each cluster's data source per IP address, but in that work-around you can have a single IP address for all clusters but multiple port numbers. You can try that work-around as an exercise :).



Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.



Monday, April 1, 2013

Setting up Ganglia in CentOS

Ganglia is a scalable distributed monitoring system for high-performance computing systems such as clusters and Grids (ref). Installing and configuring Ganglia is very straight-forward. It has two major parts:

Gmond (Ganglia monitoring daemon): Runs on every single node and collects the data and sends to meta daemon node.

Gmetad (Ganglia meta daemon): Runs on a head (or client) node and gathers the data from all monitoring nodes and displays it on UI.

Assume I have 4 nodes cluster and one of the nodes also works as client. So, I will install the Ganglia PHP UI on that machine.

Here are their IP addresses and list of services I am going to install on them:
  • 10.0.0.33 - client node (gmetad, gmond, ui)
  • 10.0.0.194 - monitoring node (gmond)
  • 10.0.0.195 - monitoring node (gmond)
  • 10.0.0.196 - monitoring node (gmond)

On client node:
--> Install meta daemon, monitoring daemon and web UI by executing:
--> If they are not available, then you might need to install EPEL repositories to your machine.

On monitoring node:
--> Install monitoring daemon by:

Configuration:

By this point, everything is installed and now you need to configure your Ganglia.
  • /etc/ganglia/gmetad.conf --- configuration file for gmetad daemon
  • /etc/ganglia/gmond.conf --- configuration file for gmond daemon

I have updated only the following part on gmond.conf file in each monitoring node.
Notice that I have commented out mcast_join and bind because multicast is not supported by AWS EC2 and unicast is only the option for Ganglia. So, all monitoring nodes are sending collected data to the node (10.0.0.33) which is collecting data (nodes which is running gmetad daemon).

On gmetad.conf file I have updated this:
data_source "Cloud for Beginners" 60  10.0.0.33:8649
Here I'm telling to meta daemon the name of the cluster (name should be matched to organize list of hosts by cluster) and host's IP address and port from where data will be collected from and duration (collect data in every 60 seconds).

You are done! Now start monitoring daemon and meta daemon in all nodes.
After 1-2 minutes you should be able to see all your monitoring data through:

You might want to change boot configuration so that gmetad and gmond daemons will be started at boot:

Common Issue: 
In case if you are facing that the gmetad is not starting up, you can check the log by:
In log you might see "Please make sure that /var/lib/ganglia/rrds is owned by nobody" error, in that case you need to execute this:



Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.


Thursday, March 21, 2013

A basic Oozie coordinator job

Suppose you want to run your workflow in every two hours or once per day, at that point coordinator job comes out very handy. There are several more use cases where you can use Oozie coordinator. Today I'm just showing you how to write a very basic Oozie coordinator job.

I'm assuming that you are already familiar with Oozie and have an workflow ready to be used as coodinator job. For this tutorial, my Oozie workflow is a shell-based action workflow. I want to execute a shell script in every two hours starting from today to next 10 days. My workflow.xml is already inside the a HDFS directory.
Without the coordinator, I'm currently running it like this:
Here is my job.properties file:
Now I want to run this workflow with coordinator. Oozie Coordinator Engine is responsible for the coordinator job and the input of the engine is a Coordinator App. At least two files are required for each Coordinator App:
  1. coordinator.xml - Definition of coordinator job is defined in this file. Based on what(time based or input based) your workflow will trigger, how long it will continue, workflow wait time - all of this information need to be written on this coordinator.xml file.
  2. coordinator.properties - Contain properties for coordinator job, behaves same as job.properfiles file.
Based on my requirement, here is my coordinator.xml file:
As I need to pass coordinator.properties file for a coordinator job, I cannot pass previous job.properties file at the same time. That's why I need to move all properties from the job.properties file to coordinator.properties file. Remember one thing, coordinator.properties file must have a property which specifics the location of coordinator.xml file (similar to oozie.wf.application.path in job.properties). After moving those properties my coordinator.properties file became:

As you noticed I mentioned application path oozie.coord.application.path and that path contains the cooridnator.xml file.
Now I'm pretty much set. Now if I execute a coordinator job now it will execute the coordinator app located in the coordinator application path. Coordinator app has a tag <workflow><app-path>.... </app-path></workflow> which specifics the actual workflow location. At that location, I have my workflow.xml file. So that workflow.xml will be triggered based on  how I define the job in coordinator.xml file.

I'm submitting my coordinator job by:
If you are running your coordinator job successfully, I highly recommend you to go through this document and try out some other use cases and alternatives.


Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Friday, March 8, 2013

Oozie Example: Sqoop Actions

To run a Sqoop action through Oozie, you at least need two files, a) workflow.xml and b) job.properties. But if you prefer to pass Sqoop options through a parameter file, then you also need to copy that parameter file into your Oozie workflow application folder. I prefer to use a parameter file for Sqoop so I'm passing that file too.

In this tutorial, what I'm trying to achieve is to run a Sqoop action which will export data from HDFS to Oracle. On my previous post, I already wrote about how to do import/export between HDFS & Oracle. Before run Sqoop action through Oozie, make sure your Sqoop is working without any errors. Once it's working without Oozie, then try it through Oozie by using Sqoop action.

I'm assuming when you execute the following line, it executes successfully and data loaded into Oracle without any error:
A successful Sqoop export should be ended with the following message on the console:
In my Oracle, I have already created the specific table "Emp_Record" which resembles the data present in HDFS (under /user/ambari-qa/example/output/sqoop/emprecord folder). That means, each rows on the HDFS files represent a row in the table. Again, the data in HDFS is tab delimited and each column represents a column in the table "Emp_Record". To know more about this, please check my previous post as I'm using the same table and HDFS files here.

So, here is my option.par file which I'm using for my Sqoop export:
And my workflow.xml file:
As you see, all the Sqoop commands which we generally use on the command line, can be passed as an argument by using <arg> tag. If you do not want to use parameter file, then you need to pass each of the command in a separate <arg> tag like: Since I'm using a parameter file for this Sqoop action, I also need to put it inside the Oozie workflow application path and have to mention this file through a <file> tag.  So, my Oozie workflow application path becomes:
Finally my job.properties file for this workflow:
Execution command for this Oozie workflow will be same as others:
A common issue: 

If your Sqoop job is failing, then you need to check your log. Most of the time(I'm using Hortonworks distribution) you might face this error message on the log:
This is happens when you do not have required Oracle library file in Oozie's classpath. In that case, you need to manually copy the required ojdbc6.jar file to Sqoop's lib folder "/usr/lib/sqoop/lib". While running Sqoop through Oozie, you need to do the same thing but in Oozie's Sqoop lib folder. You can do that by executing:
Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Sunday, March 3, 2013

Oozie Example: Java Action / MapReduce Jobs

Running a Java action through Oozie is very easy. But there are some things you need to consider before you run your Java action. In this tutorial, I'm going to execute a very simple Java action. I have a JAR file TestMR.jar which is a MapReduce application. So this application will be executed on the Hadoop cluster as a Map-Reduce job. 

TestMR.jar file has a class TestMR.java which has a public static void main method(String args[]) that initiates the whole application. To run a Java action, you need to pass the main Java class name through the tag <main-class>.

This is the workflow.xml file for a Java action with minimum number of parameters:

Your Java action has to be configured with <job-tracker> and <name-node>. And as you know, Hadoop will throw exceptions if the output folder is already exists. That's why I'm using <prepare> tag which will delete the output folder before execution. My jar also takes command line arguments. One of the argument is "-r 6" which means how many reducers I want to use for the MR job. So I'm using "<arg>" tag to pass command line arguments. You can have multiple <arg> for a single Java action. As like other actions, to indicate a "ok" transition, the main Java application needs to be completed without any error. If it throws any exception, the workflow will indicate a "error" transition.

Now comes to the folder structure inside HDFS. When Oozie executes any action, it automatically adds all JAR files and native libraries from the "/lib" folder to its classpath. Here, "/lib" folder is a sub-folder inside Oozie workflow application path. So, if "java-action" is the workflow application path then the structure would be:
- java-action
- java-action/workflow.xml
- java-action/lib

In my HDFS, I have:
And here is my job.properties file:
That's pretty much it! Now you can execute your workflow by:
Remember, this is a very basic and simple workflow to run a Java action through Oozie. You can do a lot more than these by using several other options provided by Oozie. Once you are able to run a simple workflow, I would recommend you to go through Oozie documentation and try some workflows with different settings.

Consideration: Be careful about what you have inside your "/lib" folder. If the version of the library which you are using for your application conflicts with Hadoop's library file's version, it will throw errors and those type of errors are hard to find. To avoid those kind of errors, better to match your library files with the versions you have inside "/usr/lib/[hadoop/hive/hbase/oozie]/lib" folder on your client node.


Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Friday, March 1, 2013

Oozie Example: Hive Actions

Running Hive through Oozie is pretty straight-forward and it's getting much simpler day-by-day. 1st time when I used it(old versions) I faced some issues mostly related to classpath though I resolved them. But when I used the recent versions (Hive 0.8+, Oozie 3.3.2+), I only faced 1 or 2 issues at most.

In this example, I'm going to execute a very simple Hive script through Oozie. I have a Hive table "temp" and it's currently empty. The script will load some data from HDFS to that specific hive table.
And here is the content of the script.hql:
Now you need to setup your Oozie workflow app folder. You need one very important file to execute Hive action through Oozie which is hive-site.xml. When Oozie executes a Hive action, it needs Hive's configuration file. You can provide multiple configurations file in a single action. You can find your Hive configuration file from "/etc/hive/conf.dist/hive-site.xml" (default location). Copy that file and put it inside your workflow application path in HDFS. Here is the list of files that I have in my Oozie Hive action's workflow application folder.
And here is my workflow.xml file:
Look at the <job-xml> tag, since I'm putting hive-site.xml in my application path, so I'm just passing the file name not the whole location. If you want to keep that file in some other location of your HDFS, then you can pass the whole HDFS path there too. In older version of Hive, user had to provide the hive-default.xml file by using property key oozie.hive.defaults while running Oozie Hive action, but from now on (Hive 0.8+) it's not required anymore.

Here I'm using another tag <param>, which is not required but I'm using it just to show how to pass parameter among hive script, job properties and workflow. If you are using any parameter variable inside your hive script, it needs to pass through the hive action. So you can do, either:
  • <param>INPUT_PATH=${inputPath}</param> (where inputPath can be passed through job properties) , Or
  • <param>INPUT_PATH=/user/ambari-qa/input/temp</param>

Inside my HDFS, "/hive-input/temp" folder contains files which need to be loaded to Hive table:
And here is my job.properties file:
That's it! You can now run your Hive workflow by executing this on the client node:

Two common issues:
You might face some issues if the required jar files are not present inside "/user/oozie/share/lib/hive" folder (HDFS). One of the commons issue is not having the hcatalog* jar files in that folder. In that case you will see something like this in the log:
In that case, you need to manually copy those required jar files into that folder. You can do that by following:

Another common issue you might face is:
SemanticException [Error 10001]: Table not found
Even though you can see your table is exists, you might see this error when running through Oozie. Most of the time it happens when your Hive is not properly pointing to the right metastore. Most of the time, the problem goes away when you copy the correct hive-site.xml into hive lib folder inside HDFS. Make sure you check your hive-site.xml file to see all properties are correctly set. Like,  "hive.metastore.uris", "javax.jdo.option.ConnectionUR", "javax.jdo.option.ConnectionDriverName". But me and other users (Hive action failing in oozie) also found out that the above error message is ambiguous and doesn't give much insight. If the expected jar files are not present in the share lib folder, hive also throws the same error message! So be careful about what you have in the classpath when running hive through Oozie.

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Monday, February 25, 2013

Setup a Storm cluster on Amazon EC2

Storm - Real-time  Hadoop! Yes, you can call it in that way. As you know, Hadoop provides a set of general primitives for doing batch processing, Storm also provides a set of primitives for doing real-time computation. It's a very powerful tool and pretty straight forward to setup a Storm cluster. If you want to setup a Strom cluster on Amazon EC2, you should try Nathan's storm-deploy project first which will auto deploy a Storm cluster on EC2 based on your configurations. But if you want to manually deploy a Storm cluster, you can follow these steps (if you want more detailed information, you can also follow original documentation of Storm):

Let me show you my machine's current configuration first:
  • Machine type: m1.large (for supervisor) and m1.small (for nimbus)
  • OS: 64-bit CentOS 6.3
  • JRE Version: 1.6.0_43
  • JDK Version: 1.6.0_43
  • Python Version: 2.6.6


For this tutorial, I am going to setup a 3-node Storm cluster. IP addresses of each hosts and my targetted configurations is:

10.0.0.194 - StormSupervisor1
10.0.0.195 - StormSupervisor2
10.0.0.196 - StormSupervisor3
10.0.0.182 - StormNimbus

Storm depends on Zookeeper for coordinating the cluster. I have already installed Zookeeper in each of those above hosts. Now apply each of the following steps on all Supervisor and Nimbus nodes:


A. Install ZeroMQ 2.1.7

Step #A-1: Download zeromq-2.1.7.tar.gz from http://download.zeromq.org/.

Step #A-2: Extract the gzip file:
[root@ip-10-0-0-194 tool]# tar -zxvf zeromq-2.1.7.tar.gz
Step #A-3: Build ZeroMQ and update the library:
Note: If you are facing "cannot link with -luuid, install uuid-dev." error when you are executing "./configure", then you need to install it. You can install it by executing "yum install libuuid-devel".


B. Install JZMQ

Step #B-1: Get the project from the Git by executing:
Step #B-2: Install it:

C. Setup Storm

Step #C-1: Download the latest version (for this tutorial, I'm using 0.8.1 version) from https://github.com/nathanmarz/storm/downloads.

Step #C-2: Unzip the downloaded zip file:
[root@ip-10-0-0-194 tool]# unzip storm-0.8.1.zip
Step #C-3: Now change the configuration based on your environment. Default location of the main Storm configuration file is: "/storm/conf/storm.yaml". Any setting you write on this file will overwrite default configuration file. Here is what I changed in the storm.yaml file:
Note: I have created the "storm" folder manually inside "/var" directory.

At this point, you are ready to start your Storm cluster. Here, I installed and setup everything on a single instance first (supervisor1 - 10.0.0.194) and then I created AMI from that instance and later created rest of the two supervisors and one nimbus node from that AMI.

Launch daemons by using the storm script (bin/storm) on each nodes. I started nimbus and UI daemons on the nimbus host and supervisor daemon on each of the supervisor nodes.

  • bin/storm nimbus on 10.0.0.182
  • bin/storm ui on 10.0.0.182
  • bin/storm supervisor on 10.0.0.194,10.0.0.195,10.0.0.196



You can see Storm UI by navigating to your nimbus host: http://{nimbus host}:8080. For my case, it was: http://54.208.24.209:8080 (here, 54.208.24.209 is the public IP address of my nimbus host).



Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.


Saturday, February 23, 2013

Install Opscenter in CentOS environment

In my previous post, I talked about how to install Cassandra in CentOS environment. This is the follow-up post of my previous post and here I am going to show you how to install OpsCenter in the same environment. "OpsCenter is a browser-based user interface for monitoring, administering, and configuring multiple Cassandra clusters in a single, centralized management console (ref)". 

Last time, I installed Cassandra cluster on these nodes:
  • cassandra node1 -> 10.0.0.57
  • cassandra node1 -> 10.0.0.58
  • cassandra node1 -> 10.0.0.59
Now I am going to install OpsCenter agents on these nodes and will treat 10.0.0.57 as my client node (that means, OpsCenter console will be deployed on that host). Before OpsCenter installation, make sure your Cassandra cluster is up and running successfully.

Step #1: Create a new yum repository definition for DataStax OpsCenter in 10.0.0.57 node.
[root@ip-10-0-0-57 ~]# vim /etc/yum.repos.d/datastax.repo
Step #2: Write the edition you want to install in the datastax.repo file (I am installing OpsCenter community edition):
Step #3: Install the OpsCenter pacakge:
[root@ip-10-0-0-57 ~]# yum install opscenter-free
The above steps will install the most recent OpsCenter community edition in your system. But I want to install a specific version of OpsCenter today (appropriate for the Cassandra version which I installed earlier). So to do that, at first I need to check the list of versions for OpsCenter which are available now:
I wanted to install 3.0.2-1 version. So, I'm installing it by:
[root@ip-10-0-0-57 ~]# yum install opscenter-free-3.0.2-1
Step #4:  If you do not want to try with the repository and want to install manually any specific version of OpsCenter, in that case you can download the rpm file from http://rpm.datastax.com/community/noarch/ and can install it by:
[root@ip-10-0-0-57 ~]# yum install opscenter-free-3.0.2-1.noarch.rpm
Step #5: Now configure your opscenter configuration file (/etc/opscenter/opscenterd.conf) to mention your web server's IP address or hostname:
Step #6: Now start your OpsCenter by:
Step #7: Now you can see your OpsCenter console by navigating to http://<opscenter_host>:8888. For my case, it would be: http://54.208.29.59:8888 as it's the public IP address for the host 10.0.0.57.

Step #8: Wait, you are not done yet! You still need to install your OpsCenter agents. For the first time when you open your OpsCenter console, it will ask you whether you want to create a new cluster or want to use existing cluster. In previous post, I installed a cluster with the name "Simple Cluster". So I want to install that existing cluster for my OpsCenter. So, I'm selecting "Use Existing Cluster" option.

Step #9: Now, you need to pass a list of hostnames or IP address of my cluster in each line at a time (leave other fields as they are):

Step #10: At this point you should be able to see your OpsCenter console.

Note that on top of your console, there is a notification labeled as "0 of 3 agents connected" with a link called "fix". This is because, none of your OpsCenter agents are installed yet. Click on that link and install agents automatically.

Step #11: Enter appropriate credentials of your machine. For my case, I'm writing "root" as my username and pasting my private key (including the commented part, i.e. --BEGIN RSA PRIVATE KEY):

Click on "Done" and finally install it by clicking on the "Install on all nodes" button.

Step #12: Accept all the fingerprints by clicking on the "Accept Fingerprints" button. Once you click on that button, each of your host will be downloading agent package by connecting to the internet and then it will install and configure agent in the system.

At the end of the installation you should be able to see a successful message on your screen:


As you see that each of the host downloads the agent package from internet, it is required that each of your host can talk to internet. If you do not have that setup, you can also install OpsCenter agent manually.

At the end of your installation you shouldn't be seen "0 of 3 agents connected" notification anymore.

Finally you are done! There are so many things you can do in OpsCenter. I highly recommend you to play with it and do some experiments by changing/applying different settings and configurations. It also comes with a large set of very good performance metrics, so do not forget to check those metrics too.



Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Thursday, February 21, 2013

Sqoop import/export from/to Oracle

I love Sqoop! It's a fun tool to work with and it's very powerful. Importing and exporting data from/to Oracle by Sqoop is pretty straightforward. One crucial thing you need to remember when working with Sqoop and Oracle together, that is using all capital letters for Oracle table names. Otherwise, Sqoop will not recognize Oracle tables.

This is my database (Oracle) related information:
  • URL: 10.0.0.24
  • db_name (SID): test
  • Schema: Employee_Reporting
  • Username: username1
  • Password: password1
  • Tablename: employee ( I'm going to export this table to HDFS by Sqoop)

Import from Oracle to HDFS:
Let's go through this option file (option.par):
You can see most of the parameters are self-explanatory. Notice that I'm providing table name in all capital letters. How you want to see the imported columns in HDFS? For that, we need to use --fields-terminated-by parameter. Here I'm passing "\t" for that parameter, which means that the column or field for each rows will be tab delimited after import. Sqoop will generate a class(with a list of setter and getter) to invoke the employee object, the name of that class name is defined by --class-name parameter. So in this case, it will create a class named Employee.java inside com.example.reporting package. I'm using --verbose parameter to print out information while Sqoop is working. It's not mandatory and you can ignore it if you want. --split-by parameter represents the name of the column which I want to use for splitting the import data. Here, ID is the primary key of the table Employee. You can use any WHERE clause for your import, in that case you need to pass that with the --where parameter. For the above example, it will import all rows from the table Employee where ID is less than or equal to 100000 (e.g. importing 100000 rows). You need to mention a HDFS location which will be used as a destination directory for the imported data (--target-dir parameter). Remember one thing here is that the target directory should not be existing prior to run import command otherwise Sqoop will throw an error. The last parameter -m represents the number of map tasks to run in parallel for the entire import job.

Once you have your option file ready, you can execute the Sqoop import command as:
sqoop import --options-file option.par
Using option file is not mandatory, I'm just using it for my convenience. You can also pass each of the parameter from your console and execute the import job. Example:

Export from Hive to Oracle:
For export, I will be using some of the parameters which I used during import as they are common for both import and export job. Assume I processed(by MR jobs) the data generated by import job and inserted them into Hive tables. Now I want to export those Hive tables to Oracle. If you are familiar with Hive, you may know that Hive moves/copies data to its warehouse folder by default. I'm using Hortonworks's distribution and for my case Hive'e warehouse folder is located at: "/apps/hive/warehouse/emp_record". Here, emp_record is one of the Hive table I want to export from.

I have already created a matching table "Emp_Record" in my Oracle inside the same schema "Employee_Reporting". To export the Hive table, I'm executing the following command:
Notice that instead of using --target-dir, I'm using --export-dir, this is the location of the Hive table's warehouse folder and data will be exporting from there.

Now assume, inside the warehouse directory, I have a file 00000_1 (which contains the data of Hive table Emp_Record) and some of its lines are:
As you can see, each of columns/fields are tab delimited and each of the rows are separated by a new line. Again we see here that there is an entire row which contains null as their values (Ideally you might not have null values as you might want to filter those values from your M-R jobs). But say we have all kind of values, so we need to tell Sqoop how to treat each of those values. Because of that, I'm using --input-fields-terminated-by parameter to inform that the fields are tab delimited and --input-lines-terminated-by parameter to distinguish rows. Again from Sqoop side, there are two kinds of column - string column and non-string column. Sqoop needs to know what string value is interpreting a null value. Because of that I'm using --input-null-string and --input-null-non-string parameters for two column types and passing '\\N' as their value because for my case '\\N' is null.


Wrapping Up:
Sometimes you will face some issues during export when your Oracle table has a very tight constraint (e.g. not null, time-stamp, expecting value in specific format, etc). In that case, the best idea is to export the Hive table/HDFS data to a temporary Oracle table without any modification to make Sqoop happy :). And then write a SQL script to convert and filter those data to your expected values and load them into your main Oracle table.

The above two examples are just showing a very basic import and export Sqoop job. There are a lot of setting in Sqoop you can use. Once you are able to run export/import job successfully, I would recommend you to try the same job with different parameters and see how it goes. You can find all available options in the Sqoop user guide.


Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Wednesday, February 13, 2013

Install Cassandra Cluster within 30 minutes

Installing Cassandra cluster is pretty straight forward and you will find a lot of detailed documentations on DataStax site. But if you still find it overwhelming, you can follow the steps I mention here. It's just a sorted version of all of the steps you need to install a basic setup of Cassandra cluster. But I still recommend you to read DataStax documents for detailed information.

I am using a EBS backed CentOS AMI for this setup. You can choose different AMI based on your needs.

Please check the current Cassandra version if you want to setup the most latest one. Today, I am going to install Cassandra 1.2.3 version.

Step #1: Create an instance in Amazon AWS. I choose the following one for this setup:

EBS Backed CentOS 6.3

This is an EBS backed 64-bit 6.3 CentOS AMI. For me, the IP address which was assigned for this server: 10.0.0.57.

Step #2: Once you created this instance, by default it will not allow you to login as root user. So, login to that server as ec2-user.

Step #3: Now allow root login on that machine:
Using username "ec2-user".
Authenticating with public key "imported-openssh-key"
[ec2-user@ip-10-0-0-57 ~]$ sudo su
[root@ip-10-0-0-57 ec2-user]# cd /root/.ssh
[root@ip-10-0-0-57 .ssh]# cp /home/ec2-user/.ssh/authorized_keys .
cp: overwrite `./authorized_keys'? y
[root@ip-10-0-0-57 .ssh]# chmod 700 /root/.ssh
[root@ip-10-0-0-57 .ssh]# chmod 640 /root/.ssh/authorized_keys
[root@ip-10-0-0-57 .ssh]# service sshd restart
Step #4: Now you should be able to login as root user on that server. So, login again as root user and check the current Java version:
[root@ip-10-0-0-57 ~]# java -version
The AMI which I used for this server didn't have Java pre-installed. So, I'm going to install last version of Java 1.6 on this server. It is recommended not to use Java 1.7 for Cassandra.

Step #5: Download Java rpm (jre-6u43-linux-x64-rpm.bin) from the following location:


Step #6: Copy your downloaded rpm to that server by using WinSCP. (You can use any other tool if you want).

Step #7: Give required permissions to that rpm and install it:
[root@ip-10-0-0-57 ~]# chmod a+x jre-6u43-linux-x64-rpm.bin
[root@ip-10-0-0-57 ~]# ./jre-6u43-linux-x64-rpm.bin
Step #8: Java should be installed successfully and it should show you expected information:
[root@ip-10-0-0-57 ~]# java -version
java version "1.6.0_43"
Java(TM) SE Runtime Environment (build 1.6.0_43-b01)
Java HotSpot(TM) 64-Bit Server VM (build 20.14-b01, mixed mode)
Step #9: Now download Cassandra 1.2.3 rpm from the following location and copy it to your server by WinSCP:

DataStax Cassandra 1.2.3 rpm

You can also install it by using DataStax repository (check DataStax Cassandra manual for that), I'm just following this way as a preference.

Step #10: After you copy the Cassandra rpm to your server, install it:
[root@ip-10-0-0-57 ~]# yum install cassandra12-1.2.3-1.noarch.rpm
Step #11: Cassandra should be installed successfully and you can check the status by this (By default, Cassandra server is stopped right after its installed):
[root@ip-10-0-0-57 ~]# /etc/init.d/cassandra status
cassandra is stopped
Step #12: During Cassandra installation, it downloads OpenJDK version of Java as a dependency and it will overwrite your java setting. So follow these to set back your Java to use Oracle version:

Step #13: At this point, your Cassandra is ready to start. But before you start your Cassandra, you need to update its configuration file for your cluster. All configuration file are present in the location "/etc/cassandra/conf" of your server by default if its packaged install. For the cluster, I will change one of the file from there which is cassandra.yaml. The cassandra.yaml is the main configuration file for Cassandra. 

There are so many properties you can change in the main configuration file and you can find its details here: http://www.datastax.com/docs/1.2/configuration/node_configuration. But as I am just installing the most basic version of Cassandra cluster, I will change only the following property:
  • cluster_name: Name of your cluster. It will be same for all hosts or instances.
  • initial_token: Used in versions prior to 1.2. For this setup, I will manually set this value. But that is not required and you can setup your cluster by using both initial_token & num_tokens property. You can read more about it on the web and can try out different configs once you are familiar with Cassandra.
  • partitioner: It determine which node to store the data on. Remember, paritioner cannot be changed without loading your all data. So configure your correct partitioner before initializing your cluster.
  • seed_provider: A list of comma-delimited IP addresses to use as contact points when a node joins a cluster. This value should be unique for all your hosts.
  • listen_address: Local IP address of each host.
  • rpc_address: Listener address for client connections. Make it 0.0.0.0 to listens on all configured interfaces.
  • endpoint_snitch: Sets which snitch Cassandra uses for locating nodes and routing requests. Since, I'm installing Cassandra cluster on AWS with a single region, I will be using EC2Snitch. Again, this value should be unique for all of your hosts.
So, after modifying, here is my updated configuration file (reflects only changes which I made):
Step #14:

Murmur3Partitioner: This is a new partitioner which is available from Cassandra 1.2 version. initial_token value is depends on the partitioner you are using. To generate initial_token value for Murmur3Partitioner, you can run the following commands:
Note: Here, 3 is the number of nodes which I will be using for my cluster setup. Change that value based on your needs.

RandomPartitioner: If you want to use RandomParitioner then in that case you can generate your initial_token value by using the tooken-generator tool which comes with Cassandra installation:
Step #15: Now, I am going to create a new AMI from my current instance so that when I create a new instance from that, I will have Cassandra installation ready with expected configuration.

Since, I'm installing a 3 node cluster, I will create two new instances from the AMI which I just created. So, I got IP addresses like this:

- cassandra node1 -> 10.0.0.57
- cassandra node2 -> 10.0.0.58
- cassandra node3 -> 10.0.0.59

If you are using Amazon Virtual Private Cloud (VPC), then you have the option to choose specific IP address based on your needs.

Step #16: Remember, even though you created instances from the AMI, you still need to change some values in cassandra.yaml file which varies based on hosts. Those are:

Node #2:
initial_token: -3074457345618258603
listen_address: 10.0.0.58
Node #3:
initial_token: 3074457345618258602
listen_address: 10.0.0.59
Step #17: You may need to update your "/etc/hosts" file in case your hostname is not configured. I have updated that file in each server like this:
Step #18: That's it! Now you are ready to start your Cassandra. Execute this in each node to start your cluster:
[root@ip-10-0-0-57 ~]# /etc/init.d/cassandra start
Starting cassandra: OK
Step #19: You can check the status of your Cassandra by executing this:


Two major things here you need to look on the status, those are "Owns" and "Status" columns. You see here all nodes are up and sharing same percentage of the total ownership. 

As I said at the beginning, this is the most basic (or minimum) setup of Cassandra cluster. There are a lot of things you can change, tune and modify based on your needs. Once you are familiar with basic Cassandra, I highly recommend you to do some experiments by using different configurations. The more you try the more you learn!

In another post, I will write about how to install DataStax OpsCenter to monitor this Cassandra Cluster.

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.